1. Pyspark?
파이썬으로 스파크를 사용하기 위한 목적으로 만들어진 인터페이스이다.
파이썬 API 를 이용할 수 있으면서, 스파크 애플리케이션을 작성할 수 있다.
interactive 하게 pyspark shell 도 제공한다.
Spark SQL, DataFrame, streaming , MLlib 그리고 Spark Core 를 지원한다.
2. 왜 PySPark?
java, scala 로 작성된 코드가 성능이 조금 더 뛰어나다고 알려져 있다.
하지만 빅데이터 진영에서는 파이썬이 친숙한 사람이 많고, 생산성이 더 난다고 생각한다.
3. Pyspark 기초 - Dataframe 생성 및 다루기
** 개발할 때 비교적 자주 사용되는 함수만 다룰 것 이다.
- 조건 조회와 출력, 그리고 withColumn 은 정말 많이 사용되는 것 같다.
from pyspark.sql.types import *
from pyspark.sql.functions import col,lit
# 멤버 컬럼
member_columns = ['group_cd','join_dt','name','login_cnt']
# 멤버 데이터
member_data = [
('k-00001','2021-07-31','name1',10),
('k-00002','2021-06-31','name2',10),
('k-00003','2021-08-31','name3',10),
('k-00004','2021-03-31','name4',12),
('k-00001','2021-04-31','name5',11),
('k-00002','2021-05-31','name6',15),
]
# 데이터 프레임 생성
member_df = spark.createDataFrame(data=member_data,schema=member_columns)
#스키마 구조 출력
member_df.printSchema()
# withColumn - 새 컬럼 추가. literal 로 초기화해줘야 한다.
member_df= member_df.withColumn("use_flag",lit(False))
member_df.printSchema()
#pyspark 에서 countDistinct 사용을 위해서는 lib 를 선언 해줘야 하다니..
from pyspark.sql.functions import countDistinct
from pyspark.sql import functions as F
# 행 출력
member_df.show(10,truncate = False)
# 특정 컬럼 출력
member_df.select('join_dt','name','group_cd').show(10)
# groupby 출력
member_df.groupBy("group_cd").count().sort("count",ascending=True).show()
# filter 조건 출력
member_df.filter(member_df.login_cnt > 10).show()
# filter 조건을 복수 개로 줘보자
member_df.filter(
(F.col("group_cd") == "k-00001") &
(F.col("login_cnt") > 10)
).count()
# where 조건 출력
member_df.where(my_df.acntid=='name1').show()
member_df.filter("name == 'name1'").show()
'Data Engineer' 카테고리의 다른 글
Apache kafka (카프카) 기초 (1) | 2022.08.19 |
---|---|
kubernetes 기초 (1) (0) | 2021.12.01 |
[python] dataprep을 이용하여 EDA (데이터 분석) 레포트 쉽게 만들기 (0) | 2021.06.06 |
[airflow] HA - health 체크를 이용한 운영 (0) | 2021.06.05 |
Elasticsearch (ES) 기초 (1) - CRUD (0) | 2021.06.03 |