반응형
🌱TriggerDagRunOperator
와 ExternalTaskSensor
Airflow에서 DAG 간의 실행 순서나 의존성을 간접적으로 연결하기 위해 사용하는 도구들입니다.
둘 다 DAG와 DAG 사이를 이어주는 역할을 하지만, 의도와 쓰임새가 완전히 다릅니다.
1. 두 연산자의 목적과 차이
항목 | TriggerDagRunOperator | ExternalTaskSensor |
주 목적 | 다른 DAG을 실행시키는 것 | 다른 DAG의 Task 상태를 기다리는 것 |
실행 여부 | 직접 다른 DAG을 실행함 | 스스로는 실행하지 않고, 기다림 |
의존 방식 | 능동적(Trigger): A DAG → B DAG 실행 | 수동적(Sensor): B DAG ← A DAG 완료 대기 |
사용 예 | "A DAG이 끝나면 B DAG을 시작해라" | "B DAG은 A DAG이 끝났는지 확인하고 시작해라" |
트리거 시간 | 즉시 | polling/poke 주기마다 확인 |
DAG 실행 관계 | 한 DAG에서 다른 DAG을 호출 | 한 DAG에서 다른 DAG을 감시 |
2. 각각의 설명 & 예제
1. TriggerDagRunOperator
- 내가 직접 다른 DAG을 실행시킨다.
예시:
from airflow.operators.dagrun_operator import TriggerDagRunOperator
trigger = TriggerDagRunOperator(
task_id='trigger_dag_b',
trigger_dag_id='dag_b', # 실행할 다른 DAG의 ID
)
dag_a
가 실행되면dag_b
를 트리거(실행)함dag_b
가 실행되든 말든,dag_a
입장에서는 그걸 기다리지 않음- 즉시 실행되며, 스케줄 상의 시간과는 다를 수 있음
2. ExternalTaskSensor
- 나는 다른 DAG/Task가 끝났는지 기다릴게
예시:
from airflow.sensors.external_task import ExternalTaskSensor
wait = ExternalTaskSensor(
task_id='wait_for_dag_a_task',
external_dag_id='dag_a',
external_task_id='some_task', # A DAG의 특정 Task가 완료될 때까지 대기
mode='poke', # or 'reschedule'
timeout=600,
)
- 현재 DAG은
dag_a
의some_task
가 끝날 때까지 기다림 - DAG 스케줄 상 같은 execution_date를 기준으로 감시
- 동기적 제어가 필요할 때 사용
3. 어떤 상황에서 쓰는가?
상황 | 쓰는 연산자 |
A DAG이 끝나면 B DAG을 자동 실행시키고 싶다 | TriggerDagRunOperator |
B DAG이 시작되기 전에 A DAG의 어떤 Task가 완료되었는지 확인하고 싶다 | ExternalTaskSensor |
DAG 간 실행 순서를 엄격하게 연결하고 싶다 (동기적) | 둘 다 사용해서 구성 (트리거 + 센서) |
4. 둘 다 함께 사용하는 경우
실무에서는 두 개를 조합해서 DAG 간의 트리거 + 완료 확인 구조를 만들기도 합니다.
- DAG A →
TriggerDagRunOperator
→ DAG B 실행 - DAG C →
ExternalTaskSensor
→ DAG A의 완료 확인
이런 식으로 DAG 간에 완벽한 의존성을 구성할 수 있어요.
좋습니다! 이제 말씀하신 대로 DAG A에서 DAG B를 트리거하고,
DAG B는 DAG A의 특정 Task가 끝난 것을 감지하고 시작하는 구조를 함께 만들어보겠습니다.
3. 실제 예
1. 목표 구조
[DAG A]
└── task_a1
↓
└── trigger_dag_b (TriggerDagRunOperator)
[DAG B]
└── wait_for_task_a1 (ExternalTaskSensor)
↓
└── task_b1
- DAG A는
task_a1
실행 후 DAG B를 트리거 - DAG B는 시작되더라도
task_a1
이 끝나기 전까지 기다림 - 이렇게 해서 DAG 간 정확한 순서와 제어를 만듦
2. DAG A: dag_a.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from datetime import datetime
def do_task_a1():
print("DAG A - Task A1 실행 완료")
with DAG(
dag_id='dag_a',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False
) as dag:
task_a1 = PythonOperator(
task_id='task_a1',
python_callable=do_task_a1
)
trigger_b = TriggerDagRunOperator(
task_id='trigger_dag_b',
trigger_dag_id='dag_b', # DAG B를 트리거
)
task_a1 >> trigger_b
3. DAG B: dag_b.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta
def do_task_b1():
print("DAG B - Task B1 실행")
with DAG(
dag_id='dag_b',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False
) as dag:
wait_for_a1 = ExternalTaskSensor(
task_id='wait_for_task_a1',
external_dag_id='dag_a',
external_task_id='task_a1',
allowed_states=['success'],
mode='poke', # poke 또는 reschedule
timeout=600,
poke_interval=30,
execution_delta=timedelta(seconds=0), # 동일한 execution_date를 기준으로 감시
)
task_b1 = PythonOperator(
task_id='task_b1',
python_callable=do_task_b1
)
wait_for_a1 >> task_b1
4. 작동 흐름 요약
- 사용자가 DAG A를 실행하면
task_a1
실행 후 → DAG B를 TriggerDagRunOperator로 실행- DAG B는 바로 실행되지만, 처음엔
ExternalTaskSensor
로 DAG A의 task_a1이 성공했는지 확인 - 확인되면
task_b1
이 실행됨
5. 주의할 점
execution_date
를 맞추기 위해 DAG A와 B의 start_date, schedule_interval은 동일하거나 적절히 맞춰야 합니다.- 만약 DAG A에서 특정 날짜에 실행되고, DAG B는 동적으로 트리거되며 schedule이 없을 경우,
execution_delta
조정이 필요할 수도 있습니다. - 실무에서는
execution_date
대신 run_id 기준으로 감시하는 TriggerRule 방식이나 XCom을 활용하는 방식도 사용되기도 합니다.
반응형
'Data Mining & Distributed > Airflow' 카테고리의 다른 글
[ Airflow ] task 실패처리를 위한 예외 처리하기 (0) | 2025.04.08 |
---|---|
[ Airflow ] Ubuntu + Airflow + Postgresql + Dask - 설치하기 (0) | 2025.01.14 |
[ Airflow ] 3가지 방법으로 동시 실행 제한하기 (0) | 2025.01.02 |
[ Airflow ] max_active_runs - 동시 작업 개수 제한하기 (0) | 2025.01.02 |
[ Airflow ] concurrency - 동시 작업 개수 제한하기 (0) | 2025.01.02 |