반응형

1.ChatGPT 란?

  • GPT3.5 , GPT4 모델을 기반으로 하는 대화형 인공지능 서비스
  • 인공지능 챗봇
  • 인간의 피드백을 통한 강화학습으로 훈련
  • openAI 라는 회사가 만듦
  • 일론머스크, 샘알트만이 공동 설립.
  • 서비스는 프롬프트 + 응답의 구조

 

2.ChatGPT Simple Application 

  • open ai 의 api 를 이용하여나만의 챗봇 애플리케이션 만들기
  • 구현 내용
    • 파이썬 이용
    • streamlit 패키지를 이용하여 웹 애플리케이션으로 띄움
    • 간단한 입출력 폼 작성
    • openai 의 api 를 이용하여 서비스 로직 작성

3. code (api 호출 부분만)

from dotenv import load_dotenv
from chatgpt_logger import logger
import openai
import os



def get_openai_options():
    openai_model = os.environ.get("OPENAI_MODEL")
    openai_temperature = os.environ.get("OPENAI_TEMPERATURE")
    oepnai_max_token =os.environ.get("OPENAI_MAX_TOKEN") 

    args = {
        'model': openai_model,
        'temperature' : openai_temperature,
        'max_token' : oepnai_max_token,
    }

    return args

def load_env():

    # set environment for application
    load_dotenv()
    version = os.environ.get("VERSION")
    openai_token = os.environ.get("OPENAI_TOKEN")

    version = os.environ.get("VERSION")

    # set openai connection
    openai.api_key=openai_token

    logger.info(f"app version :  {version} \t")


def answer_from_chatgpt(query):
    #query = 'yarn cluster manager의 개념을 알려줘'
    answer = ''
    if query is None or len(query) < 1:
        answer = 'No Response..'
        return answer


    options = get_openai_options()
    response = openai.Completion.create(model=options['model'], prompt=query, temperature=float(options['temperature']),max_tokens= int(options['max_token']))
    res = response['choices'][0]['text']
    answer = res

    return answer

 전체 코드 : https://github.com/jaysooo/chatgpt_streamlit_app

 

GitHub - jaysooo/chatgpt_streamlit_app: Simple Streamlit Application of chatGPT

Simple Streamlit Application of chatGPT. Contribute to jaysooo/chatgpt_streamlit_app development by creating an account on GitHub.

github.com

 

 

 

4. 여담

결혼 전에 구매했던 컴퓨터(mini pc) 에 리눅스를 올려서 홈 서버를 구축했다. nextcloud, vscode server 등 서버에 오픈소스들 올리고, 회사나 외부에서 원격으로 붙어 이것저것 해보는 중이다. 위에 만든 chatGPT app 도 기능 좀 추가하고 나만의 모델을 만들어 학습해서 써볼 생각이다.

 

최근 바빠서 블로그 포스팅, 알고리즘 스터디를 소홀히 하고 있다. 안그래도 바쁜데 파트 내에 함께 했던 좋은 동료들이 이직을 많이 하고 있다. ㅠㅠ .. 좋은 데이터 엔지니어 동료가 필요한 상황이다...  

 

시간이 되면 회사에서 PoC 했던 airbyte on EKS 내용도 올리겠다.

 

 

반응형
반응형

0. 실습 내용

  • airbyte 를 docker container 로 실행한다
  • airbyte 커넥션
    • mysql --> s3 (csv) 
    • sync mode : full refresh
  • mysql 역시 docker container 로 실행한다.
  • 데이터는 공공 데이터를 활용한다.

 

1.install & quick start

  • mysql docker compose
version: "3.7"

services:
  mysql:
    image: mysql:latest
    environment:
      - MYSQL_ROOT_PASSWORD=jssvs
    volumes:
      - ./data:/var/lib/mysql
    ports:
      - 3306:3306
  • airbyte docker compose
## airbyte clone & up 
$ git clone https://github.com/airbytehq/airbyte.git
$ docker-compose -f docker-compose.yaml up -d 



## mysql up
$ docker-compose -f docker-compose.yaml up -d

  • 서비스 진입

2.Sample 데이터 로드 

 

서울교통공사_지하철혼잡도정보_20211231

