본문 바로가기

Data Mining & Distributed/Airflow

[ Airflow ] concurrency - 동시 작업 개수 제한하기

반응형

Airflow에서 concurrency는 하나의 DAG 내에서 동시에 실행될 수 있는 작업(Task)의 수를 제한하는 파라미터입니다. 즉, concurrency는 DAG 수준에서 실행되는 작업의 수를 제한하여 시스템 리소스를 효율적으로 관리할 수 있도록 도와줍니다.

1. concurrency 사용 방법

concurrency는 DAG 객체의 파라미터로 설정하며, 해당 DAG 내에서 동시에 실행할 수 있는 최대 작업 수를 지정합니다. 이를 통해 DAG 내에서 실행되는 작업의 수를 제어할 수 있습니다.

예제: concurrency를 사용하는 기본적인 DAG

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta

# DAG 정의
dag = DAG(
    'my_dag_with_concurrency',
    start_date=datetime(2023, 1, 1),
    concurrency=3,  # DAG 내에서 동시에 실행될 수 있는 최대 작업 수를 3으로 설정
    schedule_interval=timedelta(days=1),  # 하루에 한 번 실행
)

# Dummy task 정의
task1 = DummyOperator(
    task_id='task1',
    dag=dag,
)

task2 = DummyOperator(
    task_id='task2',
    dag=dag,
)

task3 = DummyOperator(
    task_id='task3',
    dag=dag,
)

task4 = DummyOperator(
    task_id='task4',
    dag=dag,
)

# task 의존성 설정 (task1 -> task2 -> task3 -> task4)
task1 >> task2 >> task3 >> task4

예제 설명

  • concurrency=3: 이 설정은 한 번에 동시에 실행될 수 있는 작업 수를 최대 3개로 제한합니다. 즉, DAG 내에서 동시에 실행되는 작업은 3개까지 가능하고, 나머지 작업은 이전 작업이 완료될 때까지 대기해야 합니다.
  • 작업 task1, task2, task3, task4가 순차적으로 실행되는 구조이지만, concurrency=3 설정에 의해 한 번에 최대 3개의 작업이 동시에 실행될 수 있습니다.
    1. task1, task2, task3가 동시에 실행됩니다.
    2. task1 또는 task2, task3 중 하나가 완료되면 task4가 실행됩니다.
  • 예를 들어:

이렇게 concurrency를 설정하면 시스템 자원(예: CPU, 메모리 등)을 보다 효율적으로 사용할 수 있으며, 한 번에 너무 많은 작업이 실행되는 것을 방지할 수 있습니다.

2. concurrency vs max_active_runs

  • concurrency는 DAG 내에서 동시에 실행할 수 있는 작업의 최대 개수를 설정합니다.
  • max_active_runs는 한 DAG 인스턴스가 동시에 실행될 수 있는 최대 개수를 설정합니다. 즉, 여러 DAG 인스턴스가 동시에 실행되는 것을 제어하는 파라미터입니다.

둘의 차이점:

  • concurrency는 DAG 내에서 동시에 실행되는 작업 수를 제어합니다.
  • max_active_runs는 DAG 인스턴스 수를 제어합니다.

따라서 두 파라미터는 서로 다른 목적을 가지고 있으며, 필요에 따라 각각을 적절히 조정하여 효율적인 작업 실행을 할 수 있습니다.

예제 2: concurrencymax_active_runs를 함께 사용하는 예시

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta

# DAG 정의
dag = DAG(
    'my_dag_with_concurrency_and_max_active_runs',
    start_date=datetime(2023, 1, 1),
    max_active_runs=2,  # 동시에 실행될 수 있는 최대 DAG 인스턴스 수
    concurrency=3,  # DAG 내에서 동시에 실행될 수 있는 작업 수를 3으로 제한
    schedule_interval=timedelta(days=1),  # 하루에 한 번 실행
)

# Dummy task 정의
task1 = DummyOperator(
    task_id='task1',
    dag=dag,
)

task2 = DummyOperator(
    task_id='task2',
    dag=dag,
)

task3 = DummyOperator(
    task_id='task3',
    dag=dag,
)

task4 = DummyOperator(
    task_id='task4',
    dag=dag,
)

# task 의존성 설정
task1 >> task2 >> task3 >> task4

예제 설명

  • max_active_runs=2: 동시에 실행되는 DAG 인스턴스 수가 2개로 제한됩니다. 즉, DAG의 인스턴스가 최대 2개까지 동시에 실행될 수 있습니다.
  • concurrency=3: DAG 내에서 동시에 실행될 수 있는 작업은 최대 3개로 제한됩니다.

이렇게 설정하면 동시에 2개의 DAG 인스턴스가 실행되며, 각 DAG 인스턴스 내에서 최대 3개의 작업만 동시에 실행됩니다.

3. concurrency 활용 예시

concurrency는 주로 다음과 같은 경우에 유용합니다:

  1. 리소스 관리: 시스템의 리소스(예: CPU, 메모리)가 한정된 경우, 동시에 실행되는 작업 수를 제한하여 리소스를 과도하게 사용하지 않도록 할 수 있습니다.
  2. 대규모 작업 실행: 여러 개의 작업이 실행될 때, 특정 작업 그룹이 너무 많은 리소스를 소모하는 경우, concurrency를 통해 효율적으로 작업을 분배할 수 있습니다.
  3. 프로세스 간 의존성 관리: 일부 작업들이 서로 의존성이 있을 때, 동시에 실행되는 작업 수를 제한하여 프로세스 간의 충돌이나 리소스 경쟁을 방지할 수 있습니다.

 

4. 결론

  • concurrency는 DAG 내에서 동시에 실행할 수 있는 최대 작업 수를 제한하는 파라미터입니다.
  • 이를 사용하면 여러 작업이 동시에 실행되는 것을 제어할 수 있으며, 시스템 리소스를 효율적으로 사용할 수 있습니다.
  • max_active_runsconcurrency는 서로 다른 목적을 가지고 있지만, 함께 사용하여 작업과 DAG 인스턴스의 실행을 최적화할 수 있습니다.
반응형