Data Engineer
Apache Spark 기초 (2)
jssvs
2021. 6. 2. 22:56
반응형
스파크 DF, DS, SQL
- 효율적인 저장옵션, 옵티마이저, 직렬화된 데이터에 대한 직접적인 연산 등을 지원한다
- DF와 DS 는 특별한 Row 객체를 가지며, 컴파일시 타입 체크를 제공하지 않는다.
- DataFrame 이란 특화된 데이터 표현방식, 컬럼 기반 캐시 포맷을 갖고 있다.
SparkSession
- SparkContext 가 스파크 애플리케이션의 시작점이라면 SparkSession은 SparkSQL 의 시작점을 제공한다.
- 빌터 패턴에 의해 생성 되며, getOrCreate()가 마지막에 온다
- config(key,value) 로 문자열 기반의 설정을 할 수 있다.
- 일상적인 인자들에 대해 미리 지정해 놓는 쇼커트(shortcuts) 도 존재한다.
- enableHiveSupport() 는 hiveUDF 를 쓸수 있게 해준다. (추가적인 jar 파일 필요)
- val session = SparSession.builder().enableHiveSupport().getOrCreate()
- 모든 스파크 코드가 SparkSession 을 쓸수 있게 업데이트 되어 있진 않다.
- SparkSession > HiveContext > SQLContext 순으로 사용하는것을 고려한다.
스파크 SQL 의 의존성
- 스파크 SQL 과 하이브 컴포넌트 추0가가 필수다
- spark-sql, spark-hive
- 하이브 의존성을 애플리케이션에 포함할 수 없다면, SQLContext 를 생성해서 쓸 수있다.
- val sqlContext = new SQLContext(sc)
- import sqlContext.implicits._
- 하지만 하이브 기반의 특정 사용자 정의함수(UDF)나 사용자 정의 집계함수(UDAF)를 쓸 수 없다.
스키마의 기초
- RDD, DataSet은 정해진 타입이 없다
- 스키마는 스파크 SQL에 의해 자동으로 처리되며, 데이터를 로딩할 때 스키마 추측이 이루어지거나 부모 DataFrame과 저용된 트랜스포메이션에 의해 계산된다
DataFrame API
- 트랜스포메이션
- RDD 트랜스포메이션과 유사하지만 관계형에 가까운 형태다.
- 단일 DataFrame, 다중 DataFrame , 키/값, 그룹화/윈도화 트랜스포메이션으로 분류할 수 있다.
- DataFrame filter 연산
- 람다 함수 대신 스파크 SQL 표현식을 받아들인다
- df.filter(조건식)
- 스파크 SQL 스칼라 연산자
- !== : not equal
- && : and
- === : equal (널 값에 안전하지 않음)
- <==> : eqNullSafe (널값에 안전함)
- 스파크 SQL 표준함수
- lit(값) : 스칼라 심벌을 컬럼 리터럴로 바꾼다.
- isNaN : 숫자가 아닌 경우를 체크한다
- 필터와 함꼐 isNan 이나 isNull을 써서 유실 데이터 관리를 범용적으로 할 수 있다
- 스파크 SQL 은 dropDuplicates() 로 중복되지 않은 레코드만 뽑아낼 수 있다
- RDD 에서 이런 유의 연산(distinct) 는 셔플을 요구하게 되고 종종 filter를 쓰는것보다 느리다.
- 스파크 SQL 에서의 집계연산은 RDD 에서 동일한 작업을 하는 것보다 간편하다.
- 윈도화 - 노이즈 데이터를 포함하는 평균속도 계산, 상대적인 매출 계산 등에 매우 유용하게 쓰인다.
- 비용에 따른 집합연산들
- unionAll : 낮음
- intersect : 높음
- except : 높음
- distinct : 높음
- 하이브 데이터와 상호 연동
- 하이브 메타스토어에 연결이 되어 있다면, SQL 질의를 직접 작성하거나 임시 테이블로 등록할 수 있다.
- df.registerTempTable("MyTable")
- df.write.saveAsTable("perm_MyTable")
- sqlContext.sql("select * from MyTable")
- 다중 DataFrame 트랜스포메이션
- unionall, intersect, except, distinct 와 같은 DataFrame 끼리의 유사 집합 연산을 말한다
- DataFrame의 테이블 등록
- df.registerTempTable("테이블이름") 을 통해 sql 질의를 작성하여 사용할 수 있다.
DataFrame과 Dataset 에서의 데이터 표현
- 텅스텐
- 바이트 단위 레벨에서 직접 동작하는 스파크 SQL의 새로운 컴포넌트
- 자바나 크리오 직렬화를 쓰는것보다 데이터 용량을 적게 쓸수 있다.
- 자바 객체 기반이 아니기 때문에 온힙과 오프힙을 모두 지원한다.
- 스파크 1.5에서 기본으로 탑재
데이터 적재, 저장함수들
- DataFrameWriter, DataFrameReader
- json, jdbc, orc, parquet 메서드들은 reader/writer에서 직접적으로 정의되어 있고, 경로나 연결 정보를 인자로 받는다.
- JSON
- 다른 데이터 소스들에 비해 비용이 많이 든다
- 스파크 SQL은 데이터를 샘플링하여 스키마를 추측한다.
- api 함수로 스키마 판별을 위해 데이터를 얼마나 읽을 것인지 설정이 가능하다.
- df = session.read.format("json").option("samplingRatio","1.0").load(path)
- 입력 데이터가 걸러 내야 할 잘못된 json 레코드를 갖고 있을 경우를 위해 단순히 문자열의 rdd로 읽어 올 수도 있다.
- rdd:RDD[String] = input.filter(_.contains("panda"))
- df = session.read.json(rdd)
- JDBC
- 여러 데이터베이스 벤더들의 jar 파일을 추가해야한다.
- spark-submit 으로 실행한다면 --jars 옵션으로 포함시킬 수 있다.
- 파케이
- 아파치 파케이는 여러 파일로 쉽게 분할, 압축, 중첩 타입 등의 다양한 기능을 제공한다.
- 공간 효율성이 뛰어나다.
- 하이브 테이블
- df=session.read.table("table")
- df.write.saveAsTable("table")
- RDD
- DataFrame 에서 RDD로 변환하는 것은 트랜스포메이션이다.(액션이 아니다.)
- RDD를 DataFrame이나 DataSet으로 변환하는 일은 연산 혹은 일부의 샘플링을 필요로 한다.
DataSet
- 컴파일 타임 타입체크를 제공하는 스파크 SQL의 확장 기능 ( DataFrame은 컴파일 타임 타입 체크가 빠져있다.)
- DataSet API = 관계형(DataFrame) + 함수형(RDD)이 섞인 강타입 컬렉션 API
- DataFrame, DataSet, RDD 간의 변환 방법
- ds=df.as[ElementType]
- df=ds.rdd
- df=ds.DF()
- rdd와 유사한 함수형 트랜스포메이션 지원
- filter, map , flatMapm, mapPartitions 등의 함수를 제공한다.
- ds.map{ rp=>rp.attributes.filter(_>0).sum}
- 관계형 트랜스포메이션 또한 지원한다.
- ds.select($"id".as[Long], ($"attributes"(0) > 0.5).as[Boolean])
- intersect, union, substract 등의 표준 집합 연산들도 사용할 수 있다.
- 타입 안정성이 강점이지만, DataSet Api 가 계속 진화 중이므로 추후 스파크 버전 업데이트 시 코드 수정이 필요할 수 있다.
반응형