반응형

목차

  • MCP 란 무엇인가 ?
  • MCP 로 뭘 할 수 있는가?
  • MCP 는 어떻게 동작하는가?
  • MCP 의 실체를 알아보고 직접 server 를 만들어보자

1.MCP 란 무엇인가?


  • MCP = Model Context Protocol, 대형 언어 모델 LLM 이 외부 데이터 및 기능과 상호작용할 수 있도록 돕는 공개 프로토콜
  • 부연 설명 !! 2024년 11월 calude 로 유명한 Anthropic 에서 발표한, LLM Agent 가 다른 프로그램과 연동하기 위한 프로토콜이다.
  • 사람들이 쉽게 예를 들어 설명할때 AI USB 로 표현하기도 함
  • AI 가 더 똑똑해지려면 단순한 훈련 데이터에 의존하지 않도록 해야 하는게 필수. 외부데이터를 안전하게 가져와야 하는 방법이 필요했다 그래서 탄생하게 된게 MCP

 

2.MCP로 뭘 할 수 있는가?


 

  • 주요 역할
    • LLM 이 파일, 데이터베이스, API, 시스템 정보 등에 접근 가능
    • 사용자 허가를 통해 안전한 데이터 이동
    • 다양한 LLM 에서 공통적으로 사용할 수 있는 표준 인터페이스 제공
  • 기존에는 AI 가 사전에 학습된 정보에만 의존해야 했지만, MCP 를 사용하면 실시간으로 외부와 연동하여 최신 데이터를 가져와 활용할 수 있다.

3.그래서 MCP는 어떻게 동작하는가?

A. 주요 아키텍쳐 컴포넌트

  • MCP Hosts
    • LLM 모델을 이용하는 AI Tool
    • IDE, Claude Desktop,
  • MCP Client
    • 서버와 1:1 로 연결하는 프로그램 클라이언트
    • LLM 이 실제 실행되는 주체. 다양한 도구나 서비스를 사용할 수 있또록 요청을 보내는 쪽
  • MCP Server
    • 모델의 문맥을 교환할 수 있는 프로토콜로 통신하는 경량화된 프로그램
  • Local Data Sources
    • 컴퓨터 파일, DB 등의 서비스
  • Remote Services
    • 인터넷 API 를 통해 연결 가능한 외부 서비스들

 

B. 동작흐름

  • 클라이언트는 서버에 연결 요청을 보내고, 서버는 승인하여 통신 채널을 설정한다.
  • 사용자의 요청을 MCP Client 가 MCP Server 를 통해 요구하는 사양에 맞는 내용의 데이터를 반환 받고, LLM 제공자( sonnet, grok, gemini, gpt 등) 에게 전달하여 분석과 처리를 한다.

4. MCP 서버의 실체를 알아보고, 직접 만들어보기

A.개발환경 설정

  • MCP SDK 설치
  • MCP 서버 코드 작성
  • MCP 서버 실행 & Inspector 로 테스트
  • Cursor에서 연동해보기

B.시나리오 설정

  • AI Tool (MCP Host) 가 MCP 에 연결하여 회의록 내용을 읽고 회의록의 내용을 요약하거나 질문에 답한다.
  • 빠른 시연을 위해 회의록 데이터는 로컬에 txt 형태로 저장한다.
  • 샘플 데이터
$ mkdir my-mcp
$ cd my-mcp

$ npm init -y

$ npm install @modelcontextprotocol/sdk zod 
$ npm install -D @types/node typescript


# 모듈에 빌드스크립트 내용 추가
$ vi package.json
-> 아래 추가 
"type": "module",
iimport { McpServer, ResourceTemplate } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from '@modelcontextprotocol/sdk/server/stdio.js';
import { z } from "zod";

// MCP 서버를 생성합니다.
const server = new McpServer({
    name: "jssvs-server",
    version: "1.0.0"
    }
);

// registerTool 메서드: 서버에 새로운 툴(기능)을 등록합니다.
// 첫 번째 인자는 툴의 이름, 두 번째는 툴의 메타데이터(설명, 입력값 스키마 등),
// 세 번째는 실제로 동작하는 함수(입력값을 받아 결과를 반환)입니다.
// 아래는 두 숫자를 더하는 add 툴을 등록하는 예시입니다.
server.registerTool("add",
  {
    title: "Addition Tool",
    description: "Add two numbers",
    inputSchema: { a: z.number(), b: z.number() }
  },
  async ({ a, b }) => ({
    content: [{ type: "text", text: String(a + b) }]
  })
);

// registerTool 메서드: 회의록 읽기 툴을 등록합니다.
// 사용자가 날짜(date)를 입력하면, 해당 날짜의 회의록 파일을 읽어 반환합니다.
server.registerTool("readMeeting",
  {
    title: "Meeting Notes Reader",
    description: "Read meeting notes from a specific date",
    inputSchema: { date: z.string() }
  },
  async ({ date }) => {
    try {
      // 파일 시스템과 경로 모듈을 동적으로 import
      const fs = await import('fs/promises');
      const path = await import('path');
      // 회의록 파일 경로를 생성합니다.
      const filePath = path.join('/Users/jssvs/workspace/mcp-demo/mydata', `${date}-meeting.txt`);
      // 파일을 읽어서 내용을 반환합니다.
      const content = await fs.readFile(filePath, 'utf8');
      return {
        content: [{ type: "text", text: content }]
      };
    } catch (error) {
      // 파일 읽기 실패 시 에러 메시지를 반환합니다.
      return {
        content: [{ type: "text", text: `Error reading meeting notes: ${error.message}` }]
      };
    }
  }
);

// registerResource 메서드: 서버에 동적 리소스를 등록합니다.
// 첫 번째 인자는 리소스 이름, 두 번째는 리소스 URI 템플릿,
// 세 번째는 리소스의 메타데이터(설명 등), 네 번째는 실제 동작 함수입니다.
// 아래는 이름에 따라 인사말을 생성하는 greeting 리소스를 등록하는 예시입니다.
server.registerResource(
  "greeting",
  new ResourceTemplate("greeting://{name}", { list: undefined }),
  { 
    title: "Greeting Resource",      // Display name for UI
    description: "Dynamic greeting generator"
  },
  async (uri, { name }) => ({
    contents: [{
      uri: uri.href,
      text: `Hello, ${name}!`
    }]
  })
);

// connect 메서드: 서버와 트랜스포트(통신 채널)를 연결합니다.
// 여기서는 StdioServerTransport를 사용하여 표준 입출력(터미널) 기반으로 외부와 통신할 수 있게 합니다.
const transport = new StdioServerTransport();
await server.connect(transport);

D. MCP Client 와 연결 ( Cursor IDE )

