반응형

1. 쿠버네티스란?

  • 오픈 소스 플랫폼
  • 오케스트레이션 시스템
  • 컨테이너화된 워크로드와 서비스를 관리

는 공식 홈페이지에 소개 내용이지만, 내가 이해한 내용은 이렇다.

  • 런타임 의존성과 함께 애플리케이션을 패키징하는 컨테이너를 많이 쓰기 시작한다. -> 컨테이너 런타임을 지원하면 어디에서 실행하던 동일한 동작을 보장하니까
  • 하지만 컨테이너는 변경이 불가하다. 즉 애플리케이션의 업데이트가 발생하면 새로 이미지를 빌드 해야 한다.
  • 쿠버네티스는 이러한 경우 서비스 중단 없이 업데이트를 가능하게 해주고, 이 밖에 운영 측면의 스케일링, 리소스 관리 등을 해주는 녀석이다.

2. 왜 쿠버네티스를 사용하지?

  • high Availability = no downtime
  • Scalability
  • High Performance
  • Disaster recovery

위 장점때문에 사람들이 많이 사용하는 것 같은데 구성이 멀티 클러스터로 되어있기 때문이라고 생각한다면 저 장점을 더 이해하기 쉬울 것 같다.

 

3. 쿠버네티스 아키텍쳐

  •  
  • 일반적인 쿠버네티스 배포 → 쿠버네티스 클러스터 생성
  • 쿠버네티스 클러스터
    • 모든 클러스터는 최소 한 개의 워커 노드, 1개의 마스터 노드를 가짐
    • 워커 노드
      • 동작중인 파드를 유지시키고, 쿠버네티스 런타임 환경을 제공하며 모든 노드 상에서 동작
    • 컨트롤 플레인 = 마스터 노드
      • 워커노드와 클러스터 내 파드를 관리한다
      • 컨트롤 플레인이 여러 컴퓨터에 걸쳐 실행되고, 클러스터는 여러 노드를 실행하므로 내결함성과 고가용성이 제공됨

 

컨트롤 플레인

  • 주요 컴포넌트
    • kube-apiserver
      • 쿠버네티스 컨트롤 플레인의 프론트 엔드
      • UI, API, CLI 를 제공
    • etcd
      • 모든 클러스터 데이터를 담는 쿠버네티스 뒷단의 저장소
      • 키-벨류 저장소
      • kubernetes backking stroe
    • kube-scheduler
      • 노드가 배정되지 않은 새로 생성된 파드를 감지
      • 실행할 노드를 선택하는 컨트롤 플레인 컴포넌트
      • ensure pods placement
      • 서버의 리소스또한 감지한다. 몇번 노드에서 메모리 30% 쓰고 뭐 이런 정보들
    • kube-controller-manager ( cm )
      • 컨트롤러 프로세스를 실행하는 컨트롤 플레인 컴포넌트
      • 4가지 구성요소
        • 노드 컨트롤러: 노드가 다운되었을때 통지와 대응에 관한 책임
        • 레플리케이션 컨트롤러 : 시스템의 모든 레플리케이션 컨트롤러 오브젝트에 대해 알맞은 수의 파드들을 유지시켜주는 책임
        • 엔드포인트 컨트롤러 : 엔드포인트 오브젝트를 채움 ( 서비스와 파드 연결 )
        • 서비스 어카운트 & 토큰 컨트롤러 : 새로운 네임스페이스에 대한 기본 계정과 api 접근 토큰을 생성
        • keeps track of whats happening in the cluster
    • cloud-contgroller-manager (ccm)
      • 클라우드 별 컨트롤 로직을 포함하는 쿠버네티스 컨트롤 플레인 컴포넌트
      • 3가지 구성요소
        • 노드 컨트롤러 : 노드가 응답을 멈춘 후 클라우드 상에서 삭제되었는지 판별하기 위해 클라우드 제공 사업자에게 확인
        • 라우트 컨트롤러 : 기본 클라우드 인프라에 경로를 구성하는 것
        • 서비스 컨트롤러 : 클라우드 제공 사업자 로드밸런서를 생성, 업데이트, 그리고 삭제 하는 것

워커 노드

  • 주요 컴포넌트
    • kubelet
      • 클러스터 각 노드에서 실행되는 에이전트
      • 파드에서 컨테이너가 동작하도록 관리
    • kube-proxy
      • 클러스터의 각 노드에서 실행되는 네트워크 프록시
      • 노드의 네트워크 규칙을 유지 관리
    • 컨테이너 런타임
      • 컨테이너 실행을 담당하는 소프트 웨어
      • 컨테이너 런타임 인터페이스를 구현한 모든 소프트웨어 지원
반응형
반응형

1. Pyspark?

파이썬으로 스파크를 사용하기 위한 목적으로 만들어진 인터페이스이다.

파이썬 API 를 이용할 수 있으면서, 스파크 애플리케이션을 작성할 수 있다. 

