반응형

스파크 DF, DS, SQL

  • 효율적인 저장옵션, 옵티마이저, 직렬화된 데이터에 대한 직접적인 연산 등을 지원한다
  • DF와 DS 는 특별한 Row 객체를 가지며, 컴파일시 타입 체크를 제공하지 않는다.
  • DataFrame 이란 특화된 데이터 표현방식, 컬럼 기반 캐시 포맷을 갖고 있다.

SparkSession

  • SparkContext 가 스파크 애플리케이션의 시작점이라면 SparkSession은 SparkSQL 의 시작점을 제공한다.
  • 빌터 패턴에 의해 생성 되며, getOrCreate()가 마지막에 온다
  • config(key,value) 로 문자열 기반의 설정을 할 수 있다.
  • 일상적인 인자들에 대해 미리 지정해 놓는 쇼커트(shortcuts) 도 존재한다.
  • enableHiveSupport() 는 hiveUDF 를 쓸수 있게 해준다. (추가적인 jar 파일 필요)
    • val session = SparSession.builder().enableHiveSupport().getOrCreate()
  • 모든 스파크 코드가 SparkSession 을 쓸수 있게 업데이트 되어 있진 않다.
  • SparkSession > HiveContext > SQLContext 순으로 사용하는것을 고려한다.

스파크 SQL 의 의존성

  • 스파크 SQL 과 하이브 컴포넌트 추0가가 필수다
    • spark-sql, spark-hive
  • 하이브 의존성을 애플리케이션에 포함할 수 없다면, SQLContext 를 생성해서 쓸 수있다.
    • val sqlContext = new SQLContext(sc)
    • import sqlContext.implicits._
  • 하지만 하이브 기반의 특정 사용자 정의함수(UDF)나 사용자 정의 집계함수(UDAF)를 쓸 수 없다.

스키마의 기초

  • RDD, DataSet은 정해진 타입이 없다
  • 스키마는 스파크 SQL에 의해 자동으로 처리되며, 데이터를 로딩할 때 스키마 추측이 이루어지거나 부모 DataFrame과 저용된 트랜스포메이션에 의해 계산된다

DataFrame API

  • 트랜스포메이션
    • RDD 트랜스포메이션과 유사하지만 관계형에 가까운 형태다.
    • 단일 DataFrame, 다중 DataFrame , 키/값, 그룹화/윈도화 트랜스포메이션으로 분류할 수 있다.
  • DataFrame filter 연산
    • 람다 함수 대신 스파크 SQL 표현식을 받아들인다
    • df.filter(조건식)
  • 스파크 SQL 스칼라 연산자
    • !== : not equal
    • && : and
    • === : equal (널 값에 안전하지 않음)
    • <==> : eqNullSafe (널값에 안전함)
  • 스파크 SQL 표준함수
    • lit(값) : 스칼라 심벌을 컬럼 리터럴로 바꾼다.
    • isNaN : 숫자가 아닌 경우를 체크한다
  • 필터와 함꼐 isNan 이나 isNull을 써서 유실 데이터 관리를 범용적으로 할 수 있다
  • 스파크 SQL 은 dropDuplicates() 로 중복되지 않은 레코드만 뽑아낼 수 있다
    • RDD 에서 이런 유의 연산(distinct) 는 셔플을 요구하게 되고 종종 filter를 쓰는것보다 느리다.
  • 스파크 SQL 에서의 집계연산은 RDD 에서 동일한 작업을 하는 것보다 간편하다.
  • 윈도화 - 노이즈 데이터를 포함하는 평균속도 계산, 상대적인 매출 계산 등에 매우 유용하게 쓰인다.
  • 비용에 따른 집합연산들
    • unionAll : 낮음
    • intersect : 높음
    • except : 높음
    • distinct : 높음
  • 하이브 데이터와 상호 연동
    • 하이브 메타스토어에 연결이 되어 있다면, SQL 질의를 직접 작성하거나 임시 테이블로 등록할 수 있다.
    • df.registerTempTable("MyTable")
    • df.write.saveAsTable("perm_MyTable")
    • sqlContext.sql("select * from MyTable")
  • 다중 DataFrame 트랜스포메이션
    • unionall, intersect, except, distinct 와 같은 DataFrame 끼리의 유사 집합 연산을 말한다
  • DataFrame의 테이블 등록
    • df.registerTempTable("테이블이름") 을 통해 sql 질의를 작성하여 사용할 수 있다.