# mcp.json
{
"my-demo-mcp-server": {
"type": "studio",
"command": "npx",
    "args": [
    "-y",
    "tsx",
    "/Users/jssvs/workspace/mcp-demo/main.ts"
    ],
"env": {}
}

** MCP 서버는 오픈 소스 커뮤니티에서 공유가 활발히 일어나고 있음

https://smithery.ai/

 

Smithery - Model Context Protocol Registry

Your Agent's Gateway to the World Integrate your AI with 7983 skills and extensions built by the community.

smithery.ai

느낀점

  • MCP 를 개인의 수준에서 활용하는 것은 바로 가능할 것 같다. 하지만 서비스 수준에서 활용하는 것은 아래의 부분들과 사이드 이펙트를 고려해야 하고, 경험이 필요 해보인다.
    • 연동 시스템에 대한 보안 ( DB 접속 정보가 Host의 학습 데이터로 넘어간다거나, 매출, 사내 보안 데이터가 유출 되는 것 )
    • 권한에 대한 문제 ( Drop, Delete, Rm등 권한을 갖게된 MCP Client 는 잘못된 명령으로 시스템을 망가뜨릴 수 있는 힘을 갖게 됨 )
    • 네트워크 구성에 대한 문제 ( 서비스 수준에서의 MCP 서버는 원격 서버 구성을 해야 하는데, 현재는 레퍼런스가 많아 보이지 않음 )
    • 어떤 LLM 모델을 연결할 것이고, 어떤 서비스를 사용할 것인가 ? ( AI 서비스는 다양하고, 서비스의 장단점을 아직 알지 못함. Bedrock? Azure OpenAIService? )

6. 참고

https://zdnet.co.kr/view/?no=20250630161811

https://techblog.woowahan.com/22342/

https://www.claudemcp.com/ko/blog/mcp-vs-api

https://medium.com/snowflake/using-mcp-with-snowflake-cortex-ai-f04e0b840958

https://aws.amazon.com/ko/blogs/tech/amazon-bedrock-agents-mcp-model-context-protocol/

반응형
반응형

 

 

apache spark 은 데이터를 다루는 사람들에게는 필수적으로 알아야 하는 기술이다.

내 방식대로 정의하자면 spark 은 framework 이면서, 빅 데이터 처리 엔진이라고 할 수 있을 것 같다.

요즘엔 Snowflake 같은 SaaS 로 SQL Like 기반의 데이터 처리도 많이 하지만, 오픈소스 진영에서는 무조건 spark 이다.

 

아래 예전에 정리했던 기술을 다시 보면서, 블로그 포스팅을 해본다

 

 


1.스파크에 대해

  • 하나의 서버에서 처리할 수 있는 수준 이상의 대용량 데이터 처리를 가능하게 해준다
  • 고수준 API 를 제공한다
  • 동종 시스템 중 가장 빠른 축에 속한다.

스파크 버전 규칙

  • [메이저].[마이너].[유지보수]

왜 스칼라인가

  • 스파크는 스칼라로 쓰여져 있다.
  • 스파크는 함수형 프레임워크이며, 불변성이나 람다 정의 같은 개념에 크게 의존하고 있기 때문에 함수형 프로그래밍 언어인 스칼라가 좋다
  • 자바 API 보다 훨씬 사용하기 쉽다
  • JVM 과의 통신 비용이 랭기지중 가장 좋다. (객체 변환에 드는 비용정도)

2.스파크 동작 개념

스파크의 독보적 장점

  • 메모리 기반 처리
  • 지연 평가 ( lazy operation )

스파크 위치

  • JVM 위에서 연산을 수행하는 것일 뿐, 데이터 저장 솔루션은 아니다
  • 클러스터 매니저 종류 : 단독 클러스터매니저(StandAlone Cluster Manager), 아파치 메소스, 하둡 얀

스파크 컴포넌트

  • 스파크 코어 - Java, Scala, Python, R 로 API 제, RDD 와 RDD API, JVM 사이에 데이터를 읽고 쓰는 I/O 제공
  • 스파크 SQL - DataFrame ( 반구조화의 데이터 타입을 위한 인터페이스) 제공
  • 스파크 스트리밍 등

병렬 연산 모델 : RDD

  • 스파크의 드라이버 혹은 마스터 노드를 위한 프로그램을 사용자가 만들어야 한다
  • RDD는 익스큐터 혹은 슬레이브 노드에 저장된다
  • RDD를 구성하는 객체는 파티션이라 하며, 경우에 따라 다른 노드에서 계산될 수 있다.
  • 클러스터 매니저는 애플리케이션에서 설정한 파라미터에 따라 익스큐터를 실행하고 분산해 주는 역할을 한다
  • 드라이버가 RDD 데이터의 처리 계획을 결정하면 실행을 지연하고, 최종 RDD를 계산해야 하는 시점에 실행된다.( 보통 저장 장치에 써야 할 시점이나 결과 데이터를 드라이버에 보내야 할 때)

지연평가

  • RDD의 연산 종류는 transformation 과 action 이 있다.
  • 액션은 데이터를 드라이버로 되돌려주든지(count, collect 등) 데이터를 외부 저장 시스템에 쓰는것(CopyToHadoop) 등의 일이다.
  • 액션은 스케쥴러를 시작하게 하며, RDD 트랜스포메이션 간의 종속성에 바탕을 둔 DAG 를 생성한다.
  • 트랜스포메이션의 로직 에러가 발생할 때, 지연 평가 때문에, 액션이 수행된 지점에서 실패한 것으로 나타나는 경우에 유의하자
    • word count 프로그램에서 null pointer exception 이 발생한다고 가정할때, 코드가 contains를 체크하는 시점이 아니라 collect 단계에서 예외가 발생한다.

메모리 영속화 & 메모리 관리

  • 맵리듀스와 비교해 스파크 성능상 이점은 반복 연산이 들어있는 사례이다.
  • 스파크가 처리하는 데이터를 디스크에 기록하지 않고 익스큐터 메모리에 데이터를 로드해 놓는 것이다.
  • 메모리 관리의 3가지 옵션
    1. 메모리에 직렬화되지 않은 자바 객체 - RDD 객체를 직렬화 하지 않고 그대로 저장한다. ( 직렬화 비용이 안드는 대신 메모리 공간 사용이 비 효율이다. )
    2. 메모리에 직렬화된 데이터 - RDD 객체를 네트워크 전송이 가능한 바이트 스트림으로 변환한다. ( 데이터를 읽는데 CPU 가 더 많이 사용되므로 접근방식은 더 느리지만 메모리 공간 사용 측면에서 효율적이다. 크리오(Kryo) 직렬화를 쓰면 공간 측면에서도 효과적이다.)
    3. 디스크 - 익스큐터 램에 담기에 파티션이 큰 RDD 라면 디스크에 데이터를 쓸 수 있다. ( 반복 연산시 속도 면에서 불리하지만 장애에 안전하다.)
  • persist() 의 기본 옵션은 RDD를 메모리에 직렬화되지 않은 상태로 저장한다.

불변성과 RDD 인터페이스

  • RDD는 정적인 타입인 데다 불변한 성격을 가지고 있어, Transformation 을 호출하는 것이 새롭게 정의한 속성들을 가진 새로운 RDD를 리턴하는 행위다.
  • RDD 생성 방식
    • 기존 RDD에 Transformation 호출
    • SparkContext 객체로부터 생성
    • DataSet이나 DataFrame을 변환 ( 이것들은 SparkSession 으로 부터 만들어짐)
  • SparkContext는 스파크 클러스터와 실행중인 스파크 애플리케이션 하나와의 연결을 나타낸다.
  • RDD 속성을 알 수 있는 함수
    • partitions() - 분산 데이터 셋의 부분들을 구성하는 파티션 객체들의 배열을 리턴한다. getPartition()의 결괏값과 같다
    • iterator(p,parentIters) - 각각의 부모 파티션을 순회하는 반복자가 주어지면 파티션 p의 구성요소들을 계산해낸다.
    • dependencies() - 종속성 객체의 목록을 리턴한다. 스케쥴러가 현재의 RDD가 어떤식으로 다른 RDD에 종속될지 알려준다.
    • partitioner() - element 와 partition 사이에 연관되는 함수를 갖고 있는 RDD라면 스칼라의 option 타입으로 partitioner 객체를 리턴한다.
    • perferredLocations(p) - 파티션 p의 데이터 지역성에 대한 정보를 리턴한다. p가 저장된 각 노드의 정보를 문자열로 표현한 시퀀스를 리턴한다

RDD의 종류

  • RDD는 정적인 타입인 데다 불변한 성격을 가지고 있어, Transformation 을 호출하는 것이 새롭게 정의한 속성들을 가진 새로운 RDD를 리턴하는 행위다.

넓은 종속성 vs 좁은 종속성

  • 종속성이 넓으냐 좁으냐는 트랜스포메이션 평가에 중요한 영향을 끼치며 성능에도 크게 작용한다.
  • 좁은 종속성
    • 자식 RDD 의 각 파티션이 부모 RDD의 파티션들에 대해 단순하고 한정적인 종속성을 갖는다
    • 부모가 최대 하나의 자식파티션을 갖는 경우
    • map, filter, mapPartitions 등의 연산이 이 패턴을 따른다.
  • 넓은 종속성
    • 자식 RDD가 다수의 부모 RDD의 파티션들과 관계를 맺고 있는 경우
    • groupbykey, sort, reducebykey 등과 같이 Shuffle 을 일으키는 함수가 이 패턴을 따른다.
    • 셔플 비용이 가장 크다.
  • map 은 파티션 간 이동이 없는 연산, coalesce 는 파티션을 합치는 연산으로 파티션 개수를 줄이는 목적의 함수이다.
  • join 함수는 두개의 부모 RDD 가 어떻게 파티션되었는지에 따라 좁거나 넓은 종속성을 가질 수 있다.

스파크 잡 스케쥴링

  • 잡 실행 과정
    • 스파크 프로그램 자체는 드라이버 노드에서 실행되며 일련의 명령들을 익스큐터에게 보낸다.
    • 애플리케이션들은 클러스터 매니저에 의해 스케쥴링되고, 각각 하나의 SparkContext를 가진다.
    • 스파크 애플리케이션들은 공존하는 여러 개의 잡을 차례로 실행할 수 있다.
    • Job들은 애플리케이션의 한 RDD가 호출하는 각 액션에 대응한다.
  • 자원할당
    • 정적할당과 동적 할당이 가능
  • 스파크 애플리케이션
    • 잡들은 드라이버 프로그램의 SparkContext에 정의되어 있다.
    • SparkContext가 생기면 스파크 애플리케이션이 구동한다.
    • SparkContext를 실행하면 드라이버와 익스큐터들이 각 작업 노드에서 구동된다.
    • 각 익스큐터는 JVM을 가지며 한 노드에 여러 개의 익스큐터가 존재할 수 있다.
    • 스파크 잡이 실행될 때 익스큐터는 RDD를 계산할 태스크 실행을 위한 슬롯을 가진다.
  • 스파크 잡 해부
    • 각 액션마다 스파크 스케쥴러는 실행 그래프를 만들고 스파크 잡을 실행한다.
    • 각 잡은 최종 RDD를 만들어 내는데 필요한 데이터 변환의 각 단계를 의미하는 스테이지(Stage)들로 구성된다.
    • 각 스테이지는 각 병렬 연산의 한 단위를 의미하며 익스큐터들 위에서 실행되는 다수의 태스크(Task)들로 구성된다.
    • Spark Aplication -> 잡 -> 스테이지1 & 스테이지2 ... -> 태스크 1& 태스크 2...
    • SparkContext / SparkSession -> 액션 -> 넓은 포메이션 -> 하나의 파티션 평가를 위한 연산
    • 스파크 실행 구성에서 가장 높은 단계
    • 하나의 잡 = 하나의 액션에 대응하며, 액션은 스파크 애플리케이션의 드라이버의 프로그램에서 호출한다.
  • 스테이지
    • 잡의 부분들을 스테이지로 정의한다.
    • 넓은 트랜스포메이션에 의해 생성되는 셔플 의존성에 대응한다.
    • 하나의 스테이지는 다른 익스큐터나 드라이버와의 통신 없이 계산 가능한 태스크들의 집합으로 생각할 수 있다.
  • 태스크
    • 하나의 스테이지는 여러 개의 태스크로 이루어진다.
    • 실행 계층에서 가장 작은 단위이며, 익스큐터가 태스크 실행을 위해 동적으로 할당된 여러개의 슬롯을 가진다.
    • 스테이지당 태스크의 개수는 해당 스테이지의 결과 RDD의 파티션 개수에 대응된다.
    • 익스큐터 코어 개수의 총합 = 익스큐터당 코어의 개수 X 익스큐터 개수를 쓰면 sparkConf로 부터 동시 실행의 태스크 개수를 구할 수 있다.
    • 태스크 분산 과정은 TaskScheduler 가 담당하는데, 페어 스케쥴러인지 FIFO 스케쥴러인지 등에 따라 다르다.

 

 

 
반응형
반응형

쿠버네티스 복습을 하면서 스토리지 관련해 요약했던 내용을 포스팅 해보겠다.

 

요약

 

1.EBS란 ?

  • Elastic Block Store (EBS)
  • EC2 에서 쉽게 사용할 수 있는 영구적인 Volume (linux 가 볼륨을 디바이스로 인식하는)
  • mkfs 명령어를 사용하여 mount 가능
  • 다양한 유형
    • ssd 기반 - gp2
    • iops 속성이 존재. io1, io2., st1, sc1 등.
    • iops 조절이 가능한 gp3

2.PV란 ?

  • Persistent Volume
  • 물리적인 볼륨을 나타냄
  • Cluster 단위의 resource (node 처럼)
  • 관리자가 생성하거나 pvc 로 부터 동적으로 생성됨
  • 예를 들어 pv 는 요리사가 만든 피자 한 판을 굽는 것이다.

3.PVC란?

  • namespace 단위의 resource (pod 처럼)
  • 사용할 용량, access mode, 등의 속성을 갖고 있음
  • PV의 요청을 의미하는 추상 오브젝트
  • 예를 들어 pvc 는 피자 한 판에서 피자 한 조각을 손님이 접시에 담아 가져오는 것이다.

4.provisioner 란?

  • storageclass 의 설정과 pvc 의 설정을 읽어 aws ebs 등 물리적인 volume 과 pv obeject 생성을 담당.

5.csi Driver 란?

  • Container Storage Interface Driver
  • EKS 에서는 쿠버네티스에 내장된 provisioner 를 모두 삭제하고 별도의 컨트롤러 파드를 통해 동적으로 프로비저닝을 사용할 수 있도록 만들었음. 그것이 바로 csi driver

 

 

아래 실습을 간단히 해본다

 

먼저 스토리지 클래스를 만들고

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
  name: my-gp2
provisioner: kubernetes.io/aws-ebs
parameters:
  type: gp2
  fstype: ext4

 

PV를 만든다.

apiVersion: v1
kind: PersistentVolume
metadata:
  name: my-pv
spec:
  capacity:
    storage: 10Gi
  accessModes:
    - RewadWriteMany
  persistentVolumeReclaimPolicy: Retain
  storageClassName: my-gp2
hostPath:
    path: /mnt/data


---

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: my-pvc-ebs
  namespace: nextcloud

spec:
  storageClassName: my-gp2
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi
  persistentVolumeReclaimPolicy: Retain
 

간단하게 Pod 를 만들어 마운트해보자

apiVersion: v1
kind: Pod
metadata:
  name: task-pv-pod
spec:
  nodeSelector:
    kubernetes.io/hostname: kube-01
  volumes:
    - name: task-pv-storage
      persistentVolumeClaim:
        claimName: task-pv-claim
  containers:
    - name: task-pv-container
      image: nginx
      ports:
        - containerPort: 80
          name: "http-server"
      volumeMounts:
        - mountPath: "/usr/share/nginx/html"
          name: task-pv-storage

 

반응형
반응형

카프카 구축 전에 예전에 정리했던 내용 기반으로 이론을 다시 정리해봤다. 

요즘에는 운영을 쉽게 하려고 MSK, 컨플루언트 카프카를 많이 사용 하는 것 같다.

아무튼 본론으로..

로고가 참 이쁘다..

 

1.카프카란?

  • 분산 이벤트 큐.
  • 분산 이벤트 스트리밍 플랫폼
  • 카프카 컨슈머, 프로듀서, 스트림즈, 커넥트 등 연동 API 제공
  • 초당 수백만개 데이터를 처리할수 있으므로 빅데이터에 적합
  • 분산 데이터를 통해 24시간 365일 안전하게 데이터를 처리할 수 있는 고가용성 기능 제공

2.왜 카프카?

  • 고가용성
    • 서비스에 지장없는 운영을 보장.
  • 낮은 지연
  • 확장성
  • 높은 처리량
    • 높은 처리량을 감당하지 못한다면, 서비스를 유지하기 힘듦

RabbitMQ , Redis 와의 차이점

  • 이벤트 브로커
    • 서비스에서 발생한 이벤트를 브로커의 큐에 저장함
    • 딱 한번 일어난 이벤트 데이터를 브로커에 저장함으로써 단일 진실 공급원으로 사용 및 재처리가 가능
    • 마이크로 서비스 아키텍쳐에서 중요한 역할을 함
    • kafka , kinesis
  • 메세지 브로커
    • 대규모 메세지 기반 미들웨어 아키텍쳐에서 사용
    • RabbitMQ, Redis

 

 

 

카프카 구조

  • 기존 1:1 매칭으로 개발하고 운영하던 데이터 파이프라인은 커플링으로 인해 한쪽 이슈가 생기면 다른 한쪽에도 영향이 간다. → 카프카는 이러한 의존도를 타파하였다. (디커플링)
  • 큐에 데이터를 보내는 것이 프로듀서이고 큐에서 데이터를 가져가는 것이 컨슈머다

카프카 특징

  • 높은 처리량
    • 높은 처리량을 감당하지 못한다면, 서비스를 유지하기 힘듦
    • 우리 비지니스의 성공여부는 어떤 Threash hold 에 걸쳐지면 안된다.
    • 파티션 단위를 통해 동일 목적의 데이터를 여러 파티션에 분배하고, 이런 파티션을 컨슈머로 병렬처리할수 있는것이 큰 특징
    • 파티션 개슈만큼 컨슈머 개슈를 늘릴수 있다
  • 확장성
  • 영속성
    • 파일 io 성능 향상을 위해 os 에서 담당하는 페이지 캐시를 이용한다. 그래서 파일을 쓰고 읽는데도 빠를수 있다.
  • 고가용성

3.카프카 구성 요소

토픽

  • 구체화된 이벤트 스트림 = 쉽게 큐로 이해하면 됨
  • 하나의 토픽에 여러 Producer / Consumer 가 존재할 수 있다.
  • 토픽은 담는 데이터에 따라 이름을 줄 수 있다.

컨슈머

  • 기본적으로 가장 오래된 순서대로 가져감 - 0번 오프셋부터
  • 새로운 컨슈머가 구독을 하게 되도 가장 오래된 순서대로 가져감
    • auto.offset.reset = earliest 인경우

파티션

  • 카프카의 토픽들은 여러 파티션으로 나눠짐.
  • 파티션의 끝에서 0번 부터 차곡차곡 쌓이게 됨
  • 토픽 = 논리적인 개념이라면, 파티션은 물리적인 저장소에 저장하는 단위
  • 각 파티션은 Append-only 방식으로 기록됨
  • 특정 파티션으로 데이터를 쓸수 있고, 명시되있지 않으면 RoundRobin 방식으로 파티션을 배정한다
  • 파티션을 늘린다면?
    • 파티션을 다시 줄일수는 없다.
    • 컨슈머 개수가 늘어날때 분산 처리할 수 있다.
    • 신규 데이터는 2개의 파티션 중어디로 들어갈까?
    • 보통은 라운드로빈으로 파티션을 할당함
    • 키의 해시값으로
  • 파티션 삭제 주기는?
    • log.retention.ms : 최대 record 보존 시간
    • log.retension.byte : 최대 record 보존 크기

오프셋

  • 각각 파티션의 레코드는 Offset 식별자 정보를 가짐, 데이터 번호
  • 카프카는 메세지 순서를 보장 하지 않음. 하지만 파티션이 1개라면 보장할지도?
 

4.카프카 클러스터

카프카 클러스터

  • 카프카 브로커
    • 설치된 카프카의 서버 단위
    • 보통은 3대로 구성
  • replication
    • replication 이 1인 상태라면 파티션이 브로커 서버에 1개만 저장된다.
    • replicaion 이 2라면 원본 하나, 복제본 1개의 파티션이 각각의 브로커 서버에 저장된다.
    • 따라서 replication 개수 ≤ 브로커 서버 개수
    • 원본 파티션 = Leader 파티션, 복제본 파티션 = follow 파티션
    • replication 의 설정된 값에 따라 서로 다른 브로커 서버에 파티션의 복제본이 생긴다.
  • ISR(In-SyncReplica)
    • 리터 파티션의 레코드 개수 만큼 팔로워 파티션의 개수가 동일하게 복제가 된 안정된 상태
  • ACK
    • 카프카 프로듀서는 ack 를 이용해 고 가용성 보장
      • ack = 0 , response 무시. 속도는 빠르지만 유실이 있음.
      • ack = 1, reponse 를 받음, 파티션 복제는 보장 못함. 유실 가능성 있음
      • ack = all , response 받고, follwer partition 저장확인 절차를 거침. 유실 가능성 없음
    • replication 개수가 늘어난다면 성능 저하.
  • 파티셔너
    • 데이터를 토픽에 어떤 파티션에 넣는지 결정하는 역할을 함
    • 메세지 키 또는 메세지 값에 따라 파티션이 결정됨
    • hash(키) = 파티션 넘버
  • 카프카 lag
    • 운영시에는 consumer lag이 발생
    • lag 이란 = 컨슈머가 마지막으로 읽은 offset - 프로듀서가 마지막으로 넣은 offset
    • 한개의 토픽과 컨슈머 그룹에 대한 lag 이 여러개 존재 하게 된다.
    • max lag 에 대한 모니터링이 운영시에는 필요하다
  • lag burrow
    • golang 으로 개발된 오픈 소스
    • 컨슈머 lag 모니터링을 도와주는 독립적인 애플리케이션
    • 멀티 카프카 클러스터 지원
      • 2개이상의 카프카 클러스터를 운영할때, 하나의 burrow 로 운영 가능
  • 주키퍼
    • 코디네이션 애플리케이션
    • 브로커 서버와 통신하며 상태관리, 컨슈머와의 통신, 카프카 메타데이터 정보를 저장함.

 

반응형
반응형

1. Yunikorn 이란 ?

  • 쿠버네티스에서 동작하는 Batch , Data & ML 서비스를 위한 리소스의 파워를 unleash(해방,촉발시키는) 하는 스케쥴러다.(1)
  • k8s 기본 스케쥴러의 대안으로 활용될 수 있는 스케쥴러로, 특히 다양한 리소스의 요구사항을 만족시킬 수 있는 기능들을 제공한다.


2.왜 Yunikorn?

  • 기존의 k8s 스케쥴러는 CPU, Memory, GPU 등의 리소스만을 고려하여 스케쥴링을 수행하는데, 만약 다양하고 많은 양의 Pod 가 배포되고 회수되는 환경에서 필요하게되는 미세한 조정이 어려운 단점이 있다. 
  • 그런 부분에서 Yunikorn 은 아래에서 소개할 다양한 기능으로 단점을 보완해준다.
  • yunikorn 과 함께 많이 사용되는 volcano(2) 스케쥴러가 있다.(개인적으로 volcano 가 더 어려워보여서 유니콘을 선택했다.. 유니콘은 대시보드를 제공해주는 점이 더 매력적이었고)
  • yunikorn 은 apache incubator 프로젝트고 volcano 는 cnf(cloud native computing foundation) 프로젝트이다.)


