반응형

1. lambda 란?

간단하게 서버리스로 코드를 실행할 수 있는 aws 컴퓨팅 서비스라고 할 수 있다.

2. lambda 를 왜 쓰는가?

프로그램 실행을 위한 런타임 환경이나 실행가능한 서버가 필요없다.  - 서버리스

동시성등을 고려한 개발을 프로그래머가 직접 해줄 필요가 없다 - 옵션 에서 동시성이나 병렬 제어가 가능하기 때문에

모니터링이 편하다. 

데이터 엔지니어 입장에서 aws lambda 를 통한 스트리밍 처리도 가능하다고 생각한다. (허용되는 데이터 발생량이나 규모에 따라)

 

3. lambda 만들기

람다는 웹 콘솔에서 GUI 로도 손쉽게 만들 수 있다.

하지만 이번 포스팅은 로컬 서버에서 aws cli 를 이용하여 lambda 함수를 만들어 본다.

실습 예제는 aws 공식 홈페이지에 나와있는 내용들이다. ( 공식 홈페이지 가이드가 더 내용이 풍부하니까 그 쪽을 먼저 보길 바란다) 

 

a) 사전 준비 (시간이 부족해서 생략) 

  • 로컬 PC 에서 aws cli 가 설치 및 IAM 구성이 되어 있어야 한다.
  • lambda 생성과 실행 등 필요한 IAM 권한이 미리 준비되어 있어야 한다 .
  • 실행권한 arn 이 필요하다.

b) lambda 생성

# 워킹 디렉토리 생성 및 이동
$ mkdir lambda_python_sample 
$ cd lambda_python_sample

# 람다 코드 작성
$ vi lambda_function.py  


# 사용되는 패키지를 working 디렉토리에 다운로드
$ pip install --target ./package requests 
$ cd package

# 패키지 압축
$ zip -r ../jssvs-development-package.zip .

# lambda 코드 압축
$ zip -g jssvs-deployment-package.zip lambda_function.py  

# lambda 생성 명령어  - 함수이름, 압축 파일 경로, 이벤트 핸들러 명(function 이름) # 역할 arn 순으로.
$ aws lambda create-function --function-name lambda-jssvs-dev \
--zip-file fileb://[home dir]/WorkSpace/aws/lambda_python_sample/jssvs-deployment-package.zip \ 
--handler lambda_function.main --runtime python3.7  \
--role arn:aws:iam::[ars code]:role/gamebi-lambda-role

-- lambda 코드 

import requests
def main(event, context):
    response = requests.get("https://www.test.com/")
    print(response.text)
    return response.text
if __name__ == "__main__":
    main('', '')

c) lambda 실행

# 기본 실행 및 출력 
$ aws lambda invoke --function-name lambda-jssvs-dev out \
--log-type Tail --query 'LogResult' --output text | base64 -d

lambda 의 출력 포맷은 base64로 인코딩 되어 있기 때문에, 우리가 보려면 base64 로 디코딩 해야 한다.

 

d) lambda 삭제 

$ aws lambda delete-function --function-name lambda-jssvs-dev

e) lambda 리스트 조회 및 검색

# 검색 
$ aws lambda list-functions --max-items 10
# 조회
$ aws lambda get-function --function-name lambda-jssvs-dev

 

 

** lambda 기본적으로 지원할 것 같지만 지원하지 않는 라이브러리들이 (requests 등 ) 있기 때문에, 개발자가 직접 소스를 올려줘야 한다.

** 웹 콘솔에서는 레이어 라는 이름으로 외부 패키지나 라이브러리를 올려서 사용할 수 있다.

** 함수를 생성할때 파이썬 버전은 꼭 맞춰주길 바란다. (다른 언어도 마찬가지. )   

반응형
반응형

1. Kinesis(키네시스)란?

리얼 타임 스트리밍 데이터를 수집하고, 처리하기 위한 도구. 분산 메시징 시스템

2. 파이프라인 

kinesis -> lambda -> s3 

 

3. 사전 준비 

  • 키네시스 스트림이 생성되어 있어야 한다.
  • 다음 권한을 가진 IAM 이 필요하다
    • 키네시스 CRUD
    • 키네시스와 S3에 접근 및 읽기 쓰기가 가능한 Lamda Role
  • kiner를 이용하여 테스트 레코드를 전송한다.

4.코드

a) Lambda Code

from __future__ import print_function
import boto3
import base64
import json

print('Loading function')
AWS_BUCKET_NAME = 'jssvs-bucket'
s3 = boto3.client('s3')


