본문 바로가기

Data Mining & Distributed/Airflow

[ Airflow ] schedule - 일정 주기로 dag 실행하기

반응형

Airflow에서 특정 주기로 DAG을 실행하려면 schedule_interval 속성에 적절한 값을 설정해야 합니다. 아래는 요청하신 주기를 설정하는 방법입니다:


 

1. 2분에 한 번 실행

크론 표현식:

schedule_interval="*/2 * * * *"
  • */2: 매 2분마다 실행.
  • 예: 00:02, 00:04, 00:06, ...

 

2. 10분에 한 번 실행

크론 표현식:

schedule_interval="*/10 * * * *"
  • */10: 매 10분마다 실행.
  • 예: 00:10, 00:20, 00:30, ...

 

3. 2시간에 한 번 실행

크론 표현식:

schedule_interval="0 */2 * * *"
  • */2: 매 2시간마다 실행.
  • 0: 매 2시간마다 정확히 정각(분 0)에 실행.
  • 예: 02:00, 04:00, 06:00, ...

 

4. 6시간에 한 번 실행

크론 표현식:

schedule_interval="0 */6 * * *"
  • */6: 매 6시간마다 실행.
  • 예: 06:00, 12:00, 18:00, ...

 

5. 동일 DAG에서 여러 주기를 설정하고 싶을 경우

Airflow는 하나의 DAG에서 여러 주기를 직접 설정할 수는 없지만, 이를 우회하려면 다음과 같은 방법을 사용할 수 있습니다:

a) 여러 DAG으로 나누기

각 주기에 따라 다른 DAG을 생성하고, 다른 schedule_interval을 지정:

with DAG(
    "dag_every_2_minutes",
    schedule_interval="*/2 * * * *",
    start_date=datetime(2024, 1, 1),
) as dag_2min:
    ...

with DAG(
    "dag_every_10_minutes",
    schedule_interval="*/10 * * * *",
    start_date=datetime(2024, 1, 1),
) as dag_10min:
    ...

b) DAG 내에서 동적 실행 관리

단일 DAG을 생성한 후, 실행 시 주기를 분기 처리:

from datetime import datetime

def dynamic_schedule():
    now = datetime.now()
    if now.minute % 2 == 0:
        return "2-minute task"
    elif now.minute % 10 == 0:
        return "10-minute task"
    # 추가 조건 작성 가능

with DAG(
    "dynamic_schedule_dag",
    schedule_interval="* * * * *",  # 매 분 실행
    start_date=datetime(2024, 1, 1),
) as dag:
    task = PythonOperator(
        task_id="dynamic_task",
        python_callable=dynamic_schedule,
    )

6. 주의사항

  • 스케줄러 과부하: 짧은 간격(예: 2분)으로 실행할 경우 스케줄러가 과부하될 수 있으니 성능 모니터링이 필요합니다.
  • start_datecatchup 옵션: 스케줄이 겹치지 않도록 catchup=False를 설정하는 것이 좋습니다.
반응형