반응형


빅데이터 환경에서 다양한 애플리케이션, 플랫폼 도구들을 실험할 때 데이터는 항상 필요하다.

실제 운영 환경의 데이터를 활용하면 좋겠지만, 보통은 개발/스테이징/운영의 환경을 나눠놓기 때문에, 데이터도 개발용 데이터가 필요할 때가 있다.

아래 간단하게 yaml 포맷으로 데이터 스키마를 정의하면, 명령어 실행으로 데이터를 출력해주는 간단한 툴을 만들었다.

누구나 가져다 쓸 수 있고, 수정해서 쓸 수 있다.

사실 이미 공개된 오픈 소스 제네레이터는 많지만, 파티셔닝된 데이터출력이라던지 데이터 포맷을 스스로 정의해줘야 하는 특수한 경우를 고려해서 그냥 만들어봤다.

 

만들어볼 줄 도 알아야 하니까.

 

 

1. BigdataSimpleGenerator 소스코드 

https://github.com/jaysooo/bigdata-simple-generator

 

 

2. 빠른 시작

A.데이터 스키마 정의하기

  • 기본적으로 int, string, float( 자리수 포함), 범위, 옵션 리스트 를 정의할 수 있다.
  • S3 , 로컬을 지원한다.
  • 레코드 개수를 정의할 수 있다.
data_spec:
  table_name: stg-event-dummy
  records: 1000
  file_format: csv # csv, parquet, json
  file_prefix: data
  destination: /YourPath/sample_data
  # destination: s3://yourbucket/event-data/
  partition_by:
    name: partition_key
    range:
      min: "2025-01-01"
      max: "2025-01-10"
  table_schema:
    - name: id
      type: int
      prefix: id_
    - name: device_name
      type: string
      prefix: device_
    - name: device_type
      type: string
      range:
        items:
          - "iphone"
          - "galaxy"
          - "xiaomi"
          - "huawei"
    - name: event_time
      type: timestamp
    - name: event_type
      type: string
    - name: event_rate
      type: float
      range:
        min: 0 
        max: 1
      decimal_point: 2

B.실행하기

$ python data-generator.py --config config.yaml --producer pyarrow

 

3. 출력 데이터

예제 데이터

 

4. 참고

  • pyarrow dataframe 으로 데이터를 생산하기 때문에, 데이터량에 따라 머신의 메모리 가용 공간을 확인한다.
  • 인터페이스만 구현한다면 데이터 출력 모듈을 직접 구현해서 출력할 수 있다. (pyspark, pandas 등 )

 

 

 
반응형
반응형

오랜만에 포스팅을 하는 것 같다.

 

오늘은 GPT 를 이용해 간단한 동영상 쇼츠 제작 방법을 소개해보려고 한다.

 

여러 방법들이 있을 거고 정답은 없는 것 같다. 나는 X에서 최근 공개한 Grok 과 openAI 사의 ChatGPT 를 함께 이용해서 만들었다.

 

절차부터 얘기하면 다음과 같다.

 

1) grok 을 이용해 대본만들기

 

2) chatGPT 의 영상 제작 모델에 대본을 프롬프트로 입력해 영상 제작을 의뢰한다.

 

 

