data engineering

elastic kibana 깨달은점. (data visualization)

qkqhxla1 2019. 12. 26. 19:30

키바나는 엘라스틱서치 내의 데이터를 가져와서 시각화를 해주는 도구이다. elk, efk스택 등등이라고 부르는 것들중 하나이고.. 무슨 내용을 적을까 하다가 보면서 깨달은 점을 간단하게 적기로 했다. 키바나의 경우에는 처음에 한번 데이터 넘기고 그 데이터로 그래프 그리면서 개념 이해하는게 너무 어려워서.. 보고 잘 동작하는 코드와 간단한 설명을 적음.


(키바나는 얼마 안해봐서 틀린 개념등이 있으면 댓글로 알려주시면 감사하겠습니다.)


우선 엘라스틱 서치가 기본으로 설치되어 있다고 가정한다. 설치는 찾아보면 있으니 따로 안적음.


이후로 엘라스틱 서치로 데이터를 주기적으로 쏠 프로그램을 짠다. 키바나는 엘라스틱 서치의 데이터를 가져와서 시각화하기때문에 엘라스틱 서치로 데이터를 쏴주고, 키바나에서 가져와야 한다.


1. 데이터를 넣는 부분이다.

# -*- coding: utf-8 -*-

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from datetime import datetime
from pytz import timezone


class MetricHelper:
    def __init__(self):
        self.host = '엘라스틱 서치 url'
        self.port = 9200
        self.es = Elasticsearch([{'host': self.host, 'port': self.port}, ], timeout=10)
        self._index = "item_count-{:%Y.%m}".format(datetime.now(timezone('Asia/Seoul')))

    def document_in_es(self, kind, start_day):
        already_in = self.es.search(index=self._index,
                           body={'query': {
                                    "bool": {
                                       "must": [
                                           {"match": {"start_day": start_day}},
                                           {"match": {"kind": kind}},
                                        ]
                                    }
                               }
                               })['hits']['hits']
        return already_in

    def delete_es_document(self, kind, start_day):
        while True:
            indexing_list = self.document_in_es(kind, start_day)
            if len(indexing_list) > 1:
                self.es.delete(index=self._index, doc_type='document', id=indexing_list[-1]['_id'], refresh=True)
                continue
            break

    def send_to_es(self, es_json_dict):
        es_json = [{
            "_index": self._index,
            "_type": "document",
            "_source": es_json_dict}]
        print es_json
        try:
            bulk(self.es, es_json, refresh=True)
        except:
            print 'error'
        self.delete_es_document(es_json_dict['kind'], es_json_dict['start_day'])


m = MetricHelper()
es_dict = {'kind': kind, 'fail_count_sql': fail_count_sql, 'count_sql': count_sql,
           'item_fail_count': item_fail_count, 'item_count': item_count, 
           'item_success_count': item_success_count,
           'success_rate': rate, 'start_day': today}
m.send_to_es(es_dict)