interactive 하게 pyspark shell 도 제공한다.

Spark SQL, DataFrame, streaming , MLlib 그리고 Spark Core 를 지원한다.

 

2. 왜 PySPark?

java, scala 로 작성된 코드가 성능이 조금 더 뛰어나다고 알려져 있다. 

하지만 빅데이터 진영에서는 파이썬이 친숙한 사람이 많고, 생산성이 더 난다고 생각한다.

 

3. Pyspark 기초 -  Dataframe 생성 및 다루기

** 개발할 때 비교적 자주 사용되는 함수만 다룰 것 이다.

- 조건 조회와 출력, 그리고 withColumn 은 정말 많이 사용되는 것 같다.

 

from pyspark.sql.types import *
from pyspark.sql.functions import col,lit

# 멤버 컬럼
member_columns = ['group_cd','join_dt','name','login_cnt']

# 멤버 데이터
member_data = [
    ('k-00001','2021-07-31','name1',10),
    ('k-00002','2021-06-31','name2',10),
    ('k-00003','2021-08-31','name3',10),
    ('k-00004','2021-03-31','name4',12),
    ('k-00001','2021-04-31','name5',11),
    ('k-00002','2021-05-31','name6',15),
]

# 데이터 프레임 생성
member_df = spark.createDataFrame(data=member_data,schema=member_columns)

#스키마 구조 출력 
member_df.printSchema()

# withColumn  - 새 컬럼 추가. literal 로 초기화해줘야 한다.
member_df= member_df.withColumn("use_flag",lit(False))

member_df.printSchema()

 

#pyspark 에서 countDistinct 사용을 위해서는 lib 를 선언 해줘야 하다니..
from pyspark.sql.functions import countDistinct
from pyspark.sql import functions as F


# 행 출력
member_df.show(10,truncate = False)

# 특정 컬럼 출력
member_df.select('join_dt','name','group_cd').show(10)

# groupby 출력
member_df.groupBy("group_cd").count().sort("count",ascending=True).show()

# filter 조건 출력 
member_df.filter(member_df.login_cnt > 10).show()

# filter 조건을 복수 개로 줘보자
member_df.filter(
    (F.col("group_cd") == "k-00001") &
    (F.col("login_cnt") > 10)
).count()


# where 조건 출력
member_df.where(my_df.acntid=='name1').show()

member_df.filter("name == 'name1'").show()

 

반응형
반응형

1.EDA란?

Exploratory Data Analysis 의 줄임말로, 탐색적 데이터 분석이란 의미다.

내가 이해한바로는 데이터에서 유의미한 인사이트를 찾기 위해 다각도로 시각화하는 작업을 포함한 분석 정도로 이해했다.

 

2.왜 EDA 툴을 사용하지?

EDA 작업은 시간과 노력이 많이 들어간다. 파이썬 패키지를 통해 코드 몇줄로 이러한 시간 및 고생을 줄일 수 있다.

 

3. 레포트 생성

 a) 사전 준비 하기

  • DataPrep 이라는 패키지를 이용해서 레포트를 만들어 볼 것이다.
  • jupyter 노트북이 설치 되어 있어야 한다
  • pandas,dataprep 이 설치 되어 있어야 한다.
# pandas 설치
$ pip install pandas


# dataprep 설치 https://pypi.org/project/dataprep/
$ pip install -U dataprep
  • 데이터를 준비한다
    • 공공 데이터 포털에서 쉽게 얻을 수 있다.  ( https://www.data.go.kr/index.do )
    • csv 파일로 준비하면 좋다. 엑셀이나 다른 파일도 가능은 한데 pandas로 읽기 위한 작업을 추가로 해야 할 것이다.

b)  데이터 읽기 

  • 공공 데이터 포털에서 받았다면, csv 를 읽어 pandas 로 Load 할 때 인코딩 타입은 cp949 로 설정해야 문제 없이 실행될 것이다
  • 패키지 import와 데이터를 읽어서 눈으로 확인해보자

c) 레포트 생성하기

  • create_report 함수를 이용해서 레포트를 생성한다.
  • 자동으로 생성된 레포트에는 데이터의 기초 통계, 변수별 통계, 상관관계 및 Interactions 차트 등 을 확인 할 수 있다. 

4. 코드 

# 설치 https://pypi.org/project/dataprep/
$ pip install -U dataprep

# 패키지 선언
from dataprep.eda import create_report

#csv 읽기, dataframe 생성하기
sample_df = pd.read_csv('sample_data.csv',encoding='cp949')

#리포트 생성하기
create_report(sample_df)

 

반응형

'Data Engineer' 카테고리의 다른 글

kubernetes 기초 (1)  (0) 2021.12.01
pyspark 기초 (1)  (0) 2021.10.03
[airflow] HA - health 체크를 이용한 운영  (0) 2021.06.05
Elasticsearch (ES) 기초 (1) - CRUD  (0) 2021.06.03
Apache Spark 기초 (2)  (0) 2021.06.02
반응형