Grok (https://x.ai/) 으로 브라우저를 옮겼다. 그리고 나는 대본을 만들어달라는 프롬프트를 아래 처럼 주문했다.

 

Grok 화면

답변을 복사하고 chatgpt (https://chatgpt.com/) 로 브라우저를 넘겨보자.

GPT 탐색 을 눌러보면 video maker 와 관련된 여러 모델들을 검색할 수 있는데, 나는 1위에 랭크되있는 모델을 택했다.

 

 

 

나는 데이터엔지니어기 때문에, apache airflow 를 30초 로 소개하는 영상을 주문해보았다.

 

대본을 프롬프트에 입력하고 영상 제작을 주문하면 연계된 서비스 페이지로 링크를 하나 보내줄 것이다.

 

링크를 타고 들어가면, 영상을 제작하는 과정을 조금 기다려야 한다.

 

물론 !!! 자동으로 생성된 영상은 한 20% 정도 부족해보인다.

 

그래서 동영상을 편집하는 모드에서 자막과 영상을 업로드할 수도 있다. (여기서 노가다가 시작되지 않을까.. ) 

 

퀄리티가 아주 만족할 만한 수준은 아니지만, 그래도 짧은 시간에 제작한 것 치곤 괜찮다.

 

AI 가 제작해준 영상을 아래 업로드하면서 마무리한다.

(아.. 용량 제한으로 영상 업로드가 안되네.. ) 

 

 

참고) 프롬프트

**** GROK Prompt ****

이제부터 너 유튜브 대본을 작성하는 작가이자 데이터 엔지니어인 제인이야 

난 한국에서 유튜브 채널을 만들어 영상을 올릴 계획이야. 

이번에는 apache airflow 를 소개하는 영상을 만들거야

다음 요구사항을 만족하도록 대본과 정보를 만들어줘

- 영상 시간은 30초야
- airflow 의 소개와, 기능 그리고 장점으로 구성해줘
- airflow 를 사용자에게 홍보하는 느낌으로 만들어줘



**** ChatGPT Prompt ****

이제부터 너 유튜브 대본을 작성하는 작가이자 데이터 엔지니어인 제인이야

난 한국에서 유튜브 채널을 만들어 영상을 올릴 계획이야.

이번에는 apache airflow 를 소개하는 영상을 만들거야

다음 대본을 줄테니 영상을 제작해줘

대본 (30초)

"안녕하세요! 오늘은 데이터 엔지니어링의 필수 도구, Apache Airflow를 소개해드릴게요. Airflow는 복잡한 데이터 파이프라인을 쉽게 관리하고 스케줄링할 수 있는 오픈소스 플랫폼이에요. DAG를 통해 작업의 의존성과 순서를 명확히 정의하고, AWS, Google Cloud 같은 서비스와도 간편하게 연동할 수 있죠. 확장성이 뛰어나고, 활발한 커뮤니티 지원으로 계속 발전 중이에요. 게다가 직관적인 UI로 워크플로우를 한눈에 모니터링할 수 있어요. 데이터 작업을 효율적으로 관리하고 싶다면, Airflow를 꼭 사용해보세요!"

정보 요약

소개: Apache Airflow는 데이터 파이프라인을 관리하고 스케줄링하는 오픈소스 플랫폼입니다.

기능: DAG(Directed Acyclic Graph)로 작업 흐름을 정의하고, 다양한 서비스(AWS, Google Cloud 등)와 유연하게 통합됩니다.

장점: 확장성이 뛰어나고, 커뮤니티 지원이 활발하며, UI로 워크플로우를 시각적으로 관리할 수 있어 사용자 친화적입니다.
 
반응형
반응형

1. Yunikorn 이란 ?

  • 쿠버네티스에서 동작하는 Batch , Data & ML 서비스를 위한 리소스의 파워를 unleash(해방,촉발시키는) 하는 스케쥴러다.(1)
  • k8s 기본 스케쥴러의 대안으로 활용될 수 있는 스케쥴러로, 특히 다양한 리소스의 요구사항을 만족시킬 수 있는 기능들을 제공한다.


2.왜 Yunikorn?

  • 기존의 k8s 스케쥴러는 CPU, Memory, GPU 등의 리소스만을 고려하여 스케쥴링을 수행하는데, 만약 다양하고 많은 양의 Pod 가 배포되고 회수되는 환경에서 필요하게되는 미세한 조정이 어려운 단점이 있다. 
  • 그런 부분에서 Yunikorn 은 아래에서 소개할 다양한 기능으로 단점을 보완해준다.
  • yunikorn 과 함께 많이 사용되는 volcano(2) 스케쥴러가 있다.(개인적으로 volcano 가 더 어려워보여서 유니콘을 선택했다.. 유니콘은 대시보드를 제공해주는 점이 더 매력적이었고)
  • yunikorn 은 apache incubator 프로젝트고 volcano 는 cnf(cloud native computing foundation) 프로젝트이다.)


3. Features

  • App-aware 스케쥴링
    • k8s 기본 스케쥴러와 달리 컨텍스트 정보(사용자, 애플리케이션, 큐 등) 기반의 스케쥴링을 지원한다.
    • 리소스 총량, 공정성 정책, 우선순위 정책으로 스케쥴링 fine-grained controls (적은 범위의 제어)를 할 수 있다.
  • 계층적 구조의 리소스 큐
    • 큐 간의 계층적 구조를 통해 multi-tenancy 환경에서 효율적인 리소스 할당을 할 수 있다.
  • Job Ordering and Queuing
    • 작업의 우선순위를 지정하고, 작업이 큐에 들어갈 때 우선순위에 따라 큐에 배치된다.
    • FIFO, Fair, StateAware, piriority 정책을 지원한다.
  • 갱 스케쥴
    • 쉽게 말해 분산 환경에서 All or Nothing 스케쥴 방식이다.
    • 애플리케이션이 필요한 리소스의 세트(묶음)을 요청하고, 이 리소스가 모두 확보될 때 한 번에 스케쥴링한다.
    • 갱 스케쥴러가 placeholder pod(임시 pod)를 배정하고, UpScaling이 일어난 후 실제 pod 와 교체되는 방식이다.
    • 갱 스케쥴을 사용할 경우 FIFO 정책을 사용하게 된다. (왜냐하면 policy 는 리소스를 부분적으로 예약하기 때문에 리소스 세그먼트가 발생할 수 있다.)

갱 스케쥴링 방식의 배포 흐름

 

 

쿠버네티스 기본 스케쥴링의 파드배포
유니콘 스케쥴링의 파드 배포

 

 

4. Yunikorn 사용하기

***아래 가이드는 쿠버네티스 사용이 익숙한 사람들 기준으로 작성된 점 참고바랍니다.

 

