data engineering

apache kafka 기본, 삽질 등.

qkqhxla1 2020. 2. 4. 10:41

카프카를 처음 다루게 되면서, 겪었던 삽질이나, 삽질하면서 얻었던 것 등의 과정을 올리고자 한다. 그냥 내가 한 삽질을 따라가면서 기본 튜토리얼만 읽었을때와 다르게 어떤 어려움이 있었는지,있을지 짐작하는 용도로 쓰자.



개발자들이 대부분 어떤 기술이나 이런걸 처음 입문하거나 할때, 아마 대부분 기본적으로 데스크탑에 설치해보고, 튜토리얼을 따라하면서 사용법을 익힐것이다. 나도 동일하게 튜토리얼을 보고 회사 맥에 설치해보려다가 계정 권한 문제인지 실행이 잘 되지 않았다.(순수 admin을 내가 갖고있는게 아님.)


요 글에서 적었듯이 우리 팀에서는 쿠버네티스를 편리하게 컨트롤해주는 gui플랫폼인 랜쳐라는걸 쓴다. 랜쳐에서는(쿠버네티스에서는) 매우 편하게 클릭 몇번으로 서비스를 설치하거나 할수 있는데, 로컬에서는 뭔가 문제가 있는지 실행이 안되니 랜쳐에서 카프카를 설치후 로컬에서 쿠버네티스 안의 카프카에 접속해서 테스트를 해보기로 했다.(그리고 쿠버네티스 안의 이것저것 인자를 바꿈으로서 데브옵스적인 요소들과 좀더 친해지고자 하는것도 있었음.) 

너무 편하다. 그냥 kafka로 검색후 맞는걸 누르고 launch를 누르면 알아서 설치된다. 클릭 두번만에 카프카 설치가 완료되었다.(kafka from Library를 설치함)


카프카를 시작하기 전에, 우리는 현재 일부 시스템의 큐 용도로 redis를 쓴다. 전에 시니어분한테 레디스하고 카프카의 차이점이 뭐에요? 하고 물어본적이 있는데, 그때 시니어분이 간단하게 레디스는 메모리에 데이터를 저장하고, 카프카는 파일로 저장한다. ~~~등등을 말씀하셨다. 자세한 설명은 여기(용도만 보자면 레디스와 카프카는 쓰이는곳이 다르다. 레디스는 주로 데이터를 빠르게 조회하기위한 캐싱 데이터 저장용, 카프카가 큐로 쓰인다.)

어쨌든 레디스를 쓰는데, 레디스는 엄청 편하다. 설치도 간편하고 최소 설치로 설치할경우 올라오는것도 마스터 하나밖에 없다. redis의 list는 push와 pop코드도 명시적이어서 음 뭐 카프카도 비슷하겠지~ 하고 설치를 했는데 아니었다.


일단 보기에는 카프카가 훨씬 더 복잡하다..(아키텍쳐도 그렇고 돌아가는것도 그렇고..)

카프카가 zookeeper위에서 돌아가기에 주키퍼가 필요하다. 그리고 카프카 아키텍쳐 그림을 한번 보자.

https://docs.cloudera.com/documentation/kafka/1-2-x/topics/kafka.html

producer는 카프카에 메시지를 넣는 생산자고, topic은 큐 하나하나, partition은 topic을 나눠놓은거고 consumer는 큐에서 데이터를 가져가는 주체라고보면된다. 위 그림에는 없는데 broker라는게 있는데, producer가 메시지를 생산하면 그 메시지를 받아서 topic에 넣어주는 일꾼같은거라고 보면 된다.


아키텍쳐 글도 한번 읽어보자 : https://epicdevs.com/17https://towardsdatascience.com/kafka-python-explained-in-10-lines-of-code-800e3e07dad1


위 글 말고도 불충분하면 구글링해서 몇개 더 읽어보고, 쿠버네티스안의 카프카를 살펴봤다. 근데 이상하게 end point가 웹이나 31385포트로 밖에 나온것밖에 없다. producer나 cousumer 예제코드를 살펴보면 bootstrap_servers라는 엔드포인트처럼 보이는것에 넣기 위해 9092 포트를 쓰는 서버 목록이 있어서 그걸 넣어야 할것 같은데 없다. 


카프카에서 생성된 서비스 목록들을 보니 이런것들이 있다. 주키퍼 3개, 카프카 3개, 카프카 rest, 카프카 ui.

하나씩 들어가서 로그를 보다가 카프카 rest의 로그에서 bootstrap.servers관련된걸 찾았고, 또 그 서버주소를 보니 그게 그냥 카프카 3개의 리스트이다. 아 그럼 카프카 3개의 주소를 쓰면 밖에서 접속할수 있나??? 했는데 아니었다. 


