본문 바로가기

Data Mining & Distributed/Airflow

[ Airflow ] operator - 여러 개의 operator를 생성해서 병렬 처리 확인하는 Dag 등록하기

반응형

Airflow에서 부하 테스트를 수행하려면 DAG(Directed Acyclic Graph)를 생성하여 여러 작업(Task)을 병렬로 실행하는 워크플로를 설계할 수 있습니다. LocalExecutor는 병렬 처리를 지원하므로 부하 테스트를 수행하기에 적합합니다. 아래는 Airflow DAG 예제입니다.

 

1. Airflow 설정 확인

  • airflow.cfg 파일에서 executor 설정이 LocalExecutor로 설정되어 있는지 확인하세요:
  [core]
  executor = LocalExecutor
  • parallelism, dag_concurrency, max_active_runs_per_dag, max_active_tasks_per_dag 등을 설정하여 부하를 조정할 수 있습니다.

 

2. DAG 파일 생성

/dags 디렉터리에 다음과 같은 DAG 파일을 생성합니다.

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import random
import time

# Python 함수: 부하 테스트용 작업
def task_function(task_id):
    print(f"Task {task_id} started.")
    time.sleep(random.uniform(2, 5))  # 2~5초 동안 작업 실행
    print(f"Task {task_id} finished.")

# 기본 DAG 설정
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(seconds=30),
}

# DAG 정의
with DAG(
    "load_test_dag",
    default_args=default_args,
    description="A simple DAG for load testing",
    schedule_interval=None,  # 실행은 수동으로 트리거
    start_date=datetime(2023, 1, 1),
    catchup=False,
    max_active_runs=5,  # 동시에 실행되는 DAG 인스턴스 수 제한
) as dag:

    # 다수의 태스크 생성
    tasks = []
    for i in range(50):  # 50개의 병렬 태스크 생성
        task = PythonOperator(
            task_id=f"task_{i}",
            python_callable=task_function,
            op_kwargs={"task_id": i},
        )
        tasks.append(task)

    # 모든 태스크 병렬 실행
    tasks

 

3. DAG 실행

  1. Airflow 웹 UI에서 load_test_dag를 활성화한 후 실행하거나,
  2. 터미널에서 다음 명령을 사용하여 수동으로 실행합니다:
   airflow dags trigger load_test_dag

 

4. 부하 조정

  • DAG 내부에서 생성하는 태스크 수(range(50))를 늘리거나 줄여 부하를 조정하세요.
  • airflow.cfg의 다음 설정을 조정하여 실행 환경의 병렬성을 높일 수 있습니다:
    • parallelism: 전체 병렬 실행 가능한 태스크 수.
    • dag_concurrency: 한 DAG에서 실행 가능한 최대 태스크 수.
    • worker_concurrency: LocalExecutor에서 실행 가능한 병렬 태스크 수.

 

5. 모니터링

Airflow의 웹 UI에서 DAG의 상태와 각 태스크의 실행 시간을 모니터링하여 부하를 분석하세요. 또한, CPU와 메모리 사용량을 관찰하여 시스템 리소스가 적절히 활용되고 있는지 확인합니다.

이 DAG는 부하 테스트의 기본 골격으로, 태스크 개수 및 실행 시간을 변경하며 다양한 시나리오를 테스트할 수 있습니다.

반응형