data engineering

kafka consuming 기본 흐름제어 플로우

qkqhxla1 2020. 12. 28. 21:56

운영하는 시스템중에 카프카가 있는데 일부 데이터는 프로듀싱만 하고 다른곳에서는 컨슈밍만 한다.

현재 운영환경에서 하나의 토픽에 파티션이 9개로 나뉘어져있는데, 프로듀싱 시에는 파티션을 딱히 신경 쓰지 않아도 된다. 프로듀싱하면 알아서 나눠지기 때문이다.

근데 컨슈밍시에는 기본적으로 신경쓸게 더 많다. 중요한 데이터의 경우 토픽이 파티션으로 나뉘어져있으면 파티션별 어디까지 읽었는지에 대한 정보인 오프셋도 신경을 써야 한다.

일단 카프카에 데이터가 있는지는 아래의 카프카 콘솔로 컨슈밍해봐서 확인이 가능하다.

./kafka-console-consumer.sh --bootstrap-server 카프카서버:9092 --topic 토픽이름 --from-beginning

데이터 양이 적거나 중간에 데이터가 유실되어도 크게 문제가 없으면 버리고 이후부터 다시 재개하면 되는데 그런 경우가 아니면 오프셋을 신경 써야 한다.

 

https://devidea.tistory.com/entry/Kafka-consumer-group-offset-change-by-python 글을 개념만 간단하게 한번 읽어보자. 데이터가 저장되어있고 오프셋이 0~10까지 있다고 하면 컨슈머가 5까지 가져갔다고 가정했을때 컨슈머의 commited된 오프셋은 5이다. 

 

파이썬 코드로 보자. 

from kafka import KafkaConsumer

consumer = KafkaConsumer(bootstrap_servers=[host],
                             group_id='test',
                             auto_offset_reset='latest',
                             value_deserializer=lambda x: x.decode(),
                             )

consumer.subscribe([topic_name]) 

이런식으로 컨슈머를 만들었으면 poll함수로 컨슈밍을 할 수 있다. poll함수는 컨슈밍할 데이터가 있으면 딕셔너리를 리턴하고, 키:값은 파티션:컨슈밍된 값이다. poll함수의 인자로는 타임아웃이 들어간다.

for partition, messages in consumer.poll(300).iteritems():
    print partition, messages

파티셔닝된 토픽 컨슈밍의 기본인데 poll()함수는 내부에서 계속 돌면서 컨슈밍할 데이터가 있으면 딕셔터리를 계속 리턴한다. 그런데.

count = 0
for partition, messages in consumer.poll(300).iteritems():   
    count += 1
    print count

이렇게 호출해보면 count는 내부에서 1씩 증가하도록 해놨는데 증가하지 않고 1로 계속 출력된다. 동작이 오묘해서 확실히 동작하려면 컨슈밍 카운트용 변수를 하나 만들어두고 그 변수가 일정 수치에 도달할 때까지 컨슈밍하도록 코드를 짜야 한다. 예시는 아래에 적었다.


poll으로 데이터를 가져올시 아래 예시처럼 짜지 않으면 데이터를 잘 못 가져온다. 처음에 아마 엄청 삽질하게 된다면 아마 이 부분에서 하게될거다.. 그냥 컨슈밍한 데이터를 바로 출력하도록 하게하면 파티션 하나에 데이터가 없는경우 공백을 리턴하는데 poll함수의 동작상(?) 그대로 그냥 공백을 리턴하는것처럼 보여서 마치 프로듀싱된 데이터가 하나도 없는것처럼? 보이는 현상이 있다. 주의하자.

poll함수가 polling방식으로 가져오느라 이름도 poll인것 같은데 polling에 관해서 찾아보면.. 이 블로그를 읽어보자.

 

'시스템 내에 동작 중에 폴링 방식과 이벤트 방식이 있다. 폴링 방식은 어떤 상태인지를 주기적으로 확인해보는 것이다. 폴링 방식을 예를 든다면 우편물이 왔는지를 매번 내가 가서 보는 것이다. 이렇게 매번 오가는게 폴링이다. 주기적으로 알아보는 만큼 오지 않았을 때 나가보는 동안 비효율이 발생을 한다.

이벤트 방식은 어떤 상태가 되면 알려주는 것이다. 매번 가는 것이 아니라 우편물이 도착했을 때 문자를 보내는 것이다. 훨씬 효율적일 수 있다. 이벤트 방식로 해당 사람이 오면 알려 주는 방식이다. 두 방식에는 차이가 있지만, 언듯 폴링 방식은 비효율적일 거 같다는 생각을 할 수 있다. 하지만 정기적으로 뭔가를 감시하거나 검사를 해야 한다면 폴링방식도 필요할 것이다. 하지만 이벤트 방식을 통해서 트리거를 발생 시켜서 인지를 하게 되면 그 비효율이 줄어들어서 효율적으로 처리 할 수 있다.'

 

그러니 poll로 데이터를 가져오고 일정 데이터만 컨슈밍해올때

는 poll가져오는 부분을 함수로 구현해서 yield로 필요할때 리턴하도록 조금 더 세분해서 구현하는게 괜찮다.

 

그리고 원하는 만큼 컨슈밍이 완료되었으면 commit()함수를 실행해 내가 이 오프셋까지 데이터를 가져왔다는 표시를 해두자. rollback의 경우에는 적당히 아래처럼 구현한다.

