반응형

1. HA (High Availability)

고가용성이라고도 하는데, 얼마나 운영이 오래될 수 있는가에 대한 성질이다. 에어플로우 또한 고가용성을 높이기 위해 클러스터로 운영하기도 하고, 나의 경우처럼 어떤 대안을 생각하기도 해야 한다.

2. 왜 헬스체크 (HealthCheck)를 ? 

단일 서버로 에어플로우를 운영해보니, 스케쥴러 프로세스가 내려가는 일이 자주는 아니지만 4~5개월에 한번 씩 간헐적으로 발생했다. ( 마지막 로그에는 메세지 큐로 이용중인 레디스와의 커넥션 타임아웃 로그가 마지막에 찍혀있었다. )

 

워크플로우의 작업 누락을 막으려면 어떤 조치가 필요했다. 에어플로우에서는 Rest API 로 헬스체크 기능을 제공했다.

>request
curl -XGET "http://에어플로우 서버 :5000/health"

>response
{
    "metadatabase": {
        "status": "healthy"
    },
    "scheduler": {
        "status": "healthy",
        "latest_scheduler_heartbeat": "2020-04-13T01:38:51.283786+00:00"
    }
}

3. go ticker를 이용하여 스케쥴러 알림 및 실행

일정 시간 마다 헬스 체크 Rest API로 상태를 받아와 스케쥴러가 UnHealthy 상태로 내려갈 때, 알림을 주거나 프로세스를 다시 올리는 프로그램을 만들어 볼 수 있다.

 

물론 작성한 프로그램이 죽어버리면 안되기 때문에, 서버에 sudo 권한이 있다면 ticker 부분을 제외하고 crontab 으로 걸어 더 안전하게 운영을 가져갈 수 있다.

 

더 좋은 방법은 managed service 를 이용하는 것이다. aws, gcp 모두 검색해보면 서비스가 있다.

package main

import (
	"net/http"
	"fmt"
	"time"
	"bytes"
	"io/ioutil"
	"github.com/multiplay/go-cticker"	// ref. https://github.com/multiplay/go-cticker
)

type AFStatus struct {
    Metadatabase  Status  `json:"metadatabase"`
    Scheduler Status `json:"scheduler"`
}

type Status struct {
    Status  string `json:"status"`
    Latest_scheduler_heartbeat  string `json:"latest_scheduler_heartbeat"`
}

func sendWatchAlert(msg string){
	// 장애 처리 부분 작성 
}

func healthCheck(){
    resp,err:=http.Get("https://myairflow_host/health")
    msg_tmpl:="[장애] airflow healthcheck ..\\n\\nscheduler status : {scheduler_status} \\nmetadatabase status : {metadatabase_status}\\nLatest_scheduler_heartbeat: {Latest_scheduler_heartbeat}"
    var  af  AFStatus
    if err!=nil{
        fmt.Println("[E] http request error .. ")
        fmt.Println(err)
    }

    if(resp.StatusCode != 200) {
        sendWatchAlert("[장애] airflow healthcheck \\n\\nwebserver status: unHealthy")
        fmt.Println("[E] http response 200 error .. ")
        return
    }
    buf := new(bytes.Buffer)
    buf.ReadFrom(resp.Body)
    newStr := buf.String()
    err = json.Unmarshal([]byte(newStr),&af)
    if err != nil {
        fmt.Println("[E] Unmarshal  Error .. ")
        fmt.Println(err)
    }

    msg:=strings.Replace(msg_tmpl,"{scheduler_status}",af.Scheduler.Status,1)
    msg=strings.Replace(msg,"{metadatabase_status}",af.Metadatabase.Status,1)
    msg=strings.Replace(msg,"{Latest_scheduler_heartbeat}",af.Scheduler.Latest_scheduler_heartbeat,1)

    if af.Scheduler.Status != "healthy" || af.Metadatabase.Status != "healthy" {
        sendWatchAlert(msg)
    }
}



func main() {
    fmt.Println("[L] airflow health checker start..")
	
//  interval:=30
//  flag.IntVar(&interval,"-i",180,"ticker interval")
//  flag.Parse()

    healthCheck()
    t := cticker.New(time.Second * 60 ,time.Second) //duration , accuracy
    for tick := range t.C {
        // Process tick
        healthCheck()
        //fmt.Println("tick:", tick)
    }
}

ref. https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/check-health.html

 

Checking Airflow Health Status — Airflow Documentation

 

airflow.apache.org

 

 

반응형

+ Recent posts