1. HA (High Availability)

고가용성이라고도 하는데, 얼마나 운영이 오래될 수 있는가에 대한 성질이다. 에어플로우 또한 고가용성을 높이기 위해 클러스터로 운영하기도 하고, 나의 경우처럼 어떤 대안을 생각하기도 해야 한다.

2. 왜 헬스체크 (HealthCheck)를 ? 

단일 서버로 에어플로우를 운영해보니, 스케쥴러 프로세스가 내려가는 일이 자주는 아니지만 4~5개월에 한번 씩 간헐적으로 발생했다. ( 마지막 로그에는 메세지 큐로 이용중인 레디스와의 커넥션 타임아웃 로그가 마지막에 찍혀있었다. )

 

워크플로우의 작업 누락을 막으려면 어떤 조치가 필요했다. 에어플로우에서는 Rest API 로 헬스체크 기능을 제공했다.

>request
curl -XGET "http://에어플로우 서버 :5000/health"

>response
{
    "metadatabase": {
        "status": "healthy"
    },
    "scheduler": {
        "status": "healthy",
        "latest_scheduler_heartbeat": "2020-04-13T01:38:51.283786+00:00"
    }
}

3. go ticker를 이용하여 스케쥴러 알림 및 실행

일정 시간 마다 헬스 체크 Rest API로 상태를 받아와 스케쥴러가 UnHealthy 상태로 내려갈 때, 알림을 주거나 프로세스를 다시 올리는 프로그램을 만들어 볼 수 있다.

 

물론 작성한 프로그램이 죽어버리면 안되기 때문에, 서버에 sudo 권한이 있다면 ticker 부분을 제외하고 crontab 으로 걸어 더 안전하게 운영을 가져갈 수 있다.

 

더 좋은 방법은 managed service 를 이용하는 것이다. aws, gcp 모두 검색해보면 서비스가 있다.

package main

import (
	"net/http"
	"fmt"
	"time"
	"bytes"
	"io/ioutil"
	"github.com/multiplay/go-cticker"	// ref. https://github.com/multiplay/go-cticker
)

type AFStatus struct {
    Metadatabase  Status  `json:"metadatabase"`
    Scheduler Status `json:"scheduler"`
}

type Status struct {
    Status  string `json:"status"`
    Latest_scheduler_heartbeat  string `json:"latest_scheduler_heartbeat"`
}

func sendWatchAlert(msg string){
	// 장애 처리 부분 작성 
}

func healthCheck(){
    resp,err:=http.Get("https://myairflow_host/health")
    msg_tmpl:="[장애] airflow healthcheck ..\\n\\nscheduler status : {scheduler_status} \\nmetadatabase status : {metadatabase_status}\\nLatest_scheduler_heartbeat: {Latest_scheduler_heartbeat}"
    var  af  AFStatus
    if err!=nil{
        fmt.Println("[E] http request error .. ")
        fmt.Println(err)
    }

    if(resp.StatusCode != 200) {
        sendWatchAlert("[장애] airflow healthcheck \\n\\nwebserver status: unHealthy")
        fmt.Println("[E] http response 200 error .. ")
        return
    }
    buf := new(bytes.Buffer)
    buf.ReadFrom(resp.Body)
    newStr := buf.String()
    err = json.Unmarshal([]byte(newStr),&af)
    if err != nil {
        fmt.Println("[E] Unmarshal  Error .. ")
        fmt.Println(err)
    }

    msg:=strings.Replace(msg_tmpl,"{scheduler_status}",af.Scheduler.Status,1)
    msg=strings.Replace(msg,"{metadatabase_status}",af.Metadatabase.Status,1)
    msg=strings.Replace(msg,"{Latest_scheduler_heartbeat}",af.Scheduler.Latest_scheduler_heartbeat,1)

    if af.Scheduler.Status != "healthy" || af.Metadatabase.Status != "healthy" {
        sendWatchAlert(msg)
    }
}



func main() {
    fmt.Println("[L] airflow health checker start..")
	
//  interval:=30
//  flag.IntVar(&interval,"-i",180,"ticker interval")
//  flag.Parse()

    healthCheck()
    t := cticker.New(time.Second * 60 ,time.Second) //duration , accuracy
    for tick := range t.C {
        // Process tick
        healthCheck()
        //fmt.Println("tick:", tick)
    }
}

ref. https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/check-health.html

 

Checking Airflow Health Status — Airflow Documentation

 

airflow.apache.org

 

 

반응형
반응형

1. ES 데이터 구조

  • Document
    • 데이터 최소 단위
    • 관계형 Database 에서 Row 단위
  • Type
    • 관계형 Database에서 table 단위
    • 1Type = N개 * Documents
  • Index
    • 관계형 Database에서 DB단위
    • 1 Index = N개 * Types

**ES 7.0 버전 부터는 도큐먼트 타입 개념이 사라짐. 우리는 6.x 버전 대네요.. (업그레이드도 가능 )

