1.Dag Factory 란?
- yaml 포맷 기반의 워크플로우 정의를 읽어 dag 을 자동으로 생성해주는 에어플로우 라이브러리이다. (1)(2)
2. 왜 Dag Factory ?
- Airflow 를 사용하려면 아래와 같은 절차로 Workflow 를 만드는 작업을 해야 하는데, 이런 수고를 줄여준다.
- workflow 는 dag 객체를 생성하는 파이썬 스크립트 파일을 만들어야 하기 때문에 파이썬 문법을 알아야 한다.
- dag 파일을 작성하려면 파이썬의 문법과 airflow 에서 제공하는 core 패키지 문서를 학습하여 작성해야 한다.
- 워크플로우들이 많아질때는 관리하기 힘들어지기 때문에, Dag의 메타데이터화와 동시에 자동화를 고려해야 하는 시기가 온다.
- python 과 airflow 의 주요 내용들의 학습 없이 DAG 구조체를 만들어준다
- 중복된 코드를 피해준다
3. Features
- Multiple Configuration Files
- Custom Operators
- Callbacks 지원
4. Dag Factory 사용하기
절차
1) dag factory 를 설치한다.
2) 쿠버네티스를 사용할 경우 2가지 방법이 있다.
a.패키지 설치 버전의 이미지를 직접 빌드하는 방법
b.pod 내 airflow 가 init 할때 패키지 설치 명령어를 추가하는 방법
3) yaml 포맷의 워크플로우 파일을 작성한다.
4) dagfactory 객체를 이용해 yaml 리소스를 로드할 python 파일을 작성한다.
5) 작성한 2개의 파일을 dag 폴더로 이동시킨다
A. 설치
$ pip install dag-factory
B.쿠버네티스에서 helm 을 이용해 배포할 경우
쿠버네티스에서 사용할 경우 아래 두 가지 선택이 있을 수 있다.
첫 번째 ) 직접 이미지를 빌드하는 방법
두 번째 ) pod 내 airflow 가 init 할때 패키지 설치 명령어를 추가하는 방법
1) dag-factory 를 설치완료한 이미지를 빌드하여 올려서 사용한다.
Docker file 예시
FROM apache/airflow:2.8.2
RUN pip install dag-factory==0.19.0
ENTRYPOINT ["/usr/bin/dumb-init", "--", "/entrypoint"]
CMD []
2) airflow chart values 파일 내 worker 가 bootstrap 할 때 패키지 설치 명령어를 추가한다.
workers:
# Number of airflow celery workers in StatefulSet
replicas: 1
# Max number of old replicasets to retain
revisionHistoryLimit: ~
# Command to use when running Airflow workers (templated).
command: ~
# Args to use when running Airflow workers (templated).
args:
- "bash"
- "-c"
# The format below is necessary to get `helm lint` happy
- |-
python -m pip install --upgrade pip & pip install --no-cache-dir dag-factory && exec \
airflow {{ semverCompare ">=2.0.0" .Values.airflowVersion | ternary "celery worker" "worker" }}
C.사용
절차
1 ) yaml 포맷의 워크플로우 파일 작성한다.
2) dagfactory 객체를 이용해 yaml 리소스를 로드할 python 파일을 작성한다.
3) 작성한 2개의 파일을 dag 폴더로 이동시킨다
1) yaml 포맷의 워크플로우 파일 작성하기
default:
default_args:
catchup: false,
start_date: 2024-11-11
basic_example_dag:
default_args:
owner: "custom_owner"
description: "this is an example dag"
schedule_interval: "0 3 * * *"
render_template_as_native_obj: True
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 1"
task_2:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 2"
dependencies: [task_1]
task_3:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 2"
dependencies: [task_1]
2) python 파일 작성하기
import os
from pathlib import Path
# The following import is here so Airflow parses this file
# from airflow import DAG
import dagfactory
DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"
CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))
config_file = str(CONFIG_ROOT_DIR / "example_dag_factory.yml")
example_dag_factory = dagfactory.DagFactory(config_file)
# Creating task dependencies
example_dag_factory.clean_dags(globals())
example_dag_factory.generate_dags(globals())
3) 파일을 dag 디렉토리로 이동시킨다.
mv * /usr/local/airflow/dags/
** 이전에 포스팅한 GitSync 를 같이 이용할 수 있습니다. 그렇게 되면 dag 파일을 디렉토리로 이동할 필요 없이 git repository 로 이동하면 되는거죠. :) [3]
화면 참고용 스크린샷
5.References
[1] https://github.com/astronomer/dag-factory
[2] https://astronomer.github.io/dag-factory/latest/getting-started/quick-start-airflow-standalone/
'Data Engineer' 카테고리의 다른 글
airflow - gitSync 기능 연동 (0) | 2025.02.17 |
---|---|
쿠버네티스 -스테이트풀셋(statefulset)를 이용해 ElasticSearch 배포 (0) | 2024.12.29 |
쿠버네티스 - 디플로이먼트(deployment)를 이용해 MySQL 배포 (1) | 2024.12.09 |
파이썬 데일리코딩 - 다이나믹 프로그래밍 (0) | 2024.11.25 |
파이썬 데일리코딩 - 덕 타이핑 (1) | 2024.11.17 |