A.yunikorn 설치 

 

2가지 설치 모드

  • embeddedAdmissionController
    • 쿠버네티스 기본 스케쥴러 대신 유니콘 스케쥴러를 사용하는 모드
    • api server 와 통신하는 모든 트래픽을 유니콘 스케쥴러로 라우팅한다
    • 성능이 뛰어나다고 하다..ㅇㅅㅇ
  • plugin Mode
    • 기본 스케쥴러의 framework 일부로 작동하는 모드
    •  mixed workload 에 적합하다.

 

 

 


그리고 helm repo values.yaml 파일 작성 (4)

service:
  type: NodePort
  targetPort: 9080
  portWeb: 9889


** 다른 설정은 하지 않았고, Dashboard ingress 생성을 위해 service 만 NodePort 로 변경했다

 

 


Ingress 생성

apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: yunikorn-ingress
  namespace: yunikorn
  annotations:
    alb.ingress.kubernetes.io/scheme: internet-facing
    alb.ingress.kubernetes.io/target-type: instance
    alb.ingress.kubernetes.io/subnets: [ 사용하는 서브넷 ]
    alb.ingress.kubernetes.io/security-groups: [보안 그룹 ]
    alb.ingress.kubernetes.io/load-balancer-name: [로드밸랜서 이름]

spec:
  ingressClassName: alb
  rules:
    - http:
        paths:
          - path: /
            pathType: Prefix
            backend:
              service:
                name: yunikorn-service
                port:
                  number: 9889




helm 설치 스크립트

#!/bin/bash
helm repo add yunikorn https://apache.github.io/yunikorn-release
helm repo update


echo "install yunikorn by helm.."

helm upgrade --cleanup-on-fail \
  --install yunikorn yunikorn/yunikorn \
  --namespace yunikorn \
  --create-namespace \
  --version=1.6.0 \
  --values values.yaml


**혹시 필요한 사람을 위해 설치스크립트도 공유한다.** 

 

대시보드 화면

대시보드화면



B. Yunikorn 사용

  • Standard mode 로 설치했을 경우 어떤 파드로 배포하더라도 yunikorn 스케쥴러가 파드를 스케쥴링한다.
  • 필자의 경우 처럼 Spark Job 을 실행할때는 아래와 같이 속성값을 추가해줘야 한다
- controller.batchScheduler.enable=true
- controller.batchScheduler.default=yunikorn


* EKS 기준으로 yunikorn 을 이용해 spark job 을 배포하는 가이드는 별도로 포스팅하고, 잘 작성된 가이드로 마무리하겠다. (3)

 


5.출처

  •  이 포스팅에 대한 설명은 함께 작업했던 강인호(aws 코리아)님께서 공유해주신 자료를 복습하고 요약하였다.

6.References


(1) https://yunikorn.apache.org/
(2) https://volcano.sh/en/
(3) https://docs.aws.amazon.com/ko_kr/emr/latest/EMR-on-EKS-DevelopmentGuide/tutorial-yunikorn.html

(4)  https://artifacthub.io/packages/helm/yunikorn/yunikorn

 
반응형
반응형

 

1.Dag Factory 란?

  • yaml 포맷 기반의 워크플로우 정의를 읽어 dag 을 자동으로 생성해주는 에어플로우 라이브러리이다. (1)(2)

 

2. 왜 Dag Factory ? 

  • Airflow 를 사용하려면 아래와 같은 절차로 Workflow 를 만드는 작업을 해야 하는데, 이런 수고를 줄여준다.
  • workflow 는 dag 객체를 생성하는 파이썬 스크립트 파일을 만들어야 하기 때문에 파이썬 문법을 알아야 한다.
  • dag 파일을 작성하려면 파이썬의 문법과 airflow 에서 제공하는 core 패키지 문서를 학습하여 작성해야 한다.
  • 워크플로우들이 많아질때는 관리하기 힘들어지기 때문에, Dag의 메타데이터화와 동시에 자동화를 고려해야 하는 시기가 온다. 
    • python 과 airflow 의 주요 내용들의 학습 없이 DAG 구조체를 만들어준다
    • 중복된 코드를 피해준다



3. Features

  • Multiple Configuration Files
  • Custom Operators
  • Callbacks 지원


4. Dag Factory 사용하기

절차

1) dag factory 를 설치한다.
2) 쿠버네티스를 사용할 경우 2가지 방법이 있다.
           a.패키지 설치 버전의 이미지를 직접 빌드하는 방법
           b.pod 내 airflow 가 init 할때 패키지 설치 명령어를 추가하는 방법
3) yaml 포맷의 워크플로우 파일을 작성한다.
4) dagfactory 객체를 이용해 yaml 리소스를 로드할 python 파일을 작성한다.
5) 작성한 2개의 파일을 dag 폴더로 이동시킨다

 


 

A. 설치

$ pip install dag-factory

 

 

 