서울교통공사 1-8호선 30분 단위 평균 혼잡도로 30분간 지나는 열차들의 평균 혼잡도(정원대비 승차인원으로, 승차인과 좌석수가 일치할 경우를 혼잡도 34%로 산정) 입니다.(단위: %). 서울교통공사

www.data.go.kr

 

 

 

3.connection 생성 및 테스트

  • Source 생성하기

  • Destination 생성하기

 

  • 연결 생성하기

  • 연결에서 볼 수 있는 설정 정보들
    • Transfer
      • 복제 주기 - cron , manual 등.
    • Streams
      • 목적지 네임스페이스 설정
      • 목적지 스트림의 prefix 네이밍 설정
      • airbyte 에서는 stream 이 옮겨질 데이터의 대상이고, mysql 의 경우 sync 될 테이블을 의미한다.
    • 원하는 sync source 를 선택할 수 있다.
  • sync Job 및 로그 확인

  • sync 된 데이터 확인

 

반응형
반응형

1.airbyte 란?

  • 오픈소스 데이터 통합 플랫폼 (ELT)
  • api , database, warehouse , application 간 데이터 sync, 즉 동기화 를 돕는 , 가능하게 해주는 소프트웨어 라고 하지만 내생각엔 아직 툴..
  • 데이터 통합 상품
  • 장점
    • built for extensibility
      • 커넥터 추가가 쉽고 확장성을 제공한다.
    • optional nomalized schemas
      • 선택적으로는 스키마를 정규화 할 수 있다.
    • Full grade schfeduler
      • 필요한 만큼 replication 을 자동화 할 수 있다.
    • real - time monitoring
      • 모든 로그를 모니터링 하고, 기능으로 제공한다.
    • incremental updates
      • replication 이 증분 업데이트를 기반으로 동작하여 transfer cost 를 줄여준다.
    • manual full refresh
      • 원할 때 수동으로 refresh 가 가능하다.
  • 단점
    • Stable 릴리즈 버전 없음, 아직 알파
    • 사용자 액세스 관리에 대한 지원 부족

 

2. 왜 airbyte ?

  • 기존 ETL 기반의 아키텍쳐에서 ELT 기반으로 리아키텍쳐링 할 때, 원본 데이터 소스를 이관 및 sync 해주는 역할이 중요해졌고, airbyte 가 시장에 빠르게 진입 했다고 생각한다.
  • 손쉽게 커넥터 설치만으로 데이터 연동 및 sync 가 가능해졌고, 충분한 UI 를 제공하고 있다.
  • airflow 로도 비슷한 구현을 할 수 있지만 개발이 필요한데.. 얘는 개발도 안해도 되고 CDC 옵션까지 제공한다.
  • 커넥터가 정말 많고 다양하게 지원한다.

3. airbyte 구성 요소

  • UI
    • airbyte 사용자를 위한 GUI
  • WebServer
    • UI 와 API 사이에서 발생하는 이벤트를 핸들링 하는 웹 서버
  • Config Store
    • 커넥션 정보들을 담고 있음 ( credential, 주기 등등.)
  • Config API
    • airbyte 의 main controle plane. 직역하면 관리영역이고, airbyte 의 모든 operation(동작) 들. API 콜 포함하여 설정하고 Inovoke 를 수행한다.
  • Scheduler
    • API 로 요청을 받고 Temporal Service 로 병렬적으로 보낸다. 잡의 성공/실패를 트래킹 하는 역할도 있다.
  • Scheduler Store
    • 예약된 스케쥴 job 정보가 저장된 곳
  • Temporal Service
    • 큐에 쌓이는 Task 와 workflow. 를 관리한다.
  • Worker
    • 소스 커넥터에 연결하고, data 를 받아와 목적지에 쓰는 역할을 수행한다
  • 지원되는 동기화 모드
    • full refresh - overwrite - 전체 새로고침 덮어쓰기 , 모든 데이터를 다시 동기화 하고 교체
    • full refresh - append - 전체 새로고침 추가 , 모든 행을 다시 동기화 하고 복제
    • incremental append - 증분 추가 , 새 행을 동기화하고 이미 동기화된 행뒤에 추가
    • incremental dedupe history - 중복 제거된 증분 추가, 새 행을 동기화 하고 동기화된 행을 추가하며 증분 중복제거를 기록

위 구성 컴포넌트들의 이름이 조금 다르게 되있지만 컨테이너의 로그를 보면 대략적으로 어떤 데몬이 해당 컴포넌트 역할을 하는지 알 수 있다.

 

