반응형

1.이슈 내용

  • 최근 EMR Cluster 에서 도커 이미지를 이용해 Spark - submit 실행하는 job 을 추가함.
  • 테스트 시에는 문제가 되지 않았으나, 새벽 배치에서 장애 발생

2.장애 또는 에러 내용

  • Docker inspect command : /usr/bin/docker inspect —format {{.State.ExitCode}} container_iD_appid_…..
  • Exit code from docker inspect : 1

3.이슈 원인

  • EMR 에서 Scale Out 시 Task 노드를 임의로 할당 받게 된다. (Spot instance 사용 중)
  • Task 노드의 커널 아키텍쳐가 이미지 내 빌드된 프로세서 아키텍쳐와 호환이 안될 가능성이 있다.
    • 내가 amd64 아키텍쳐로 빌드를 했는데 실제로 arm64 아키텍쳐의 노드 위에서 실행 될 경우..
  • docker inspect 는 실행 전 이미지를 검사하는 단계로 보여진다. 

4.해결방법

  • buildx 를 이용하여 멀티 프로세서 아키텍쳐에 맞게 빌드한다.
  • 참고로 buildx 를 이용할 때 빌드와 push 만 가능하다.
1. buildx 활성화 및 확인
$ docker buildx version
$ docker buildx ls

2. buildx 초기화 및 빌더 인스턴스 생성
$ docker buildx create --name multi-archi-builder
$ docker buildx use multi-archi-builder


$ docker buildx inpsect --bootstrap 
-- bootstrap 은 초기화 프로세스 플래그

3. build 와 동시에 push 하기
$ docker buildx build --platform linux/arm64,linux/amd64 -t myapp:latest --push .

4. 이미지 확인하기
$ docker manifest inpsect [이미지]:[태그]
 

5.회고

- 사실 aws 코리아 기술자 분들과 미팅을 하면서,쿠버네티스 환경에서는 꼭 컨테이너 이미지의 멀티 아키텍쳐 빌드가 필요하다는 내용을 팁으로 들었었다. 그때는 잘 끄덕였는데.. 귀로만 흘려들었더니, 새벽 4시에 일어나 장애 대응을 해야 했던 결과를 맞았다.

최근 배치 장애도 잦고, 더 꼼꼼해야 겠다. 그리고 반성하자

반응형
반응형

[개발자 일상] 두 번의 깨달음

 

사회 초년생일 때는 야근이 잦았다. 

그때 항상 사무실에 남아 멀리서 나와 야근을 함께 하는 40대 동료 사우가 있었다. (40대 동료라고 하면 놀랄 수 있겠다. 첫 회사는 수평 문화의 회사였다.) 

 

왜 저 사람은 맨날 집에 늦게 갈까? 매일 야근 할 정도로 일이 많을까? 혹시 아기 보기 싫어서, 늦게 퇴근하는 거 아냐?  정말 별로인 아빠네.. 라고, 속으로 욕했던 기억도 난다. 

 

이런… 그 사람의 모습을 내가 갖게 됐다…

 

이상하게 저녁에 업무 집중이 잘 되고, 야근 후에 집에 돌아가면 아이가 자고 있어 몸이 편했다. 왜 아빠들이 늦게 퇴근하는지 깨달은 것 같아..  

 

애기가 생기면서 주말과 개인 시간이 사실상 소멸했고, 게임은 커녕 공부할 시간도 모자라다보니 집중할 수 있는 시간이 보장된 회사에서 채우게 되고, 그러다 보니 퇴근 시간이 늦어지는 것 같다는 나의 그럴듯한 핑계다.

 

그렇게 점점 회사에서 보내는 시간이 길어지면서 육아 시간도 줄었다.

 

너무 더워진 요즘 아기 방의 온도를 체크하러 잠깐 들어갔는데 자고 있는 얼굴을 보다 행복을 느꼈다. 

 

