Notice
Recent Posts
Recent Comments
Link
250x250
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
Tags
- KubernetesPodOperator
- bar cahrt
- 윈도우
- proerty
- 크롤링
- mysql
- polars
- dbt_project
- freshness
- Python
- spark
- 도커
- spring boot
- 모바일
- 카프카
- ksql
- docker
- query history
- kafka
- airflow
- k9s
- Materializations
- CDC
- UI for kafka
- Java
- numpartitions
- 동적 차트
- 쿠버네티스
- 파이썬
- DBT
Archives
- Today
- Total
데이터 엔지니어 이것저것
Airflow DAG 분리하기 본문
728x90
airflow의 파이프라인을 관리하다보면 하나의 DAG에 너무 많은 TASK가 들어가게 된다.
렇게 진행하다보면 극단적으로는 하나의 DAG에 수십 ~ 수백개의 TASK 또는 하나의 DAG만 존재하게 된다.
이를 해결하기 위해 airflow.operators.subdag를 사용하였다.
그전에 기존에 방식 -> 변경한 방식을 보면
task_start = PythonOperator(
task_id=f'task_start',
python_callable=_sleep,
dag=dag,
)
task_end = PythonOperator(
task_id=f'task_end',
python_callable=_sleep,
dag=dag,
)
a_task_list = []
for i in range(10):
a_task_list.append(
PythonOperator(
task_id=f'a_{i}',
python_callable=_sleep,
dag=dag,
)
)
if i == 0:
task_start >> a_task_list[i]
else:
a_task_list[i-1] >> a_task_list[i]
a_task_list[i] >> task_end
task_start = PythonOperator(
task_id=f'task_start',
python_callable=_sleep,
dag=dag,
)
task_end = PythonOperator(
task_id=f'task_end',
python_callable=_sleep,
dag=dag,
)
a_task_list = []
for i in range(10):
a_task_list.append(
PythonOperator(
task_id=f'a_{i}',
python_callable=_sleep,
dag=dag,
)
)
chain(task_start, a_task_list, task_end)
크게 이렇게 2가지의 방식으로 구성하였다.
물론 실제 구성은 직렬,병렬 모두 섞어 사용한다.
이렇게 TASK 가 많아지게 되면 결국 관리가 너무 힘들어진다
위의 DAG를 SUB DAG로 나누어 구성을 해보면
이렇게 심플하게 구성할수있다.
SUB DAG를 보면 위 처럼 Zoom into Sub DAG 를 볼수있다.
서브 DAG에서는 특별한 규칙을 주지 않아 이렇게 구성되어있다.
서브 DAG 구성방법은 추가적인 file에 sub dag를 구현한뒤, 상위 parent dag에서 SubDagOperator로 호출하면된다.
dag = DAG(dag_id=DAG_NAME, #dag_id, 고유한 이름으로 해야함, 다른 dag와 겹치면 안됨, 한글도 x
default_args=args,
schedule_interval="@once", #실행스케줄러, cron기능
catchup=False #지난 날짜 진행 여부,
)
task_start = DummyOperator(
task_id=f'task_start',
dag=dag,
)
task_end = DummyOperator(
task_id=f'task_end',
dag=dag,
)
section_1 = SubDagOperator(
task_id='section-1',
subdag=subdag(DAG_NAME, 'section-1', args),
dag=dag,
)
task_start >> section_1 >> task_end
#subdag
def subdag(parent_dag_name, child_dag_name, args):
dag_subdag = DAG(
dag_id=f'{parent_dag_name}.{child_dag_name}',
default_args=args,
start_date=datetime(2021, 1, 1),
catchup=False,
schedule_interval="@once",
)
for i in range(5):
DummyOperator(
task_id=f'{child_dag_name}-task-{i + 1}',
default_args=args,
dag=dag_subdag,
)
return dag_subdag
728x90
'오픈소스 > airflow' 카테고리의 다른 글
DockerOperator (0) | 2023.09.19 |
---|---|
Airflow DAG간 종속성 (0) | 2022.05.15 |
airflow vs argo (0) | 2021.12.27 |
왜 airflow를 사용할까 (0) | 2021.07.27 |
airflow helm install (0) | 2021.05.17 |