3. Features

  • App-aware 스케쥴링
    • k8s 기본 스케쥴러와 달리 컨텍스트 정보(사용자, 애플리케이션, 큐 등) 기반의 스케쥴링을 지원한다.
    • 리소스 총량, 공정성 정책, 우선순위 정책으로 스케쥴링 fine-grained controls (적은 범위의 제어)를 할 수 있다.
  • 계층적 구조의 리소스 큐
    • 큐 간의 계층적 구조를 통해 multi-tenancy 환경에서 효율적인 리소스 할당을 할 수 있다.
  • Job Ordering and Queuing
    • 작업의 우선순위를 지정하고, 작업이 큐에 들어갈 때 우선순위에 따라 큐에 배치된다.
    • FIFO, Fair, StateAware, piriority 정책을 지원한다.
  • 갱 스케쥴
    • 쉽게 말해 분산 환경에서 All or Nothing 스케쥴 방식이다.
    • 애플리케이션이 필요한 리소스의 세트(묶음)을 요청하고, 이 리소스가 모두 확보될 때 한 번에 스케쥴링한다.
    • 갱 스케쥴러가 placeholder pod(임시 pod)를 배정하고, UpScaling이 일어난 후 실제 pod 와 교체되는 방식이다.
    • 갱 스케쥴을 사용할 경우 FIFO 정책을 사용하게 된다. (왜냐하면 policy 는 리소스를 부분적으로 예약하기 때문에 리소스 세그먼트가 발생할 수 있다.)