** airbyte 는 job 이 수행될 때 동적으로 해당 job의 컨테이너가 새로 생성되서 동작한다. 관리형 컨테이너 서비스를 써서 구축한다면 참고하길 바란다.

 

 

참조 

https://airbytehq.github.io/understanding-airbyte/high-level-view/

 

Architecture overview | Airbyte Documentation

A high level view of Airbyte's components.

docs.airbyte.com

 

 

 

반응형
반응형

1.사전준비

  • Docker , Docker Compose 가 설치되어 있어야 한다.
  • 카프카 cli  사용을 위해 kafka binary 버전을 미리 받아 실행 준비를 해둔다.
 

Apache Kafka

Apache Kafka: A Distributed Streaming Platform.

kafka.apache.org

2. Docker-compose 작성

.env

CONFLUENT_VERSION=7.0.1

docker-compose.yml

version: '3'

networks:
  jssvs-net:
    external: true

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:${CONFLUENT_VERSION}
    hostname: zookeeper
    restart: on-failure
    ports:
      - 2181:2181
    volumes:
      - ./zookeeper/data:/data
      - ./zookeeper/logs:/datalog
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    networks:
      - jssvs-net

  kafka-1:
    image: confluentinc/cp-kafka:${CONFLUENT_VERSION}
    hostname: kafka-1
    restart: on-failure
    ports:
      - 9091:9091
    depends_on:
      - zookeeper
    volumes:
      - ./kafka-1/data:/var/lib/kafka/data
    environment:
      KAFKA_BROKER_ID: 101
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-1:29092,EXTERNAL://localhost:9091"
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9091"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'false'
    networks:
      - jssvs-net
  kafka-2:
    image: confluentinc/cp-kafka:${CONFLUENT_VERSION}
    hostname: kafka-2
    restart: on-failure
    ports:
      - 9092:9092
    depends_on:
      - zookeeper
    volumes:
      - ./kafka-2/data:/var/lib/kafka/data
    environment:
      KAFKA_BROKER_ID: 102
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-2:29092,EXTERNAL://localhost:9092"
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9092"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'false'
    networks:
      - jssvs-net

  kafka-3:
    image: confluentinc/cp-kafka:${CONFLUENT_VERSION}
    hostname: kafka-3
    restart: on-failure
    ports:
      - 9093:9093
    depends_on:
      - zookeeper
    volumes:
      - ./kafka-3/data:/var/lib/kafka/data
    environment:
      KAFKA_BROKER_ID: 103
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_INTER_BROKER_LISTENER_NAME: "INTERNAL"
      KAFKA_ADVERTISED_LISTENERS: "INTERNAL://kafka-3:29092,EXTERNAL://localhost:9093"
      KAFKA_LISTENERS: "INTERNAL://:29092,EXTERNAL://:9093"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'false'
    networks:
      - jssvs-net



  manager:
    image: obsidiandynamics/kafdrop
    restart: "no"
    ports:
      - 9000:9000
    environment:
      KAFKA_BROKERCONNECT: kafka-1:29092
      JVM_OPTS: "-Xms16M -Xmx48M -Xss180K -XX:-TieredCompilation -XX:+UseStringDeduplication -noverify"
    depends_on:
      - zookeeper
      - kafka-1
      - kafka-2
      - kafka-3
    networks:
      - jssvs-net

 