결국 실패해서 찾아봤는데 쿠버네티스에서 카프카를 설치한 경우, 외부에서 쿠버네티스 안의 카프카에 접근하는게 되게 복잡한 문제라는걸 깨달았다. 그냥 설치된 카프카에 아이피 입력해서 접속 단순하게 이게 아니다.


참고링크.

https://argus-sec.com/external-communication-with-apache-kafka-deployed-in-kubernetes-cluster/


유용한 링크라서 읽어보길 추천함.

첫번째 링크에는 3가지 방법이 나와있다. 

1. rest를 사용해서 각각의 broker에 접근.

2. 각각의 broker를 nodeport로 포트포워딩처럼 열어줌.

3. 각각의 broker마다 로드밸런서를 달아서 접근

1번의 해결책을 위해 쿠버네티스에서 카프카를 설치했을때 기본적으로 rest가 설치되어있는것같다.(위에서 언급한 31385포트)


여기까지 왔을때 아니 동작 테스트 한번하려고 별짓을 다하게되네..? 하는 생각이 들었다. 슬슬 귀찮아지려는 찰나,

https://medium.com/@tsuyoshiushio/configuring-kafka-on-kubernetes-makes-available-from-an-external-client-with-helm-96e9308ee9f4


요 글을 보게되었다. 일본 개발자같은데 현재 내가 직면한 문제에 대한 해결법을 적어놨다. 내려가니까... yml을 추가해야하는것은 물론 쿠버네티스에 버그가 있는지 어떻게 저떻게 더 해야한다고 한다.

여기까지 읽고 깔끔하게 '쿠버네티스에서 kafka를 설치한 후 외부에서 접속해보는 실습해보기'는 포기했다.(너무귀찮)


대신 바로 위의 링크에서 유용한 정보를 얻었는데 당연한 얘기지만 쿠버네티스 안에서는 같은 네트워크이므로 테스트가 가능하다. 쿠버네티스 안에 우분투를 하나 설치하고, 그 우분투에서 kafka로 접속하면 된다!(너무 당연함)

위에 링크에 headless에 대해서 적혀있는데, headless에 대해서 찾아보면(설명) kubernetes에서 statefulset들의 네트워크를 컨트롤하기 위한 그런 로드 밸런서같은거다. 

그러니 쿠버네티스 내부에서 headless주소를 이용해서도 카프카 3개(statefulset)에 접근 가능하다.

당연히 여기서 떠오르는 의문인 나처럼 쿠버네티스 바깥에서 접속하는 사람도 많을텐데 왜 바깥에서는 접근하기가 불편하게 만들었을까? 에 대한 답은 https://stackoverflow.com/questions/59782188/why-headless-service-to-be-used-for-kafka-in-kubernetes-why-not-cluster-ip-with 여기 있다.

요약하면 : statefulset이라서 그런다. statefulset은 각각의 인스턴스에서 본인의 데이터에 대해 책임이 있다. 각각의 인스턴스들은 정확히 모두 replica가 아니기 때문에 이것들을 로드밸런싱하는데는 headless가 쓰인다.. -> 카프카가 statefulset이기 때문에 이럴수밖에 없다.

여기서 혹시나 stateful과 stateless에 대한 차이가 궁금한 분들을 위한 링크 : https://linuxhint.com/stateful-vs-stateless-kubernetes/


여기서 producer의 예제 코드를 하나 올림.(참고로 아래 코드를 돌리려면 test라는 토픽이 있어야 함.)

# -*- coding: utf-8 -*-

from kafka import KafkaProducer


if __name__ == "__main__":
    host = "kafka-kafka-headless.kafka:9092"  # broker주소(headless주소)
    topic_name = 'test'
    kafka_producer = KafkaProducer(bootstrap_servers=[host],
                                   value_serializer=lambda m: m.encode('utf-8'))

    for i in range(50):
        try:
            kafka_producer.send(topic=topic_name, value='rarara~~~')
        except Exception as e:
            print(str(e))
        kafka_producer.flush()

이번엔 consumer코드.

from kafka import KafkaConsumer
import logging
import pprint

logging.basicConfig(level=logging.INFO)

def consume():
    host = "kafka-kafka-headless.kafka:9092"
    topic_name = 'test'
    consumer = KafkaConsumer(bootstrap_servers=[host], auto_offset_reset='latest',
                             value_deserializer=lambda x: x.decode())
    consumer.subscribe([topic_name])
    while True:
        for message in consumer:
            pprint.pprint(message.value)


if __name__ == '__main__':
    consume()

