반응형

1. Pyspark?

파이썬으로 스파크를 사용하기 위한 목적으로 만들어진 인터페이스이다.

파이썬 API 를 이용할 수 있으면서, 스파크 애플리케이션을 작성할 수 있다. 

interactive 하게 pyspark shell 도 제공한다.

Spark SQL, DataFrame, streaming , MLlib 그리고 Spark Core 를 지원한다.

 

2. 왜 PySPark?

java, scala 로 작성된 코드가 성능이 조금 더 뛰어나다고 알려져 있다. 

하지만 빅데이터 진영에서는 파이썬이 친숙한 사람이 많고, 생산성이 더 난다고 생각한다.

 

3. Pyspark 기초 -  Dataframe 생성 및 다루기

** 개발할 때 비교적 자주 사용되는 함수만 다룰 것 이다.

- 조건 조회와 출력, 그리고 withColumn 은 정말 많이 사용되는 것 같다.

 

from pyspark.sql.types import *
from pyspark.sql.functions import col,lit

# 멤버 컬럼
member_columns = ['group_cd','join_dt','name','login_cnt']

# 멤버 데이터
member_data = [
    ('k-00001','2021-07-31','name1',10),
    ('k-00002','2021-06-31','name2',10),
    ('k-00003','2021-08-31','name3',10),
    ('k-00004','2021-03-31','name4',12),
    ('k-00001','2021-04-31','name5',11),
    ('k-00002','2021-05-31','name6',15),
]

# 데이터 프레임 생성
member_df = spark.createDataFrame(data=member_data,schema=member_columns)

#스키마 구조 출력 
member_df.printSchema()

# withColumn  - 새 컬럼 추가. literal 로 초기화해줘야 한다.
member_df= member_df.withColumn("use_flag",lit(False))

member_df.printSchema()

 

#pyspark 에서 countDistinct 사용을 위해서는 lib 를 선언 해줘야 하다니..
from pyspark.sql.functions import countDistinct
from pyspark.sql import functions as F


# 행 출력
member_df.show(10,truncate = False)

# 특정 컬럼 출력
member_df.select('join_dt','name','group_cd').show(10)

# groupby 출력
member_df.groupBy("group_cd").count().sort("count",ascending=True).show()

# filter 조건 출력 
member_df.filter(member_df.login_cnt > 10).show()

# filter 조건을 복수 개로 줘보자
member_df.filter(
    (F.col("group_cd") == "k-00001") &
    (F.col("login_cnt") > 10)
).count()


# where 조건 출력
member_df.where(my_df.acntid=='name1').show()

member_df.filter("name == 'name1'").show()

 

반응형
반응형

1.zeppelin 기반 로컬 개발 환경 구성이란?

spark 를 공부할 때, zeppelin 이라는 노트북을 이용하면 편하다.

python 에서 주피터와 비슷한 역할의 툴이다.

spark 개발을 안한지 오래되서, 다시 공부를 하기 위해 구성해보려고 한다.

요즘 데이터 엔지니어들이 보는 코딩 테스트에는 알고리즘 문제 뿐만 아니라, 스파크를 이용한 문제도 출제 된다.

역시 docker-compose 를 이용해서 구성한다.

 

2.왜 Docker-compose ?

오픈 소스 버전을 그대로 로컬에 받아서 설치하려면, 적지 않은 시간이 소요된다.(java 설치 lib 등등의 환경 설정. 네트워크 설정 이나 연동 등 .. )

누군가 고생해서 만들어 놓은 docker image 와 docker-compose 에 설정이 기술된 yaml 파일을 이용하여 손쉽게 올릴 수 있다. (보통 docker compose [오픈 소스 이름] 을 치면 쉽게 찾을 수 있다. )

 

3. zeppelin, spark 설치 

구성해보기보다는 따라하기에 가깝다. 내가 참고한 블로그를 참조 한다. 여기를 참고하는게 더 정확하니까 아래 링크로 이동하자.