- 참조 (https://github.com/jaysooo/kafka-flink-stack-docker-compose)

3. 컨테이너 생성 및 실행

# 1) network 생성
$ docker network create jssvs-net

# 2) docker-compose up
$ docker-compose -f docker-compose.yml up -d

-> 브라우저에서 http://localhost:9000 로 접속하면 kafdrop 이라는 web UI 로 카프카 정보를 확인할 수 있다

 

4. 간단한 실습

카프카 binary 버전을 다운로드 받은후 하위  bin 디렉토리에 CLI 쉘 스크립트를 이용한다.

kafdrop 에서도 토픽 생성은 가능하다.

 

# 1) 토픽 생성
$ ./kafka-topics.sh --bootstrap-server 127.0.0.1:9091 --topic my-topic1 --create --partitions 1 --replication-factor 1

# 2) 토픽 정보 보기
$ ./kafka-topics.sh --bootstrap-server 127.0.0.1:9091 --topic my-topic1 --describe

# 3) 토픽 구독하기 (consumer)
$ ./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9091 --from-beginning --topic my-topic1

# 4) 토픽 삭제 하기
$ ./kafka-topics.sh --bootstrap-server 127.0.0.1:9091 --topic my-topic1 --delete

 

 

5. 마치며

  • 회사에서 운영 목적으로 사용할 flink 를 공부하면서 카프카도 함께 공부 하고 있는데, 어렵기도 하고 시간이 너무 부족하다.
  • 주로 실시간 데이터는 kinesis 로 받아서, firehose + lambda 기반 ETL 처리로 손쉽게 구성했었는데 아무래도 현업에서는 카프카가 더 많이 쓰이기도 하고, 최근에 트렌드가 데비지움 카프카 커넥트로 CDC -> kafka 구성을 많이 하는 것 같다.
  • 다음 포스팅에서 카프카와 연동하는 producer application 과 consumer application 을 간단하게 작성해서 업로드 하겠다. 
  • 바쁘다.. 바뻐  
반응형

'Data Engineer' 카테고리의 다른 글

airbyte(에어바이트) 구축 및 실습  (0) 2023.01.02
airbyte (에어바이트) 기초  (1) 2023.01.02
Apache kafka (카프카) 기초  (1) 2022.08.19
kubernetes 기초 (1)  (0) 2021.12.01
pyspark 기초 (1)  (0) 2021.10.03
반응형

1. 카프카란?

  • 분산 이벤트 큐.
  • 분산 이벤트 스트리밍 플랫폼
  • 카프카 컨슈머, 프로듀서, 스트림즈, 커넥트 등 연동 API 제공
  • 초당 수백만개 데이터를 처리할수 있으므로 빅데이터에 적합
  • 분산 데이터를 통해 24시간 365일 안전하게 데이터를 처리할 수 있는 고가용성 기능 제공

2. 왜 카프카?

  • 고가용성
    • 서비스에 지장없는 운영을 보장.
  • 낮은 지연
  • 확장성
  • 높은 처리량
    • 높은 처리량을 감당하지 못한다면, 서비스를 유지하기 힘듦

3. 카프카 구성 요소

브로커

  • 설치된 카프카의 서버 단위
  • 보통은 3대로 구성

주키퍼

  • 코디네이션 애플리케이션
  • 브로커 서버와 통신하며 상태관리, 컨슈머와의 통신, 카프카 메타데이터 정보를 저장함.

토픽

  • 구체화된 이벤트 스트림 = 쉽게 큐로 이해하면 됨
  • 하나의 토픽에 여러 Producer / Consumer 가 존재할 수 있다.
  • 토픽은 담는 데이터에 따라 이름을 줄 수 있다.

컨슈머

  • 카프카와 통신하면서 메세지를 구독함
  • 기본적으로 가장 오래된 순서대로 가져감 - 0번 오프셋부터
  • 새로운 컨슈머가 구독을 하게 되도 가장 오래된 순서대로 가져감
    • auto.offset.reset = earliest 인경우

프로듀서

  • 카프카와 통신하면서 메세지를 생산함

 

파티션

  • 카프카의 토픽들은 여러 파티션으로 나눠짐.
  • 파티션의 끝에서 0번 부터 차곡차곡 쌓이게 됨
  • 토픽 = 논리적인 개념이라면, 파티션은 물리적인 저장소에 저장하는 단위
  • 각 파티션은 Append-only 방식으로 기록됨
  • 특정 파티션으로 데이터를 쓸수 있고, 명시되있지 않으면 RoundRobin 방식으로 파티션을 배정한다
  • 파티션을 늘린다면?
    • 파티션을 다시 줄일수는 없다.
    • 컨슈머 개수가 늘어날때 분산 처리할 수 있다.
    • 신규 데이터는 2개의 파티션 중어디로 들어갈까?
    • 보통은 라운드로빈으로 파티션을 할당함
    • 키의 해시값으로 저장.
  • 파티션 삭제 주기는?
    • log.retention.ms : 최대 record 보존 시간
    • log.retension.byte : 최대 record 보존 크기

오프셋

  • 각각 파티션의 레코드는 Offset 식별자 정보를 가짐, 데이터 번호
  • 카프카는 메세지 순서를 보장 하지 않음. 하지만 파티션이 1개라면 보장할지도?

 

파티셔너 

  • 데이터를 토픽에 어떤 파티션에 넣는지 결정하는 역할을 함
  • 메세지 키 또는 메세지 값에 따라 파티션이 결정됨
  • hash(키) = 파티션 넘버

레플리케이션

  • replication 이 1인 토픽은 하나의 브로커에만 저장됨.
  • replicaion 이 2라면 원본 하나, 복제본 1개의 포티션이 2개의 브로커에 저장됨
  • 따라서 replication 개수 ≤ 브로커 서버 개수
  • 원본 파티션 = Leader 파티션, 복제본 파티션 = follow 파티션
반응형
반응형

1. 쿠버네티스란?

  • 오픈 소스 플랫폼
  • 오케스트레이션 시스템
  • 컨테이너화된 워크로드와 서비스를 관리

는 공식 홈페이지에 소개 내용이지만, 내가 이해한 내용은 이렇다.

  • 런타임 의존성과 함께 애플리케이션을 패키징하는 컨테이너를 많이 쓰기 시작한다. -> 컨테이너 런타임을 지원하면 어디에서 실행하던 동일한 동작을 보장하니까
  • 하지만 컨테이너는 변경이 불가하다. 즉 애플리케이션의 업데이트가 발생하면 새로 이미지를 빌드 해야 한다.
  • 쿠버네티스는 이러한 경우 서비스 중단 없이 업데이트를 가능하게 해주고, 이 밖에 운영 측면의 스케일링, 리소스 관리 등을 해주는 녀석이다.

2. 왜 쿠버네티스를 사용하지?

  • high Availability = no downtime
  • Scalability
  • High Performance
  • Disaster recovery

위 장점때문에 사람들이 많이 사용하는 것 같은데 구성이 멀티 클러스터로 되어있기 때문이라고 생각한다면 저 장점을 더 이해하기 쉬울 것 같다.

 

3. 쿠버네티스 아키텍쳐

  •  
  • 일반적인 쿠버네티스 배포 → 쿠버네티스 클러스터 생성
  • 쿠버네티스 클러스터
    • 모든 클러스터는 최소 한 개의 워커 노드, 1개의 마스터 노드를 가짐
    • 워커 노드
      • 동작중인 파드를 유지시키고, 쿠버네티스 런타임 환경을 제공하며 모든 노드 상에서 동작
    • 컨트롤 플레인 = 마스터 노드
      • 워커노드와 클러스터 내 파드를 관리한다
      • 컨트롤 플레인이 여러 컴퓨터에 걸쳐 실행되고, 클러스터는 여러 노드를 실행하므로 내결함성과 고가용성이 제공됨

 

컨트롤 플레인

  • 주요 컴포넌트
    • kube-apiserver
      • 쿠버네티스 컨트롤 플레인의 프론트 엔드
      • UI, API, CLI 를 제공
    • etcd
      • 모든 클러스터 데이터를 담는 쿠버네티스 뒷단의 저장소
      • 키-벨류 저장소
      • kubernetes backking stroe
    • kube-scheduler
      • 노드가 배정되지 않은 새로 생성된 파드를 감지
      • 실행할 노드를 선택하는 컨트롤 플레인 컴포넌트
      • ensure pods placement
      • 서버의 리소스또한 감지한다. 몇번 노드에서 메모리 30% 쓰고 뭐 이런 정보들
    • kube-controller-manager ( cm )
      • 컨트롤러 프로세스를 실행하는 컨트롤 플레인 컴포넌트
      • 4가지 구성요소
        • 노드 컨트롤러: 노드가 다운되었을때 통지와 대응에 관한 책임
        • 레플리케이션 컨트롤러 : 시스템의 모든 레플리케이션 컨트롤러 오브젝트에 대해 알맞은 수의 파드들을 유지시켜주는 책임
        • 엔드포인트 컨트롤러 : 엔드포인트 오브젝트를 채움 ( 서비스와 파드 연결 )
        • 서비스 어카운트 & 토큰 컨트롤러 : 새로운 네임스페이스에 대한 기본 계정과 api 접근 토큰을 생성
        • keeps track of whats happening in the cluster
    • cloud-contgroller-manager (ccm)
      • 클라우드 별 컨트롤 로직을 포함하는 쿠버네티스 컨트롤 플레인 컴포넌트
      • 3가지 구성요소
        • 노드 컨트롤러 : 노드가 응답을 멈춘 후 클라우드 상에서 삭제되었는지 판별하기 위해 클라우드 제공 사업자에게 확인
        • 라우트 컨트롤러 : 기본 클라우드 인프라에 경로를 구성하는 것
        • 서비스 컨트롤러 : 클라우드 제공 사업자 로드밸런서를 생성, 업데이트, 그리고 삭제 하는 것

워커 노드

  • 주요 컴포넌트
    • kubelet
      • 클러스터 각 노드에서 실행되는 에이전트
      • 파드에서 컨테이너가 동작하도록 관리
    • kube-proxy
      • 클러스터의 각 노드에서 실행되는 네트워크 프록시
      • 노드의 네트워크 규칙을 유지 관리
    • 컨테이너 런타임
      • 컨테이너 실행을 담당하는 소프트 웨어
      • 컨테이너 런타임 인터페이스를 구현한 모든 소프트웨어 지원
반응형
반응형

1. Pyspark?

파이썬으로 스파크를 사용하기 위한 목적으로 만들어진 인터페이스이다.

파이썬 API 를 이용할 수 있으면서, 스파크 애플리케이션을 작성할 수 있다. 

interactive 하게 pyspark shell 도 제공한다.

Spark SQL, DataFrame, streaming , MLlib 그리고 Spark Core 를 지원한다.

 

2. 왜 PySPark?

java, scala 로 작성된 코드가 성능이 조금 더 뛰어나다고 알려져 있다. 

하지만 빅데이터 진영에서는 파이썬이 친숙한 사람이 많고, 생산성이 더 난다고 생각한다.

 

3. Pyspark 기초 -  Dataframe 생성 및 다루기

** 개발할 때 비교적 자주 사용되는 함수만 다룰 것 이다.

- 조건 조회와 출력, 그리고 withColumn 은 정말 많이 사용되는 것 같다.

 

from pyspark.sql.types import *
from pyspark.sql.functions import col,lit

# 멤버 컬럼
member_columns = ['group_cd','join_dt','name','login_cnt']

# 멤버 데이터
member_data = [
    ('k-00001','2021-07-31','name1',10),
    ('k-00002','2021-06-31','name2',10),
    ('k-00003','2021-08-31','name3',10),
    ('k-00004','2021-03-31','name4',12),
    ('k-00001','2021-04-31','name5',11),
    ('k-00002','2021-05-31','name6',15),
]

# 데이터 프레임 생성
member_df = spark.createDataFrame(data=member_data,schema=member_columns)

#스키마 구조 출력 
member_df.printSchema()

# withColumn  - 새 컬럼 추가. literal 로 초기화해줘야 한다.
member_df= member_df.withColumn("use_flag",lit(False))

member_df.printSchema()

 

#pyspark 에서 countDistinct 사용을 위해서는 lib 를 선언 해줘야 하다니..
from pyspark.sql.functions import countDistinct
from pyspark.sql import functions as F


# 행 출력
member_df.show(10,truncate = False)

# 특정 컬럼 출력
member_df.select('join_dt','name','group_cd').show(10)

# groupby 출력
member_df.groupBy("group_cd").count().sort("count",ascending=True).show()

# filter 조건 출력 
member_df.filter(member_df.login_cnt > 10).show()

# filter 조건을 복수 개로 줘보자
member_df.filter(
    (F.col("group_cd") == "k-00001") &
    (F.col("login_cnt") > 10)
).count()


# where 조건 출력
member_df.where(my_df.acntid=='name1').show()

member_df.filter("name == 'name1'").show()

 

반응형
반응형

1.EDA란?

Exploratory Data Analysis 의 줄임말로, 탐색적 데이터 분석이란 의미다.

내가 이해한바로는 데이터에서 유의미한 인사이트를 찾기 위해 다각도로 시각화하는 작업을 포함한 분석 정도로 이해했다.

 

2.왜 EDA 툴을 사용하지?

EDA 작업은 시간과 노력이 많이 들어간다. 파이썬 패키지를 통해 코드 몇줄로 이러한 시간 및 고생을 줄일 수 있다.

 

3. 레포트 생성

 a) 사전 준비 하기

  • DataPrep 이라는 패키지를 이용해서 레포트를 만들어 볼 것이다.
  • jupyter 노트북이 설치 되어 있어야 한다
  • pandas,dataprep 이 설치 되어 있어야 한다.