갱 스케쥴링 방식의 배포 흐름

 

 

쿠버네티스 기본 스케쥴링의 파드배포
유니콘 스케쥴링의 파드 배포

 

 

4. Yunikorn 사용하기

***아래 가이드는 쿠버네티스 사용이 익숙한 사람들 기준으로 작성된 점 참고바랍니다.

 

A.yunikorn 설치 

 

2가지 설치 모드

  • embeddedAdmissionController
    • 쿠버네티스 기본 스케쥴러 대신 유니콘 스케쥴러를 사용하는 모드
    • api server 와 통신하는 모든 트래픽을 유니콘 스케쥴러로 라우팅한다
    • 성능이 뛰어나다고 하다..ㅇㅅㅇ
  • plugin Mode
    • 기본 스케쥴러의 framework 일부로 작동하는 모드
    •  mixed workload 에 적합하다.

 

 

 


그리고 helm repo values.yaml 파일 작성 (4)

service:
  type: NodePort
  targetPort: 9080
  portWeb: 9889


** 다른 설정은 하지 않았고, Dashboard ingress 생성을 위해 service 만 NodePort 로 변경했다

 

 


Ingress 생성

apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: yunikorn-ingress
  namespace: yunikorn
  annotations:
    alb.ingress.kubernetes.io/scheme: internet-facing
    alb.ingress.kubernetes.io/target-type: instance
    alb.ingress.kubernetes.io/subnets: [ 사용하는 서브넷 ]
    alb.ingress.kubernetes.io/security-groups: [보안 그룹 ]
    alb.ingress.kubernetes.io/load-balancer-name: [로드밸랜서 이름]

