본문 바로가기

Data Mining & Distributed/Airflow

[ Airflow ] 예제 분석 Dataset을 사용한 스케쥴링

반응형

 

 

 

이 코드는 Apache Airflow의 Dataset 기반 스케줄링 기능을 활용한 DAG 집합을 정의한 예제입니다. Airflow 2.4부터 도입된 Dataset은 DAG 간 의존성 관리와 트리거 조건을 명확하게 구성할 수 있게 해주는 매우 강력한 기능입니다.

[ CODE ]

from __future__ import annotations

import pendulum

from airflow.datasets import Dataset
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable

# [START dataset_def]
dag1_dataset = Dataset("s3://dag1/output_1.txt", extra={"hi": "bye"})
# [END dataset_def]
dag2_dataset = Dataset("s3://dag2/output_1.txt", extra={"hi": "bye"})
dag3_dataset = Dataset("s3://dag3/output_3.txt", extra={"hi": "bye"})

with DAG(
    dag_id="dataset_produces_1",
    catchup=False,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule="@daily",
    tags=["produces", "dataset-scheduled"],
) as dag1:
    # [START task_outlet]
    BashOperator(outlets=[dag1_dataset], task_id="producing_task_1", bash_command="sleep 5")
    # [END task_outlet]

with DAG(
    dag_id="dataset_produces_2",
    catchup=False,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=None,
    tags=["produces", "dataset-scheduled"],
) as dag2:
    BashOperator(outlets=[dag2_dataset], task_id="producing_task_2", bash_command="sleep 5")

# [START dag_dep]
with DAG(
    dag_id="dataset_consumes_1",
    catchup=False,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=[dag1_dataset],
    tags=["consumes", "dataset-scheduled"],
) as dag3:
    # [END dag_dep]
    BashOperator(
        outlets=[Dataset("s3://consuming_1_task/dataset_other.txt")],
        task_id="consuming_1",
        bash_command="sleep 5",
    )

with DAG(
    dag_id="dataset_consumes_1_and_2",
    catchup=False,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=[dag1_dataset, dag2_dataset],
    tags=["consumes", "dataset-scheduled"],
) as dag4:
    BashOperator(
        outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
        task_id="consuming_2",
        bash_command="sleep 5",
    )

with DAG(
    dag_id="dataset_consumes_1_never_scheduled",
    catchup=False,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=[
        dag1_dataset,
        Dataset("s3://unrelated/this-dataset-doesnt-get-triggered"),
    ],
    tags=["consumes", "dataset-scheduled"],
) as dag5:
    BashOperator(
        outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
        task_id="consuming_3",
        bash_command="sleep 5",
    )

with DAG(
    dag_id="dataset_consumes_unknown_never_scheduled",
    catchup=False,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=[
        Dataset("s3://unrelated/dataset3.txt"),
        Dataset("s3://unrelated/dataset_other_unknown.txt"),
    ],
    tags=["dataset-scheduled"],
) as dag6:
    BashOperator(
        task_id="unrelated_task",
        outlets=[Dataset("s3://unrelated_task/dataset_other_unknown.txt")],
        bash_command="sleep 5",
    )

with DAG(
    dag_id="consume_1_and_2_with_dataset_expressions",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=(dag1_dataset & dag2_dataset),
) as dag5:
    BashOperator(
        outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
        task_id="consume_1_and_2_with_dataset_expressions",
        bash_command="sleep 5",
    )
with DAG(
    dag_id="consume_1_or_2_with_dataset_expressions",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=(dag1_dataset | dag2_dataset),
) as dag6:
    BashOperator(
        outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
        task_id="consume_1_or_2_with_dataset_expressions",
        bash_command="sleep 5",
    )
with DAG(
    dag_id="consume_1_or_both_2_and_3_with_dataset_expressions",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)),
) as dag7:
    BashOperator(
        outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
        task_id="consume_1_or_both_2_and_3_with_dataset_expressions",
        bash_command="sleep 5",
    )
with DAG(
    dag_id="conditional_dataset_and_time_based_timetable",
    catchup=False,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    schedule=DatasetOrTimeSchedule(
        timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset)
    ),
    tags=["dataset-time-based-timetable"],
) as dag8:
    BashOperator(
        outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")],
        task_id="conditional_dataset_and_time_based_timetable",
        bash_command="sleep 5",
    )

 

 

1. 핵심 개념 정리: Airflow Dataset

  • Dataset: DAG 외부의 공유 리소스를 표현하는 객체로, 보통 파일 경로, DB 테이블, API Endpoint 등으로 설정합니다.
  • DAG 간 연결을 의존성이 아닌 데이터 흐름으로 표현할 수 있습니다.
  • Dataset을 outlets으로 푸시한 DAG 실행이 끝나면, 해당 Dataset을 schedule로 사용하는 DAG이 자동 실행됩니다.

2. 주요 구성 설명

🔹 Dataset 정의