# pandas 설치
$ pip install pandas


# dataprep 설치 https://pypi.org/project/dataprep/
$ pip install -U dataprep
  • 데이터를 준비한다
    • 공공 데이터 포털에서 쉽게 얻을 수 있다.  ( https://www.data.go.kr/index.do )
    • csv 파일로 준비하면 좋다. 엑셀이나 다른 파일도 가능은 한데 pandas로 읽기 위한 작업을 추가로 해야 할 것이다.

b)  데이터 읽기 

  • 공공 데이터 포털에서 받았다면, csv 를 읽어 pandas 로 Load 할 때 인코딩 타입은 cp949 로 설정해야 문제 없이 실행될 것이다
  • 패키지 import와 데이터를 읽어서 눈으로 확인해보자

c) 레포트 생성하기

  • create_report 함수를 이용해서 레포트를 생성한다.
  • 자동으로 생성된 레포트에는 데이터의 기초 통계, 변수별 통계, 상관관계 및 Interactions 차트 등 을 확인 할 수 있다. 

4. 코드 

# 설치 https://pypi.org/project/dataprep/
$ pip install -U dataprep

# 패키지 선언
from dataprep.eda import create_report

#csv 읽기, dataframe 생성하기
sample_df = pd.read_csv('sample_data.csv',encoding='cp949')