DataFrame과 Dataset 에서의 데이터 표현

  • 텅스텐
    • 바이트 단위 레벨에서 직접 동작하는 스파크 SQL의 새로운 컴포넌트
    • 자바나 크리오 직렬화를 쓰는것보다 데이터 용량을 적게 쓸수 있다.
    • 자바 객체 기반이 아니기 때문에 온힙과 오프힙을 모두 지원한다.
    • 스파크 1.5에서 기본으로 탑재

데이터 적재, 저장함수들

  • DataFrameWriter, DataFrameReader
    • json, jdbc, orc, parquet 메서드들은 reader/writer에서 직접적으로 정의되어 있고, 경로나 연결 정보를 인자로 받는다.
  • JSON
    • 다른 데이터 소스들에 비해 비용이 많이 든다
    • 스파크 SQL은 데이터를 샘플링하여 스키마를 추측한다.
    • api 함수로 스키마 판별을 위해 데이터를 얼마나 읽을 것인지 설정이 가능하다.
      • df = session.read.format("json").option("samplingRatio","1.0").load(path)
    • 입력 데이터가 걸러 내야 할 잘못된 json 레코드를 갖고 있을 경우를 위해 단순히 문자열의 rdd로 읽어 올 수도 있다.
      • rdd:RDD[String] = input.filter(_.contains("panda"))
      • df = session.read.json(rdd)
  • JDBC
    • 여러 데이터베이스 벤더들의 jar 파일을 추가해야한다.
    • spark-submit 으로 실행한다면 --jars 옵션으로 포함시킬 수 있다.
  • 파케이
    • 아파치 파케이는 여러 파일로 쉽게 분할, 압축, 중첩 타입 등의 다양한 기능을 제공한다.
    • 공간 효율성이 뛰어나다.
  • 하이브 테이블
    • df=session.read.table("table")
    • df.write.saveAsTable("table")
  • RDD
    • DataFrame 에서 RDD로 변환하는 것은 트랜스포메이션이다.(액션이 아니다.)
    • RDD를 DataFrame이나 DataSet으로 변환하는 일은 연산 혹은 일부의 샘플링을 필요로 한다.

DataSet

  • 컴파일 타임 타입체크를 제공하는 스파크 SQL의 확장 기능 ( DataFrame은 컴파일 타임 타입 체크가 빠져있다.)
  • DataSet API = 관계형(DataFrame) + 함수형(RDD)이 섞인 강타입 컬렉션 API
  • DataFrame, DataSet, RDD 간의 변환 방법
    • ds=df.as[ElementType]
    • df=ds.rdd
    • df=ds.DF()
  • rdd와 유사한 함수형 트랜스포메이션 지원
    • filter, map , flatMapm, mapPartitions 등의 함수를 제공한다.
    • ds.map{ rp=>rp.attributes.filter(_>0).sum}
  • 관계형 트랜스포메이션 또한 지원한다.
    • ds.select($"id".as[Long], ($"attributes"(0) > 0.5).as[Boolean])
    • intersect, union, substract 등의 표준 집합 연산들도 사용할 수 있다.
  • 타입 안정성이 강점이지만, DataSet Api 가 계속 진화 중이므로 추후 스파크 버전 업데이트 시 코드 수정이 필요할 수 있다.
반응형
반응형

1. library 설치

  • 커버로스 인증을 타고 하이브에 접근을 할 경우 아래 패키지들이 설치되어 있어야 한다.
--os레벨 설치 패키지

$ yum install libsasl2-dev
$ yum install cyrus-sasl-devel
$ yum install cyrus-sasl-gssapi


--python 라이브러리 패키지
impyla==0.16.2
PyHive==0.6.1
PyMySQL==0.9.3

sasl==0.2.1
thrift==0.10.0  -> issue 시 pip install thrift==0.9.3
thrift-sasl==0.4.1
thriftpy==0.3.9
thriftpy2==0.4.10


 

2. pyhive 연동 예제 코드

from pyhive import hive

conn = hive.Connection(host='hadoop.host.io', port=10000, database='jssvs_tmp',auth='KERBEROS',kerberos_service_name='hive')

    cursor=conn.cursor()
    cursor.execute('s select * from hive_tbl')
    for r in cursor.fetchall():
        print(r)

 

3. ph2 연동 예제 코드

import pyhs2
 
with pyhs2.connect (host='hadoop.host.io',
                port=10000,
                authMechanism='KERBEROS') as conn:
        with conn.cursor() as cur:
        #Show databases
                print cur.getDatabases()
 
        #Execute query
                cur.execute("select * from table")
 
        #Return column info from query
                print cur.getSchema()
 
        #Fetch table results
        for i in cur.fetch():
                print i
  
  
---- or --------------
  