2.도규먼트 삽입

PUT 방식 (document id를 선택해서 삽입할 때)
curl -XPUT http://localhost:9200/books/book/1 -d
'{
"tile": "Nesoy Elastic Guide",
"author": "Nesoy",
"date": "2019-01-15",
"pages": 250
}'


PUT 방식 (document id를 선택해서 삽입할 때)
http://localhost:9200/jayden_idx01/jayden_type01/1
{
  "name":"jayden",
  "message":"안녕하세요 Elasticsearch"
}

POST 방식 (document id를 임의로 줄때)
http://localhost:9200/jayden_idx01/jayden_type01
{
  "name":"jayden",
  "message":"test1"
}

3.도큐먼트 조회

POST 전체 조회 (인덱스 별)
http://localhost:9200/jayden_test01/_search
{
  "query": {
    "match_all": {}
  }
}



POST /인덱스/_search?pretty 조건 조회
http:/localhost:9200/jayden_idx01/_search
{
  "query": {
    "bool": {
      "must": [
        { "match": { "name": "jayden" } }
      ]
    }
  }}

POST
http://localhost:9200/jayden_idx01/_search
{ 
   "query" : {
        "term" : { "name" : "jayden" }
    }
}


POST 정렬 조건 조회
http://localhost:9200/jayden_idx01/_search
{
"size":1,
"sort":[
{ "@timestamp" : { "order":"desc"} }
],
"query":{
"term":{ "조건 필드": "mill" }
}

}

4.도큐먼트 삭제

DELETE /인덱스
http://localhost:9200/jayden_idx01/

 

반응형
반응형

스파크 DF, DS, SQL

  • 효율적인 저장옵션, 옵티마이저, 직렬화된 데이터에 대한 직접적인 연산 등을 지원한다
  • DF와 DS 는 특별한 Row 객체를 가지며, 컴파일시 타입 체크를 제공하지 않는다.
  • DataFrame 이란 특화된 데이터 표현방식, 컬럼 기반 캐시 포맷을 갖고 있다.

SparkSession

  • SparkContext 가 스파크 애플리케이션의 시작점이라면 SparkSession은 SparkSQL 의 시작점을 제공한다.
  • 빌터 패턴에 의해 생성 되며, getOrCreate()가 마지막에 온다
  • config(key,value) 로 문자열 기반의 설정을 할 수 있다.
  • 일상적인 인자들에 대해 미리 지정해 놓는 쇼커트(shortcuts) 도 존재한다.
  • enableHiveSupport() 는 hiveUDF 를 쓸수 있게 해준다. (추가적인 jar 파일 필요)
    • val session = SparSession.builder().enableHiveSupport().getOrCreate()
  • 모든 스파크 코드가 SparkSession 을 쓸수 있게 업데이트 되어 있진 않다.
  • SparkSession > HiveContext > SQLContext 순으로 사용하는것을 고려한다.

스파크 SQL 의 의존성

  • 스파크 SQL 과 하이브 컴포넌트 추0가가 필수다
    • spark-sql, spark-hive
  • 하이브 의존성을 애플리케이션에 포함할 수 없다면, SQLContext 를 생성해서 쓸 수있다.
    • val sqlContext = new SQLContext(sc)
    • import sqlContext.implicits._
  • 하지만 하이브 기반의 특정 사용자 정의함수(UDF)나 사용자 정의 집계함수(UDAF)를 쓸 수 없다.

스키마의 기초

  • RDD, DataSet은 정해진 타입이 없다
  • 스키마는 스파크 SQL에 의해 자동으로 처리되며, 데이터를 로딩할 때 스키마 추측이 이루어지거나 부모 DataFrame과 저용된 트랜스포메이션에 의해 계산된다

DataFrame API

  • 트랜스포메이션
    • RDD 트랜스포메이션과 유사하지만 관계형에 가까운 형태다.
    • 단일 DataFrame, 다중 DataFrame , 키/값, 그룹화/윈도화 트랜스포메이션으로 분류할 수 있다.
  • DataFrame filter 연산
    • 람다 함수 대신 스파크 SQL 표현식을 받아들인다
    • df.filter(조건식)
  • 스파크 SQL 스칼라 연산자
    • !== : not equal
    • && : and
    • === : equal (널 값에 안전하지 않음)
    • <==> : eqNullSafe (널값에 안전함)
  • 스파크 SQL 표준함수
    • lit(값) : 스칼라 심벌을 컬럼 리터럴로 바꾼다.
    • isNaN : 숫자가 아닌 경우를 체크한다
  • 필터와 함꼐 isNan 이나 isNull을 써서 유실 데이터 관리를 범용적으로 할 수 있다
  • 스파크 SQL 은 dropDuplicates() 로 중복되지 않은 레코드만 뽑아낼 수 있다
    • RDD 에서 이런 유의 연산(distinct) 는 셔플을 요구하게 되고 종종 filter를 쓰는것보다 느리다.
  • 스파크 SQL 에서의 집계연산은 RDD 에서 동일한 작업을 하는 것보다 간편하다.
  • 윈도화 - 노이즈 데이터를 포함하는 평균속도 계산, 상대적인 매출 계산 등에 매우 유용하게 쓰인다.
  • 비용에 따른 집합연산들
    • unionAll : 낮음
    • intersect : 높음
    • except : 높음
    • distinct : 높음
  • 하이브 데이터와 상호 연동
    • 하이브 메타스토어에 연결이 되어 있다면, SQL 질의를 직접 작성하거나 임시 테이블로 등록할 수 있다.
    • df.registerTempTable("MyTable")
    • df.write.saveAsTable("perm_MyTable")
    • sqlContext.sql("select * from MyTable")
  • 다중 DataFrame 트랜스포메이션
    • unionall, intersect, except, distinct 와 같은 DataFrame 끼리의 유사 집합 연산을 말한다
  • DataFrame의 테이블 등록
    • df.registerTempTable("테이블이름") 을 통해 sql 질의를 작성하여 사용할 수 있다.