#리포트 생성하기
create_report(sample_df)

 

반응형

'Data Engineer' 카테고리의 다른 글

kubernetes 기초 (1)  (0) 2021.12.01
pyspark 기초 (1)  (0) 2021.10.03
[airflow] HA - health 체크를 이용한 운영  (0) 2021.06.05
Elasticsearch (ES) 기초 (1) - CRUD  (0) 2021.06.03
Apache Spark 기초 (2)  (0) 2021.06.02
반응형

1. HA (High Availability)

고가용성이라고도 하는데, 얼마나 운영이 오래될 수 있는가에 대한 성질이다. 에어플로우 또한 고가용성을 높이기 위해 클러스터로 운영하기도 하고, 나의 경우처럼 어떤 대안을 생각하기도 해야 한다.

2. 왜 헬스체크 (HealthCheck)를 ? 

단일 서버로 에어플로우를 운영해보니, 스케쥴러 프로세스가 내려가는 일이 자주는 아니지만 4~5개월에 한번 씩 간헐적으로 발생했다. ( 마지막 로그에는 메세지 큐로 이용중인 레디스와의 커넥션 타임아웃 로그가 마지막에 찍혀있었다. )

 

워크플로우의 작업 누락을 막으려면 어떤 조치가 필요했다. 에어플로우에서는 Rest API 로 헬스체크 기능을 제공했다.