import pyhs2
conn = pyhs2.connect(host='hadoop.host.io', port=10000, authMechanism='KERBEROS' )
cursor = conn.cursor()
print cursor.getDatabases()

 

반응형
반응형

Docker 이용하여 mysql, redis 컨테이너 올리기

$ mkdir /usr/local/db_data
$ mkdir /usr/local/redis_data

$ docker pull mysql:8.0
$ sudo docker run -d -p 3306:3306 -e MYSQL_ROOT_PASSWORD=test --volume /usr/local/db_data/:/var/lib/mysql --name mysql_bi mysql:8  --default-authentication-plugin=mysql_native_password 

$ docker pull redis:5.0
$ sudo docker run -d --name redis_bi -p 6379:6379 --volume /usr/local/redis_data/:/data -e REDIS_PASSWORD=airflow redis:5.0

$ sudo docker ps

virtualenv 설치 및 가상 환경 만들기

# Python 을 이용하여 virtualenv 설치
$ python3 -m pip install virtualenv virtualenvwrapper --user

# 가상 환경 만들기
$ mkdir myenv
$ virtualenv --python=python3.5 myenv

#아래 명령어로도 가능하다
$ virtualenv -p `which python3` venv 

# 가상환경 진입
$ source myenv/bin/activate

#가상환경 빠져 나오기
$ deactivate

airflow 설치 및 서버 동작 스크립트

$ source myenv/bin/activate
$ pip install --upgrade pip
$ pip install pymysql

# airflow
pip install apache-airflow[mysql,redis,celery]==1.10.7

# 에어플로우 홈 설정
$ mkdir myenv/airflow_home
$ export AIRFLOW_HOME=`pwd`/airflow_home # virtualenv 바깥에서도 동일하게 설정

# 에어플로우 데이터베이스 초기화
$ airflow initdb
$ airflow resetdb #load_examples False 가 적용이 안될때 다시 db 리셋

# 웹서버 구동
$ airflow webserver -p 5000
$ airflow scheduler
$ airflow worker

$ airflow flower

 

airflow.cfg 설정 파일 수정

$ vi airflow.cfg

dags_folder = /home/deploy/work/airflow/dags

# The folder where airflow should store its log files
# This path must be absolute
base_log_folder = /home/deploy/work/airflow/logs

# executor = SequentialExecutor
executor = CeleryExecutor

# sql_alchemy_conn = sqlite:////home/airflow/airflow/airflow.db
sql_alchemy_conn =  mysql+pymysql://airflow:airflow@127.0.0.1:3306/airflow

# catchup_by_default = True
catchup_by_default = False

# broker_url = sqla+mysql://airflow:airflow@127.0.0.1:3306/airflow
broker_url = redis://airflow@127.0.0.1:6379/0

# result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
result_backend = db+mysql://airflow:airflow@127.0.0.1:3306/airflow

# load_examples = True
load_examples = False

 

airflow 자주 쓰는 커맨드

# print the list of active DAGs
airflow list_dags

# prints the list of tasks the "tutorial" dag_id
airflow list_tasks jayden_tutorial

# prints the hierarchy of tasks in the tutorial DAG
airflow list_tasks jayden_tutorial --tree


# command layout: command subcommand dag_id task_id date
$ airflow test jayden_tutorial print_date 2020-03-02

 

다음에는 Hive Operator 연동을 올리겠다. :D 

반응형
반응형


스파크에 대해
  • 하나의 서버에서 처리할 수 있는 수준 이상의 대용량 데이터 처리를 가능하게 해준다
  • 고수준 API 를 제공한다
  • 동종 시스템 중 가장 빠른 축에 속한다.
  • 스파크는 스칼라로 쓰여져 있다. 
  • 스파크는 함수형 프레임워크이며, 불변성이나 람다 정의 같은 개념에 크게 의존하고 있기 때문에 함수형 프로그래밍 언어인 스칼라가 좋다
  • 자바 API 보다 훨씬 사용하기 쉽다
  • JVM 과의 통신 비용이 랭기지중 가장 좋다. (객체 변환에 드는 비용정도)


스파크의 독보적 장점
  • 메모리 기반 처리
  • 지연 평가 ( lazy operation )

스파크 위치
  • JVM 위에서 연산을 수행하는 것일 뿐, 데이터 저장 솔루션은 아니다
  • 클러스터 매니저 종류 : 단독 클러스터매니저(StandAlone Cluster Manager), 아파치 메소스, 하둡 얀

스파크 컴포넌트
  • 스파크 코어 - Java, Scala, Python, R 로 API 제, RDD 와 RDD API, JVM 사이에 데이터를 읽고 쓰는 I/O 제공
  • 스파크 SQL - DataFrame ( 반구조화의 데이터 타입을 위한 인터페이스)  제공
  • 스파크 스트리밍 등