# 람다 트리거 발생 시 호출
def lambda_handler(event, context):
    #print("Received event: " + json.dumps(event, indent=2))
    
    for record in event['Records']:
        # Kinesis data is base64 encoded so decode here
        
        #base64 decode 
        payload = base64.b64decode(record['kinesis']['data']).decode("UTF-8")
		
        #json 타입으로 읽어 변수에 저장
        jsonObj = json.loads(payload)
        
        # s3 적재에 필요한 변수 선언
        s3 = boto3.resource('s3')
        bucket=s3.Bucket(AWS_BUCKET_NAME)
        path='kinesis_test/parsed_log/'
        
        
        # 데이터를 S3 에 적재 
        s3.Object(AWS_BUCKET_NAME,path).put(Body=json.dumps(jsonObj[1]))
        
        print(jsonObj[1])
    return 'Successfully processed {} records.'.format(len(event['Records']))

b) 키네시스 Bulk Put Record

from uuid import uuid4
from kiner.producer import KinesisProducer

def on_flush(count, last_flushed_at, Data=b'', PartitionKey='', Metadata=()):
    print("Flushed {count} messages at timestamp {last_flushed_at}\nLast message was {Metadata['id']} paritioned by {PartitionKey} ({len(Data)} bytes)".format(count,last_flushed_at,Data,PartitonKey,Metadata))

p = KinesisProducer('[키네시스 이름]', flush_callback=on_flush)

for i in range(10):
    p.put_record('["dummy_log",{"jsonKey":"jsonValue"}]\n', metadata={'id': uuid4()}, partition_key=f"{i % 2}")



p.close()

** lambda 앞단에 firehose를 배치하여 파이프라인을 구성하는 방법도 있습니다.

** lambda에 키네시스 트리거 이벤트를 바로 붙히는데 레코드를 모아서 처리할 경우, baes64로 decode 된 값이 정상 포맷이 아닐 수 있습니다. deaggregation 과정이 필요하다는 이야기. 

 

*** kinesis , lambda 생성과정, 트리거 생성 부분등의 과정이 생략되었는데, 추후에 스크린샷을 떠서 올리겠습니다.  :D

반응형

'Cloud Platform Service' 카테고리의 다른 글

AWS lambda - cli를 이용해서 만들기  (0) 2021.08.10
AWS CLI 설치 및 s3 관련 기본 명령어  (0) 2021.05.31
GoogleCloudSDK (1)  (0) 2018.10.27
반응형

1. aws cli 설치 ( virtualenv 환경 )

$ curl "https://s3.amazonaws.com/aws-cli/awscli-bundle.zip" -o "awscli-bundle.zip"
$ unzip awscli-bundle.zip
$ sudo ./awscli-bundle/install -i /usr/local/aws -b /usr/local/bin/aws

2. aws 계정 configure 설정

$ pip install --user virtualenv

$ virtualenv ~/cli-ve

$ virtualenv -p /usr/bin/python37 ~/cli-ve

$ pip install --upgrade awscli

$ aws configure

3. s3 관련 기본 명령어들

버킷 생성
$ aws s3 mb s3://fruits
# mb = makebucket

버킷 조회
$aws s3 ls

파일 복사하기 (단일 파일)
# aws s3 cp s3://[source] [target]
$ aws s3 cp ./jayden_sample.csv s3://jayden_test/jayden/sample/

파일 복사하기 (멀티플 파일)
$ aws s3 cp s3://[source] [target] --recursive



파일 목록 조회하기
$ aws s3 ls s3://jayden_test

--recursive 옵션
: 하위 디렉토리 까지 적용
$ aws cp . s3://jayden_test --recursive

$ aws s3 cp [source] [target]

용량 확인하기
aws s3 ls s3://[버켓]/디렉토리 --summarize --human-readable
반응형
반응형
  •  GoogleCloud SDK
    • GoogleCloud 서비스에 필요한 커맨드를 제공한다.
  • 설치방법(Linux)
  • bq 
    • $ bq load --nosync --autodetenct [projectId]:[datasetId].[tableName] [버킷/파일경로] [테이블 스키마]         
    • --autodetect          //자동스키마타입
    • --skip_leading_rows=1         //헤더라인 스킵
    • --nosync          //비동기 방식
    • 테이블 스키마
      • [{"name":"name","type":"string","mode":"nullable"}]
      • 타입은 소문자만 허용
      • csv-gz 포맷일 경우 타입의 순서도 중요
    • $ bq shell           //대화형 모드
    • $ bq mk [dataset 이름]
    • $ bq query "[쿼리]"
  • gsutil
    • $ gsutil ls  gs://nmlog/
    • $ gsuil du gs://nmlog/
    • $ gsutil cp -m [파일] [gs경로]
    • -m           //병렬로드 옵션
  • gcloud
    • $ gcloud auth login          //구글 클라우드 계정 로그인
    • $ gcloud projects list         //프로젝트 리스트
    • $ gcloud auth list                   //계정 리스트

 

반응형

+ Recent posts