>request
curl -XGET "http://에어플로우 서버 :5000/health"

>response
{
    "metadatabase": {
        "status": "healthy"
    },
    "scheduler": {
        "status": "healthy",
        "latest_scheduler_heartbeat": "2020-04-13T01:38:51.283786+00:00"
    }
}

3. go ticker를 이용하여 스케쥴러 알림 및 실행

일정 시간 마다 헬스 체크 Rest API로 상태를 받아와 스케쥴러가 UnHealthy 상태로 내려갈 때, 알림을 주거나 프로세스를 다시 올리는 프로그램을 만들어 볼 수 있다.

 

물론 작성한 프로그램이 죽어버리면 안되기 때문에, 서버에 sudo 권한이 있다면 ticker 부분을 제외하고 crontab 으로 걸어 더 안전하게 운영을 가져갈 수 있다.

 

더 좋은 방법은 managed service 를 이용하는 것이다. aws, gcp 모두 검색해보면 서비스가 있다.

package main

import (
	"net/http"
	"fmt"
	"time"
	"bytes"
	"io/ioutil"
	"github.com/multiplay/go-cticker"	// ref. https://github.com/multiplay/go-cticker
)

type AFStatus struct {
    Metadatabase  Status  `json:"metadatabase"`
    Scheduler Status `json:"scheduler"`
}