spec:
  ingressClassName: alb
  rules:
    - http:
        paths:
          - path: /
            pathType: Prefix
            backend:
              service:
                name: yunikorn-service
                port:
                  number: 9889




helm 설치 스크립트

#!/bin/bash
helm repo add yunikorn https://apache.github.io/yunikorn-release
helm repo update


echo "install yunikorn by helm.."

helm upgrade --cleanup-on-fail \
  --install yunikorn yunikorn/yunikorn \
  --namespace yunikorn \
  --create-namespace \
  --version=1.6.0 \
  --values values.yaml


**혹시 필요한 사람을 위해 설치스크립트도 공유한다.** 

 

대시보드 화면

대시보드화면



B. Yunikorn 사용

  • Standard mode 로 설치했을 경우 어떤 파드로 배포하더라도 yunikorn 스케쥴러가 파드를 스케쥴링한다.
  • 필자의 경우 처럼 Spark Job 을 실행할때는 아래와 같이 속성값을 추가해줘야 한다
- controller.batchScheduler.enable=true
- controller.batchScheduler.default=yunikorn


* EKS 기준으로 yunikorn 을 이용해 spark job 을 배포하는 가이드는 별도로 포스팅하고, 잘 작성된 가이드로 마무리하겠다. (3)

 


5.출처

  •  이 포스팅에 대한 설명은 함께 작업했던 강인호(aws 코리아)님께서 공유해주신 자료를 복습하고 요약하였다.

6.References


(1) https://yunikorn.apache.org/
(2) https://volcano.sh/en/
(3) https://docs.aws.amazon.com/ko_kr/emr/latest/EMR-on-EKS-DevelopmentGuide/tutorial-yunikorn.html

(4)  https://artifacthub.io/packages/helm/yunikorn/yunikorn

 
반응형
반응형

 