B.쿠버네티스에서 helm 을 이용해 배포할 경우

쿠버네티스에서 사용할 경우 아래 두 가지 선택이 있을 수 있다.
첫 번째 ) 직접 이미지를 빌드하는 방법
두 번째 ) pod 내 airflow 가  init 할때 패키지 설치 명령어를 추가하는 방법

 

1)  dag-factory 를 설치완료한 이미지를 빌드하여 올려서 사용한다.

 

Docker file 예시 

FROM apache/airflow:2.8.2

RUN pip install dag-factory==0.19.0

ENTRYPOINT ["/usr/bin/dumb-init", "--", "/entrypoint"]
CMD []



 2) airflow chart values 파일 내 worker 가 bootstrap 할 때 패키지 설치 명령어를 추가한다.

workers:
  # Number of airflow celery workers in StatefulSet
  replicas: 1
  # Max number of old replicasets to retain
  revisionHistoryLimit: ~

  # Command to use when running Airflow workers (templated).
  command: ~
  # Args to use when running Airflow workers (templated).
  args:
    - "bash"
    - "-c"
    # The format below is necessary to get `helm lint` happy
    - |-
      python -m pip install --upgrade pip & pip install --no-cache-dir dag-factory && exec \
      airflow {{ semverCompare ">=2.0.0" .Values.airflowVersion | ternary "celery worker" "worker" }}

 

 

 

C.사용

절차
1 ) yaml 포맷의 워크플로우 파일 작성한다.
2) dagfactory 객체를 이용해 yaml 리소스를 로드할 python 파일을 작성한다.
3) 작성한 2개의 파일을 dag 폴더로 이동시킨다

 

1) yaml 포맷의 워크플로우 파일 작성하기

default:
  default_args:
    catchup: false,
    start_date: 2024-11-11

basic_example_dag:
  default_args:
    owner: "custom_owner"
  description: "this is an example dag"
  schedule_interval: "0 3 * * *"
  render_template_as_native_obj: True
  tasks:
    task_1:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 1"
    task_2:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 2"
      dependencies: [task_1]
    task_3:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 2"
      dependencies: [task_1]



2) python 파일 작성하기

import os
from pathlib import Path

# The following import is here so Airflow parses this file
# from airflow import DAG
import dagfactory

DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"

CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))

config_file = str(CONFIG_ROOT_DIR / "example_dag_factory.yml")

example_dag_factory = dagfactory.DagFactory(config_file)

# Creating task dependencies
example_dag_factory.clean_dags(globals())
example_dag_factory.generate_dags(globals())

 


3) 파일을 dag 디렉토리로 이동시킨다.

mv * /usr/local/airflow/dags/

 

** 이전에 포스팅한 GitSync 를 같이 이용할 수 있습니다. 그렇게 되면 dag 파일을 디렉토리로 이동할 필요 없이 git repository 로 이동하면 되는거죠. :) [3]

 

 

 

화면 참고용 스크린샷





5.References


[1]  https://github.com/astronomer/dag-factory
[2]  https://astronomer.github.io/dag-factory/latest/getting-started/quick-start-airflow-standalone/

[3] https://jssvs.tistory.com/101

반응형
반응형

1. Git-Sync란?

  • Git-Sync 는 Kubernetes 클러스터 내에서 git repository 를 동기화하는 sidecar 기능이다 (1)
  • airflow 와 같은 워크플로우 도구를 사용할 때, DAG 파일들을 git 에서 관리하고 있을 때 사용한다.
  • helm 을 통한 쿠버네티스 배포 시에만 지원하는 기능으로 보인다.

 

2. 왜 GitSync 기능을? 

  • 일반적으로 python 으로 작성한 airflow dag 파일은 airflow component(worker, scheduler) 에서 access 할 수 있는 파일 시스템에 저장되어야 한다.
  • git sync 기능을 사용하면, git repository 에 저장된 DAG 파일을 kubernetes pod 내부에 동기화 할 수 있다.
  • 반대로 git sync 를 사용하지 않는다면, DAG 파일을 kubernetes pod 에 복사하는 방식을 사용해야 한다.
    • s3 로 부터 주기적으로 sync 를 받는 sidecar 컨테이너를 개발
    • efs 와 같은 네트워크 파일 시스템을 사용 

 

3. Git-Sync 사용하기 

절차

  • git repository 를 생성한다.
  • 2가지 인증 방식 중 한 가지를 선택한다.
  • 선택한 인증방식의 credential 을 준비한다.
  • 인증 정보를 쿠버네티스 secret(시크릿) 으로 생성한다
  • helm chart 에서 git-sync 를 활성화 한다. (2)

 

인증 2가지 방식 소개 

  • SSH 프로토콜을 이용한 인증 방식 (Github 기준 )
  • HTTPS 프로토콜을 이용한 인증 방식

 

A.SSH  프로토콜을 이용한 인증 방식

SSH 키 생성

$ ssh-keygen -t rsa -b 4096 -C "jssvs@test.com"

