반응형

스파크 DF, DS, SQL

  • 효율적인 저장옵션, 옵티마이저, 직렬화된 데이터에 대한 직접적인 연산 등을 지원한다
  • DF와 DS 는 특별한 Row 객체를 가지며, 컴파일시 타입 체크를 제공하지 않는다.
  • DataFrame 이란 특화된 데이터 표현방식, 컬럼 기반 캐시 포맷을 갖고 있다.

SparkSession

  • SparkContext 가 스파크 애플리케이션의 시작점이라면 SparkSession은 SparkSQL 의 시작점을 제공한다.
  • 빌터 패턴에 의해 생성 되며, getOrCreate()가 마지막에 온다
  • config(key,value) 로 문자열 기반의 설정을 할 수 있다.
  • 일상적인 인자들에 대해 미리 지정해 놓는 쇼커트(shortcuts) 도 존재한다.
  • enableHiveSupport() 는 hiveUDF 를 쓸수 있게 해준다. (추가적인 jar 파일 필요)
    • val session = SparSession.builder().enableHiveSupport().getOrCreate()
  • 모든 스파크 코드가 SparkSession 을 쓸수 있게 업데이트 되어 있진 않다.
  • SparkSession > HiveContext > SQLContext 순으로 사용하는것을 고려한다.

스파크 SQL 의 의존성

  • 스파크 SQL 과 하이브 컴포넌트 추0가가 필수다
    • spark-sql, spark-hive
  • 하이브 의존성을 애플리케이션에 포함할 수 없다면, SQLContext 를 생성해서 쓸 수있다.
    • val sqlContext = new SQLContext(sc)
    • import sqlContext.implicits._
  • 하지만 하이브 기반의 특정 사용자 정의함수(UDF)나 사용자 정의 집계함수(UDAF)를 쓸 수 없다.

스키마의 기초

  • RDD, DataSet은 정해진 타입이 없다
  • 스키마는 스파크 SQL에 의해 자동으로 처리되며, 데이터를 로딩할 때 스키마 추측이 이루어지거나 부모 DataFrame과 저용된 트랜스포메이션에 의해 계산된다

DataFrame API

  • 트랜스포메이션
    • RDD 트랜스포메이션과 유사하지만 관계형에 가까운 형태다.
    • 단일 DataFrame, 다중 DataFrame , 키/값, 그룹화/윈도화 트랜스포메이션으로 분류할 수 있다.
  • DataFrame filter 연산
    • 람다 함수 대신 스파크 SQL 표현식을 받아들인다
    • df.filter(조건식)
  • 스파크 SQL 스칼라 연산자
    • !== : not equal
    • && : and
    • === : equal (널 값에 안전하지 않음)
    • <==> : eqNullSafe (널값에 안전함)
  • 스파크 SQL 표준함수
    • lit(값) : 스칼라 심벌을 컬럼 리터럴로 바꾼다.
    • isNaN : 숫자가 아닌 경우를 체크한다
  • 필터와 함꼐 isNan 이나 isNull을 써서 유실 데이터 관리를 범용적으로 할 수 있다
  • 스파크 SQL 은 dropDuplicates() 로 중복되지 않은 레코드만 뽑아낼 수 있다
    • RDD 에서 이런 유의 연산(distinct) 는 셔플을 요구하게 되고 종종 filter를 쓰는것보다 느리다.
  • 스파크 SQL 에서의 집계연산은 RDD 에서 동일한 작업을 하는 것보다 간편하다.
  • 윈도화 - 노이즈 데이터를 포함하는 평균속도 계산, 상대적인 매출 계산 등에 매우 유용하게 쓰인다.
  • 비용에 따른 집합연산들
    • unionAll : 낮음
    • intersect : 높음
    • except : 높음
    • distinct : 높음
  • 하이브 데이터와 상호 연동
    • 하이브 메타스토어에 연결이 되어 있다면, SQL 질의를 직접 작성하거나 임시 테이블로 등록할 수 있다.
    • df.registerTempTable("MyTable")
    • df.write.saveAsTable("perm_MyTable")
    • sqlContext.sql("select * from MyTable")
  • 다중 DataFrame 트랜스포메이션
    • unionall, intersect, except, distinct 와 같은 DataFrame 끼리의 유사 집합 연산을 말한다
  • DataFrame의 테이블 등록
    • df.registerTempTable("테이블이름") 을 통해 sql 질의를 작성하여 사용할 수 있다.

DataFrame과 Dataset 에서의 데이터 표현

  • 텅스텐
    • 바이트 단위 레벨에서 직접 동작하는 스파크 SQL의 새로운 컴포넌트
    • 자바나 크리오 직렬화를 쓰는것보다 데이터 용량을 적게 쓸수 있다.
    • 자바 객체 기반이 아니기 때문에 온힙과 오프힙을 모두 지원한다.
    • 스파크 1.5에서 기본으로 탑재

데이터 적재, 저장함수들

  • DataFrameWriter, DataFrameReader
    • json, jdbc, orc, parquet 메서드들은 reader/writer에서 직접적으로 정의되어 있고, 경로나 연결 정보를 인자로 받는다.
  • JSON
    • 다른 데이터 소스들에 비해 비용이 많이 든다
    • 스파크 SQL은 데이터를 샘플링하여 스키마를 추측한다.
    • api 함수로 스키마 판별을 위해 데이터를 얼마나 읽을 것인지 설정이 가능하다.
      • df = session.read.format("json").option("samplingRatio","1.0").load(path)
    • 입력 데이터가 걸러 내야 할 잘못된 json 레코드를 갖고 있을 경우를 위해 단순히 문자열의 rdd로 읽어 올 수도 있다.
      • rdd:RDD[String] = input.filter(_.contains("panda"))
      • df = session.read.json(rdd)
  • JDBC
    • 여러 데이터베이스 벤더들의 jar 파일을 추가해야한다.
    • spark-submit 으로 실행한다면 --jars 옵션으로 포함시킬 수 있다.
  • 파케이
    • 아파치 파케이는 여러 파일로 쉽게 분할, 압축, 중첩 타입 등의 다양한 기능을 제공한다.
    • 공간 효율성이 뛰어나다.
  • 하이브 테이블
    • df=session.read.table("table")
    • df.write.saveAsTable("table")
  • RDD
    • DataFrame 에서 RDD로 변환하는 것은 트랜스포메이션이다.(액션이 아니다.)
    • RDD를 DataFrame이나 DataSet으로 변환하는 일은 연산 혹은 일부의 샘플링을 필요로 한다.

DataSet

  • 컴파일 타임 타입체크를 제공하는 스파크 SQL의 확장 기능 ( DataFrame은 컴파일 타임 타입 체크가 빠져있다.)
  • DataSet API = 관계형(DataFrame) + 함수형(RDD)이 섞인 강타입 컬렉션 API
  • DataFrame, DataSet, RDD 간의 변환 방법
    • ds=df.as[ElementType]
    • df=ds.rdd
    • df=ds.DF()
  • rdd와 유사한 함수형 트랜스포메이션 지원
    • filter, map , flatMapm, mapPartitions 등의 함수를 제공한다.
    • ds.map{ rp=>rp.attributes.filter(_>0).sum}
  • 관계형 트랜스포메이션 또한 지원한다.
    • ds.select($"id".as[Long], ($"attributes"(0) > 0.5).as[Boolean])
    • intersect, union, substract 등의 표준 집합 연산들도 사용할 수 있다.
  • 타입 안정성이 강점이지만, DataSet Api 가 계속 진화 중이므로 추후 스파크 버전 업데이트 시 코드 수정이 필요할 수 있다.
반응형

+ Recent posts