type Status struct {
    Status  string `json:"status"`
    Latest_scheduler_heartbeat  string `json:"latest_scheduler_heartbeat"`
}

func sendWatchAlert(msg string){
	// 장애 처리 부분 작성 
}

func healthCheck(){
    resp,err:=http.Get("https://myairflow_host/health")
    msg_tmpl:="[장애] airflow healthcheck ..\\n\\nscheduler status : {scheduler_status} \\nmetadatabase status : {metadatabase_status}\\nLatest_scheduler_heartbeat: {Latest_scheduler_heartbeat}"
    var  af  AFStatus
    if err!=nil{
        fmt.Println("[E] http request error .. ")
        fmt.Println(err)
    }

    if(resp.StatusCode != 200) {
        sendWatchAlert("[장애] airflow healthcheck \\n\\nwebserver status: unHealthy")
        fmt.Println("[E] http response 200 error .. ")
        return
    }
    buf := new(bytes.Buffer)
    buf.ReadFrom(resp.Body)
    newStr := buf.String()
    err = json.Unmarshal([]byte(newStr),&af)
    if err != nil {
        fmt.Println("[E] Unmarshal  Error .. ")
        fmt.Println(err)
    }

    msg:=strings.Replace(msg_tmpl,"{scheduler_status}",af.Scheduler.Status,1)
    msg=strings.Replace(msg,"{metadatabase_status}",af.Metadatabase.Status,1)
    msg=strings.Replace(msg,"{Latest_scheduler_heartbeat}",af.Scheduler.Latest_scheduler_heartbeat,1)

    if af.Scheduler.Status != "healthy" || af.Metadatabase.Status != "healthy" {
        sendWatchAlert(msg)
    }
}



func main() {
    fmt.Println("[L] airflow health checker start..")
	
//  interval:=30
//  flag.IntVar(&interval,"-i",180,"ticker interval")
//  flag.Parse()

    healthCheck()
    t := cticker.New(time.Second * 60 ,time.Second) //duration , accuracy
    for tick := range t.C {
        // Process tick
        healthCheck()
        //fmt.Println("tick:", tick)
    }
}

ref. https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/check-health.html

 

Checking Airflow Health Status — Airflow Documentation

 

airflow.apache.org

 

 

반응형
반응형

1. ES 데이터 구조

  • Document
    • 데이터 최소 단위
    • 관계형 Database 에서 Row 단위
  • Type
    • 관계형 Database에서 table 단위
    • 1Type = N개 * Documents
  • Index
    • 관계형 Database에서 DB단위
    • 1 Index = N개 * Types

**ES 7.0 버전 부터는 도큐먼트 타입 개념이 사라짐. 우리는 6.x 버전 대네요.. (업그레이드도 가능 )

2.도규먼트 삽입

PUT 방식 (document id를 선택해서 삽입할 때)
curl -XPUT http://localhost:9200/books/book/1 -d
'{
"tile": "Nesoy Elastic Guide",
"author": "Nesoy",
"date": "2019-01-15",
"pages": 250
}'


PUT 방식 (document id를 선택해서 삽입할 때)
http://localhost:9200/jayden_idx01/jayden_type01/1
{
  "name":"jayden",
  "message":"안녕하세요 Elasticsearch"
}

POST 방식 (document id를 임의로 줄때)
http://localhost:9200/jayden_idx01/jayden_type01
{
  "name":"jayden",
  "message":"test1"
}

3.도큐먼트 조회

POST 전체 조회 (인덱스 별)
http://localhost:9200/jayden_test01/_search
{
  "query": {
    "match_all": {}
  }
}



POST /인덱스/_search?pretty 조건 조회
http:/localhost:9200/jayden_idx01/_search
{
  "query": {
    "bool": {
      "must": [
        { "match": { "name": "jayden" } }
      ]
    }
  }}

POST
http://localhost:9200/jayden_idx01/_search
{ 
   "query" : {
        "term" : { "name" : "jayden" }
    }
}


POST 정렬 조건 조회
http://localhost:9200/jayden_idx01/_search
{
"size":1,
"sort":[
{ "@timestamp" : { "order":"desc"} }
],
"query":{
"term":{ "조건 필드": "mill" }
}

}

4.도큐먼트 삭제

DELETE /인덱스
http://localhost:9200/jayden_idx01/

 

반응형

+ Recent posts