# 복제할 키 문자열
$ cat ~/.ssh/id_rsa.pub

 

 

Git에 SSH 키 등록 

GIthub 키등록

 

쿠버네티스 시크릿 리소스 생성

kubectl create secret generic airflow-git-ssh-secret --from-file=gitSshKey=~/.ssh/id_rsa -네임스페이스

 

helm chart 값 작성

..... 생략
dags:
  gitSync:
    enabled: true
    repo: git@git repository url 
    branch: main
    rev: HEAD
    depth: 1
    maxFailures: 0
    subPath: ""

    sshKeySecret: airflow-git-ssh-secret
    
.... 생략

 

B.HTTPS 프로토콜을 이용한 인증 방식

PAT 생성 (Setting -> Developer settings -> Personal access tokens -> Generate new token)

Github PAT 생성

 

 

 

시크릿 yaml 작성하기
** 주의 : username 과 pat 는 base64 인코딩을 해서 저장해야 한다. 
** 또 주의: linux 에서 echo 를 이용할 경우 -n 옵션을 추가하여 뉴라인을 제거한다 

apiVersion: v1
kind: Secret
metadata:
  name: git-credentials
data:
  GIT_SYNC_USERNAME: github username
  GIT_SYNC_PASSWORD: PAT 비밀번호
  GITSYNC_USERNAME: github username
  GITSYNC_PASSWORD: PAT 비밀번호

 

 

helm chart값 작성

.....
dags:
  gitSync:
    enabled: true
    repo: https://git repository url 
    branch: main
    rev: HEAD
    depth: 1
    maxFailures: 0
    subPath: ""

    credentialsSecret: git-credentials
 .....

 

helm 을 이용한 설치 후 git-syn-init 컨테이너에서 아래와 같은 로그가 찍혔다면 성공이다.

 

 

 

동작 스크린샷

Git Repository

 

 

Sample Dag
Airflow Dag 등록 화면

 

 

4. Reference

(1) https://airflow.apache.org/docs/helm-chart/stable/manage-dags-files.html#using-git-sync

 

Manage DAGs files — helm-chart Documentation

 

airflow.apache.org

(2) https://artifacthub.io/packages/helm/apache-airflow/airflow

 

airflow 1.15.0 · apache-airflow/apache-airflow

The official Helm chart to deploy Apache Airflow, a platform to programmatically author, schedule, and monitor workflows

artifacthub.io

 

 

반응형
반응형
오늘은 쿠버네티스의 스테이트풀셋 리소스를 이용해 오픈소스 ElasticSearch 를 배포해보겠습니다.

스테이트풀셋은 디플로이먼트와 성질이 비슷하지만, 파드에 고유한 이름이 부여되고 여러 개의 파드 사이에 순서를 지정해서 실행할 수 있습니다.

ElasticSearch 는 RestAPI 기반으로 json 도큐먼트 데이터를 분산 저장하여, 빠르게 조회할 수 있는 검색 엔진이죠.

빅데이터 엔지니어링 분야에서 kibana, Logstash 와 함께 ELK 로 많이 사용되는 소프트웨어 스택이기도 합니다.

아래 과정을 통해 손쉽게 쿠버네티스로 배포해보겠습니다.

 

 

1. 준비 과정

  • 데이터 영속성을 보장하기 위해 영구 볼륨 (PersistentVolume)생성
  • ElasticSearch 애플리케이션 배포를 위한 스테이트풀셋 생성
  • ElasticSearch 파드를 외부로 노출하기 위한 Service(쿠버네티스의 서비스) 생성
  • curlImage로 기본 파드를 띄우고 Elastic Search 의 서비스 확인

 

2. Statefulset 이용해 ElasticSearch 배포

 
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: sf-es
  namespace: work
spec:
  replicas: 2
  serviceName: "my-es"
  selector:
    matchLabels:
      app: sf-es
  volumeClaimTemplates:
  - metadata:
      name: vol-es-data
    spec:
      accessModes: ["ReadWriteOnce"]
      storageClassName: gp3
      resources:
        requests:
          storage: 1Gi
  template:
    metadata:
      labels:
        app: sf-es
    spec:
      containers:
      - name: sf-es
        image: docker.elastic.co/elasticsearch/elasticsearch:7.8.0
        env:
        - name: discovery.type
          value: "single-node" # Elasticsearch 노드 타입 설정
        - name: cluster.name
          value: my-es # Elasticsearch 클러스터 이름 설정
        - name: network.host
          value: 0.0.0.0 # 모든 네트워크 인터페이스에서 수신하도록 설정
        ports:
        - containerPort: 9200
          name: es1 # HTTP 포트
        - containerPort: 9300
          name: es2 # Transport 포트
        volumeMounts:
        - name: vol-es-data
          mountPath: /usr/share/elasticsearch/data
        livenessProbe:
          httpGet:
            path: /
            port: 9200
          initialDelaySeconds: 60 # 초기 지연 시간
          periodSeconds: 10 # 주기 시간
          timeoutSeconds: 5 # 타임아웃 시간
          failureThreshold: 3 # 실패 허용 임계값
      initContainers:
      - name: init-container
        image: busybox:latest
        command: ["sh","-c","chown -R 1000:1000 /usr/share/elasticsearch/data"]
        securityContext:
          privileged: true # 루트 권한으로 실행
        volumeMounts:
        - name: vol-es-data
          mountPath: /usr/share/elasticsearch/data # 데이터 볼륨 마운트 경로
