데이터 엔지니어 이것저것

Airflow DAG 분리하기 본문

오픈소스/airflow

Airflow DAG 분리하기

pastime 2022. 2. 5. 15:11
728x90

airflow의 파이프라인을 관리하다보면 하나의 DAG에 너무 많은 TASK가 들어가게 된다.

 

렇게 진행하다보면 극단적으로는 하나의 DAG에 수십 ~ 수백개의 TASK 또는 하나의 DAG만 존재하게 된다.

 

이를 해결하기 위해 airflow.operators.subdag를 사용하였다.

 

그전에 기존에 방식 -> 변경한 방식을 보면

 

task_start = PythonOperator(
    task_id=f'task_start',
    python_callable=_sleep,
    dag=dag,
)
task_end = PythonOperator(
    task_id=f'task_end',
    python_callable=_sleep,
    dag=dag,
)


a_task_list = []
for i in range(10):
    a_task_list.append(
        PythonOperator(
            task_id=f'a_{i}',
            python_callable=_sleep,
            dag=dag,
        )
    )
    if i == 0:
        task_start >> a_task_list[i]
    else:
        a_task_list[i-1] >> a_task_list[i]

a_task_list[i] >> task_end

 

   
task_start = PythonOperator(
    task_id=f'task_start',
    python_callable=_sleep,
    dag=dag,
)
task_end = PythonOperator(
    task_id=f'task_end',
    python_callable=_sleep,
    dag=dag,
)


a_task_list = []
for i in range(10):
    a_task_list.append(
        PythonOperator(
            task_id=f'a_{i}',
            python_callable=_sleep,
            dag=dag,
        )
    )

chain(task_start, a_task_list, task_end)

 

크게 이렇게 2가지의 방식으로 구성하였다.

 

물론 실제 구성은 직렬,병렬 모두 섞어 사용한다.

 

이렇게 TASK 가 많아지게 되면 결국 관리가 너무 힘들어진다

 

위의 DAG를 SUB DAG로 나누어 구성을 해보면

 

이렇게 심플하게 구성할수있다.

SUB DAG를 보면 위 처럼 Zoom into Sub DAG 를 볼수있다.

 

서브 DAG에서는 특별한 규칙을 주지 않아 이렇게 구성되어있다.

 

서브 DAG 구성방법은 추가적인 file에  sub dag를 구현한뒤, 상위 parent dag에서 SubDagOperator로 호출하면된다.

dag = DAG(dag_id=DAG_NAME, #dag_id, 고유한 이름으로 해야함, 다른 dag와 겹치면 안됨, 한글도 x
    default_args=args,
    schedule_interval="@once", #실행스케줄러, cron기능
    catchup=False #지난 날짜 진행 여부,
)


task_start = DummyOperator(
    task_id=f'task_start',
    dag=dag,
)

task_end = DummyOperator(
    task_id=f'task_end',
    dag=dag,
)

section_1 = SubDagOperator(
    task_id='section-1',
    subdag=subdag(DAG_NAME, 'section-1', args),
    dag=dag,
)

task_start >> section_1 >> task_end
#subdag
def subdag(parent_dag_name, child_dag_name, args):

    dag_subdag = DAG(
        dag_id=f'{parent_dag_name}.{child_dag_name}',
        default_args=args,
        start_date=datetime(2021, 1, 1),
        catchup=False,
        schedule_interval="@once",
    )

    for i in range(5):
        DummyOperator(
            task_id=f'{child_dag_name}-task-{i + 1}',
            default_args=args,
            dag=dag_subdag,
        )

    return dag_subdag
728x90

'오픈소스 > airflow' 카테고리의 다른 글

DockerOperator  (0) 2023.09.19
Airflow DAG간 종속성  (0) 2022.05.15
airflow vs argo  (0) 2021.12.27
왜 airflow를 사용할까  (0) 2021.07.27
airflow helm install  (0) 2021.05.17