DataFrame과 Dataset 에서의 데이터 표현

  • 텅스텐
    • 바이트 단위 레벨에서 직접 동작하는 스파크 SQL의 새로운 컴포넌트
    • 자바나 크리오 직렬화를 쓰는것보다 데이터 용량을 적게 쓸수 있다.
    • 자바 객체 기반이 아니기 때문에 온힙과 오프힙을 모두 지원한다.
    • 스파크 1.5에서 기본으로 탑재

데이터 적재, 저장함수들

  • DataFrameWriter, DataFrameReader
    • json, jdbc, orc, parquet 메서드들은 reader/writer에서 직접적으로 정의되어 있고, 경로나 연결 정보를 인자로 받는다.
  • JSON
    • 다른 데이터 소스들에 비해 비용이 많이 든다
    • 스파크 SQL은 데이터를 샘플링하여 스키마를 추측한다.
    • api 함수로 스키마 판별을 위해 데이터를 얼마나 읽을 것인지 설정이 가능하다.
      • df = session.read.format("json").option("samplingRatio","1.0").load(path)
    • 입력 데이터가 걸러 내야 할 잘못된 json 레코드를 갖고 있을 경우를 위해 단순히 문자열의 rdd로 읽어 올 수도 있다.
      • rdd:RDD[String] = input.filter(_.contains("panda"))
      • df = session.read.json(rdd)
  • JDBC
    • 여러 데이터베이스 벤더들의 jar 파일을 추가해야한다.
    • spark-submit 으로 실행한다면 --jars 옵션으로 포함시킬 수 있다.
  • 파케이
    • 아파치 파케이는 여러 파일로 쉽게 분할, 압축, 중첩 타입 등의 다양한 기능을 제공한다.
    • 공간 효율성이 뛰어나다.
  • 하이브 테이블
    • df=session.read.table("table")
    • df.write.saveAsTable("table")
  • RDD
    • DataFrame 에서 RDD로 변환하는 것은 트랜스포메이션이다.(액션이 아니다.)
    • RDD를 DataFrame이나 DataSet으로 변환하는 일은 연산 혹은 일부의 샘플링을 필요로 한다.

DataSet

  • 컴파일 타임 타입체크를 제공하는 스파크 SQL의 확장 기능 ( DataFrame은 컴파일 타임 타입 체크가 빠져있다.)
  • DataSet API = 관계형(DataFrame) + 함수형(RDD)이 섞인 강타입 컬렉션 API
  • DataFrame, DataSet, RDD 간의 변환 방법
    • ds=df.as[ElementType]
    • df=ds.rdd
    • df=ds.DF()
  • rdd와 유사한 함수형 트랜스포메이션 지원
    • filter, map , flatMapm, mapPartitions 등의 함수를 제공한다.
    • ds.map{ rp=>rp.attributes.filter(_>0).sum}
  • 관계형 트랜스포메이션 또한 지원한다.
    • ds.select($"id".as[Long], ($"attributes"(0) > 0.5).as[Boolean])
    • intersect, union, substract 등의 표준 집합 연산들도 사용할 수 있다.
  • 타입 안정성이 강점이지만, DataSet Api 가 계속 진화 중이므로 추후 스파크 버전 업데이트 시 코드 수정이 필요할 수 있다.
반응형
반응형

1. library 설치

  • 커버로스 인증을 타고 하이브에 접근을 할 경우 아래 패키지들이 설치되어 있어야 한다.
--os레벨 설치 패키지

$ yum install libsasl2-dev
$ yum install cyrus-sasl-devel
$ yum install cyrus-sasl-gssapi


--python 라이브러리 패키지
impyla==0.16.2
PyHive==0.6.1
PyMySQL==0.9.3

sasl==0.2.1
thrift==0.10.0  -> issue 시 pip install thrift==0.9.3
thrift-sasl==0.4.1
thriftpy==0.3.9
thriftpy2==0.4.10


 

