반응형

1. Pyspark?

파이썬으로 스파크를 사용하기 위한 목적으로 만들어진 인터페이스이다.

파이썬 API 를 이용할 수 있으면서, 스파크 애플리케이션을 작성할 수 있다. 

interactive 하게 pyspark shell 도 제공한다.

Spark SQL, DataFrame, streaming , MLlib 그리고 Spark Core 를 지원한다.

 

2. 왜 PySPark?

java, scala 로 작성된 코드가 성능이 조금 더 뛰어나다고 알려져 있다. 

하지만 빅데이터 진영에서는 파이썬이 친숙한 사람이 많고, 생산성이 더 난다고 생각한다.

 

3. Pyspark 기초 -  Dataframe 생성 및 다루기

** 개발할 때 비교적 자주 사용되는 함수만 다룰 것 이다.

- 조건 조회와 출력, 그리고 withColumn 은 정말 많이 사용되는 것 같다.

 

from pyspark.sql.types import *
from pyspark.sql.functions import col,lit

# 멤버 컬럼
member_columns = ['group_cd','join_dt','name','login_cnt']

# 멤버 데이터
member_data = [
    ('k-00001','2021-07-31','name1',10),
    ('k-00002','2021-06-31','name2',10),
    ('k-00003','2021-08-31','name3',10),
    ('k-00004','2021-03-31','name4',12),
    ('k-00001','2021-04-31','name5',11),
    ('k-00002','2021-05-31','name6',15),
]

# 데이터 프레임 생성
member_df = spark.createDataFrame(data=member_data,schema=member_columns)

#스키마 구조 출력 
member_df.printSchema()

# withColumn  - 새 컬럼 추가. literal 로 초기화해줘야 한다.
member_df= member_df.withColumn("use_flag",lit(False))

member_df.printSchema()

 

#pyspark 에서 countDistinct 사용을 위해서는 lib 를 선언 해줘야 하다니..
from pyspark.sql.functions import countDistinct
from pyspark.sql import functions as F


# 행 출력
member_df.show(10,truncate = False)

# 특정 컬럼 출력
member_df.select('join_dt','name','group_cd').show(10)

# groupby 출력
member_df.groupBy("group_cd").count().sort("count",ascending=True).show()

# filter 조건 출력 
member_df.filter(member_df.login_cnt > 10).show()

# filter 조건을 복수 개로 줘보자
member_df.filter(
    (F.col("group_cd") == "k-00001") &
    (F.col("login_cnt") > 10)
).count()


# where 조건 출력
member_df.where(my_df.acntid=='name1').show()

member_df.filter("name == 'name1'").show()

 

반응형

+ Recent posts