본문 바로가기

Data Mining & Distributed/Airflow

[ Airflow ] Dag 간에 작업 순서 연결하기

반응형

🌱TriggerDagRunOperatorExternalTaskSensor

Airflow에서 DAG 간의 실행 순서나 의존성을 간접적으로 연결하기 위해 사용하는 도구들입니다.
둘 다 DAG와 DAG 사이를 이어주는 역할을 하지만, 의도와 쓰임새가 완전히 다릅니다.


1. 두 연산자의 목적과 차이

항목 TriggerDagRunOperator ExternalTaskSensor
주 목적 다른 DAG을 실행시키는 것 다른 DAG의 Task 상태를 기다리는 것
실행 여부 직접 다른 DAG을 실행함 스스로는 실행하지 않고, 기다림
의존 방식 능동적(Trigger): A DAG → B DAG 실행 수동적(Sensor): B DAG ← A DAG 완료 대기
사용 예 "A DAG이 끝나면 B DAG을 시작해라" "B DAG은 A DAG이 끝났는지 확인하고 시작해라"
트리거 시간 즉시 polling/poke 주기마다 확인
DAG 실행 관계 한 DAG에서 다른 DAG을 호출 한 DAG에서 다른 DAG을 감시

 

2. 각각의 설명 & 예제

1. TriggerDagRunOperator

  • 내가 직접 다른 DAG을 실행시킨다.

예시:

from airflow.operators.dagrun_operator import TriggerDagRunOperator

trigger = TriggerDagRunOperator(
    task_id='trigger_dag_b',
    trigger_dag_id='dag_b',  # 실행할 다른 DAG의 ID
)
  • dag_a가 실행되면 dag_b를 트리거(실행)함
  • dag_b가 실행되든 말든, dag_a 입장에서는 그걸 기다리지 않음
  • 즉시 실행되며, 스케줄 상의 시간과는 다를 수 있음

2. ExternalTaskSensor

  • 나는 다른 DAG/Task가 끝났는지 기다릴게

예시:

from airflow.sensors.external_task import ExternalTaskSensor

wait = ExternalTaskSensor(
    task_id='wait_for_dag_a_task',
    external_dag_id='dag_a',
    external_task_id='some_task',  # A DAG의 특정 Task가 완료될 때까지 대기
    mode='poke',  # or 'reschedule'
    timeout=600,
)
  • 현재 DAG은 dag_asome_task끝날 때까지 기다림
  • DAG 스케줄 상 같은 execution_date를 기준으로 감시
  • 동기적 제어가 필요할 때 사용

3. 어떤 상황에서 쓰는가?

상황 쓰는 연산자
A DAG이 끝나면 B DAG을 자동 실행시키고 싶다 TriggerDagRunOperator
B DAG이 시작되기 전에 A DAG의 어떤 Task가 완료되었는지 확인하고 싶다 ExternalTaskSensor
DAG 간 실행 순서를 엄격하게 연결하고 싶다 (동기적) 둘 다 사용해서 구성 (트리거 + 센서)

4. 둘 다 함께 사용하는 경우

실무에서는 두 개를 조합해서 DAG 간의 트리거 + 완료 확인 구조를 만들기도 합니다.

  1. DAG A → TriggerDagRunOperator → DAG B 실행
  2. DAG C → ExternalTaskSensor → DAG A의 완료 확인

이런 식으로 DAG 간에 완벽한 의존성을 구성할 수 있어요.

좋습니다! 이제 말씀하신 대로 DAG A에서 DAG B를 트리거하고,
DAG B는 DAG A의 특정 Task가 끝난 것을 감지하고 시작하는 구조를 함께 만들어보겠습니다.


3. 실제 예

1. 목표 구조

[DAG A]
  └── task_a1
        ↓
  └── trigger_dag_b (TriggerDagRunOperator)

[DAG B]
  └── wait_for_task_a1 (ExternalTaskSensor)
        ↓
  └── task_b1
  • DAG A는 task_a1 실행 후 DAG B를 트리거
  • DAG B는 시작되더라도 task_a1이 끝나기 전까지 기다림
  • 이렇게 해서 DAG 간 정확한 순서와 제어를 만듦

2. DAG A: dag_a.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from datetime import datetime

def do_task_a1():
    print("DAG A - Task A1 실행 완료")

with DAG(
    dag_id='dag_a',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False
) as dag:

    task_a1 = PythonOperator(
        task_id='task_a1',
        python_callable=do_task_a1
    )

    trigger_b = TriggerDagRunOperator(
        task_id='trigger_dag_b',
        trigger_dag_id='dag_b',  # DAG B를 트리거
    )

    task_a1 >> trigger_b

3. DAG B: dag_b.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.sensors.external_task import ExternalTaskSensor
from datetime import datetime, timedelta

def do_task_b1():
    print("DAG B - Task B1 실행")

with DAG(
    dag_id='dag_b',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False
) as dag:

    wait_for_a1 = ExternalTaskSensor(
        task_id='wait_for_task_a1',
        external_dag_id='dag_a',
        external_task_id='task_a1',
        allowed_states=['success'],
        mode='poke',  # poke 또는 reschedule
        timeout=600,
        poke_interval=30,
        execution_delta=timedelta(seconds=0),  # 동일한 execution_date를 기준으로 감시
    )

    task_b1 = PythonOperator(
        task_id='task_b1',
        python_callable=do_task_b1
    )

    wait_for_a1 >> task_b1

4. 작동 흐름 요약

  1. 사용자가 DAG A를 실행하면
  2. task_a1 실행 후 → DAG B를 TriggerDagRunOperator로 실행
  3. DAG B는 바로 실행되지만, 처음엔 ExternalTaskSensor로 DAG A의 task_a1이 성공했는지 확인
  4. 확인되면 task_b1이 실행됨

5. 주의할 점

  • execution_date를 맞추기 위해 DAG A와 B의 start_date, schedule_interval은 동일하거나 적절히 맞춰야 합니다.
  • 만약 DAG A에서 특정 날짜에 실행되고, DAG B는 동적으로 트리거되며 schedule이 없을 경우, execution_delta 조정이 필요할 수도 있습니다.
  • 실무에서는 execution_date 대신 run_id 기준으로 감시하는 TriggerRule 방식이나 XCom을 활용하는 방식도 사용되기도 합니다.

반응형