https://omrisk.medium.com/apache-spark-3-playground-back-to-school-with-zeppelin-notebooks-pt-3-4ebc18da68f7

 

Apache Spark 3 playground — Back to school with Zeppelin notebooks! Pt.3

In this third and final part of our “Apache Spark 3 playground” guide we will be adding a Zeppelin notebook into the mix.

omrisk.medium.com

a) 설치 스크립트

# git clone 다운로드
$ https://omrisk.medium.com/apache-spark-3-playground-back-to-school-with-zeppelin-notebooks-pt-3-4ebc18da68f7

# 컨테이너 올리기
$ docker-compose -f docker-compose.yml -f docker-compose-zeppelin.yml up -d

 

b) 브라우저 접속

  • http://localhost:9090 접속, (zeppelin) 
  • 마찬가지로 spark mster GUI 에도 접근이 가능하다.
  • 기본적으로 예제 코드와 스파크 앱을 띄우는 노트가 있다. ( spark server 입장에서는 zeppelin 도 하나의 appication 이기 때문에 spark 를 사용하기 위해서는 인터프리터가 구동되어야 한다.) 

** 보통 zeppelin 과 spark 를 연동 할때 spark_home ,master 엔드포인트를 잡아주는데, 아래에서 처럼 노트 안에서 잡아줄수도 있는 것 같다. 

 

 

시간이 생기면 추가로 보충하겠다... 너무 바쁘다 요즘.

반응형
반응형

1.apache superset 이란?

비지니스 인텔리젼스 웹 애플리케이션다.

에어비엔비에서 사용하고 있는 시각화 툴이다.

 

 

2. 왜 superset ?

오픈소스이면서 다양한 데이터소스를 지원한다.

UI 가 직관적이다.

에어비엔비가 쓰니까 공신력? 있어보인다.

 

3. spuerset 설치 (docker-compose)

how ? ( 구성 방법 )

a) 설치 스크립트

  • non-dev 버전으로 띄웠더니 app, worker, db, cache (redis) 프로세스가 분리되어있다.
    # yaml 소스 클론
    $ git clone https://github.com/apache/superset.git
    
    # 릴리즈 버전으로 체크아웃
    $ git checkout 1.2.0
    $ git status
    
    # 컨테이너로 올리는 명령 실행
    $ docker-compose -f docker-compose-non-dev.yml up -d​

b) 브라우저 접속

 

  • 초기 비밀번호는 admin/admin
  • http://localhost:8088 접속 (서버에 설치하신 분들은 localhost에 설치한 서버의 host 로 대체)
  • 기본적으로 예제 데이터가 추가되어있는데, yaml 을 찾아보면 off 할 수 있다.

 

 

c) 데이터소스 추가 ( mysql)

차트 또는 대시보드를 구성하기 전에, dataset 을 생성해야 한다.

CSV 등의 데이터 업로드를 위해서는 database advanced 에서 권한 을 줘야 한다.

대시보드 구성은 다음 편에 ..  :D

반응형
반응형

1. lambda 란?

간단하게 서버리스로 코드를 실행할 수 있는 aws 컴퓨팅 서비스라고 할 수 있다.

2. lambda 를 왜 쓰는가?

프로그램 실행을 위한 런타임 환경이나 실행가능한 서버가 필요없다.  - 서버리스

동시성등을 고려한 개발을 프로그래머가 직접 해줄 필요가 없다 - 옵션 에서 동시성이나 병렬 제어가 가능하기 때문에

모니터링이 편하다. 

데이터 엔지니어 입장에서 aws lambda 를 통한 스트리밍 처리도 가능하다고 생각한다. (허용되는 데이터 발생량이나 규모에 따라)

 

3. lambda 만들기

람다는 웹 콘솔에서 GUI 로도 손쉽게 만들 수 있다.

하지만 이번 포스팅은 로컬 서버에서 aws cli 를 이용하여 lambda 함수를 만들어 본다.