“아빠”라는 말을 마지막으로 들은 지 꽤 된 것 같다. 휴대폰에는 마지막 아이의 사진이 2주 전이다. 

 

무엇이 중요한지 깨달았다. 힘들어도 아이와 보내는 시간과 추억이 더 소중할 것 같다. 

 

주말에는 책을 많이 읽어줘야겠다. 

 

반응형
반응형

1.목표 (what & why)

  • fast 를 이용해본다
  • 유지보수와 확장 가능한 구조의 프로젝트 구조를 만든다.
  • 쿠버네티스 기반 컨테이너 서비스로 동작하도록 만든다.
  • fast api application   
    • 별도 SMTP 서버로 이메일 전송을 대신할 수 있는 api 서버 구현 ( 상세한 구현 로직은 포함하지 않음. )
    • 인증은 기본 토큰 방식을 사용한다.
  • 만드는 목적과 이유
    • 보통 이런 REST API + Application 실행 요구사항은 AWS 람다를 대부분 이용했다. 그리고 이용하면 편하고..
    • AWS 람다는 서버리스기반으로 실행되기 때문에 고정 IP 를 부여하기 힘들다. (서브넷의 CIDR 를 조정하면 될것 같긴 하지만 그렇게는 아무도 안할 것 같다)
    • 화이트 리스트로 등록되어야만 실행을 하게 하는 서비스 들이 있다. 예를 들어 SMTP 서비스 들. 

 

2.과정 (Step)

  • fastapi application 을 작성한다
  • Dockerfile 을 작성하고 컨테이너 이미지를 생성한다
  • 쿠버네티스 Deployment yaml 을 작성하고, 파드, 서비스를 작성한다.
  • 배포와 자동화 스크립트를 만들어본다.
  • 사전 준비 과정
    • Docker , 쿠버네티스, 파이썬 개발 환경의 준비가 되어있어야 한다.

 

3. 방법 (How)

fastapi application 작성한다.

- github  -https://github.com/jaysooo/service-common-api

 



 

Dockerfile 을 작성하고 컨테이너 이미지를 생성한다

from python:3.11-alpine

# set env
WORKDIR /usr/src
# source copy
COPY ./app ./app

ENV PYTHONPATH /usr/src/app

# package install
RUN pip install --upgrade pip && pip install --no-cache-dir -r ./app/requirements.txt

EXPOSE 9000

# container execution
CMD python -m uvicorn app.main:app --reload --host=0.0.0.0 --port 9000

 

이미지 생성과 실행 테스트를 해본다.

#!/bin/bash
echo "[LOG] 01. local deploy.."
IMAGE_NAME=demoapp
IMAGE_VERSION=1.0
TARGET_PORT=9000
#echo ${IMAGE_NAME}:${IMAGE_VERSION}


echo "[LOG] 02. container cleansing.."
docker stop `docker ps | grep -e "${IMAGE_NAME}:${IMAGE_VERSION}" | awk '{print $1}'`
docker rmi -f ${IMG_NAMME}:${IMAGE_VERSION}

echo "[LOG] 03. image build.."
docker build -t ${IMAGE_NAME}:${IMAGE_VERSION} .

echo "[LOG] 04. container run.."
docker run -p ${TARGET_PORT}:${TARGET_PORT} -d ${IMAGE_NAME}:${IMAGE_VERSION}

docker ps | grep -e ${IMAGE_NAME}

 

이미지 빌드 및 실행 화면

 

 

API 실행 화면

 

반응형
반응형

 

친한 선배이자 개발자 동료인 지인이 1일 1글쓰기를 시작 했다며 블로그를 공유해줬다. 그는 교육 강사, 개발자, 또는 회사 대표로서 꾸준히 일한다.

 