---
apiVersion: v1
kind: Service
metadata:
  name: svc-es
  namespace: work
spec:
  clusterIP: None
  ports:
  - port: 9200
    name: es1
  - port: 9300
    name: es2
  selector:
    app: sf-es

 

몇가지 설정에 대한 설명을 추가해보겠습니다.

디플로이먼트와 다르게 VolumeClaimTemplate 속성을 주면서 동적 프로비저닝을 할 수 있구요.

9200,9300 port는 엘라스틱서치가 사용되는 named port 입니다.

livenessProbe 속성값으로 실행된 컨테이너의 서비스 healthcheck 를 파드 차원에서 할 수 있습니다.

서비스 리소스를 생성하면 서비스 이름의 DNS 로 다른 파드에서 접근할 수 있습니다.

 

아래처럼 statefulset 을 배포하구요.

 

kubectl 또는 k9s 로 배포 상태를 확인해줍니다.

앞서 설명한 것 처럼 pod 의 이름에 임의의 문자열로 되어있지 않고, 이름과 인덱스로 고유한 이름을 갖고 있습니다.

 

 

3. 서비스 확인

별도의 파드를 띄워서 조금 전에 띄운 ElasticSearch 파드가 정상적으로 운영되는지, 접근이 되는지 확인해보겠습니다.

apiVersion: v1
kind: Pod
metadata:
  name: pod-conn-test
  namespace: work
spec:
  containers:
  - name: es-conn
    image: curlimages/curl
    command: ["sleep","3600"]

 

ElasticSearch는 RestAPI 기반으로 데이터를 핸들링할 수 있기 때문에, 리눅스 curl 이미지를 이용해 아래와 같이 테스트 파드를 띄웠습니다.

 

 

해당 파드의 컨테이너에 쉘에 진입해서 서비스 정보를 출력해봤습니다.

 

여기서 중요한 것은 ElasticSearch 의 DNS 입니다.

 

제가 만든 서비스의 이름으로 접근이 되는 것을 볼 수 있습니다.

 

 

미리 생성했던 인덱스도 조회해봤습니다.

 

추가적인 API 는 https://www.elastic.co/guide/en/elasticsearch/reference/current/rest-apis.html 에서 확인하실 수 있습니다.

 

 

4. 마치며

과거 프로젝트 진행으로 ElasticSearch를  여러 서버에 멀티클러스터링으로 직접 설치해본 경험도 있고, Docker 를 이용해 구축해본 경험도 있습니다.  쿠버네티스를 알게되고 나서 구축의 편의성을 크게 느낍니다..

느낌적으로 앞으로는 데이터엔지니어링의 서비스 배포 및 관리도 쿠버네티스로 많이 넘어갈 것 같아요.

 

 

오늘은 여기까지 쓰겠습니다.

 

끝 

 

 

반응형
반응형

쿠버네티스 데이터 엔지니어링을 시작한지 1년 정도가 지난 것 같습니다.

지금까지 학습하며 경험한 내용을 바탕으로 지식을 정리하면서 관련 포스팅을 시작하려고 합니다.

 

이론적인 내용도 다루겠지만, 시간이 되면 실제 운용 사례 기반으로 내용을 작성하려고 합니다.

이번 MySQL 배포도, 실제 서비스의 메타 데이터로 활용해봤는데 큰 문제가 없었습니다.

 

관리형 DB(예를 들어 AWS RDS 등) 를 사용하시는 분들께서 비용절감의 고민이 있으시다면, 게다가 쿠버네티스 환경이 준비된 분들이라면 스테이트풀셋을 이용한 MySQL 파드를 배포해서 사용하시는 것을 추천드립니다. 

 

스테이트풀셋을 이용해 MySQL 을 배포하는 방법도 가능합니다만 이번 글에서는 디플로이먼트를 이용해보겠습니다.

 

1. 준비 과정 

  • 데이터 영속성을 보장하기 위해 영구 볼륨 (PersistentVolume)생성
  • MySQL 애플리케이션 배포를 위한 디플로이먼트 생성
  • MySQL 파드를 외부로 노출하기 위한 Service(쿠버네티스의 서비스) 생성

** 아시는 것 처럼 쿠버네티스의 모든 리소스는 yaml 파일로 작성할겁니다.

 

2. 디플로이먼트를 이용해 MySQL 배포  

a. 데이터 영속성을 보장하기 위해 영구 볼륨 (PersistentVolume)생성

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: pvc-study-mysql
  namespace: work
