본문 바로가기

Data Mining & Distributed/Airflow

[ Airflow ] task 실패처리를 위한 예외 처리하기

반응형

 

 

🌱Airflow 예외 처리를 위해서 raise 사용하기

Airflow에서 예외 처리를 할 때 Python의 기본 예외 처리 문법인 raise 키워드를 사용합니다. Airflow 자체도 Python 기반이기 때문에, 일반적인 Python 예외 처리 방식을 그대로 따릅니다.

raise는 특정 조건에서 예외를 강제로 발생시켜 작업의 흐름을 멈추거나, 상위 시스템(예: Airflow 스케줄러)에게 문제가 발생했음을 알Airflow 예외 처리를 위해서 raise 사용하기
리는 용도로 사용됩니다.


1. raise 기본 설명

  • raise는 예외(Exception)를 강제로 발생시킵니다.
  • 일반적으로 try-except 블록과 함께 사용되며, 특정 상황에서 에러를 발생시키거나 다시 던질 수 있습니다.

기본 사용 예시

def divide(a, b):
    if b == 0:
        raise ValueError("0으로 나눌 수 없습니다.")
    return a / b

2. Airflow에서의 사용 예시

Airflow에서는 DAG(Task) 내부에서 문제가 생겼을 때 raise를 사용해 Task를 실패로 처리할 수 있습니다.

예제 1: 단순한 raise 사용

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def my_task():
    # 어떤 조건이 충족되면 예외를 발생시킴
    should_fail = True
    if should_fail:
        raise Exception("강제로 에러를 발생시킵니다.")  # Task 실패 처리

with DAG(
    dag_id='raise_example_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False
) as dag:

    task = PythonOperator(
        task_id='raise_task',
        python_callable=my_task
    )

이 DAG를 실행하면 my_task 함수 내에서 강제로 Exception이 발생하고, 해당 태스크는 실패(Failure) 상태가 됩니다.


예제 2: try-except 내에서 raise 사용

def safe_task():
    try:
        # 예외 발생 가능 코드
        result = 10 / 0
    except ZeroDivisionError as e:
        print(f"오류 발생: {e}")
        raise  # 예외를 다시 발생시켜 Task 실패로 처리

with DAG(
    dag_id='try_except_raise_dag',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False
) as dag:

    task = PythonOperator(
        task_id='safe_raise_task',
        python_callable=safe_task
    )

이 경우, 예외를 로깅하면서도 raise를 통해 상위에 알리기 때문에 Airflow는 이 Task를 실패로 인식합니다.


필요하시면 실제 Airflow에서 사용하는 사용자 정의 예외나, 특정 예외만 처리하는 고급 예시도 도와드릴 수 있어요. 더 필요한 부분 있으신가요?

 

 

 

반응형