실습 예제는 aws 공식 홈페이지에 나와있는 내용들이다. ( 공식 홈페이지 가이드가 더 내용이 풍부하니까 그 쪽을 먼저 보길 바란다) 

 

a) 사전 준비 (시간이 부족해서 생략) 

  • 로컬 PC 에서 aws cli 가 설치 및 IAM 구성이 되어 있어야 한다.
  • lambda 생성과 실행 등 필요한 IAM 권한이 미리 준비되어 있어야 한다 .
  • 실행권한 arn 이 필요하다.

b) lambda 생성

# 워킹 디렉토리 생성 및 이동
$ mkdir lambda_python_sample 
$ cd lambda_python_sample

# 람다 코드 작성
$ vi lambda_function.py  


# 사용되는 패키지를 working 디렉토리에 다운로드
$ pip install --target ./package requests 
$ cd package

# 패키지 압축
$ zip -r ../jssvs-development-package.zip .

# lambda 코드 압축
$ zip -g jssvs-deployment-package.zip lambda_function.py  

# lambda 생성 명령어  - 함수이름, 압축 파일 경로, 이벤트 핸들러 명(function 이름) # 역할 arn 순으로.
$ aws lambda create-function --function-name lambda-jssvs-dev \
--zip-file fileb://[home dir]/WorkSpace/aws/lambda_python_sample/jssvs-deployment-package.zip \ 
--handler lambda_function.main --runtime python3.7  \
--role arn:aws:iam::[ars code]:role/gamebi-lambda-role

-- lambda 코드 

import requests
def main(event, context):
    response = requests.get("https://www.test.com/")
    print(response.text)
    return response.text
if __name__ == "__main__":
    main('', '')

c) lambda 실행

# 기본 실행 및 출력 
$ aws lambda invoke --function-name lambda-jssvs-dev out \
--log-type Tail --query 'LogResult' --output text | base64 -d

lambda 의 출력 포맷은 base64로 인코딩 되어 있기 때문에, 우리가 보려면 base64 로 디코딩 해야 한다.

 

d) lambda 삭제 

$ aws lambda delete-function --function-name lambda-jssvs-dev

e) lambda 리스트 조회 및 검색

# 검색 
$ aws lambda list-functions --max-items 10
# 조회
$ aws lambda get-function --function-name lambda-jssvs-dev

 

 

** lambda 기본적으로 지원할 것 같지만 지원하지 않는 라이브러리들이 (requests 등 ) 있기 때문에, 개발자가 직접 소스를 올려줘야 한다.

** 웹 콘솔에서는 레이어 라는 이름으로 외부 패키지나 라이브러리를 올려서 사용할 수 있다.

** 함수를 생성할때 파이썬 버전은 꼭 맞춰주길 바란다. (다른 언어도 마찬가지. )   

반응형
반응형

1. 문제 소개

  • 구현 문제에 해당된다
  • undo 명령구간에 모든 명령어는 취소하고, 남은 type 명령의 문자를 저장해서 출력하는 문제다.
  • undo 명령도 undo 에 의해 취소 될 수 있다.

2.코드

#입력 테스트 케이스 처리
def setTestCase():
    N=input()
    COMMAND_LIST = []
    for i in range(int(N)):
        COMMAND_LIST.append(input())
    return COMMAND_LIST

#커맨드 라인 파싱 
def command_parser(input_command_list):
    command_list = []
    for i in input_command_list:
        split_command = i.split(" ")
        command_type,command_content,seconds = split_command[0],split_command[1],int(split_command[2])
        command_list.append((seconds,command_content))
    
    return command_list