spec:
  accessModes:
  - ReadWriteOnce
  storageClassName: gp3
  resources:
    requests:
      storage: 1Gi

** pvc 를 동적 볼륨 프로비저닝 하기 위해서는 스토리지클래스 에 provisioner 를 명시해줘야 합니다. 저는 미리 만들어둔 스토리지 클래스를 사용했고, 이 내용은 다음에 자세히 다루겠습니다.

 

b. MySQL 애플리케이션 배포를 위한 디플로이먼트 생성

 

apiVersion: apps/v1
kind: Deployment
metadata:
  name: study-mysql
  namespace: work
  labels:
    app: mysql
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mysql
  template:
    metadata:
      labels:
        app: mysql
    spec:
      volumes:
      - name: study-volume
        persistentVolumeClaim:
          claimName: pvc-study-mysql
      containers:
        - name: study-mysql
          image: mysql:latest
          env:
          - name: MYSQL_ROOT_PASSWORD
            value: root
          - name: MYSQL_DATABASE
            value: test
          ports:
          - containerPort: 3306
          volumeMounts:
          - name: study-volume
            mountPath: /var/lib/mysql

 

 

c. MySQL 파드를 외부로 노출하기 위한 Service(쿠버네티스의 서비스) 생성

apiVersion: v1
kind: Service
metadata:
  name: svc-study-mysql
  namespace: work
spec:
  selector:
    app: mysql
  ports:
  - port: 3306

 

d. 서비스 노출 확인을 위한 파드 생성과 통신 테스트

apiVersion: v1
kind: Pod
metadata:
  name: mysql-client
  namespace: work
  labels:
    app: mysql-client
spec:
  containers:
  - name: mysql-client
    image: mysql:latest
    command: [ "sleep", "infinity" ] # 파드가 무기한으로 동작하도록 설정

 

 

** 위 3개의 리소스 yaml 파일을 kubectl 을 이용해 배포해줍니다.

 

3. 서비스 확인

a.파드의 상태 확인 

 

 

b.통신테스트를 위해 띄운 파드에 쉘에 진입해 서비스 노출 확인

 

 

c.mysql 파드에 쉘에 진입해 데이터베이스 확인

 

 

4. 마치며

configmap 을 이용해서 database 의 설정을 동적으로 적용하며 운영을 할 수 있습니다.

stastefulset 으로 배포하면 운영의 안정성을 높일 수 있습니다. ( 파드의 고유성이 부여되고 영구적인 호스트 이름이 부여됩니다.) 

다음에는  statefulset 을 이용해 엘라스틱 서치를 배포해보겠습니다.

반응형
반응형

 

 

안녕하세요. 오늘 다뤄볼 주제는 다이나믹 프로그래밍입니다.

 

 

위키에 따르면 동적 프로그래밍은 "복잡한 문제를 더 작은 하위 문제로 나누어 해결하는 알고리즘 설계 기법" 이라고 설명하고 있습니다.

 

나무위키에 조금 더 와닿는 설명이 있는데, 답을 재활용하는 것입니다. 앞에서 구했던 답을 뒤에서 이용하고, 옆에서도 이용해서 더 빠르게 답을 구하는 문제 해결 패러다임이라고 볼 수 있습니다.

 

가장 많이 사용하는 예제는 피보나치 수열입니다.

 

아래 피보나치를 재귀를 이용해 동적프로그래밍으로 구현한 코드입니다.

import time





def fibonacci(n):

    if n<=1:

        return n

    

    return fibonacci(n-1) + fibonacci(n-2)







def fibonacci_with_cache(n,cache={}):

    if n in cache:

        return cache[n]



    if n <=1:

        return n

    

    cache[n] = fibonacci_with_cache(n-1) + fibonacci_with_cache(n-2)



    return cache[n]





def benchmark():

    start_time = time.time()

    print(fibonacci_with_cache(40))

    end_time = time.time()

    execution_time = end_time - start_time

    print(f"Execution time: {execution_time:.6f} seconds")



# 102334155

# (fibonacci - Execution time: 21.109457 seconds



#102334155

# Execution time: 0.000057 seconds



benchmark()

 

 

위 예제에서 피보나치 함수를 2 가지 버전으로 만들었는데요. 하나는 일반 피보나치 함수이고, 다른 하나는 캐시라는 이름이 붙어있습니다.

fibonacci_with_cache 함수는 dictionary 타입(k,v) 의 변수를 캐시라고 가정해서 피보나치의 결과를 캐시에 저장했습니다.

이렇게 답을 구한 결과를 메모리에 저장하는 방법을 메모이제이션(memoization) 이라고 합니다..

 

응답속도의 결과를 비교해보면 차이가 꽤 납니다.

 

.

반응형
반응형

 

오늘 다뤄볼 주제는 팩토리 메소드 패턴입니다.

 