병렬 연산 모델 : RDD
  • 스파크의 드라이버 혹은 마스터 노드를 위한 프로그램을 사용자가 만들어야 한다
  • RDD는 익스큐터 혹은 슬레이브 노드에 저장된다
  • RDD를 구성하는 객체는 파티션이라 하며, 경우에 따라 다른 노드에서 계산될 수 있다.
  • 클러스터 매니저는 애플리케이션에서 설정한 파라미터에 따라 익스큐터를 실행하고 분산해 주는 역할을 한다
  • 드라이버가 RDD 데이터의 처리 계획을 결정하면 실행을 지연하고, 최종 RDD를 계산해야 하는 시점에 실행된다.( 보통 저장 장치에 써야 할 시점이나 결과 데이터를 드라이버에 보내야 할 때)

지연평가
  • RDD의 연산 종류는 transformation 과 action 이 있다.
  • 액션은 데이터를 드라이버로 되돌려주든지(count, collect 등) 데이터를 외부 저장 시스템에 쓰는것(CopyToHadoop) 등의 일이다.
  • 액션은 스케쥴러를 시작하게 하며, RDD 트랜스포메이션 간의 종속성에 바탕을 둔 DAG 를 생성한다.
  • 트랜스포메이션의 로직 에러가 발생할 때, 지연 평가 때문에, 액션이 수행된 지점에서 실패한 것으로 나타나는 경우에 유의하자
    • word count 프로그램에서 null pointer exception 이 발생한다고 가정할때, 코드가 contains를 체크하는 시점이 아니라 collect 단계에서 예외가 발생한다.

메모리 영속화 & 메모리 관리
  • 맵리듀스와 비교해 스파크 성능상 이점은 반복 연산이 들어있는 사례이다.
  • 스파크가 처리하는 데이터를 디스크에 기록하지 않고 익스큐터 메모리에 데이터를 로드해 놓는 것이다.
  • 메모리 관리의 3가지 옵션
    1. 메모리에 직렬화되지 않은 자바 객체 - RDD 객체를 직렬화 하지 않고 그대로 저장한다. ( 직렬화 비용이 안드는 대신 메모리 공간 사용이 비 효율이다.)
    2. 메모리에 직렬화된 데이터 - RDD 객체를 네트워크 전송이 가능한 바이트 스트림으로 변환한다. ( 데이터를 읽는데 CPU 가 더 많이 사용되므로 접근방식은 더 느리지만 메모리 공간 사용 측면에서 효율적이다. 크리오(Kryo) 직렬화를 쓰면 공간 측변에서도 효과적이다.)
    3. 디스크 - 익스큐터 램에 담기에 파티션이 큰 RDD 라면 디스크에 데이터를 쓸 수 있다. ( 반복 연산시 속도 면에서 불리하지만 장애에 안전하다.)
  • persist() 의 기본 옵션은 RDD를 메모리에 직렬화되지 않은 상태로 저장한다.

불변성과 RDD 인터페이스
  • RDD는 정적인 타입인 데다 불변한 성격을 가지고 있어, Transformation 을 호출하는 것이 새롭게 정의한 속성들을 가진 새로운 RDD를 리턴하는 행위다.
  • RDD 생성 방식
    • 기존 RDD에 Transformation 호출
    • SparkContext 객체로부터 생성
    • DataSet이나 DataFrame을 변환 ( 이것들은 SparkSession 으로 부터 만들어짐)
  • SparkContext는 스파크 클러스터와 실행중인 스파크 애플리케이션 하나와의 연결을 나타낸다.
  • RDD 속성을 알 수 있는 함수
    • partitions() - 분산 데이터 셋의 부분들을 구성하는 파티션 객체들의 배열을 리턴한다. getPartition()의 결괏값과 같다
    • iterator(p,parentIters) - 각각의 부모 파티션을 순회하는 반복자가 주어지면 파티션 p의 구성요소들을 계산해낸다.
    • dependencies() - 종속성 객체의 목록을 리턴한다. 스케쥴러가 현재의 RDD가 어떤식으로 다른 RDD에 종속될지 알려준다.
    • partitioner() - element 와 partition 사이에 연관되는 함수를 갖고 있는 RDD라면 스칼라의 option 타입으로 partitioner 객체를 리턴한다. 
    • perferredLocations(p) - 파티션 p의 데이터 지역성에 대한 정보를 리턴한다. p가 저장된 각 노드의 정보를 문자열로 표현한 시퀀스를 리턴한다
