반응형

 

1.Dag Factory 란?

  • yaml 포맷 기반의 워크플로우 정의를 읽어 dag 을 자동으로 생성해주는 에어플로우 라이브러리이다. (1)(2)

 

2. 왜 Dag Factory ? 

  • Airflow 를 사용하려면 아래와 같은 절차로 Workflow 를 만드는 작업을 해야 하는데, 이런 수고를 줄여준다.
  • workflow 는 dag 객체를 생성하는 파이썬 스크립트 파일을 만들어야 하기 때문에 파이썬 문법을 알아야 한다.
  • dag 파일을 작성하려면 파이썬의 문법과 airflow 에서 제공하는 core 패키지 문서를 학습하여 작성해야 한다.
  • 워크플로우들이 많아질때는 관리하기 힘들어지기 때문에, Dag의 메타데이터화와 동시에 자동화를 고려해야 하는 시기가 온다. 
    • python 과 airflow 의 주요 내용들의 학습 없이 DAG 구조체를 만들어준다
    • 중복된 코드를 피해준다



3. Features

  • Multiple Configuration Files
  • Custom Operators
  • Callbacks 지원


4. Dag Factory 사용하기

절차

1) dag factory 를 설치한다.
2) 쿠버네티스를 사용할 경우 2가지 방법이 있다.
           a.패키지 설치 버전의 이미지를 직접 빌드하는 방법
           b.pod 내 airflow 가 init 할때 패키지 설치 명령어를 추가하는 방법
3) yaml 포맷의 워크플로우 파일을 작성한다.
4) dagfactory 객체를 이용해 yaml 리소스를 로드할 python 파일을 작성한다.
5) 작성한 2개의 파일을 dag 폴더로 이동시킨다

 


 

A. 설치

$ pip install dag-factory

 

 

 


B.쿠버네티스에서 helm 을 이용해 배포할 경우

쿠버네티스에서 사용할 경우 아래 두 가지 선택이 있을 수 있다.
첫 번째 ) 직접 이미지를 빌드하는 방법
두 번째 ) pod 내 airflow 가  init 할때 패키지 설치 명령어를 추가하는 방법

 

1)  dag-factory 를 설치완료한 이미지를 빌드하여 올려서 사용한다.

 

Docker file 예시 

FROM apache/airflow:2.8.2

RUN pip install dag-factory==0.19.0

ENTRYPOINT ["/usr/bin/dumb-init", "--", "/entrypoint"]
CMD []



 2) airflow chart values 파일 내 worker 가 bootstrap 할 때 패키지 설치 명령어를 추가한다.

workers:
  # Number of airflow celery workers in StatefulSet
  replicas: 1
  # Max number of old replicasets to retain
  revisionHistoryLimit: ~

  # Command to use when running Airflow workers (templated).
  command: ~
  # Args to use when running Airflow workers (templated).
  args:
    - "bash"
    - "-c"
    # The format below is necessary to get `helm lint` happy
    - |-
      python -m pip install --upgrade pip & pip install --no-cache-dir dag-factory && exec \
      airflow {{ semverCompare ">=2.0.0" .Values.airflowVersion | ternary "celery worker" "worker" }}

 

 

 

C.사용

절차
1 ) yaml 포맷의 워크플로우 파일 작성한다.
2) dagfactory 객체를 이용해 yaml 리소스를 로드할 python 파일을 작성한다.
3) 작성한 2개의 파일을 dag 폴더로 이동시킨다

 

1) yaml 포맷의 워크플로우 파일 작성하기

default:
  default_args:
    catchup: false,
    start_date: 2024-11-11

basic_example_dag:
  default_args:
    owner: "custom_owner"
  description: "this is an example dag"
  schedule_interval: "0 3 * * *"
  render_template_as_native_obj: True
  tasks:
    task_1:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 1"
    task_2:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 2"
      dependencies: [task_1]
    task_3:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 2"
      dependencies: [task_1]



2) python 파일 작성하기

import os
from pathlib import Path

# The following import is here so Airflow parses this file
# from airflow import DAG
import dagfactory

DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"

CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))

config_file = str(CONFIG_ROOT_DIR / "example_dag_factory.yml")

example_dag_factory = dagfactory.DagFactory(config_file)

# Creating task dependencies
example_dag_factory.clean_dags(globals())
example_dag_factory.generate_dags(globals())

 


3) 파일을 dag 디렉토리로 이동시킨다.

mv * /usr/local/airflow/dags/

 

** 이전에 포스팅한 GitSync 를 같이 이용할 수 있습니다. 그렇게 되면 dag 파일을 디렉토리로 이동할 필요 없이 git repository 로 이동하면 되는거죠. :) [3]

 

 

 

화면 참고용 스크린샷





5.References