그냥 send_to_es함수 내부의 bulk()부분에서 데이터가 들어가면 된다고 생각하면 된다.(이후 부분은 나중에.


bulk시 refresh=True인자를 봐둬야 하는데 이건 다른부분에도 있다.(delete부분에도.) es에 데이터를 넣을때 저 코드가 실행되고 나서 바로 반영이 되는게 아니라서 데이터를 넣고 바로 조회를 하면 데이터가 들어가는 도중이라 안 들어간 것처럼 보인다. 한 3초?후에 조회하면 다시 보이는데 데이터가 완전히 들어가거나, 삭제되거나, 업데이트될때까지 기다리는게 저 옵션이라고 보면 된다.(처음에는 몰라서 무식하게 코드 사이에 time.sleep을 줬었음. 여기보니 나와 동일한 행동을 한 사람이..)


bulk에 넣을 es_json의 형태도 중요한데, 딕셔너리 안의 딕셔너리 형태이다. 이걸 주의하지 않고 넣게 되면 데이터가 이상하게 들어가고 시각화가 뭔가 잘 안되니... 데이터를 넣을때 잘 확인해두자.


데이터를 넣는 bulk()는 try except로 감싸놨는데 이건 종종 타임아웃인지 에러가 떠서 죽어버리는 경우가 있기 때문이다. 근데 더웃긴건 에러가 떠서 프로그램은 죽는데 데이터는 잘 들어가있다. -_-; 그리고 집계를 낼 때에 동일한 데이터가 두번 들어가면 하나의 종류에 대해서 두개의 도큐먼트가 들어가는 결과가 되어 버리므로 항상 데이터를 보내고 난 후에 들어간 데이터가 두개 이상이면 삭제하기 위해 delete_es_document함수를 사용했다.(분명히 디비의 유니크 인덱스처럼 한개만 들어가게 할수 있을것 같은데 잘 못찾겠어서 이렇게 구현 ㅠ)


이미 es내부에 있는 데이터를 가져오기 위해서 document_in_es함수를 사용했는데, 쿼리 짜는게 정말 요상하다.. 더 설명은 안하겠는데 쿼리를 짤때 주의해서 짜야 한다.(반드시 실행해서 결과값의 카운트가 맞는지도 확인하고.) 내가 짠 쿼리는 start_day와 kind가 둘다 일치하는 도큐먼트를 가져오는 쿼리이다.

그리고 delete_es_document에서 한개가 남을때까지 나머지를 지워준다.


결과적으로 es에 데이터를 넣는데, start_day와 kind를 기초로 1개씩만 들어가는 프로그램을 간단하게 짰다.



2. 키바나 부분.

키바나에서 저기로 가서 Index Patterns -> Create Index Pattern를 눌러서 인덱스 패턴을 만든다.

여기서 개념적으로 잘 알아둬야하는데 인덱스 패턴은 위의 소스코드에서 내가 만든 es_json의 _index값들을 다 가져오기 위해 만들어야 한다. 와일드카드 문자는 당연히 되고, 나같은 경우는

self._index = "item_count-{:%Y.%m}".format(datetime.now(timezone('Asia/Seoul')))

요렇게 '이름_이름-년.월' 포맷으로 만들었는데 년.월.일까지 붙여도 되지만 그경우 성능이 떨어질수 있다고 들어서 월까지만 만들었다.(실제 데이터는 하루에 한번씩 들어감.)


내가 만든 인덱스 데이터를 인덱스 패턴이 모두 가져올수 있어야 하므로 item_count-*로 인덱스 패턴을 생성했다. 시간값을 start_day필드에 넣어줬었는데,(datetime.datetime.now()객체) 이 경우 자동으로 시간값이 인덱스 생성할때 잡힌다.


이후 Visualize탭에서 +를 눌러 새로 그래프를 만들어준다.


아래의 탭에는 Data와 Panel options가 있는데 Panel options에 먼저 들어간다.

Panel options에서는 데이터 소스를 정한다. items_count-*라는 이름으로 인덱스 패턴을 만들었으니 Index pattern에 저렇게 적어주고, Time field에는 내가 시간을 넣은 start_day필드를 넣어준다. 하루에 한번씩 들어가니까 Interval은 1d이다.


이어서 데이터 탭이다.

데이터 탭에서 내가 보여줄건 전체대비 fail의 %비율이다. 

아이템 갯수를 구하기 위해 Sum of item_count를 만들어놨고, fail갯수를 구하기 위해 Sum of item_fail_count를 만들어놨다. 이거 한 탭이 Aggregation인데, 왜 하루에 한개씩만 들어갈거고 한 데이터에 대해서 값을 구하면 될텐데 Sum of item_count같은 개념으로 할까? 라고 생각할수도 있다.

그런데 이건 하루에 여러개의 document가 들어올 경우 item_count들의 합을 구하기 위해 사용되는 개념이다. 그래서 1개가 들어오나 n개가 들어오나 합을 구해놓으면 어차피 같으므로 저리 만들었다. 

그리고 %만 보여주면 될거라 눈 모양을 클릭해서 안보이게 해놨다. 


그리고 3번째 탭에서는 Aggregation을 Sum of item_fail_count와 Sum of item_count를 만든다음 이것에 대해 식을 썼다. 저 식대로면 fail비율이 나온다. (params는 위의 변수 집합)


위의 Aggregation은 전부 하루치의 데이터에 대해서만 데이터를 더하거나 구하거나 한다. 하루치가 기준인지는 Panel options에서 1d라고 값을 줘서 알아서 기준치를 잡는다고 생각하면 된다.


아주 기초적인 꺾은선 그래프만 그렸는데.. 위에 스샷엔 없는데 탭만 바꾸면 글씨만 나오거나 비율만 나오는것도 많다. 그리고 파이차트 등 그래프도 원하는대로 그릴 수 있다. 


근데 일단 개념적인 걸 좀 개인적으로 나중에 한번 더 보고 숙지하기 위함이므로... 한 방향에 대해서만 적었다.


적고 나니 나말고는 아무에게도 도움이 별로 되지는 않을듯 싶다..