2. pyhive 연동 예제 코드

from pyhive import hive

conn = hive.Connection(host='hadoop.host.io', port=10000, database='jssvs_tmp',auth='KERBEROS',kerberos_service_name='hive')

    cursor=conn.cursor()
    cursor.execute('s select * from hive_tbl')
    for r in cursor.fetchall():
        print(r)

 

3. ph2 연동 예제 코드

import pyhs2
 
with pyhs2.connect (host='hadoop.host.io',
                port=10000,
                authMechanism='KERBEROS') as conn:
        with conn.cursor() as cur:
        #Show databases
                print cur.getDatabases()
 
        #Execute query
                cur.execute("select * from table")
 
        #Return column info from query
                print cur.getSchema()
 
        #Fetch table results
        for i in cur.fetch():
                print i
  
  
---- or --------------
  
import pyhs2
conn = pyhs2.connect(host='hadoop.host.io', port=10000, authMechanism='KERBEROS' )
cursor = conn.cursor()
print cursor.getDatabases()

 

반응형
반응형

Docker 이용하여 mysql, redis 컨테이너 올리기

$ mkdir /usr/local/db_data
$ mkdir /usr/local/redis_data

$ docker pull mysql:8.0
$ sudo docker run -d -p 3306:3306 -e MYSQL_ROOT_PASSWORD=test --volume /usr/local/db_data/:/var/lib/mysql --name mysql_bi mysql:8  --default-authentication-plugin=mysql_native_password 

$ docker pull redis:5.0
$ sudo docker run -d --name redis_bi -p 6379:6379 --volume /usr/local/redis_data/:/data -e REDIS_PASSWORD=airflow redis:5.0

$ sudo docker ps

virtualenv 설치 및 가상 환경 만들기

# Python 을 이용하여 virtualenv 설치
$ python3 -m pip install virtualenv virtualenvwrapper --user

# 가상 환경 만들기
$ mkdir myenv
$ virtualenv --python=python3.5 myenv

#아래 명령어로도 가능하다
$ virtualenv -p `which python3` venv 

# 가상환경 진입
$ source myenv/bin/activate

#가상환경 빠져 나오기
$ deactivate

airflow 설치 및 서버 동작 스크립트

$ source myenv/bin/activate
$ pip install --upgrade pip
$ pip install pymysql

# airflow
pip install apache-airflow[mysql,redis,celery]==1.10.7

# 에어플로우 홈 설정
$ mkdir myenv/airflow_home
$ export AIRFLOW_HOME=`pwd`/airflow_home # virtualenv 바깥에서도 동일하게 설정

# 에어플로우 데이터베이스 초기화
$ airflow initdb
$ airflow resetdb #load_examples False 가 적용이 안될때 다시 db 리셋

# 웹서버 구동
$ airflow webserver -p 5000
$ airflow scheduler
$ airflow worker

$ airflow flower

 

airflow.cfg 설정 파일 수정

$ vi airflow.cfg

dags_folder = /home/deploy/work/airflow/dags

# The folder where airflow should store its log files
# This path must be absolute
base_log_folder = /home/deploy/work/airflow/logs

# executor = SequentialExecutor
executor = CeleryExecutor

# sql_alchemy_conn = sqlite:////home/airflow/airflow/airflow.db
sql_alchemy_conn =  mysql+pymysql://airflow:airflow@127.0.0.1:3306/airflow

# catchup_by_default = True
catchup_by_default = False

# broker_url = sqla+mysql://airflow:airflow@127.0.0.1:3306/airflow
broker_url = redis://airflow@127.0.0.1:6379/0

# result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
result_backend = db+mysql://airflow:airflow@127.0.0.1:3306/airflow

# load_examples = True
load_examples = False

 

airflow 자주 쓰는 커맨드

# print the list of active DAGs
airflow list_dags

# prints the list of tasks the "tutorial" dag_id
airflow list_tasks jayden_tutorial

# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks jayden_tutorial --tree


# command layout: command subcommand dag_id task_id date
$ airflow test jayden_tutorial print_date 2020-03-02

 

다음에는 Hive Operator 연동을 올리겠다. :D 

반응형
반응형


스파크에 대해
  • 하나의 서버에서 처리할 수 있는 수준 이상의 대용량 데이터 처리를 가능하게 해준다
  • 고수준 API 를 제공한다
  • 동종 시스템 중 가장 빠른 축에 속한다.
  • 스파크는 스칼라로 쓰여져 있다. 
  • 스파크는 함수형 프레임워크이며, 불변성이나 람다 정의 같은 개념에 크게 의존하고 있기 때문에 함수형 프로그래밍 언어인 스칼라가 좋다
  • 자바 API 보다 훨씬 사용하기 쉽다
  • JVM 과의 통신 비용이 랭기지중 가장 좋다. (객체 변환에 드는 비용정도)


