반응형
1. Kinesis(키네시스)란?
리얼 타임 스트리밍 데이터를 수집하고, 처리하기 위한 도구. 분산 메시징 시스템
2. 파이프라인
kinesis -> lambda -> s3
3. 사전 준비
- 키네시스 스트림이 생성되어 있어야 한다.
- 다음 권한을 가진 IAM 이 필요하다
- 키네시스 CRUD
- 키네시스와 S3에 접근 및 읽기 쓰기가 가능한 Lamda Role
- kiner를 이용하여 테스트 레코드를 전송한다.
4.코드
a) Lambda Code
from __future__ import print_function
import boto3
import base64
import json
print('Loading function')
AWS_BUCKET_NAME = 'jssvs-bucket'
s3 = boto3.client('s3')
# 람다 트리거 발생 시 호출
def lambda_handler(event, context):
#print("Received event: " + json.dumps(event, indent=2))
for record in event['Records']:
# Kinesis data is base64 encoded so decode here
#base64 decode
payload = base64.b64decode(record['kinesis']['data']).decode("UTF-8")
#json 타입으로 읽어 변수에 저장
jsonObj = json.loads(payload)
# s3 적재에 필요한 변수 선언
s3 = boto3.resource('s3')
bucket=s3.Bucket(AWS_BUCKET_NAME)
path='kinesis_test/parsed_log/'
# 데이터를 S3 에 적재
s3.Object(AWS_BUCKET_NAME,path).put(Body=json.dumps(jsonObj[1]))
print(jsonObj[1])
return 'Successfully processed {} records.'.format(len(event['Records']))
b) 키네시스 Bulk Put Record
from uuid import uuid4
from kiner.producer import KinesisProducer
def on_flush(count, last_flushed_at, Data=b'', PartitionKey='', Metadata=()):
print("Flushed {count} messages at timestamp {last_flushed_at}\nLast message was {Metadata['id']} paritioned by {PartitionKey} ({len(Data)} bytes)".format(count,last_flushed_at,Data,PartitonKey,Metadata))
p = KinesisProducer('[키네시스 이름]', flush_callback=on_flush)
for i in range(10):
p.put_record('["dummy_log",{"jsonKey":"jsonValue"}]\n', metadata={'id': uuid4()}, partition_key=f"{i % 2}")
p.close()
** lambda 앞단에 firehose를 배치하여 파이프라인을 구성하는 방법도 있습니다.
** lambda에 키네시스 트리거 이벤트를 바로 붙히는데 레코드를 모아서 처리할 경우, baes64로 decode 된 값이 정상 포맷이 아닐 수 있습니다. deaggregation 과정이 필요하다는 이야기.
*** kinesis , lambda 생성과정, 트리거 생성 부분등의 과정이 생략되었는데, 추후에 스크린샷을 떠서 올리겠습니다. :D
반응형
'Cloud Platform Service' 카테고리의 다른 글
AWS lambda - cli를 이용해서 만들기 (0) | 2021.08.10 |
---|---|
AWS CLI 설치 및 s3 관련 기본 명령어 (0) | 2021.05.31 |
GoogleCloudSDK (1) (0) | 2018.10.27 |