[1]  https://github.com/astronomer/dag-factory
[2]  https://astronomer.github.io/dag-factory/latest/getting-started/quick-start-airflow-standalone/

[3] https://jssvs.tistory.com/101

반응형
반응형

1. Git-Sync란?

  • Git-Sync 는 Kubernetes 클러스터 내에서 git repository 를 동기화하는 sidecar 기능이다 (1)
  • airflow 와 같은 워크플로우 도구를 사용할 때, DAG 파일들을 git 에서 관리하고 있을 때 사용한다.
  • helm 을 통한 쿠버네티스 배포 시에만 지원하는 기능으로 보인다.

 

2. 왜 GitSync 기능을? 

  • 일반적으로 python 으로 작성한 airflow dag 파일은 airflow component(worker, scheduler) 에서 access 할 수 있는 파일 시스템에 저장되어야 한다.
  • git sync 기능을 사용하면, git repository 에 저장된 DAG 파일을 kubernetes pod 내부에 동기화 할 수 있다.
  • 반대로 git sync 를 사용하지 않는다면, DAG 파일을 kubernetes pod 에 복사하는 방식을 사용해야 한다.
    • s3 로 부터 주기적으로 sync 를 받는 sidecar 컨테이너를 개발
    • efs 와 같은 네트워크 파일 시스템을 사용 

 

3. Git-Sync 사용하기 

절차

  • git repository 를 생성한다.
  • 2가지 인증 방식 중 한 가지를 선택한다.
  • 선택한 인증방식의 credential 을 준비한다.
  • 인증 정보를 쿠버네티스 secret(시크릿) 으로 생성한다
  • helm chart 에서 git-sync 를 활성화 한다. (2)

 

인증 2가지 방식 소개 

  • SSH 프로토콜을 이용한 인증 방식 (Github 기준 )
  • HTTPS 프로토콜을 이용한 인증 방식

 

A.SSH  프로토콜을 이용한 인증 방식

SSH 키 생성

$ ssh-keygen -t rsa -b 4096 -C "jssvs@test.com"

# 복제할 키 문자열
$ cat ~/.ssh/id_rsa.pub

 

 

Git에 SSH 키 등록 

GIthub 키등록

 

쿠버네티스 시크릿 리소스 생성

kubectl create secret generic airflow-git-ssh-secret --from-file=gitSshKey=~/.ssh/id_rsa -네임스페이스

 

helm chart 값 작성

..... 생략
dags:
  gitSync:
    enabled: true
    repo: git@git repository url 
    branch: main
    rev: HEAD
    depth: 1
    maxFailures: 0
    subPath: ""

    sshKeySecret: airflow-git-ssh-secret
    
.... 생략

 

B.HTTPS 프로토콜을 이용한 인증 방식

PAT 생성 (Setting -> Developer settings -> Personal access tokens -> Generate new token)

Github PAT 생성

 

 

 

시크릿 yaml 작성하기
** 주의 : username 과 pat 는 base64 인코딩을 해서 저장해야 한다. 
** 또 주의: linux 에서 echo 를 이용할 경우 -n 옵션을 추가하여 뉴라인을 제거한다 

apiVersion: v1
kind: Secret
metadata:
  name: git-credentials
data:
  GIT_SYNC_USERNAME: github username
  GIT_SYNC_PASSWORD: PAT 비밀번호
  GITSYNC_USERNAME: github username
  GITSYNC_PASSWORD: PAT 비밀번호

 

 

helm chart값 작성

.....
dags:
  gitSync:
    enabled: true
    repo: https://git repository url 
    branch: main
    rev: HEAD
    depth: 1
    maxFailures: 0
    subPath: ""

    credentialsSecret: git-credentials
 .....

 

helm 을 이용한 설치 후 git-syn-init 컨테이너에서 아래와 같은 로그가 찍혔다면 성공이다.

 

 

 

동작 스크린샷

Git Repository

 

 

Sample Dag
Airflow Dag 등록 화면

 

 

4. Reference

(1) https://airflow.apache.org/docs/helm-chart/stable/manage-dags-files.html#using-git-sync

 

Manage DAGs files — helm-chart Documentation

 

airflow.apache.org

(2) https://artifacthub.io/packages/helm/apache-airflow/airflow

 

airflow 1.15.0 · apache-airflow/apache-airflow

The official Helm chart to deploy Apache Airflow, a platform to programmatically author, schedule, and monitor workflows

artifacthub.io

 

 

반응형
반응형

1.  에러 로그

-- 유실..

 

2. 원인 또는 현상

Werkzeug 모듈이 버전에 따라 secure_filename() 의 위치가 달라져, 발생하는 이슈

 

3.해결방법

해당 패키지 업그레이드.

$pip install --upgrade werkzeug==0.16.1
반응형

+ Recent posts