1.Dag Factory 란?

  • yaml 포맷 기반의 워크플로우 정의를 읽어 dag 을 자동으로 생성해주는 에어플로우 라이브러리이다. (1)(2)

 

2. 왜 Dag Factory ? 

  • Airflow 를 사용하려면 아래와 같은 절차로 Workflow 를 만드는 작업을 해야 하는데, 이런 수고를 줄여준다.
  • workflow 는 dag 객체를 생성하는 파이썬 스크립트 파일을 만들어야 하기 때문에 파이썬 문법을 알아야 한다.
  • dag 파일을 작성하려면 파이썬의 문법과 airflow 에서 제공하는 core 패키지 문서를 학습하여 작성해야 한다.
  • 워크플로우들이 많아질때는 관리하기 힘들어지기 때문에, Dag의 메타데이터화와 동시에 자동화를 고려해야 하는 시기가 온다. 
    • python 과 airflow 의 주요 내용들의 학습 없이 DAG 구조체를 만들어준다
    • 중복된 코드를 피해준다



3. Features

  • Multiple Configuration Files
  • Custom Operators
  • Callbacks 지원


4. Dag Factory 사용하기

절차

1) dag factory 를 설치한다.
2) 쿠버네티스를 사용할 경우 2가지 방법이 있다.
           a.패키지 설치 버전의 이미지를 직접 빌드하는 방법
           b.pod 내 airflow 가 init 할때 패키지 설치 명령어를 추가하는 방법
3) yaml 포맷의 워크플로우 파일을 작성한다.
4) dagfactory 객체를 이용해 yaml 리소스를 로드할 python 파일을 작성한다.
5) 작성한 2개의 파일을 dag 폴더로 이동시킨다

 


 

A. 설치

$ pip install dag-factory

 

 

 


B.쿠버네티스에서 helm 을 이용해 배포할 경우

쿠버네티스에서 사용할 경우 아래 두 가지 선택이 있을 수 있다.
첫 번째 ) 직접 이미지를 빌드하는 방법
두 번째 ) pod 내 airflow 가  init 할때 패키지 설치 명령어를 추가하는 방법

 

1)  dag-factory 를 설치완료한 이미지를 빌드하여 올려서 사용한다.

 

Docker file 예시 

FROM apache/airflow:2.8.2

RUN pip install dag-factory==0.19.0

ENTRYPOINT ["/usr/bin/dumb-init", "--", "/entrypoint"]
CMD []



 2) airflow chart values 파일 내 worker 가 bootstrap 할 때 패키지 설치 명령어를 추가한다.

workers:
  # Number of airflow celery workers in StatefulSet
  replicas: 1
  # Max number of old replicasets to retain
  revisionHistoryLimit: ~

  # Command to use when running Airflow workers (templated).
  command: ~
  # Args to use when running Airflow workers (templated).
  args:
    - "bash"
    - "-c"
    # The format below is necessary to get `helm lint` happy
    - |-
      python -m pip install --upgrade pip & pip install --no-cache-dir dag-factory && exec \
      airflow {{ semverCompare ">=2.0.0" .Values.airflowVersion | ternary "celery worker" "worker" }}

 

 

 

C.사용

절차
1 ) yaml 포맷의 워크플로우 파일 작성한다.
2) dagfactory 객체를 이용해 yaml 리소스를 로드할 python 파일을 작성한다.
3) 작성한 2개의 파일을 dag 폴더로 이동시킨다

 

1) yaml 포맷의 워크플로우 파일 작성하기

default:
  default_args:
    catchup: false,
    start_date: 2024-11-11

basic_example_dag:
  default_args:
    owner: "custom_owner"
  description: "this is an example dag"
  schedule_interval: "0 3 * * *"
  render_template_as_native_obj: True
  tasks:
    task_1:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 1"
    task_2:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 2"
      dependencies: [task_1]
    task_3:
      operator: airflow.operators.bash_operator.BashOperator
      bash_command: "echo 2"
      dependencies: [task_1]



2) python 파일 작성하기

import os
from pathlib import Path

# The following import is here so Airflow parses this file
# from airflow import DAG
import dagfactory

DEFAULT_CONFIG_ROOT_DIR = "/usr/local/airflow/dags/"

CONFIG_ROOT_DIR = Path(os.getenv("CONFIG_ROOT_DIR", DEFAULT_CONFIG_ROOT_DIR))

config_file = str(CONFIG_ROOT_DIR / "example_dag_factory.yml")

example_dag_factory = dagfactory.DagFactory(config_file)

# Creating task dependencies
example_dag_factory.clean_dags(globals())
example_dag_factory.generate_dags(globals())

 


3) 파일을 dag 디렉토리로 이동시킨다.

mv * /usr/local/airflow/dags/

 

** 이전에 포스팅한 GitSync 를 같이 이용할 수 있습니다. 그렇게 되면 dag 파일을 디렉토리로 이동할 필요 없이 git repository 로 이동하면 되는거죠. :) [3]

 

 

 

화면 참고용 스크린샷





5.References


[1]  https://github.com/astronomer/dag-factory
[2]  https://astronomer.github.io/dag-factory/latest/getting-started/quick-start-airflow-standalone/

[3] https://jssvs.tistory.com/101

반응형
반응형

1. Git-Sync란?

  • Git-Sync 는 Kubernetes 클러스터 내에서 git repository 를 동기화하는 sidecar 기능이다 (1)
  • airflow 와 같은 워크플로우 도구를 사용할 때, DAG 파일들을 git 에서 관리하고 있을 때 사용한다.
  • helm 을 통한 쿠버네티스 배포 시에만 지원하는 기능으로 보인다.

 

2. 왜 GitSync 기능을? 

  • 일반적으로 python 으로 작성한 airflow dag 파일은 airflow component(worker, scheduler) 에서 access 할 수 있는 파일 시스템에 저장되어야 한다.
  • git sync 기능을 사용하면, git repository 에 저장된 DAG 파일을 kubernetes pod 내부에 동기화 할 수 있다.
  • 반대로 git sync 를 사용하지 않는다면, DAG 파일을 kubernetes pod 에 복사하는 방식을 사용해야 한다.
    • s3 로 부터 주기적으로 sync 를 받는 sidecar 컨테이너를 개발
    • efs 와 같은 네트워크 파일 시스템을 사용 

 

3. Git-Sync 사용하기 

절차

  • git repository 를 생성한다.
  • 2가지 인증 방식 중 한 가지를 선택한다.
  • 선택한 인증방식의 credential 을 준비한다.
  • 인증 정보를 쿠버네티스 secret(시크릿) 으로 생성한다
  • helm chart 에서 git-sync 를 활성화 한다. (2)

 

인증 2가지 방식 소개 

  • SSH 프로토콜을 이용한 인증 방식 (Github 기준 )
  • HTTPS 프로토콜을 이용한 인증 방식

 

A.SSH  프로토콜을 이용한 인증 방식

SSH 키 생성

$ ssh-keygen -t rsa -b 4096 -C "jssvs@test.com"

# 복제할 키 문자열
$ cat ~/.ssh/id_rsa.pub

 

 

Git에 SSH 키 등록 

GIthub 키등록

 

쿠버네티스 시크릿 리소스 생성