이 디자인의 특징은 클라이언트 코드 수준에서 객체를 직접 생성하지 않고, 팩토리 메소드에게 위임합니다.

 

이렇게 하면 클라이언트 코드와 사용하는 객체간의 거리를 만들어 유연성과 유지관리성을 높일 수 있게 되는데, 이런 표현을 책에서는 loose coupling(느슨한 결합)이라고 합니다.

 

아래 팩토리 메소드 패턴의 주요 구성요소와 함께 예제코드를 작성해보겠습니다

 

 

구성 요소 

1) Generator = abstract class or interface 

2) SubClass of Generator = Product 유형의 오브젝트를 생성할 팩토리 메소드가 구현될 클래스

3) Product = abstract class or interface

4) SubClass of Product = 팩토리 메소드에 의해 생성되는 오브젝트의 클래스

 

 

 

 

예제 코드 설명

- (1)Generator 는 팩토리 메소드가 구현될 추상화 클래스이며, ABC 클래스를 상속받아 Base 클래스의 메소드의 구현을 강제화 합니다

- (2) SubClass 는 Generator 클래스를 상속받아, 유형별 데이터 수집기 클래스의 객체를 생성하는 팩토리 메소드를 구현합니다.

- (3) Product 는 데이터 수집기의 추상화 클래스입니다.

- (4) SubClass 는 Product 클래스를 상속받아, 유형별 데이터 수집기의 동작을 구현합니다.

- (5) create_collector 함수는 유형별 데이터 수집기의 객체를 생성하는 팩토리 메소드를 구현합니다.

 

 

예제 코드 

from abc import ABC, abstractmethod





class Collector(ABC):

    @abstractmethod

    def collect(self, data):

        pass







class DBCollector(Collector):

    def collect(self, data:str):

        print(f"Collecting data from database: {data}")





class APICollector(Collector):

    def collect(self, data:str):

        print(f"Collecting data from API: {data}")





class Generator(ABC):

    

    @abstractmethod

    def generate(self):

        pass



class DBCollectorGenerator(Generator):

    

    def generate(self):

        return DBCollector()

    

class APICollectorGenerator(Generator):



    def generate(self):

        return APICollector()

    



def create_collector(name:str):

    generator = {

        "db" : DBCollectorGenerator, 

        "api" : APICollectorGenerator

    }



    return generator[name]().generate()





def main():

    db_collector = create_collector("db")

    api_collector = create_collector("api")



    target_data = ["user_information","product_information"]



    for collecting_data in target_data:

        db_collector.collect(collecting_data)

        api_collector.collect(collecting_data)



if __name__ == "__main__":

    main()

 

 

출력 결과 

Collecting data from database: user_information

Collecting data from API: user_information

Collecting data from database: product_information

Collecting data from API: product_information

 

 

만약 제가 카카오 API 를 연동해서 어떤 데이터를 수집해야 하는 요구상황이 생긴다고 가정해볼게요. 

 

저는 위 구조에서 KaKaoAPICollector 클래스를 추가하고 collect 메소드를 구현할거에요.

 

기존 코드의 영향을 적게 주면서 새로운 클래스만 추가하면 되기 때문에, 이런 부분에서 유연성과 확장성이 올라갔다고도 볼 수 있을 겁니다.

 

 

.

 

 

 

반응형
반응형

 

 

오늘 다뤄볼 주제는 덕 타이핑입니다.

 

 

덕 타이핑은 프로그래밍 언어에서 클래스의 행동에 중점을 둔 컨셉중 하나에요. 

 

클래스의 상속, 유형보다 객체의 동작 즉 행동을 강조해서 유연성을 높이고 캐스팅의 필요성을 줄여주는 프로그래밍 스타일입니다.

 

아래 코드로 예를 들어 보겠습니다.

 

class Duck:

    def sound(self):

        print("Quack!!!")



class Person:

    def sound(self):

        print("I'm Quaking like a duck ! ")





def make_it_sound(obj):

    obj.sound()

 

만약 Dog, Cat 여러 클래스가 추가 개발되야 된다고 해도, make it sound 함수로 주요 로직을 수행하면 되기 때문에, 유사한 여러 객체를 처리할 때 유용합니다.

 

저의 경우에는 데이터를 수집하는 ETL 프로그램에서 덕타이핑을 유용하게 쓰고 있습니다. RestAPI 로 수집하거나, DB 로부터 수집하거나, 파일로부터 수집하거나 “수집”이라는 행동은 공통된 작업이기 때문입니다.

 

그러나 덕 타이핑은 종종 런타임 에러를 낼 수 있는 위험도 있습니다. 예를들어 메소드의 이름이이나 로직이 변경될 때 AttributeError 가 발생할 겁니다.

 

“오리처럼 생겼고, 오리처럼 헤엄치고, 오리처럼 꽥꽥 거린다면 아마도 그것은 오리일 것입니다.”

 

덕 타이핑이란 이름은 위 속담에서 유래가 되었다고 합니다. 

 

끝.

반응형

+ Recent posts