liminfo

Apache Airflow Reference

Airflow 워크플로우 오케스트레이션 레퍼런스

26개 결과

Apache Airflow Reference 소개

Apache Airflow 레퍼런스는 Apache Airflow로 데이터 파이프라인을 구축하는 데 사용되는 핵심 개념과 API를 다루는 검색 가능한 치트 시트입니다. 6개 카테고리로 구성됩니다: DAG(방향성 비순환 그래프 정의), 오퍼레이터(PythonOperator, BashOperator, EmailOperator, BranchPythonOperator), 센서(FileSensor, ExternalTaskSensor, HttpSensor, S3KeySensor), 스케줄러(DAG 목록 조회, 트리거, 테스트, 백필 CLI 명령), 변수(Airflow 변수 API와 XCom 태스크 간 통신), 커넥션(CLI 기반 커넥션 설정, PostgreSQL 및 S3 훅 사용).

이 레퍼런스는 Apache Airflow에서 자동화된 워크플로를 구축하는 데이터 엔지니어, 파이프라인 개발자, MLOps 실무자가 사용합니다. 클래식 DAG 생성자 패턴과 @dag 및 @task 데코레이터를 사용하는 현대적인 TaskFlow API를 모두 다룹니다. 코드 예제는 실제 변수명과 패턴을 사용합니다. 예를 들어 schedule_interval 예제는 실제 cron 표현식을 명명된 프리셋(@daily, @hourly, @weekly)과 함께 보여주고, BranchPythonOperator 예제는 날짜 기반 조건부 로직을 구현합니다.

레퍼런스의 각 항목에는 Python 구문 또는 CLI 명령, 기능에 대한 간결한 설명, 완전히 실행 가능한 코드 스니펫이 포함됩니다. 센서 예제는 poke_interval과 timeout 값을 구성하는 방법을 보여줍니다. 커넥션 예제는 CLI airflow connections add 명령과 BaseHook.get_connection() 및 공급자별 훅(PostgresHook, S3Hook)을 사용하는 프로그래밍 방식 모두를 다룹니다. XCom 섹션에서는 push/pull 패턴과 TaskFlow 자동 값 전달을 모두 다룹니다.

주요 기능

  • 검색 가능한 6개 카테고리: DAG, 오퍼레이터, 센서, 스케줄러, 변수, 커넥션
  • DAG 정의: 클래식 생성자 패턴과 현대적인 @dag/@task TaskFlow API 데코레이터
  • 오퍼레이터: PythonOperator, BashOperator, EmailOperator, Jinja 템플릿 포함 BranchPythonOperator
  • 센서: FileSensor, ExternalTaskSensor, HttpSensor, poke/timeout 설정 포함 S3KeySensor
  • 스케줄러 CLI: airflow dags list/trigger/backfill 및 날짜 인수 포함 airflow tasks test
  • 변수 API: JSON 역직렬화와 Jinja 템플릿 구문을 사용한 Variable.get/set
  • 태스크 인스턴스 컨텍스트를 통한 태스크 간 데이터 전달을 위한 XCom push/pull 패턴
  • 커넥션 훅: conn_id 참조를 사용한 PostgresHook.get_pandas_df 및 S3Hook.load_file

자주 묻는 질문

클래식 DAG 생성자와 TaskFlow API의 차이점은 무엇인가요?

클래식 패턴은 DAG 객체를 생성한 다음 dag=dag로 참조하는 오퍼레이터 인스턴스를 정의합니다. TaskFlow API(@dag 및 @task 데코레이터)를 사용하면 Python 함수를 직접 태스크로 작성할 수 있으며, XCom 전달이 자동으로 처리됩니다. 함수 반환값이 다운스트림 태스크의 입력이 됩니다.

Airflow에서 태스크 의존성을 설정하는 방법은?

>> 연산자로 태스크를 연결합니다: extract_task >> transform_task >> load_task. 팬아웃 및 팬인 패턴에는 리스트를 사용합니다: [task_a, task_b] >> task_c >> [task_d, task_e]. 이렇게 하면 스케줄러가 준수하는 업스트림/다운스트림 의존성 그래프가 설정됩니다.

센서에서 poke_interval과 timeout의 차이점은 무엇인가요?

poke_interval은 센서가 조건을 확인하는 빈도(초 단위)입니다. 예를 들어 60초마다 확인합니다. timeout은 센서가 타임아웃 오류를 발생시키기 전까지 기다리는 최대 총 시간(초 단위)입니다. 적절한 timeout을 설정하면 중단된 DAG 실행이 스케줄러를 무한정 차단하는 것을 방지합니다.

XCom은 태스크 간 데이터 전달에 어떻게 작동하나요?

태스크는 context["ti"].xcom_push(key="result", value=42)로 값을 푸시하고, 다운스트림 태스크는 context["ti"].xcom_pull(task_ids="push_task", key="result")로 검색합니다. TaskFlow API에서는 함수 반환값이 자동으로 XCom으로 푸시되고 다운스트림 함수의 매개변수로 주입됩니다.

DAG 정의에서 catchup=False의 목적은 무엇인가요?

catchup=False이면 Airflow는 start_date와 오늘 사이의 누락된 간격이 아닌 현재 스케줄 간격에 대해서만 DAG를 실행합니다. catchup=True(기본값)로 설정하면 Airflow가 start_date 이후 누락된 모든 실행을 백필하는데, 이는 프로덕션 파이프라인에 종종 바람직하지 않습니다.

API 키 같은 비밀값을 Airflow에서 어떻게 저장하고 접근하나요?

Variable.set("api_key", "your-secret")으로 값을 저장하고 Variable.get("api_key")로 런타임에 검색합니다. Jinja 템플릿에서는 {{ var.value.api_key }}로 참조합니다. 프로덕션 환경에서는 Airflow 메타데이터 DB에 비밀을 저장하지 않도록 시크릿 백엔드(AWS Secrets Manager, HashiCorp Vault)를 구성하세요.

airflow dags backfill 명령은 어떤 용도로 사용하나요?

backfill은 지정된 과거 날짜 범위에 대해 DAG를 재실행합니다. airflow dags backfill my_pipeline --start-date 2024-01-01 --end-date 2024-01-31은 해당 범위 내 스케줄 간격마다 한 번씩 실행을 트리거합니다. 실패한 실행을 유발한 DAG 버그를 수정한 후에 유용합니다.

전체 DAG를 실행하지 않고 단일 태스크를 테스트하는 방법은?

airflow tasks test <dag_id> <task_id> <execution_date>를 사용합니다. 예: airflow tasks test my_pipeline run_python 2024-01-01. 이렇게 하면 메타데이터 데이터베이스와 상호작용하지 않고 로컬에서 태스크를 실행하여 개발 중 빠른 반복 작업에 이상적입니다.