kubectl create secret generic airflow-git-ssh-secret --from-file=gitSshKey=~/.ssh/id_rsa -네임스페이스

 

helm chart 값 작성

..... 생략
dags:
  gitSync:
    enabled: true
    repo: git@git repository url 
    branch: main
    rev: HEAD
    depth: 1
    maxFailures: 0
    subPath: ""

    sshKeySecret: airflow-git-ssh-secret
    
.... 생략

 

B.HTTPS 프로토콜을 이용한 인증 방식

PAT 생성 (Setting -> Developer settings -> Personal access tokens -> Generate new token)

Github PAT 생성

 

 

 

시크릿 yaml 작성하기
** 주의 : username 과 pat 는 base64 인코딩을 해서 저장해야 한다. 
** 또 주의: linux 에서 echo 를 이용할 경우 -n 옵션을 추가하여 뉴라인을 제거한다 

apiVersion: v1
kind: Secret
metadata:
  name: git-credentials
data:
  GIT_SYNC_USERNAME: github username
  GIT_SYNC_PASSWORD: PAT 비밀번호
  GITSYNC_USERNAME: github username
  GITSYNC_PASSWORD: PAT 비밀번호

 

 

helm chart값 작성

.....
dags:
  gitSync:
    enabled: true
    repo: https://git repository url 
    branch: main
    rev: HEAD
    depth: 1
    maxFailures: 0
    subPath: ""

    credentialsSecret: git-credentials
 .....

 

helm 을 이용한 설치 후 git-syn-init 컨테이너에서 아래와 같은 로그가 찍혔다면 성공이다.

 

 

 

동작 스크린샷

Git Repository

 

 

Sample Dag
Airflow Dag 등록 화면

 

 

4. Reference

(1) https://airflow.apache.org/docs/helm-chart/stable/manage-dags-files.html#using-git-sync

 

Manage DAGs files — helm-chart Documentation

 

airflow.apache.org

(2) https://artifacthub.io/packages/helm/apache-airflow/airflow

 

airflow 1.15.0 · apache-airflow/apache-airflow

The official Helm chart to deploy Apache Airflow, a platform to programmatically author, schedule, and monitor workflows

artifacthub.io

 

 

반응형
반응형
오늘은 쿠버네티스의 스테이트풀셋 리소스를 이용해 오픈소스 ElasticSearch 를 배포해보겠습니다.

스테이트풀셋은 디플로이먼트와 성질이 비슷하지만, 파드에 고유한 이름이 부여되고 여러 개의 파드 사이에 순서를 지정해서 실행할 수 있습니다.

ElasticSearch 는 RestAPI 기반으로 json 도큐먼트 데이터를 분산 저장하여, 빠르게 조회할 수 있는 검색 엔진이죠.

빅데이터 엔지니어링 분야에서 kibana, Logstash 와 함께 ELK 로 많이 사용되는 소프트웨어 스택이기도 합니다.

아래 과정을 통해 손쉽게 쿠버네티스로 배포해보겠습니다.

 

 

1. 준비 과정

  • 데이터 영속성을 보장하기 위해 영구 볼륨 (PersistentVolume)생성
  • ElasticSearch 애플리케이션 배포를 위한 스테이트풀셋 생성
  • ElasticSearch 파드를 외부로 노출하기 위한 Service(쿠버네티스의 서비스) 생성
  • curlImage로 기본 파드를 띄우고 Elastic Search 의 서비스 확인

 

2. Statefulset 이용해 ElasticSearch 배포

 
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: sf-es
  namespace: work
spec:
  replicas: 2
  serviceName: "my-es"
  selector:
    matchLabels:
      app: sf-es
  volumeClaimTemplates:
  - metadata:
      name: vol-es-data
    spec:
      accessModes: ["ReadWriteOnce"]
      storageClassName: gp3
      resources:
        requests:
          storage: 1Gi
  template:
    metadata:
      labels:
        app: sf-es
    spec:
      containers:
      - name: sf-es
        image: docker.elastic.co/elasticsearch/elasticsearch:7.8.0
        env:
        - name: discovery.type
          value: "single-node" # Elasticsearch 노드 타입 설정
        - name: cluster.name
          value: my-es # Elasticsearch 클러스터 이름 설정
        - name: network.host
          value: 0.0.0.0 # 모든 네트워크 인터페이스에서 수신하도록 설정
        ports:
        - containerPort: 9200
          name: es1 # HTTP 포트
        - containerPort: 9300
          name: es2 # Transport 포트
        volumeMounts:
        - name: vol-es-data
          mountPath: /usr/share/elasticsearch/data
        livenessProbe:
          httpGet:
            path: /
            port: 9200
          initialDelaySeconds: 60 # 초기 지연 시간
          periodSeconds: 10 # 주기 시간
          timeoutSeconds: 5 # 타임아웃 시간
          failureThreshold: 3 # 실패 허용 임계값
      initContainers:
      - name: init-container
        image: busybox:latest
        command: ["sh","-c","chown -R 1000:1000 /usr/share/elasticsearch/data"]
        securityContext:
          privileged: true # 루트 권한으로 실행
        volumeMounts:
        - name: vol-es-data
          mountPath: /usr/share/elasticsearch/data # 데이터 볼륨 마운트 경로
---
apiVersion: v1
kind: Service
metadata:
  name: svc-es
  namespace: work
spec:
  clusterIP: None
  ports:
  - port: 9200
    name: es1
  - port: 9300
    name: es2
  selector:
    app: sf-es

 

몇가지 설정에 대한 설명을 추가해보겠습니다.

디플로이먼트와 다르게 VolumeClaimTemplate 속성을 주면서 동적 프로비저닝을 할 수 있구요.

9200,9300 port는 엘라스틱서치가 사용되는 named port 입니다.

livenessProbe 속성값으로 실행된 컨테이너의 서비스 healthcheck 를 파드 차원에서 할 수 있습니다.

서비스 리소스를 생성하면 서비스 이름의 DNS 로 다른 파드에서 접근할 수 있습니다.

 

아래처럼 statefulset 을 배포하구요.

 

kubectl 또는 k9s 로 배포 상태를 확인해줍니다.

앞서 설명한 것 처럼 pod 의 이름에 임의의 문자열로 되어있지 않고, 이름과 인덱스로 고유한 이름을 갖고 있습니다.

 

 

3. 서비스 확인

별도의 파드를 띄워서 조금 전에 띄운 ElasticSearch 파드가 정상적으로 운영되는지, 접근이 되는지 확인해보겠습니다.

apiVersion: v1
kind: Pod
metadata:
  name: pod-conn-test
  namespace: work
spec:
  containers:
  - name: es-conn
    image: curlimages/curl
    command: ["sleep","3600"]

 

ElasticSearch는 RestAPI 기반으로 데이터를 핸들링할 수 있기 때문에, 리눅스 curl 이미지를 이용해 아래와 같이 테스트 파드를 띄웠습니다.

 

 

해당 파드의 컨테이너에 쉘에 진입해서 서비스 정보를 출력해봤습니다.

 

여기서 중요한 것은 ElasticSearch 의 DNS 입니다.

 

제가 만든 서비스의 이름으로 접근이 되는 것을 볼 수 있습니다.

 

 