# 입력 처리 
def process_command(command_list):
    text = ''   # 결과 문자열을 담을 변수
    rollback_point=-1   # undo 의 분기를 찾을 카운트
    
    # 명령어는 역순으로 처리
    for command_set in reversed(list(command_list)):
        seconds,command = command_set
        # undo 가 적용되는 마지막 명령어 분기
        if seconds == rollback_point:       
            rollback_point = -1 
            continue
        # undo 가 해당되는 구간의 명령어 skip 분기
        elif rollback_point != -1 and seconds > rollback_point:
            continue
        # undo 명령어를 만났을 때 분기
        elif command.isdigit():
            rollback_point = seconds - int(command)
            if rollback_point < 0:
                rollback_point = 0
            continue
        # 문자 저장
        text+=command
    
    return text[::-1]   # 역순으로 결과 문자열이 저장되었기 때문에 reverse 처리 
        


def solution():
    input_command_list = setTestCase()
    command_list=command_parser(input_command_list)
    result = process_command(command_list)

    print(result)

if __name__=='__main__':
    solution()

3.코멘트

  • 스택구조의 프로그램 호출 처리 흐름을 모방하여, 명령어를 역순으로 undo 범위에 해당되는 명령은 취소하여 문제를 해결했다.  
  • 생각한 테스트 케이스는 문제가 없었는데 실제 제출시 실패가 있었다.
  • 함께 스터디한 스터디원에게 반례 케이스 정보를 얻어 문제를 해결할 수 있었다.
  • type/undo 가 0초일때 , 그리고 같은 초에 여러 명령어의 입력이 들어올 경우를 추가하여 테스트 했다. 
  • 언제나 그랬지만 함께 열심히 공부한 스터디원에게서 지식을 얻는다. .. (스터디원들에게 고맙다 ) 
반응형
반응형

1.  에러 로그

-- 유실..

 

2. 원인 또는 현상

Werkzeug 모듈이 버전에 따라 secure_filename() 의 위치가 달라져, 발생하는 이슈

 

3.해결방법

해당 패키지 업그레이드.

$pip install --upgrade werkzeug==0.16.1
반응형
반응형

1.문제소개

  • 트리 유형의 문제다
  • 이진 트리 배열의 구조를 알면 좋다.
  • 입력 노드( 예제에서는 입력된 오리의 위치) 까지의 탐색 경로 중에, 이미 방문 ( 예제에서는 오리의 집 선점) 된 노드가 있다면 가장 처음 마주치는 노드를 출력하고, 아니면 0 을 출력하는 문제다. 

https://www.acmicpc.net/problem/20364

2.코드

import sys

def inputCase():
    N,Q = sys.stdin.readline().split(' ')
    N,Q  = int(N),int(Q)
    ducks = []
    for i in range(0,Q):
        ducks.append(int(sys.stdin.readline()))

    return N,Q,ducks

def getParentNode(childNode):
    return (childNode)//2
        
def solution():
    # 입력 전처리
    N,Q,ducks = inputCase()

    BT_F = [False for x in range(0,N+1)] # 이진 트리 배열, 오리의 땅의 점유 여부를 체크

    for e in ducks:
        p = e
        hasLand = False
        preDuckLoc = 0
        
        while(p>1):

            if BT_F[p] == True:
               hasLand = True
               preDuckLoc = p
            
            p = getParentNode(p) 

        if hasLand:
            print(preDuckLoc)
        else:
            print(preDuckLoc)
            BT_F[e]=True


solution()

3.코멘트

  • 처음 제출한 알고리즘이 틀렸는데도, 백준에서는 시간 초과로 출력이 됐다. 
  • 해당 문제는 기본 입력 함수인 input() 을 이용할 경우 시간 초과로 출력이 되며, 성능면에서 더 빠른 sys 패키지 내 readline() 을 이용해야 한다 . ( 함께 스터디한 재민님에 의해 알게 됐다.)
  • 처음에 부모 노드의 방문 체크만 해서 틀렸는데, 이 역시 스터디를 통해 해결 할 수 있었다.
반응형
반응형

