반응형
Airflow에서 특정 주기로 DAG을 실행하려면 schedule_interval
속성에 적절한 값을 설정해야 합니다. 아래는 요청하신 주기를 설정하는 방법입니다:
1. 2분에 한 번 실행
크론 표현식:
schedule_interval="*/2 * * * *"
*/2
: 매 2분마다 실행.- 예: 00:02, 00:04, 00:06, ...
2. 10분에 한 번 실행
크론 표현식:
schedule_interval="*/10 * * * *"
*/10
: 매 10분마다 실행.- 예: 00:10, 00:20, 00:30, ...
3. 2시간에 한 번 실행
크론 표현식:
schedule_interval="0 */2 * * *"
*/2
: 매 2시간마다 실행.0
: 매 2시간마다 정확히 정각(분 0)에 실행.- 예: 02:00, 04:00, 06:00, ...
4. 6시간에 한 번 실행
크론 표현식:
schedule_interval="0 */6 * * *"
*/6
: 매 6시간마다 실행.- 예: 06:00, 12:00, 18:00, ...
5. 동일 DAG에서 여러 주기를 설정하고 싶을 경우
Airflow는 하나의 DAG에서 여러 주기를 직접 설정할 수는 없지만, 이를 우회하려면 다음과 같은 방법을 사용할 수 있습니다:
a) 여러 DAG으로 나누기
각 주기에 따라 다른 DAG을 생성하고, 다른 schedule_interval
을 지정:
with DAG(
"dag_every_2_minutes",
schedule_interval="*/2 * * * *",
start_date=datetime(2024, 1, 1),
) as dag_2min:
...
with DAG(
"dag_every_10_minutes",
schedule_interval="*/10 * * * *",
start_date=datetime(2024, 1, 1),
) as dag_10min:
...
b) DAG 내에서 동적 실행 관리
단일 DAG을 생성한 후, 실행 시 주기를 분기 처리:
from datetime import datetime
def dynamic_schedule():
now = datetime.now()
if now.minute % 2 == 0:
return "2-minute task"
elif now.minute % 10 == 0:
return "10-minute task"
# 추가 조건 작성 가능
with DAG(
"dynamic_schedule_dag",
schedule_interval="* * * * *", # 매 분 실행
start_date=datetime(2024, 1, 1),
) as dag:
task = PythonOperator(
task_id="dynamic_task",
python_callable=dynamic_schedule,
)
6. 주의사항
- 스케줄러 과부하: 짧은 간격(예: 2분)으로 실행할 경우 스케줄러가 과부하될 수 있으니 성능 모니터링이 필요합니다.
start_date
와catchup
옵션: 스케줄이 겹치지 않도록catchup=False
를 설정하는 것이 좋습니다.
반응형
'Data Mining & Distributed > Airflow' 카테고리의 다른 글
[ Airflow ] operator - 여러 개의 operator를 생성해서 병렬 처리 확인하는 Dag 등록하기 (1) | 2024.12.02 |
---|---|
[ Airflow ] db init - PostgreSQL 과 함께 초기 설정하기 (0) | 2024.12.02 |
[ Airflow ] schedule - 스케쥴러 시간 설정하기 (0) | 2024.12.02 |
[ Airflow ] Celery, Dask, Kubernetes, Ray - Executor 비교 설명하기 (0) | 2024.11.18 |
[ Airflow ] 버전별 특징 알아보기 (1) | 2024.11.16 |