반응형
이 코드는 Apache Airflow의 Dataset 기반 스케줄링 기능을 활용한 DAG 집합을 정의한 예제입니다. Airflow 2.4부터 도입된 Dataset은 DAG 간 의존성 관리와 트리거 조건을 명확하게 구성할 수 있게 해주는 매우 강력한 기능입니다.
[ CODE ]
from __future__ import annotations
import pendulum
from airflow.datasets import Dataset
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator
from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable
# [START dataset_def]
dag1_dataset = Dataset("s3://dag1/output_1.txt", extra={"hi": "bye"})
# [END dataset_def]
dag2_dataset = Dataset("s3://dag2/output_1.txt", extra={"hi": "bye"})
dag3_dataset = Dataset("s3://dag3/output_3.txt", extra={"hi": "bye"})
with DAG(
dag_id="dataset_produces_1",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule="@daily",
tags=["produces", "dataset-scheduled"],
) as dag1:
# [START task_outlet]
BashOperator(outlets=[dag1_dataset], task_id="producing_task_1", bash_command="sleep 5")
# [END task_outlet]
with DAG(
dag_id="dataset_produces_2",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=None,
tags=["produces", "dataset-scheduled"],
) as dag2:
BashOperator(outlets=[dag2_dataset], task_id="producing_task_2", bash_command="sleep 5")
# [START dag_dep]
with DAG(
dag_id="dataset_consumes_1",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=[dag1_dataset],
tags=["consumes", "dataset-scheduled"],
) as dag3:
# [END dag_dep]
BashOperator(
outlets=[Dataset("s3://consuming_1_task/dataset_other.txt")],
task_id="consuming_1",
bash_command="sleep 5",
)
with DAG(
dag_id="dataset_consumes_1_and_2",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=[dag1_dataset, dag2_dataset],
tags=["consumes", "dataset-scheduled"],
) as dag4:
BashOperator(
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
task_id="consuming_2",
bash_command="sleep 5",
)
with DAG(
dag_id="dataset_consumes_1_never_scheduled",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=[
dag1_dataset,
Dataset("s3://unrelated/this-dataset-doesnt-get-triggered"),
],
tags=["consumes", "dataset-scheduled"],
) as dag5:
BashOperator(
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
task_id="consuming_3",
bash_command="sleep 5",
)
with DAG(
dag_id="dataset_consumes_unknown_never_scheduled",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=[
Dataset("s3://unrelated/dataset3.txt"),
Dataset("s3://unrelated/dataset_other_unknown.txt"),
],
tags=["dataset-scheduled"],
) as dag6:
BashOperator(
task_id="unrelated_task",
outlets=[Dataset("s3://unrelated_task/dataset_other_unknown.txt")],
bash_command="sleep 5",
)
with DAG(
dag_id="consume_1_and_2_with_dataset_expressions",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=(dag1_dataset & dag2_dataset),
) as dag5:
BashOperator(
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
task_id="consume_1_and_2_with_dataset_expressions",
bash_command="sleep 5",
)
with DAG(
dag_id="consume_1_or_2_with_dataset_expressions",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=(dag1_dataset | dag2_dataset),
) as dag6:
BashOperator(
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
task_id="consume_1_or_2_with_dataset_expressions",
bash_command="sleep 5",
)
with DAG(
dag_id="consume_1_or_both_2_and_3_with_dataset_expressions",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)),
) as dag7:
BashOperator(
outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
task_id="consume_1_or_both_2_and_3_with_dataset_expressions",
bash_command="sleep 5",
)
with DAG(
dag_id="conditional_dataset_and_time_based_timetable",
catchup=False,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
schedule=DatasetOrTimeSchedule(
timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset)
),
tags=["dataset-time-based-timetable"],
) as dag8:
BashOperator(
outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")],
task_id="conditional_dataset_and_time_based_timetable",
bash_command="sleep 5",
)
1. 핵심 개념 정리: Airflow Dataset
- Dataset: DAG 외부의 공유 리소스를 표현하는 객체로, 보통 파일 경로, DB 테이블, API Endpoint 등으로 설정합니다.
- DAG 간 연결을 의존성이 아닌 데이터 흐름으로 표현할 수 있습니다.
- Dataset을 outlets으로 푸시한 DAG 실행이 끝나면, 해당 Dataset을 schedule로 사용하는 DAG이 자동 실행됩니다.
2. 주요 구성 설명
🔹 Dataset 정의
dag1_dataset = Dataset("s3://dag1/output_1.txt", extra={"hi": "bye"})
dag2_dataset = Dataset("s3://dag2/output_1.txt", extra={"hi": "bye"})
dag3_dataset = Dataset("s3://dag3/output_3.txt", extra={"hi": "bye"})
- Dataset은 S3 경로 등으로 식별되며 extra는 선택적인 메타데이터입니다.
- 여기서는 DAG 1, 2, 3이 각각 다른 Dataset을 생성하도록 설정되어 있습니다.
3. DAG 구성별 설명
📤 DAG1: dataset_produces_1
dag_id="dataset_produces_1", schedule="@daily"
BashOperator(outlets=[dag1_dataset])
- 매일 실행되며 dag1_dataset을 생산 (produce) 합니다.
- 이 DAG이 실행되면 이 Dataset을 의존하는 DAG들이 트리거될 수 있습니다.
📤 DAG2: dataset_produces_2
dag_id="dataset_produces_2", schedule=None
BashOperator(outlets=[dag2_dataset])
- 수동 실행 DAG으로, 실행 시 dag2_dataset을 생성합니다.
📥 DAG3: dataset_consumes_1
dag_id="dataset_consumes_1", schedule=[dag1_dataset]
- dag1_dataset이 변경(업데이트)되면 트리거됩니다.
- BashOperator는 추가적으로 outlets으로 다른 Dataset도 생성합니다.
📥 DAG4: dataset_consumes_1_and_2
dag_id="dataset_consumes_1_and_2", schedule=[dag1_dataset, dag2_dataset]
- dag1_dataset과 dag2_dataset 모두 업데이트 되어야 실행됩니다.
- AND 조건입니다.
📥 DAG5: dataset_consumes_1_never_scheduled
schedule=[dag1_dataset, Dataset("s3://unrelated/this-dataset-doesnt-get-triggered")]
- 두 개의 Dataset 모두 업데이트 되어야 실행됩니다.
- 그런데 하나는 절대 실행되지 않는 Dataset이기 때문에, 이 DAG은 절대 실행되지 않음.
📥 DAG6: dataset_consumes_unknown_never_scheduled
schedule=[Dataset("s3://unrelated/dataset3.txt"), Dataset("s3://unrelated/dataset_other_unknown.txt")]
- 외부에서 생성되지 않는 Dataset에만 의존 → 트리거되지 않음
4. Dataset Expression 기반 DAG들
✅ DAG7: consume_1_and_2_with_dataset_expressions
schedule=(dag1_dataset & dag2_dataset)
- dag1_dataset과 dag2_dataset 둘 다 업데이트되면 실행 (AND)
✅ DAG8: consume_1_or_2_with_dataset_expressions
schedule=(dag1_dataset | dag2_dataset)
- dag1_dataset 또는 dag2_dataset 하나라도 업데이트되면 실행 (OR)
✅ DAG9: consume_1_or_both_2_and_3_with_dataset_expressions
schedule=(dag1_dataset | (dag2_dataset & dag3_dataset))
- dag1_dataset이 업데이트되거나
- dag2_dataset과 dag3_dataset이 모두 업데이트되면 실행
🕒 DAG10: conditional_dataset_and_time_based_timetable
schedule=DatasetOrTimeSchedule(
timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"),
datasets=(dag1_dataset & dag2_dataset)
)
- 다음 중 하나라도 만족하면 실행됨:
- 매주 수요일 01:00 UTC
- dag1_dataset AND dag2_dataset이 둘 다 업데이트
이 DAG은 시간 기반 + Dataset 기반 트리거를 동시에 설정한 예제입니다.
5. 전체 DAG 트리거 흐름 요약
DAG ID 생성 Dataset (Outlets) 스케줄 방식 트리거 대상
dataset_produces_1 | dag1_dataset | @daily | dag3, dag4, dag5, dag7~10 |
dataset_produces_2 | dag2_dataset | 수동 | dag4, dag5, dag7~10 |
dataset_consumes_1 | consuming_1_task/... | dag1_dataset | 실행 시 |
dataset_consumes_1_and_2 | consuming_2_task/... | dag1_dataset AND dag2_dataset | 실행 시 |
consume_1_or_2_with_dataset_expressions | consuming_2_task/... | dag1_dataset OR dag2_dataset | 실행 시 |
conditional_dataset_and_time_based_timetable | ... | 매주 수요일 01시 OR dag1 & dag2 | 실행 시 |
6. 요약
- Dataset은 DAG 간 의존성을 명시적이고 선언적으로 설정하는 도구입니다.
- outlets로 생성 → schedule=[dataset] 또는 표현식으로 소비
- 표현식: |(OR), &(AND), DatasetOrTimeSchedule() 등 복합 스케줄링 가능
- 실전에서는 S3, BigQuery, Hive, DB 테이블, API 등 다양한 리소스를 Dataset으로 표현해 사용 가능
반응형
'Data Mining & Distributed > Airflow' 카테고리의 다른 글
[ Airflow ] EmailOperator에 대해서 알아보기 (0) | 2025.09.25 |
---|---|
[ Airflow ] Operator 종류 알아보기 (0) | 2025.09.25 |
[ Airflow ] Groups에 속한 명령어 알아보기 (0) | 2025.09.21 |
[ Airflow ] Ubuntu + Airflow + Postgresql + Local Executor - 설치하기 (1) | 2025.09.20 |
[ Airflow ] Dag 간에 작업 순서 연결하기 (0) | 2025.04.09 |