미리 생성했던 인덱스도 조회해봤습니다.

 

추가적인 API 는 https://www.elastic.co/guide/en/elasticsearch/reference/current/rest-apis.html 에서 확인하실 수 있습니다.

 

 

4. 마치며

과거 프로젝트 진행으로 ElasticSearch를  여러 서버에 멀티클러스터링으로 직접 설치해본 경험도 있고, Docker 를 이용해 구축해본 경험도 있습니다.  쿠버네티스를 알게되고 나서 구축의 편의성을 크게 느낍니다..

느낌적으로 앞으로는 데이터엔지니어링의 서비스 배포 및 관리도 쿠버네티스로 많이 넘어갈 것 같아요.

 

 

오늘은 여기까지 쓰겠습니다.

 

끝 

 

 

반응형
반응형

쿠버네티스 데이터 엔지니어링을 시작한지 1년 정도가 지난 것 같습니다.

지금까지 학습하며 경험한 내용을 바탕으로 지식을 정리하면서 관련 포스팅을 시작하려고 합니다.

 

이론적인 내용도 다루겠지만, 시간이 되면 실제 운용 사례 기반으로 내용을 작성하려고 합니다.

이번 MySQL 배포도, 실제 서비스의 메타 데이터로 활용해봤는데 큰 문제가 없었습니다.

 

관리형 DB(예를 들어 AWS RDS 등) 를 사용하시는 분들께서 비용절감의 고민이 있으시다면, 게다가 쿠버네티스 환경이 준비된 분들이라면 스테이트풀셋을 이용한 MySQL 파드를 배포해서 사용하시는 것을 추천드립니다. 

 

스테이트풀셋을 이용해 MySQL 을 배포하는 방법도 가능합니다만 이번 글에서는 디플로이먼트를 이용해보겠습니다.

 

1. 준비 과정 

  • 데이터 영속성을 보장하기 위해 영구 볼륨 (PersistentVolume)생성
  • MySQL 애플리케이션 배포를 위한 디플로이먼트 생성
  • MySQL 파드를 외부로 노출하기 위한 Service(쿠버네티스의 서비스) 생성

** 아시는 것 처럼 쿠버네티스의 모든 리소스는 yaml 파일로 작성할겁니다.

 

2. 디플로이먼트를 이용해 MySQL 배포  

a. 데이터 영속성을 보장하기 위해 영구 볼륨 (PersistentVolume)생성

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: pvc-study-mysql
  namespace: work
spec:
  accessModes:
  - ReadWriteOnce
  storageClassName: gp3
  resources:
    requests:
      storage: 1Gi

** pvc 를 동적 볼륨 프로비저닝 하기 위해서는 스토리지클래스 에 provisioner 를 명시해줘야 합니다. 저는 미리 만들어둔 스토리지 클래스를 사용했고, 이 내용은 다음에 자세히 다루겠습니다.

 

b. MySQL 애플리케이션 배포를 위한 디플로이먼트 생성

 

apiVersion: apps/v1
kind: Deployment
metadata:
  name: study-mysql
  namespace: work
  labels:
    app: mysql
spec:
  replicas: 1
  selector:
    matchLabels:
      app: mysql
  template:
    metadata:
      labels:
        app: mysql
    spec:
      volumes:
      - name: study-volume
        persistentVolumeClaim:
          claimName: pvc-study-mysql
      containers:
        - name: study-mysql
          image: mysql:latest
          env:
          - name: MYSQL_ROOT_PASSWORD
            value: root
          - name: MYSQL_DATABASE
            value: test
          ports:
          - containerPort: 3306
          volumeMounts:
          - name: study-volume
            mountPath: /var/lib/mysql

 

 

c. MySQL 파드를 외부로 노출하기 위한 Service(쿠버네티스의 서비스) 생성

apiVersion: v1
kind: Service
metadata:
  name: svc-study-mysql
  namespace: work
spec:
  selector:
    app: mysql
  ports:
  - port: 3306

 

d. 서비스 노출 확인을 위한 파드 생성과 통신 테스트

apiVersion: v1
kind: Pod
metadata:
  name: mysql-client
  namespace: work
  labels:
    app: mysql-client
spec:
  containers:
  - name: mysql-client
    image: mysql:latest
    command: [ "sleep", "infinity" ] # 파드가 무기한으로 동작하도록 설정

 

 

** 위 3개의 리소스 yaml 파일을 kubectl 을 이용해 배포해줍니다.

 

3. 서비스 확인

a.파드의 상태 확인 

 

 

b.통신테스트를 위해 띄운 파드에 쉘에 진입해 서비스 노출 확인

 

 

c.mysql 파드에 쉘에 진입해 데이터베이스 확인

 

 

4. 마치며

configmap 을 이용해서 database 의 설정을 동적으로 적용하며 운영을 할 수 있습니다.

stastefulset 으로 배포하면 운영의 안정성을 높일 수 있습니다. ( 파드의 고유성이 부여되고 영구적인 호스트 이름이 부여됩니다.) 

다음에는  statefulset 을 이용해 엘라스틱 서치를 배포해보겠습니다.

반응형
반응형

 

 

안녕하세요. 오늘 다뤄볼 주제는 다이나믹 프로그래밍입니다.

 

 

위키에 따르면 동적 프로그래밍은 "복잡한 문제를 더 작은 하위 문제로 나누어 해결하는 알고리즘 설계 기법" 이라고 설명하고 있습니다.

 

나무위키에 조금 더 와닿는 설명이 있는데, 답을 재활용하는 것입니다. 앞에서 구했던 답을 뒤에서 이용하고, 옆에서도 이용해서 더 빠르게 답을 구하는 문제 해결 패러다임이라고 볼 수 있습니다.

 

가장 많이 사용하는 예제는 피보나치 수열입니다.

 

아래 피보나치를 재귀를 이용해 동적프로그래밍으로 구현한 코드입니다.

import time





def fibonacci(n):

    if n<=1:

        return n

    

    return fibonacci(n-1) + fibonacci(n-2)







def fibonacci_with_cache(n,cache={}):

    if n in cache:

        return cache[n]



    if n <=1:

        return n

    

    cache[n] = fibonacci_with_cache(n-1) + fibonacci_with_cache(n-2)



    return cache[n]





def benchmark():

    start_time = time.time()

    print(fibonacci_with_cache(40))

    end_time = time.time()

    execution_time = end_time - start_time

    print(f"Execution time: {execution_time:.6f} seconds")



# 102334155

# (fibonacci - Execution time: 21.109457 seconds



#102334155

# Execution time: 0.000057 seconds



benchmark()

 

 

위 예제에서 피보나치 함수를 2 가지 버전으로 만들었는데요. 하나는 일반 피보나치 함수이고, 다른 하나는 캐시라는 이름이 붙어있습니다.

fibonacci_with_cache 함수는 dictionary 타입(k,v) 의 변수를 캐시라고 가정해서 피보나치의 결과를 캐시에 저장했습니다.

이렇게 답을 구한 결과를 메모리에 저장하는 방법을 메모이제이션(memoization) 이라고 합니다..

 

응답속도의 결과를 비교해보면 차이가 꽤 납니다.

 

.

반응형

+ Recent posts