dag1_dataset = Dataset("s3://dag1/output_1.txt", extra={"hi": "bye"})
dag2_dataset = Dataset("s3://dag2/output_1.txt", extra={"hi": "bye"})
dag3_dataset = Dataset("s3://dag3/output_3.txt", extra={"hi": "bye"})
  • Dataset은 S3 경로 등으로 식별되며 extra는 선택적인 메타데이터입니다.
  • 여기서는 DAG 1, 2, 3이 각각 다른 Dataset을 생성하도록 설정되어 있습니다.

3. DAG 구성별 설명

📤 DAG1: dataset_produces_1

dag_id="dataset_produces_1", schedule="@daily"
BashOperator(outlets=[dag1_dataset])
  • 매일 실행되며 dag1_dataset을 생산 (produce) 합니다.
  • 이 DAG이 실행되면 이 Dataset을 의존하는 DAG들이 트리거될 수 있습니다.

📤 DAG2: dataset_produces_2

dag_id="dataset_produces_2", schedule=None
BashOperator(outlets=[dag2_dataset])
  • 수동 실행 DAG으로, 실행 시 dag2_dataset을 생성합니다.

📥 DAG3: dataset_consumes_1

dag_id="dataset_consumes_1", schedule=[dag1_dataset]
  • dag1_dataset이 변경(업데이트)되면 트리거됩니다.
  • BashOperator는 추가적으로 outlets으로 다른 Dataset도 생성합니다.

📥 DAG4: dataset_consumes_1_and_2

dag_id="dataset_consumes_1_and_2", schedule=[dag1_dataset, dag2_dataset]
  • dag1_dataset과 dag2_dataset 모두 업데이트 되어야 실행됩니다.
  • AND 조건입니다.

📥 DAG5: dataset_consumes_1_never_scheduled

schedule=[dag1_dataset, Dataset("s3://unrelated/this-dataset-doesnt-get-triggered")]
  • 두 개의 Dataset 모두 업데이트 되어야 실행됩니다.
  • 그런데 하나는 절대 실행되지 않는 Dataset이기 때문에, 이 DAG은 절대 실행되지 않음.

📥 DAG6: dataset_consumes_unknown_never_scheduled

schedule=[Dataset("s3://unrelated/dataset3.txt"), Dataset("s3://unrelated/dataset_other_unknown.txt")]
  • 외부에서 생성되지 않는 Dataset에만 의존 → 트리거되지 않음

4. Dataset Expression 기반 DAG들

✅ DAG7: consume_1_and_2_with_dataset_expressions

schedule=(dag1_dataset & dag2_dataset)
  • dag1_dataset과 dag2_dataset 둘 다 업데이트되면 실행 (AND)

✅ DAG8: consume_1_or_2_with_dataset_expressions

schedule=(dag1_dataset | dag2_dataset)
  • dag1_dataset 또는 dag2_dataset 하나라도 업데이트되면 실행 (OR)

✅ DAG9: consume_1_or_both_2_and_3_with_dataset_expressions

schedule=(dag1_dataset | (dag2_dataset & dag3_dataset))
  • dag1_dataset이 업데이트되거나
  • dag2_dataset과 dag3_dataset이 모두 업데이트되면 실행

🕒 DAG10: conditional_dataset_and_time_based_timetable

schedule=DatasetOrTimeSchedule(
    timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"),
    datasets=(dag1_dataset & dag2_dataset)
)
  • 다음 중 하나라도 만족하면 실행됨:
    1. 매주 수요일 01:00 UTC
    2. dag1_dataset AND dag2_dataset이 둘 다 업데이트

이 DAG은 시간 기반 + Dataset 기반 트리거를 동시에 설정한 예제입니다.


5. 전체 DAG 트리거 흐름 요약

DAG ID 생성 Dataset (Outlets) 스케줄 방식 트리거 대상

dataset_produces_1 dag1_dataset @daily dag3, dag4, dag5, dag7~10
dataset_produces_2 dag2_dataset 수동 dag4, dag5, dag7~10
dataset_consumes_1 consuming_1_task/... dag1_dataset 실행 시
dataset_consumes_1_and_2 consuming_2_task/... dag1_dataset AND dag2_dataset 실행 시
consume_1_or_2_with_dataset_expressions consuming_2_task/... dag1_dataset OR dag2_dataset 실행 시
conditional_dataset_and_time_based_timetable ... 매주 수요일 01시 OR dag1 & dag2 실행 시

6. 요약

  • Dataset은 DAG 간 의존성을 명시적이고 선언적으로 설정하는 도구입니다.
  • outlets로 생성 → schedule=[dataset] 또는 표현식으로 소비
  • 표현식: |(OR), &(AND), DatasetOrTimeSchedule() 등 복합 스케줄링 가능
  • 실전에서는 S3, BigQuery, Hive, DB 테이블, API 등 다양한 리소스를 Dataset으로 표현해 사용 가능

 

반응형