RDD의 종류
  • RDD는 정적인 타입인 데다 불변한 성격을 가지고 있어, Transformation 을 호출하는 것이 새롭게 정의한 속성들을 가진 새로운 RDD를 리턴하는 행위다.


넓은 종속성 vs 좁은 종속성
  • 종속성이 넓으냐 좁으냐는 트랜스포메이션 평가에 중요한 영향을 끼치며 성능에도 크게 작용한다.
  • 좁은 종속성
    • 자식 RDD 의 각 파티션이 부모 RDD의 파티션들에 대해 단순하고 한정적인 종속성을 갖는다
    • 부모가 최대 하나의 자식파티션을 갖는 경우
    • map, filter, mapPartitions 등의 연산이 이 패턴을 따른다.
  • 넓은 종속성
    • 자식 RDD가 다수의 부모 RDD의 파티션들과 관계를 맺고 있는 경우
    • groupbykey, sort, reducebykey 등과 같이 Shuffle 을 일으키는 함수가 이 패턴을 따른다.
    • 셔플 비용이 가장 크다.
  • map 은 파티션 간 이동이 없는 연산, coalesce 는 파티션을 합치는 연산으로 파티션 개수를 줄이는 목적의 함수이다.
  • join 함수는 두개의 부모 RDD 가 어떻게 파티션되었는지에 따라 좁거나 넓은 종속성을 가질 수 있다.


스파크 잡 스케쥴링
  • 잡 실행 과정
    • 스파크 프로그램 자체는 드라이버 노드에서 실행되며 일련의 명령들을 익스큐터에게 보낸다.
    • 애플리케이션들은 클러스터 매니저에 의해 스케쥴링되고, 각각 하나의 SparkContext를 가진다.
    • 스파크 애플리케이션들은 공존하는 여러 개의 잡을 차례로 실행할 수 있다.
    • Job들은 애플리케이션의 한 RDD가 호출하는 각 액션에 대응한다.
  • 자원할당
    • 정적할당과 동적 할당이 가능
  • 스파크 애플리케이션
    • 잡들은 드라이버 프로그램의 SparkContext에 정의되어 있다.
    • SparkContext가 생기면 스파크 애플리케이션이 구동한다.
    • SparkContext를 실행하면 드라이버와 익스큐터들이 각 작업 노드에서 구동된다.
    • 각 익스큐터는 JVM을 가지며 한 노드에 여러 개의 익스큐터가 존재할 수 있다.
    • 스파크 잡이 실행될 때 익스큐터는 RDD를 계산할 태스크 실행을 위한 슬롯을 가진다.
  • 스파크 잡 해부
    • 각 액션마다 스파크 스케쥴러는 실행 그래프를 만들고 스파크 잡을 실행한다. 
    • 각 잡은 최종 RDD를 만들어 내는데 필요한 데이터 변환의 각 단계를 의미하는 스테이지(Stage)들로 구성된다.
    • 각 스테이지는 각 병렬 연산의 한 단위를 의미하며 익스큐터들 위에서 실행되는 다수의 태스크(Task)들로 구성된다.
    • Spark Aplication -> 잡 -> 스테이지1 & 스테이지2 ... -> 태스크 1& 태스크 2... 
    • SparkContext / SparkSession -> 액션 -> 넓은 포메이션 -> 하나의 파티션 평가를 위한 연산 
    • 스파크 실행 구성에서 가장 높은 단계
    • 하나의 잡 = 하나의 액션에 대응하며, 액션은 스파크 애플리케이션의 드라이버의 프로그램에서 호출한다.
  • 스테이지
    • 잡의 부분들을 스테이지로 정의한다.
    • 넓은 트랜스포메이션에 의해 생성되는 셔플 의존성에 대응한다.
    • 하나의 스테이지는 다른 익스큐터나 드라이버와의 통신 없이 계산 가능한 태스크들의 집합으로 생각할 수 있다.
  • 태스크
    • 하나의 스테이지는 여러 개의 태스크로 이루어진다.
    • 실행 계층에서 가장 작은 단위이며, 익스큐터가 태스크 실행을 위해 동적으로 할당된 여러개의 슬롯을 가진다.
    • 스테이지당 태스크의 개수는 해당 스테이지의 결과 RDD의 파티션 개수에 대응된다.
    • 익스큐터 코어 개수의 총합 = 익스큐터당 코어의 개수 X 익스큐터 개수를 쓰면 sparkConf로 부터 동시 실행의 태스크 개수를 구할 수 있다.
    • 태스크 분산 과정은 TaskScheduler 가 담당하는데, 페어 스케쥴러인지 FIFO 스케쥴러인지 등에 따라 다르다.


반응형

+ Recent posts