위의 producer 코드에서 value_serializer는 말 그대로 보낼 value(위에서는 rarara~~~)를 하나하나 어떤 규칙에 의해서 serialize하는것을 뜻한다. 딕셔너리 등의 값이 들어가도 알아서 인코딩할수 있도록 설정해주는게 좋다. 꼭 있어야 작동하는건 아닌데 그래도 적어놓음.

host는 위에서 언급한 headless주소이다. '{headless이름}.{namespace}:9092'의 포맷으로 이루어져있다. 랜쳐에서 보면 아래처럼 보인다.

flush()함수는 이름에서 예측가능하지만 응답을 모았다가 보내거나 그러지 않고 바로 보내는 함수이다. 그런데 여기서 이상한 점이있다. 위의 코드는 한 쿠버네티스 클러스터 안에서 돌리다보니 flush()가 필요없어도 그냥 데이터가 잘 가고 잘 받는다. 그런데 나중에 카프카 데브 서버가 있다는걸 알고 로컬에서 위와 동일한 코드를 실행시켜봤다.


그런데 희안하게 flush()가 없으면 데이터가 안간다. 데브서버 설정이나 버전 정보 이런걸 모르겠는데 flush()가 없으면 producer, consumer가 모두 정상인데 consumer가 데이터를 못 받는다. 더 희안한건 producer에서 데이터를 매번 send할때마다 flush()대신 time.sleep(1)로 1초 쉬게 하면 데이터가 또 간다(?????) 왜그러는지는 모르겠다... flush()를 안해주면 모았다가 가느라 그런가? 하고 consumer를 한참을 켜놔도 안오는걸 보니 뭔가 이상하긴 하다. 

그래서 일단 코드에 적어놓음.


이거말고도 consumer쪽에서 auto_offset_reset인자가 있다. earliest와 latest가 있는데 earliest는 처음부터 읽겠다는거고, latest는 내가 안읽은 가장 최신 메시지부터 읽겠다는 flag다. 예시보는게 더 좋을테니 여기 참조.


이거말고도 consumer group등등 여러가지 인자가 있는데 여기까지만 적겠음.

partitions 참고 읽어보기 : https://stackoverflow.com/questions/38024514/understanding-kafka-topics-and-partitions



이후 운영하면서 찾은 팁 추가.

카프카로 데이터를 프로듀싱하고, 단순하게 컨슈밍하는 간단한 용도로만 사용하고 있다.(초당 300개의 데이터 프로듀싱, 컨슈밍) 카프카를 쓰면서 찾은 필수적인 옵션 몇가지만 적음.


retention.ms, delete.retention.ms : 토픽 내의 데이터를 얼마나 유지할지에 대한 리텐션 기간이다. 중요한 이유는 프로듀싱만 계속 하고 컨슈밍쪽에 문제가 생겨서 컨슈밍이 안되는 경우 데이터가 계속 쌓인다. 근데 리텐션을 설정 안해주면 당연히 데이터가 쌓이니까 disk full나서 카프카가 죽을 수 있다. 그러니 컨슈밍이 안되어도 계속 오래된 데이터를 지워줄 수 있도록 리텐션을 설정해야 한다.

max.message.bytes : 프로듀싱을 rest로 하는데 데이터 1개당 request 1개로 하면네트워크 통신비용이 너무 크다.(오래걸린다.) 그래서 bulk방식으로 1000개씩 묶어서 1번의 요청에 보내는데, 이 경우에 용량이 너무 크면 부담스러워질 경우가 있어서 설정해줬다.

파티셔닝 : 안정성을 위해서 추가해줬다. 카프카의 파티션에 관해 찾아보면 파티션 1개당 컨슈머 1개를 붙여서 컨슈밍 성능 향상을 꾀할 수 있는데 rest에서는 그게 안되는것같다(내가 못찾은거일수도) 일단은 안정성을 위해서 추가해주었다.


컨슈머쪽에서는 컨슈밍 시 timeout과, fetch.message.max.bytes를 설정해줬다. fetch.message.max.bytes설정이 매우 중요했는데, 처음에 컨슈밍 시에 fetch.message.max.bytes를 매우 높게 해서 request한번에 많은 데이터를 가져오려고 했었다. 그런데 매트릭을 그려보니 프로듀싱되는갯수보다 컨슈밍되는갯수가 현저히 적었다. 거기에 몇일 지나자 카프카가 터져버렸다. 이유는 모르겠는데 또 프로듀싱은 되는데 컨슈밍시 블로킹된것처럼 컨슈밍이 되지않았다. 결국 재설치하고 이유를 찾았는데, fetch.message.max.bytes가 너무 높아서 그런거였다. fetch.message.max.bytes값을 매우 낮게 주고 여러번의 요청으로 데이터를 가져오는 방식으로 변경해더니, 프로듀싱갯수와 컨슈밍 갯수가 비슷해지고, 시스템도 안정을 찾았다.