1.앤서블(Ansible) 이란?

  • python으로 개발됨
  • 환경설정, 배포를 가능하게 하는 툴
  • 깃허브 파이썬 랭킹중 상위.
  • 장점자동 배포 환경이 쉽다
  • 멱등성(여러번 적용해도 결과가 바뀌지 않음)
  • 빠른 ssh 통신. 빠른 provision이 가능

2.왜 앤서블(Ansible) ?

아직 실무에 앤서블을 적용해보기 전이고, 편리함, 이점이 피부로 와닿지는 않지만, 아래의 상황에서 좋을 것 같다

-> 작업자가 서버 셋팅을 반복적으로 해야 하거나,다수의 서버에 클러스터로 동일한 설정을 해줘야 할 경우

 

2.앤서블 설치

pip 을 통해 설치가 가능하다

$pip install ansible

3. 앤서블 사용하기

a) Inventory 설정

  • 타겟이 되는 host(managed node)들에 대한 메타정보를 기술한 파일이다. -> 쉽게 말하면 내가 어떤 서버들에 작업을 할건지, 서버 
  • 호스트마다 변수를 지정할 수 있으며 호스트 변수라고 한다
  • yaml , ini 포맷으로 작성해야 한다
  • ansible을 설치경로에 ansible.cfg 설정파일을 통해 mannual 하게 경로를 줄 수 있다

ansible.cfg

[defaults]
inventory=/home/jssvs/work/ansible_inventory

 

hosts

[airflow_worker]
airflow-worker-01  --> 이 부분은 /etc/hosts 에 등록되어 있으면 호스트명을 쓰면 된다.
airflow-worker-02

[airflow_scheduler]
localhost

 

ansible에 등록된 host 간 네트워크 통신이 되는지 ping 으로 체크 방법

$ ansible all -m ping

 

b)Playbook 작성

  • 등록된 호스트에 뭘 하게 할건지를 기술하는 파일이다. 워크플로우 작업들을 작성하면 된다.
  • 룰커맨드 실행/ 스크립트 실행 / 인스톨 패키지 등등을 정의할 수 있음특수문자( - ) 으로 플레이 단위를 구분한다.탭을 쓰면 안된당
  • 시작은 —-로 한다.
  • command, script, yum, service 등의 모듈을 지원한다.
  • 호스트에서 동작할 task 활동을 정의
  • yml 작성후 웹에서 yaml lint 사이트를 이용하여 들여쓰기를 정리하면 편하다.
  • http://www.yamllint.com
 

YAMLlint - The YAML Validator

YAML Lint Paste in your YAML and click "Go" - we'll tell you if it's valid or not, and give you a nice clean UTF-8 version of it. Optimized for Ruby.

www.yamllint.com

Playbook1.yml

#Simple PlayBook1.yml
---
-
	name : play1
	hosts : localhost
	tasks :
		- name : Execute command 'date'
			command : date
		
		- name : Execute run script
			script: test_script.sh

		- name : test2
      shell: "echo 'helloworld' >> /home/deploy/ansible_print.log


-
	name : play2
	hosts : airflow_worker01
	tasks : 
		- name : Install Yum package
			yum : 
				name: httpd
				state : present
		- name : Run httpd server
			service :
				name : httpd
				state : started

c) Playbook 실행

$ansible-playbook <playbook-filename>
반응형
반응형

1. Kinesis(키네시스)란?

리얼 타임 스트리밍 데이터를 수집하고, 처리하기 위한 도구. 분산 메시징 시스템

2. 파이프라인 

kinesis -> lambda -> s3 

 

3. 사전 준비 

  • 키네시스 스트림이 생성되어 있어야 한다.
  • 다음 권한을 가진 IAM 이 필요하다
    • 키네시스 CRUD
    • 키네시스와 S3에 접근 및 읽기 쓰기가 가능한 Lamda Role
  • kiner를 이용하여 테스트 레코드를 전송한다.

4.코드

a) Lambda Code

from __future__ import print_function
import boto3
import base64
import json