주변 사람에게 선한 영향력을 주고, 그를 보면서 자극을 많이 받아 열심히 노력하게 된다. ( 여기 -> https://www.alghost.co.kr/ )

 

그래서..   

 

나 역시 내가 겪는 경험과 생각을 정리하면서, 자유롭게 글을 써보기로 마음먹었다. (마침 창 밖에 빗소리가 내 감성을 채워준다.)

 

 

 

마침 회고할 것이 있다. 가끔 머릿 속 생각을 입밖으로 꺼낼 때가 있다. 그리고 대부분 후회한다.

팀이 하반기 해야 할 일과 방향성을 고민해보고 나눠보는 자리에서, 타부서의 업무지만 우리가 함께 해야 할 것 같아 이야기를 꺼냈다가 싫은 소리를 듣게 됐다.

 

부서가 담당한 역할과 일이 분명히 구분되있다. 그리고 회사에서는 업무 하나라도 A팀에서 할지 B팀에서 할지.. 담당자들의 보이지 않는 줄다리기를 해야 한다.

 

난 데이터 엔지니어 입장에서 본다면, 데이터를 수집하는 영역은 우리가 참여해야 할 일이라고 생각했다. 그 데이터는 팀에서 활용 의존도가 높아지고 있다. 우리도 직접 만들면 요청하고 기다리지 않아도 되고, 모두가 좋은거 아닌가 싶었다.

 

그렇게 오전 회의에서 하지 않아도 될 말을 입 밖으로 꺼내 긁어 부스럼을 만드는 꼴이 됐다. 이런 부분에서 내가 아직 많이 부족하다고 느꼈다. 사실 이전 직장에서도 나의 이런 부분은 고쳐야 한다고 팀장님께서 많이 말씀하셨다. (쉽게 고쳐지지 않지만…)

 

데이터 엔지니어로서의 나와 빅데이터 플랫폼팀에서 팀원으로서 나의 역할은 같은 것 같지만 다르다. 난 두가지 Role 안에서 신중히 고민하고 의견을 내야 한다.

 

아직은 R&R 을 따져가며 정치적으로 일하고 싶지 않은 심리가 솔직히 어느정도 있다.

 

현실적인 것보다 이상적인 것에 더 끌리는 편이다.

 

내 머릿속 고민의 총량이 있다면, 정치적인 고민보다는 기술적인 고민으로 채우고 싶은 거다.

 

결론은.. 난 팀장을 하면 안되는 사람이다. 팀원이 피곤해질거니까

 
 
반응형
반응형

1. 문제 소개

  • prefix sum 과 투포인터 문제다.

2. 코드

import sys
input = sys.stdin.readline


def solution():
    input_str = input().split(" ")
    n, s = int(input_str[0]), int(input_str[1])
    input_str = input().split(" ")
    arr = [int(x) for x in input_str]
    prefix_sum = [0] * (n+1)

    # set two pointer
    sp = 0
    ep = 1
    min_cnt = n

    # set prefix_sum
    while(ep <= n):
        prefix_sum[ep] = prefix_sum[ep-1] + arr[ep-1]
        ep += 1

    ep = 0

    if prefix_sum[-1] < s:
        min_cnt = 0
    else:
        while(sp <= ep and ep <= n):
            if (prefix_sum[ep] - prefix_sum[sp]) >= s:
                if min_cnt > (ep-sp):
                    min_cnt = (ep-sp)
                sp += 1
            else:
                ep += 1

    print(min_cnt)


if __name__ == '__main__':
    solution()

3.코멘트

  • 문제에 부분합의 충족조건을 만족하지 않는 경우 0 을 출력하는 조건을 놓쳐 시간을 꽤 썼다.
  • 문제를 대충 읽지 말자..
 
반응형
반응형

1.ChatGPT 란?

  • GPT3.5 , GPT4 모델을 기반으로 하는 대화형 인공지능 서비스
  • 인공지능 챗봇
  • 인간의 피드백을 통한 강화학습으로 훈련
  • openAI 라는 회사가 만듦
  • 일론머스크, 샘알트만이 공동 설립.
  • 서비스는 프롬프트 + 응답의 구조

 

2.ChatGPT Simple Application 

  • open ai 의 api 를 이용하여나만의 챗봇 애플리케이션 만들기
  • 구현 내용
    • 파이썬 이용
    • streamlit 패키지를 이용하여 웹 애플리케이션으로 띄움
    • 간단한 입출력 폼 작성
    • openai 의 api 를 이용하여 서비스 로직 작성

3. code (api 호출 부분만)

from dotenv import load_dotenv
from chatgpt_logger import logger
import openai
import os



def get_openai_options():
    openai_model = os.environ.get("OPENAI_MODEL")
    openai_temperature = os.environ.get("OPENAI_TEMPERATURE")
    oepnai_max_token =os.environ.get("OPENAI_MAX_TOKEN") 

    args = {
        'model': openai_model,
        'temperature' : openai_temperature,
        'max_token' : oepnai_max_token,
    }

    return args

def load_env():

    # set environment for application
    load_dotenv()
    version = os.environ.get("VERSION")
    openai_token = os.environ.get("OPENAI_TOKEN")

    version = os.environ.get("VERSION")

    # set openai connection
    openai.api_key=openai_token

    logger.info(f"app version :  {version} \t")


def answer_from_chatgpt(query):
    #query = 'yarn cluster manager의 개념을 알려줘'
    answer = ''
    if query is None or len(query) < 1:
        answer = 'No Response..'
        return answer


    options = get_openai_options()
    response = openai.Completion.create(model=options['model'], prompt=query, temperature=float(options['temperature']),max_tokens= int(options['max_token']))
    res = response['choices'][0]['text']
    answer = res

    return answer

 전체 코드 : https://github.com/jaysooo/chatgpt_streamlit_app

 

GitHub - jaysooo/chatgpt_streamlit_app: Simple Streamlit Application of chatGPT

Simple Streamlit Application of chatGPT. Contribute to jaysooo/chatgpt_streamlit_app development by creating an account on GitHub.

github.com

 

 

 

4. 여담

결혼 전에 구매했던 컴퓨터(mini pc) 에 리눅스를 올려서 홈 서버를 구축했다. nextcloud, vscode server 등 서버에 오픈소스들 올리고, 회사나 외부에서 원격으로 붙어 이것저것 해보는 중이다. 위에 만든 chatGPT app 도 기능 좀 추가하고 나만의 모델을 만들어 학습해서 써볼 생각이다.

 

최근 바빠서 블로그 포스팅, 알고리즘 스터디를 소홀히 하고 있다. 안그래도 바쁜데 파트 내에 함께 했던 좋은 동료들이 이직을 많이 하고 있다. ㅠㅠ .. 좋은 데이터 엔지니어 동료가 필요한 상황이다...  

 

시간이 되면 회사에서 PoC 했던 airbyte on EKS 내용도 올리겠다.

 

 

반응형
반응형

1.문제 소개

  • DP 문제에 해당한다.

 

2.코드

import math
# 이친수 여부
def is_ichin(source):
    result = True
    offset = 1
    while(offset<len(source)):
        if source[offset] == '1' and source[offset-1] == '1':
            result=False
            break
        offset +=1
        
    return result

# 문자열 탐색 방식
def solution():
    N = int(input())
    S = int(math.pow(2,N))
    cnt = 0 
    for num in range(0,S):
        s = bin(num)[2:]
        if len(s) == N and is_ichin(s):
            cnt+=1
        #print(f"num : {num}\t binary : {s} ichin : {is_ichin(s)}")
    
    print(cnt)
          
                  
ichin_array = [0,1,1,2]

def get_ichin(n):
    if n < len(ichin_array):
        return ichin_array[n]
    else:
        ichin = get_ichin(n-1)+get_ichin(n-2)
        ichin_array.append(ichin)
        return ichin
    

    
    
# DP 방식 
def solution():
    N = int(input())
    print(get_ichin(N))
    
solution()

 

3.코멘트

  • 이진수를 만들어 문자열 탐색으로 이친수를 여부를 구하는 함수를 만들어 규칙을 찾아봤다.
  • 이친수는 피보나치 수열과 동일한 규칙을 갖고 있었다.
  • 중복 호출을 줄이는 방식으로 DP 를 이용해 정답을 출력했다.

 

반응형
반응형

0. 실습 내용

  • airbyte 를 docker container 로 실행한다
  • airbyte 커넥션
    • mysql --> s3 (csv) 
    • sync mode : full refresh
  • mysql 역시 docker container 로 실행한다.
  • 데이터는 공공 데이터를 활용한다.

 

1.install & quick start

  • mysql docker compose
version: "3.7"

services:
  mysql:
    image: mysql:latest
    environment:
      - MYSQL_ROOT_PASSWORD=jssvs
    volumes:
      - ./data:/var/lib/mysql
    ports:
      - 3306:3306
  • airbyte docker compose
## airbyte clone & up 
$ git clone https://github.com/airbytehq/airbyte.git
$ docker-compose -f docker-compose.yaml up -d 



## mysql up
$ docker-compose -f docker-compose.yaml up -d

  • 서비스 진입

2.Sample 데이터 로드 

 

서울교통공사_지하철혼잡도정보_20211231

서울교통공사 1-8호선 30분 단위 평균 혼잡도로 30분간 지나는 열차들의 평균 혼잡도(정원대비 승차인원으로, 승차인과 좌석수가 일치할 경우를 혼잡도 34%로 산정) 입니다.(단위: %). 서울교통공사

www.data.go.kr

 

 

 

3.connection 생성 및 테스트

  • Source 생성하기

  • Destination 생성하기

 

  • 연결 생성하기

  • 연결에서 볼 수 있는 설정 정보들
    • Transfer
      • 복제 주기 - cron , manual 등.
    • Streams
      • 목적지 네임스페이스 설정
      • 목적지 스트림의 prefix 네이밍 설정
      • airbyte 에서는 stream 이 옮겨질 데이터의 대상이고, mysql 의 경우 sync 될 테이블을 의미한다.
    • 원하는 sync source 를 선택할 수 있다.
  • sync Job 및 로그 확인

  • sync 된 데이터 확인

 

반응형
반응형

1.airbyte 란?

  • 오픈소스 데이터 통합 플랫폼 (ELT)
  • api , database, warehouse , application 간 데이터 sync, 즉 동기화 를 돕는 , 가능하게 해주는 소프트웨어 라고 하지만 내생각엔 아직 툴..
  • 데이터 통합 상품
  • 장점
    • built for extensibility
      • 커넥터 추가가 쉽고 확장성을 제공한다.
    • optional nomalized schemas
      • 선택적으로는 스키마를 정규화 할 수 있다.
    • Full grade schfeduler
      • 필요한 만큼 replication 을 자동화 할 수 있다.
    • real - time monitoring
      • 모든 로그를 모니터링 하고, 기능으로 제공한다.
    • incremental updates
      • replication 이 증분 업데이트를 기반으로 동작하여 transfer cost 를 줄여준다.
    • manual full refresh
      • 원할 때 수동으로 refresh 가 가능하다.
  • 단점
    • Stable 릴리즈 버전 없음, 아직 알파
    • 사용자 액세스 관리에 대한 지원 부족

 

2. 왜 airbyte ?

  • 기존 ETL 기반의 아키텍쳐에서 ELT 기반으로 리아키텍쳐링 할 때, 원본 데이터 소스를 이관 및 sync 해주는 역할이 중요해졌고, airbyte 가 시장에 빠르게 진입 했다고 생각한다.
  • 손쉽게 커넥터 설치만으로 데이터 연동 및 sync 가 가능해졌고, 충분한 UI 를 제공하고 있다.
  • airflow 로도 비슷한 구현을 할 수 있지만 개발이 필요한데.. 얘는 개발도 안해도 되고 CDC 옵션까지 제공한다.
  • 커넥터가 정말 많고 다양하게 지원한다.

3. airbyte 구성 요소

  • UI
    • airbyte 사용자를 위한 GUI
  • WebServer
    • UI 와 API 사이에서 발생하는 이벤트를 핸들링 하는 웹 서버
  • Config Store
    • 커넥션 정보들을 담고 있음 ( credential, 주기 등등.)
  • Config API
    • airbyte 의 main controle plane. 직역하면 관리영역이고, airbyte 의 모든 operation(동작) 들. API 콜 포함하여 설정하고 Inovoke 를 수행한다.
  • Scheduler
    • API 로 요청을 받고 Temporal Service 로 병렬적으로 보낸다. 잡의 성공/실패를 트래킹 하는 역할도 있다.
  • Scheduler Store
    • 예약된 스케쥴 job 정보가 저장된 곳
  • Temporal Service
    • 큐에 쌓이는 Task 와 workflow. 를 관리한다.
  • Worker
    • 소스 커넥터에 연결하고, data 를 받아와 목적지에 쓰는 역할을 수행한다
  • 지원되는 동기화 모드
    • full refresh - overwrite - 전체 새로고침 덮어쓰기 , 모든 데이터를 다시 동기화 하고 교체
    • full refresh - append - 전체 새로고침 추가 , 모든 행을 다시 동기화 하고 복제
    • incremental append - 증분 추가 , 새 행을 동기화하고 이미 동기화된 행뒤에 추가
    • incremental dedupe history - 중복 제거된 증분 추가, 새 행을 동기화 하고 동기화된 행을 추가하며 증분 중복제거를 기록

위 구성 컴포넌트들의 이름이 조금 다르게 되있지만 컨테이너의 로그를 보면 대략적으로 어떤 데몬이 해당 컴포넌트 역할을 하는지 알 수 있다.

 

** airbyte 는 job 이 수행될 때 동적으로 해당 job의 컨테이너가 새로 생성되서 동작한다. 관리형 컨테이너 서비스를 써서 구축한다면 참고하길 바란다.

 

 

참조 

https://airbytehq.github.io/understanding-airbyte/high-level-view/

 

Architecture overview | Airbyte Documentation

A high level view of Airbyte's components.

docs.airbyte.com

 

 

 

반응형
반응형

1. 문제 소개

  • 이분탐색 문제에 해당한다.

2. 코드

import sys

input = sys.stdin.readline



def set_test_case():
    N = int(input())
    input_str = input().split()
    cards = [int(x) for x in input_str]
    M = int(input())
    input_str = input().split()
    your_cards = [int(x) for x in input_str]
    return cards , your_cards

def binary_search(arr,search_value):
    start = 0 
    end = len(arr)
    
    while(start<end):
        mid = (start + end) // 2
        if arr[mid] == search_value:
            return mid
        elif arr[mid] < search_value:
            start = mid+1
        else:
            end = mid
    
    return -1
    
def solution(): 
    cards,your_cards = set_test_case()
    cache = dict()
    for i in cards:
        if i not in cache:
            cache[i] = 1
        else:
            cache[i] +=1
    sorted_card = sorted(cards)
    answer = []
    for i in your_cards:
        if binary_search(sorted_card,i) > -1:
            answer += [str(cache[i])]
        else:
            answer += [str(0)]
    
    print(" ".join(answer))
    
    
            
    
            
    

solution()

 

3. 코멘트

  • 알고리즘을 정리하면 숫자카드를 자료구조 map 에 저장하고, 상근이의 카드를 이분탐색으로 조회하되, 결과는 map 에서 출력한다.
  • 육아로 알고리즘 스터디를 진행할 시간과 여유가 없어 아쉽다. 여유가 곧 생길수 있기를..  

 

 

반응형

+ Recent posts