반응형
1. HA (High Availability)
고가용성이라고도 하는데, 얼마나 운영이 오래될 수 있는가에 대한 성질이다. 에어플로우 또한 고가용성을 높이기 위해 클러스터로 운영하기도 하고, 나의 경우처럼 어떤 대안을 생각하기도 해야 한다.
2. 왜 헬스체크 (HealthCheck)를 ?
단일 서버로 에어플로우를 운영해보니, 스케쥴러 프로세스가 내려가는 일이 자주는 아니지만 4~5개월에 한번 씩 간헐적으로 발생했다. ( 마지막 로그에는 메세지 큐로 이용중인 레디스와의 커넥션 타임아웃 로그가 마지막에 찍혀있었다. )
워크플로우의 작업 누락을 막으려면 어떤 조치가 필요했다. 에어플로우에서는 Rest API 로 헬스체크 기능을 제공했다.
>request
curl -XGET "http://에어플로우 서버 :5000/health"
>response
{
"metadatabase": {
"status": "healthy"
},
"scheduler": {
"status": "healthy",
"latest_scheduler_heartbeat": "2020-04-13T01:38:51.283786+00:00"
}
}
3. go ticker를 이용하여 스케쥴러 알림 및 실행
일정 시간 마다 헬스 체크 Rest API로 상태를 받아와 스케쥴러가 UnHealthy 상태로 내려갈 때, 알림을 주거나 프로세스를 다시 올리는 프로그램을 만들어 볼 수 있다.
물론 작성한 프로그램이 죽어버리면 안되기 때문에, 서버에 sudo 권한이 있다면 ticker 부분을 제외하고 crontab 으로 걸어 더 안전하게 운영을 가져갈 수 있다.
더 좋은 방법은 managed service 를 이용하는 것이다. aws, gcp 모두 검색해보면 서비스가 있다.
package main
import (
"net/http"
"fmt"
"time"
"bytes"
"io/ioutil"
"github.com/multiplay/go-cticker" // ref. https://github.com/multiplay/go-cticker
)
type AFStatus struct {
Metadatabase Status `json:"metadatabase"`
Scheduler Status `json:"scheduler"`
}
type Status struct {
Status string `json:"status"`
Latest_scheduler_heartbeat string `json:"latest_scheduler_heartbeat"`
}
func sendWatchAlert(msg string){
// 장애 처리 부분 작성
}
func healthCheck(){
resp,err:=http.Get("https://myairflow_host/health")
msg_tmpl:="[장애] airflow healthcheck ..\\n\\nscheduler status : {scheduler_status} \\nmetadatabase status : {metadatabase_status}\\nLatest_scheduler_heartbeat: {Latest_scheduler_heartbeat}"
var af AFStatus
if err!=nil{
fmt.Println("[E] http request error .. ")
fmt.Println(err)
}
if(resp.StatusCode != 200) {
sendWatchAlert("[장애] airflow healthcheck \\n\\nwebserver status: unHealthy")
fmt.Println("[E] http response 200 error .. ")
return
}
buf := new(bytes.Buffer)
buf.ReadFrom(resp.Body)
newStr := buf.String()
err = json.Unmarshal([]byte(newStr),&af)
if err != nil {
fmt.Println("[E] Unmarshal Error .. ")
fmt.Println(err)
}
msg:=strings.Replace(msg_tmpl,"{scheduler_status}",af.Scheduler.Status,1)
msg=strings.Replace(msg,"{metadatabase_status}",af.Metadatabase.Status,1)
msg=strings.Replace(msg,"{Latest_scheduler_heartbeat}",af.Scheduler.Latest_scheduler_heartbeat,1)
if af.Scheduler.Status != "healthy" || af.Metadatabase.Status != "healthy" {
sendWatchAlert(msg)
}
}
func main() {
fmt.Println("[L] airflow health checker start..")
// interval:=30
// flag.IntVar(&interval,"-i",180,"ticker interval")
// flag.Parse()
healthCheck()
t := cticker.New(time.Second * 60 ,time.Second) //duration , accuracy
for tick := range t.C {
// Process tick
healthCheck()
//fmt.Println("tick:", tick)
}
}
ref. https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/check-health.html
Checking Airflow Health Status — Airflow Documentation
airflow.apache.org
반응형
'Data Engineer' 카테고리의 다른 글
pyspark 기초 (1) (0) | 2021.10.03 |
---|---|
[python] dataprep을 이용하여 EDA (데이터 분석) 레포트 쉽게 만들기 (0) | 2021.06.06 |
Elasticsearch (ES) 기초 (1) - CRUD (0) | 2021.06.03 |
Apache Spark 기초 (2) (0) | 2021.06.02 |
[hive] pyhive, ph2 라이브러리를 이용하여 python hive 연동 (0) | 2021.06.02 |