def rollback(self):
    if self.consumer is not None:
        for partition in self.partitions:
            committed_offset = self.consumer.committed(partition)
            if committed_offset is None:
                self.consumer.seek_to_beginning(partition)
            else:
                self.consumer.seek(partition, committed_offset)
                del self.dirty_partitions
                self.partitions = set()

참고로 consumer의 seek함수를 사용하려면 subscribe를 하면 안되고 assign받은 다음 seek을 사용해야 정상적으로 먹힌다.(버그인듯) 참고 https://github.com/dpkp/kafka-python/issues/648

 

간단하게 동작을 확인하기 위해 완성된 코드는 아래와 같다. 

# -*- coding: utf-8 -*-

from kafka import KafkaConsumer
from kafka import TopicPartition

import json


class KafkaConsumerExample:
    def __init__(self):
        self.host = '카프카주소:9092'
        self.topic_name = '토픽명'
        self.consumer = KafkaConsumer(bootstrap_servers=[self.host],
                                      group_id='test',
                                      auto_offset_reset='latest',
                                      value_deserializer=lambda x: x.decode(),
                                      security_protocol='SASL_PLAINTEXT',
                                      sasl_mechanism='PLAIN',
                                      sasl_plain_username='admin',
                                      sasl_plain_password='admin'
                                      )
        self.consumer.subscribe([self.topic_name])
        self.partitions = set()

    # 나중을 위해 예시만.
    def rollback(self):
        # seek함수를 쓰려면 consumer가 subscribe가 아니라 아래처럼 assign으로 읽고있어야 변경됨.
        # self.consumer.assign([TopicPartition(self.topic_name, 0)])
        # self.consumer.seek(TopicPartition(self.topic_name, 0), 805000)

        if self.consumer:
            for partition in self.partitions:
                committed_offset = self.consumer.committed(partition)
                if not committed_offset:
                    self.consumer.seek_to_beginning(partition)
                else:
                    self.consumer.seek(partition, committed_offset)
                    self.partitions = set()

    def _consume(self):  # _consume함수에서는 yield로 리턴하도록 해놓고..
        for partition, messages in self.consumer.poll(timeout_ms=300).iteritems():
            self.consume_count += 1
            self.partitions.add(partition)
            for message in messages:
                yield json.loads(message.value)

    def run(self):
        try:
            print 'run!'
            consumed_data = []
            self.consume_count = 0
            print 'offset before =',self.consumer.committed(TopicPartition(self.topic_name, 0))
            while self.consume_count < 10:  # 10개단위로 컨슈밍
                for message in self._consume():  # 실질적으로 컨슈밍 시에는 바로 윗줄의 self.consume_count < 10일동안 동작하도록 한다. 이런식으로 안짜고 바로 받으면 어떻게 될지는 직접 돌려보자.
                    consumed_data.append(message)
            self.consumer.commit()  # 컨슈밍이 완료되면 오프셋을 커밋한다.
            print 'offset after =', self.consumer.committed(TopicPartition(self.topic_name, 0))
        except:
            pass
            # self.rollback()  # 만약 에러가 생겼을때에는 롤백을 구현해준다.


if __name__ == '__main__':
    kafka = KafkaConsumerExample()
    kafka.run()

+ 카프카로 데이터를 프로듀싱하고 컨슈밍 하는 경우 데이터가 파이프라인으로 잘 흐르고 있는지 메트릭을 만드는데.. 우리같은경우는 프로듀싱 수와 컨슈밍 수 카운트, lag count를 집계해서 es로 보내고 키바나로 시각화한다. 프로듀싱과 컨슈밍 카운트의 경우는 단순하게 보낸 만큼, 받은 만큼 찍어서 보내면 되니 따로 문제가 되진 않지만 lag이 지속적으로 낮게 유지되는지 알기 위해 잘 계산해서 보내야 한다.

lag계산은 https://github.com/dpkp/kafka-python/issues/1673 여기 코드를 보고 하자.

 

lag계산 함수도 나중을 위해 넣어놓음.

    def get_lag(self):
        consumer = KafkaConsumer(bootstrap_servers=self.host,
                                 group_id='test',
                                 enable_auto_commit=False,
                                 consumer_timeout_ms=30000,
                                 value_deserializer=lambda x: json.loads(x),
                                 security_protocol='SASL_PLAINTEXT',
                                 sasl_mechanism='PLAIN',
                                 sasl_plain_username=id,
                                 sasl_plain_password=pw,
                                 )
        topic_list = ['topic_a', 'topic_b', 'topic_c']
        lag_dict = {}
        total_lag = 0
        for topic in topic_list:
            lag_count = 0
            for partition in consumer.partitions_for_topic(topic):
                tp = TopicPartition(topic, partition)
                consumer.assign([tp])
                committed = consumer.committed(tp)
                consumer.seek_to_end(tp)
                last_offset = consumer.position(tp)
                lag_count += (last_offset - committed)*100  # lag 1당 100개씩 묶어서 보내서 *100
            lag_dict[topic] = lag_count
            total_lag += lag_count
        lag_dict['total_lag'] = total_lag
        consumer.close(autocommit=False)
        return lag_dict