print('Loading function')
AWS_BUCKET_NAME = 'jssvs-bucket'
s3 = boto3.client('s3')


# 람다 트리거 발생 시 호출
def lambda_handler(event, context):
    #print("Received event: " + json.dumps(event, indent=2))
    
    for record in event['Records']:
        # Kinesis data is base64 encoded so decode here
        
        #base64 decode 
        payload = base64.b64decode(record['kinesis']['data']).decode("UTF-8")
		
        #json 타입으로 읽어 변수에 저장
        jsonObj = json.loads(payload)
        
        # s3 적재에 필요한 변수 선언
        s3 = boto3.resource('s3')
        bucket=s3.Bucket(AWS_BUCKET_NAME)
        path='kinesis_test/parsed_log/'
        
        
        # 데이터를 S3 에 적재 
        s3.Object(AWS_BUCKET_NAME,path).put(Body=json.dumps(jsonObj[1]))
        
        print(jsonObj[1])
    return 'Successfully processed {} records.'.format(len(event['Records']))

b) 키네시스 Bulk Put Record

from uuid import uuid4
from kiner.producer import KinesisProducer

def on_flush(count, last_flushed_at, Data=b'', PartitionKey='', Metadata=()):
    print("Flushed {count} messages at timestamp {last_flushed_at}\nLast message was {Metadata['id']} paritioned by {PartitionKey} ({len(Data)} bytes)".format(count,last_flushed_at,Data,PartitonKey,Metadata))

p = KinesisProducer('[키네시스 이름]', flush_callback=on_flush)

for i in range(10):
    p.put_record('["dummy_log",{"jsonKey":"jsonValue"}]\n', metadata={'id': uuid4()}, partition_key=f"{i % 2}")



p.close()

** lambda 앞단에 firehose를 배치하여 파이프라인을 구성하는 방법도 있습니다.

** lambda에 키네시스 트리거 이벤트를 바로 붙히는데 레코드를 모아서 처리할 경우, baes64로 decode 된 값이 정상 포맷이 아닐 수 있습니다. deaggregation 과정이 필요하다는 이야기. 

 

*** kinesis , lambda 생성과정, 트리거 생성 부분등의 과정이 생략되었는데, 추후에 스크린샷을 떠서 올리겠습니다.  :D

반응형

'Cloud Platform Service' 카테고리의 다른 글

AWS lambda - cli를 이용해서 만들기  (0) 2021.08.10
AWS CLI 설치 및 s3 관련 기본 명령어  (0) 2021.05.31
GoogleCloudSDK (1)  (0) 2018.10.27
반응형

1.문제소개

  • 모든 정점들간의 최단경로를 구하는 플로이드 와샬을 사용하면서, 출력 요구사항만 맞추면 된다.
  • 시간복잡도는 for 루프가 3중으로 수행되기 때문에 O(n3) 이다.
  • 공간복잡도는 행렬의 크기만큼 O(n2)이다.

2.코드


def floydWarshall(dag, N):
    for i in range(0, N):
        for j in range(0, N):
            if dag[i][j] == 0 and i != j:
                dag[i][j] = -1  # INF

    for k in range(0, N):
        for i in range(0, N):
            for j in range(0, N):
                if dag[i][k] > 0 and dag[k][j] > 0:
                    dag[i][j] = 1

    for i in range(0, N):
        for j in range(0, N):
            if dag[i][j] <= 0:
                print('{} '.format(0), end='')
            else:
                print('{} '.format(dag[i][j]), end='')
        print()


def main():
    N = int(input())
    dag = []
    for i in range(0, N):
        line = input().split(' ')
        dag += [[int(x) for x in line]]

    floydWarshall(dag, N)


if __name__ == '__main__':
    main()

3.코멘트

  • 그래프 문제는 알고리즘도 어렵지만 언제나 전처리부분 작성이 성가시다..
반응형

+ Recent posts