스파크의 독보적 장점
  • 메모리 기반 처리
  • 지연 평가 ( lazy operation )

스파크 위치
  • JVM 위에서 연산을 수행하는 것일 뿐, 데이터 저장 솔루션은 아니다
  • 클러스터 매니저 종류 : 단독 클러스터매니저(StandAlone Cluster Manager), 아파치 메소스, 하둡 얀

스파크 컴포넌트
  • 스파크 코어 - Java, Scala, Python, R 로 API 제, RDD 와 RDD API, JVM 사이에 데이터를 읽고 쓰는 I/O 제공
  • 스파크 SQL - DataFrame ( 반구조화의 데이터 타입을 위한 인터페이스)  제공
  • 스파크 스트리밍 등

병렬 연산 모델 : RDD
  • 스파크의 드라이버 혹은 마스터 노드를 위한 프로그램을 사용자가 만들어야 한다
  • RDD는 익스큐터 혹은 슬레이브 노드에 저장된다
  • RDD를 구성하는 객체는 파티션이라 하며, 경우에 따라 다른 노드에서 계산될 수 있다.
  • 클러스터 매니저는 애플리케이션에서 설정한 파라미터에 따라 익스큐터를 실행하고 분산해 주는 역할을 한다
  • 드라이버가 RDD 데이터의 처리 계획을 결정하면 실행을 지연하고, 최종 RDD를 계산해야 하는 시점에 실행된다.( 보통 저장 장치에 써야 할 시점이나 결과 데이터를 드라이버에 보내야 할 때)

지연평가
  • RDD의 연산 종류는 transformation 과 action 이 있다.
  • 액션은 데이터를 드라이버로 되돌려주든지(count, collect 등) 데이터를 외부 저장 시스템에 쓰는것(CopyToHadoop) 등의 일이다.
  • 액션은 스케쥴러를 시작하게 하며, RDD 트랜스포메이션 간의 종속성에 바탕을 둔 DAG 를 생성한다.
  • 트랜스포메이션의 로직 에러가 발생할 때, 지연 평가 때문에, 액션이 수행된 지점에서 실패한 것으로 나타나는 경우에 유의하자
    • word count 프로그램에서 null pointer exception 이 발생한다고 가정할때, 코드가 contains를 체크하는 시점이 아니라 collect 단계에서 예외가 발생한다.

메모리 영속화 & 메모리 관리
  • 맵리듀스와 비교해 스파크 성능상 이점은 반복 연산이 들어있는 사례이다.
  • 스파크가 처리하는 데이터를 디스크에 기록하지 않고 익스큐터 메모리에 데이터를 로드해 놓는 것이다.
  • 메모리 관리의 3가지 옵션
    1. 메모리에 직렬화되지 않은 자바 객체 - RDD 객체를 직렬화 하지 않고 그대로 저장한다. ( 직렬화 비용이 안드는 대신 메모리 공간 사용이 비 효율이다.)
    2. 메모리에 직렬화된 데이터 - RDD 객체를 네트워크 전송이 가능한 바이트 스트림으로 변환한다. ( 데이터를 읽는데 CPU 가 더 많이 사용되므로 접근방식은 더 느리지만 메모리 공간 사용 측면에서 효율적이다. 크리오(Kryo) 직렬화를 쓰면 공간 측변에서도 효과적이다.)
    3. 디스크 - 익스큐터 램에 담기에 파티션이 큰 RDD 라면 디스크에 데이터를 쓸 수 있다. ( 반복 연산시 속도 면에서 불리하지만 장애에 안전하다.)
  • persist() 의 기본 옵션은 RDD를 메모리에 직렬화되지 않은 상태로 저장한다.

불변성과 RDD 인터페이스
  • RDD는 정적인 타입인 데다 불변한 성격을 가지고 있어, Transformation 을 호출하는 것이 새롭게 정의한 속성들을 가진 새로운 RDD를 리턴하는 행위다.
  • RDD 생성 방식
    • 기존 RDD에 Transformation 호출
    • SparkContext 객체로부터 생성
    • DataSet이나 DataFrame을 변환 ( 이것들은 SparkSession 으로 부터 만들어짐)
  • SparkContext는 스파크 클러스터와 실행중인 스파크 애플리케이션 하나와의 연결을 나타낸다.
  • RDD 속성을 알 수 있는 함수
    • partitions() - 분산 데이터 셋의 부분들을 구성하는 파티션 객체들의 배열을 리턴한다. getPartition()의 결괏값과 같다
    • iterator(p,parentIters) - 각각의 부모 파티션을 순회하는 반복자가 주어지면 파티션 p의 구성요소들을 계산해낸다.
    • dependencies() - 종속성 객체의 목록을 리턴한다. 스케쥴러가 현재의 RDD가 어떤식으로 다른 RDD에 종속될지 알려준다.
    • partitioner() - element 와 partition 사이에 연관되는 함수를 갖고 있는 RDD라면 스칼라의 option 타입으로 partitioner 객체를 리턴한다. 
    • perferredLocations(p) - 파티션 p의 데이터 지역성에 대한 정보를 리턴한다. p가 저장된 각 노드의 정보를 문자열로 표현한 시퀀스를 리턴한다
