반응형
🌱Airflow 예외 처리를 위해서 raise 사용하기
Airflow에서 예외 처리를 할 때 Python의 기본 예외 처리 문법인 raise
키워드를 사용합니다. Airflow 자체도 Python 기반이기 때문에, 일반적인 Python 예외 처리 방식을 그대로 따릅니다.
raise
는 특정 조건에서 예외를 강제로 발생시켜 작업의 흐름을 멈추거나, 상위 시스템(예: Airflow 스케줄러)에게 문제가 발생했음을 알Airflow 예외 처리를 위해서 raise 사용하기
리는 용도로 사용됩니다.
1. raise
기본 설명
raise
는 예외(Exception)를 강제로 발생시킵니다.- 일반적으로
try-except
블록과 함께 사용되며, 특정 상황에서 에러를 발생시키거나 다시 던질 수 있습니다.
기본 사용 예시
def divide(a, b):
if b == 0:
raise ValueError("0으로 나눌 수 없습니다.")
return a / b
2. Airflow에서의 사용 예시
Airflow에서는 DAG(Task) 내부에서 문제가 생겼을 때 raise
를 사용해 Task를 실패로 처리할 수 있습니다.
예제 1: 단순한 raise 사용
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def my_task():
# 어떤 조건이 충족되면 예외를 발생시킴
should_fail = True
if should_fail:
raise Exception("강제로 에러를 발생시킵니다.") # Task 실패 처리
with DAG(
dag_id='raise_example_dag',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False
) as dag:
task = PythonOperator(
task_id='raise_task',
python_callable=my_task
)
이 DAG를 실행하면 my_task
함수 내에서 강제로 Exception
이 발생하고, 해당 태스크는 실패(Failure) 상태가 됩니다.
예제 2: try-except 내에서 raise 사용
def safe_task():
try:
# 예외 발생 가능 코드
result = 10 / 0
except ZeroDivisionError as e:
print(f"오류 발생: {e}")
raise # 예외를 다시 발생시켜 Task 실패로 처리
with DAG(
dag_id='try_except_raise_dag',
start_date=datetime(2023, 1, 1),
schedule_interval=None,
catchup=False
) as dag:
task = PythonOperator(
task_id='safe_raise_task',
python_callable=safe_task
)
이 경우, 예외를 로깅하면서도 raise
를 통해 상위에 알리기 때문에 Airflow는 이 Task를 실패로 인식합니다.
필요하시면 실제 Airflow에서 사용하는 사용자 정의 예외나, 특정 예외만 처리하는 고급 예시도 도와드릴 수 있어요. 더 필요한 부분 있으신가요?
반응형
'Data Mining & Distributed > Airflow' 카테고리의 다른 글
[ Airflow ] Dag 간에 작업 순서 연결하기 (0) | 2025.04.09 |
---|---|
[ Airflow ] Ubuntu + Airflow + Postgresql + Dask - 설치하기 (0) | 2025.01.14 |
[ Airflow ] 3가지 방법으로 동시 실행 제한하기 (0) | 2025.01.02 |
[ Airflow ] max_active_runs - 동시 작업 개수 제한하기 (0) | 2025.01.02 |
[ Airflow ] concurrency - 동시 작업 개수 제한하기 (0) | 2025.01.02 |