RDD의 종류
  • RDD는 정적인 타입인 데다 불변한 성격을 가지고 있어, Transformation 을 호출하는 것이 새롭게 정의한 속성들을 가진 새로운 RDD를 리턴하는 행위다.


넓은 종속성 vs 좁은 종속성
  • 종속성이 넓으냐 좁으냐는 트랜스포메이션 평가에 중요한 영향을 끼치며 성능에도 크게 작용한다.
  • 좁은 종속성
    • 자식 RDD 의 각 파티션이 부모 RDD의 파티션들에 대해 단순하고 한정적인 종속성을 갖는다
    • 부모가 최대 하나의 자식파티션을 갖는 경우
    • map, filter, mapPartitions 등의 연산이 이 패턴을 따른다.
  • 넓은 종속성
    • 자식 RDD가 다수의 부모 RDD의 파티션들과 관계를 맺고 있는 경우
    • groupbykey, sort, reducebykey 등과 같이 Shuffle 을 일으키는 함수가 이 패턴을 따른다.
    • 셔플 비용이 가장 크다.
  • map 은 파티션 간 이동이 없는 연산, coalesce 는 파티션을 합치는 연산으로 파티션 개수를 줄이는 목적의 함수이다.
  • join 함수는 두개의 부모 RDD 가 어떻게 파티션되었는지에 따라 좁거나 넓은 종속성을 가질 수 있다.


스파크 잡 스케쥴링
  • 잡 실행 과정
    • 스파크 프로그램 자체는 드라이버 노드에서 실행되며 일련의 명령들을 익스큐터에게 보낸다.
    • 애플리케이션들은 클러스터 매니저에 의해 스케쥴링되고, 각각 하나의 SparkContext를 가진다.
    • 스파크 애플리케이션들은 공존하는 여러 개의 잡을 차례로 실행할 수 있다.
    • Job들은 애플리케이션의 한 RDD가 호출하는 각 액션에 대응한다.
  • 자원할당
    • 정적할당과 동적 할당이 가능
  • 스파크 애플리케이션
    • 잡들은 드라이버 프로그램의 SparkContext에 정의되어 있다.
    • SparkContext가 생기면 스파크 애플리케이션이 구동한다.
    • SparkContext를 실행하면 드라이버와 익스큐터들이 각 작업 노드에서 구동된다.
    • 각 익스큐터는 JVM을 가지며 한 노드에 여러 개의 익스큐터가 존재할 수 있다.
    • 스파크 잡이 실행될 때 익스큐터는 RDD를 계산할 태스크 실행을 위한 슬롯을 가진다.
  • 스파크 잡 해부
    • 각 액션마다 스파크 스케쥴러는 실행 그래프를 만들고 스파크 잡을 실행한다. 
    • 각 잡은 최종 RDD를 만들어 내는데 필요한 데이터 변환의 각 단계를 의미하는 스테이지(Stage)들로 구성된다.
    • 각 스테이지는 각 병렬 연산의 한 단위를 의미하며 익스큐터들 위에서 실행되는 다수의 태스크(Task)들로 구성된다.
    • Spark Aplication -> 잡 -> 스테이지1 & 스테이지2 ... -> 태스크 1& 태스크 2... 
    • SparkContext / SparkSession -> 액션 -> 넓은 포메이션 -> 하나의 파티션 평가를 위한 연산 
    • 스파크 실행 구성에서 가장 높은 단계
    • 하나의 잡 = 하나의 액션에 대응하며, 액션은 스파크 애플리케이션의 드라이버의 프로그램에서 호출한다.
  • 스테이지
    • 잡의 부분들을 스테이지로 정의한다.
    • 넓은 트랜스포메이션에 의해 생성되는 셔플 의존성에 대응한다.
    • 하나의 스테이지는 다른 익스큐터나 드라이버와의 통신 없이 계산 가능한 태스크들의 집합으로 생각할 수 있다.
  • 태스크
    • 하나의 스테이지는 여러 개의 태스크로 이루어진다.
    • 실행 계층에서 가장 작은 단위이며, 익스큐터가 태스크 실행을 위해 동적으로 할당된 여러개의 슬롯을 가진다.
    • 스테이지당 태스크의 개수는 해당 스테이지의 결과 RDD의 파티션 개수에 대응된다.
    • 익스큐터 코어 개수의 총합 = 익스큐터당 코어의 개수 X 익스큐터 개수를 쓰면 sparkConf로 부터 동시 실행의 태스크 개수를 구할 수 있다.
    • 태스크 분산 과정은 TaskScheduler 가 담당하는데, 페어 스케쥴러인지 FIFO 스케쥴러인지 등에 따라 다르다.


반응형

+ Recent posts