HomeAbout
Apache Kafka Consumer의 동작 방식에 대해서 [Poll method, Offset, Commit]
DATA
Apache Kafka Consumer의 동작 방식에 대해서 [Poll method, Offset, Commit]
NASA1515
NASA1515
December 18, 2022
4 min

목차

01
✔ KAFKA Consumer
02
Consumer는 어떤 방식으로 Messages를 가져올까?
03
Commit/offset Architecture
04
Reference

✔ KAFKA Consumer

image

Kafka Producer가 Record를 생산하고, Brocker의 Topic으로 전송하는 역할을 담당하고 있다면,
Kafka Consumer는 Record를 가져와서 소비하는 역할을 담당하는 Application or Client를 지칭합니다.
Consumer의 주요 기능은 특정 Partition을 관리하는 Partition Leader에게 Record Polling 요청을 하는 것 입니다.
Broker는 실제 Topic의 Record를 Local에 존재하는 Segment File에 저장하기 떄문에 실제로는 Segment File의 내용을 가져온다고 볼 수 있습니다.


Consumer는 어떤 방식으로 Messages를 가져올까?


Polling Method

image

일반적인 Messaging Queue들은 Queue에서 Record를 Push 하는 방식을 사용합니다.
예를들면 Brocker가 Consumer로 Record를 보냅니다. 하지만 Push 방식의 단점은 Queue가 Consumer의 처리 성능을 미리 염두해둬야 합니다.
즉, Consumer가 이 정도는 처리할 수 있겠지!” 등 의 Consumer의 사전 환경 정보를 미리 알고 고려해 전송해야 합니다.

반대로 Kafka는 Consumer가 Broker로부터 Record를 요청하는 Polling 구조입니다.
Consumer는 원하는 Record를 Broker로 요청합니다. 이 구조의 장점Consumer의 Subscription 성능을 최적화할 수 있다는 것입니다.
추가로 Broker는 Consumer가 요청하는 것만큼 Messages를 전달하기만 하면 되기 때문에 Consumer의 환경을 고려할 필요가 없습니다.


Consumer Offset / Commit

Kafka Consumer의 동작 방식을 요약하자면 아래와 같습니다.

  1. Consumer는 Record 요청 Poll()을 호출합니다.
  2. Consumer Group은 Kafka Topic Partition에 저장된 아직 읽어오지 않은 Record를 가져옵니다.

아직 읽어오지 않은 Record를 구분해서 가져올 수 있는 이유

Consumer Group의 Consumer들은 각 Partitino에 가져간 Record의 위치정보 (offset)를 기록하기 때문입니다.


Offset 저장은…?

각 Consumer Group의 Partition별로 Offset 정보를 저장하기 위한 저장소가 별도로 필요합니다.
offset은 Kafka 내부 broker의 “__consumer_offset” Topic에 저장/호출되며 관리되고, Commit이 일어나면 값이 변경됩니다.


Consumer Project???

Kafka는 분산 처리 클러스터를 관리한다는 이유로 어쩔 수 없는 Zookeper의 의존도 있었습니다.
최신 Version의 Kafka Client에서는 Producer, Consumer, BrokerZookeper에 대한 의존도를 제거한 Version으로 통용되어
Consumer는 0.9 version을 기점으로 Old ConsumerNew Consumer 두 가지로 나뉘게 되었습니다.
두가지 Version의 가장 큰 차이는 Offset을 저장하는 방식에 있습니다. Old Consumer는 Consumer의 offset을 주키퍼의 지노드에 저장하는 방식을 지원했었지만, 0.9 version 이후로는 offset을 주키퍼가 아닌 카프카 Topic에 저장하는 방식으로 변경되었습니다.


Commit..?

그렇다면 commit은 언제 일어나게 될까요…?
기본적으로 consumer에서 poll() 메서드가 실행되면 auto commit이 일어나고 이때 offset의 값 또한 변경됩니다.
auto commit은 enable.auto.commit값이 true로 설정되어 있어 일어나게 되고 필요에 따라 false로 설정해 사용합니다.
auto commit을 사용하면 발생할 수 있는 문제점이 있지만, 후반부에서 다시 설명하도록 하겠습니다.


Commit/offset Architecture

위에서 설명한 부분에 대해서 간단하게 그림으로 예를 들어 이해를 쉽게 해봤습니다.
Commit/Offset의 동작은 설명한 그대로입니다.

image

  • Consumer의 poll()는 이전에 Commit한 offset이 있으면 그 offset 이후의 record를 읽어옴
  • 읽어온 다음 마지막 읽어온 record의 offset을 Commit


Commit된 Offset이 없는 경우

poll()로 record를 읽어오기 위해 접근을 하는데 처음 접근이거나 Commit 된 Offset이 없는 경우

  • auto.offset.reset 설정 사용
    • earliest: 맨 처음 Offset 사용
    • latest: 가장 마지막 Offset 사용 (기본값)
    • none: Consumer Group에 대한 이전 Commit이 없으면 Exception 발생

주로 earliest, latest 두 옵션을 사용하는 편입니다.

image



How Consumer Offset is Managed

Consumer Commit은 자동 Commit/수동 Commit 두가지 방법으로 관리할 수 있습니다.


자동 Commit (Auto Commit)

  • Auto Commit을 사용하려면 Conumser 옵션 중 enable.auto.commit을 true로 설정해주어야 합니다.
    이 경우, Consumer에서 poll()을 호출할 때 auto.commit.interval.ms(default: 5s)이 지났는지를 확인 한 뒤 Commit이 가능 하면 가장 마지막 offset을 commit 합니다.
    다만, 주의해야 할 점은 중복이나, Messages 손실이 발생할 수 있다는 점 입니다.
    예를 들어, auto.commit.interval.ms가 5초이고, 이 5초가 지나기 전에 consumer group rebalancing이 일어난 경우를 생각해보면

image

  • 위의 그림에서, Commit된 offset은 2번 offset이고, 8번 offset을 처리하는 도중 consumer가 rebalance가 되었다고 가정해서 예를 들어보자면,
    7번 offset은 이미 처리가 완료된 상태이지만, offset 2번 이후로 offset은 commit 되지 않았습니다.
    때문에 reblance 후 partition에 할당된 consumer가 consuming을 시작할 땐 7번부터 다시 consuming을 합니다. 즉, 중복이 발생할 가능성이 있다는 것입니다.
    auto.commit.interval.ms를 줄여서 이런 상황이 발생할 확률을 낮출 수는 있겠지만, 해당 가능성은 항상 존재하게 됩니다.
  • Messages가 손실되는 경우는. Auto Commit의 경우, poll()을 호출하는 시점에 auto.commit.interval.ms가 지났으면 마지막 offset을 commit하게 되는데.
    이 경우, commit은 되어버렸지만 아직 Messages의 처리가 끝나지 못한 상태에서 Consumer에 장애가 발생하면 해당 Messages는 손실될 수 있습니다


수동 Commit (Manual Commit)

  • Manual Commit은 Messages 처리가 완전히 완료되기 전까지 Messages를 가져온 것으로 간주되면 안 되는 경우 사용합니다.
    Messages 손실을 최대한 방지하기 위해, Manual Commit을 통해 Messages의 처리가 완전히 끝났다고 마킹할 수 있는 시점에 직접 offset을 Commit 할 수 있습니다.
    하지만 Manual Commit의 경우에도, 작업을 처리하다가 에러가 나는 경우 중복이 발생할 수 있습니다.
    예를 들어, message 처리 과정이 A -> B -> C -> commit일 때에 B에서 장애가 난 경우, 해당 message는 commit이 되지 않아 Retry 될 것이기 때문에
    A 과정은 중복처리가 될 수 있습니다. 이와 같은 특성을 고려하여 각 operation을 최대한 idempotent하게 동작할 수 있도록 고려하여 설계해야 합니다.
    또 주의할 점은, consumer에 retry로 처리할 수 없는 Messages가 들어온 경우 (ex. parsing이 불가능한 Messages) 해당 Messages는 절대로 처리될 수 없고,
    이 offset은 영원히 Commit 되지 못해, 다음 Messages들 또한 처리되지 못해서 Messages이 막혀있는 상황이 발생할 수도 있습니다.
    이런 경우 처리할 수 없는 Messages를 다른 Topic으로 보내는 DLQ등을 고려하여 설계해야 합니다.

또한 Manual Commit에는 동기Commit, 비동기 Commit이 있는데 아래에서 간단하게 알아보겠습니다.


동기 Offst Commit (commitSync())
  • Commit에 성공하면 Exception 이 발생하지 않고 실패하면 Exception 발생
  • Commit에 실패했을 때는 Exception 을 catch 해서 알맞은 처리가 필요함
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSecond(1));
for (ConsumerRecord<String, String> record : records) {
  ... 처리
}
try {
  consumer.commitSync();
} catch(Exception ex) {
  // Commit 실패시 에러 발생
}

비동기 Offset Commit (commitAsync())
  • 코드 자체에서 바로 실패여부를 알 수 없음
  • 성공 실패 여부를 알고 싶다면 callback을 받아서 처리
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSecond(1));
for (ConsumerRecord<String, String> record : records) {
  ... 처리
}
consumer.commitAsync(); // commitAsync(OffsetCommitCallback callback)

Auto Commit/Manual Commit Result

  • enable.auto.commit 설정

    • true: 일정 주기로 Consumer가 읽은 Offset을 Commit (기본값)
    • false: 수동으로 Commit 실행
  • auto.commit.interval.ms: 자동 Commit 주기

    • 기본값: 5000 (5초)
  • poll(), close() 메서드 호출시 자동 Commit 실행



단일 Topic의 멀티 컨슈밍 (Multi-Consuming)

image

Kafka Consumer의 특징 중 하나는 하나의 Topic에 서로 다른 Consumer Application이 동시에 구독할 수 있다는 것입니다.
이렇게 단일 Topic에 대한 멀티 Consuming이 가능한 이유는 Consumer가 Record를 읽을 때 Broker의 Messages가 삭제되는 것이 아니라
Consumer Application들의 Messages는 Consumer Offset으로 관리되기 때문입니다. 또한, Consumer Application이 Record 구독중 중단되었다가
다시 구동되는 경우에도 Offset을 이용해서 Record 관리가 가능합니다. 즉. Consumer 상태와 관계 없이 안정적인 Messages 구독이 가능해집니다.


Consumer API 예제 코드 (JAVA 8 기준)

public class SampleConsumer {
    public static void main(String[] args) {
        // 기본 설정 부분
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "nasa1515-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // Consumer 초기화
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singleton("nasa1515-topic"));

        // Consuming
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(2_000));
            records.forEach(record -> System.out.println(record.value()));
        }
    }
}

Consumer를 생성 및 사용하기 위한 필수 설정은 아래 3가지 입니다.

  • bootstrap.servers : 메타데이터 초기화를 위한 설정
  • group.id : Consumer의 그룹 아이디
  • key.deserializer.class : Messages의 키를 역직렬화하기 위한 설정
  • value.deserializer.class : Messages의 값을 역직렬화하기 위한 설정

또한 Consumer는 반드시 하나의 Consumer Group에 속해 있어야 하고, 때문에 Consumer Group에는 하나 이상의 Consumer가 존재하게 됩니다.

위 예제 코드의 Conumser는 로컬(localhost:9092) Broker를 통해 Metadata를 초기화하고, nasa1515-group이라는 Consumer Group에 속합니다.
또한 구독하는 Messages의 키와 값은 StringDeserializer를 통해 역직렬화합니다(기본 설정 부분).
위 설정 값으로 생성된 Properties를 이용하여 Conumser를 생성, Consumer의 subscribe 메소드를 이용해 nasa1515-topic을 구독합니다.
반복문과 poll 메소드를 이용하여 Messages를 무한히 구독하고, Consumer로 읽은 Messages의 Offset과 값을 표준 출력합니다


Python Sample (JAVA 와 동일)

from kafka import KafkaConsumer

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('my-topic',
                         group_id='my-group',
                         bootstrap_servers=['localhost:9092'])
for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

# consume earliest available messages, don't commit offsets
KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)

# consume json messages
KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))

# consume msgpack
KafkaConsumer(value_deserializer=msgpack.unpackb)

# StopIteration if no message after 1sec
KafkaConsumer(consumer_timeout_ms=1000)

# Subscribe to a regex topic pattern
consumer = KafkaConsumer()
consumer.subscribe(pattern='^awesome.*')

# Use multiple consumers in parallel w/ 0.9 kafka brokers
# typically you would run each on a different server / process / CPU
consumer1 = KafkaConsumer('my-topic',
                          group_id='my-group',
                          bootstrap_servers='my.server.com')


Consumer Record Read Option

Consumer는 얼마 만큼의 Record의 양을 가져와야 하고, 만일 Consumer가 한번에 읽어들이는 양이 적다면 Fetch(Messages Read)가 반복적으로 일어나게 되면서 성능 저하가 발생할 수 있습니다. 이를 해결하기 위해 많은 옵션이 있지만, 주로 아래의 내용이 처리 성능을 높이는 데 사용됩니다.

image

옵션설명
fetch.max.bytes(Batch) 한번에 가져 올 수 있는 최소 크기, 다 채워지지 않는다면 기다린다.
fetch.max.wait.ms이 설정 값보다 적은 경우 요청에 대한 응답을 기다리는 최대 시간
max.partition.fetch.bytesPartition당 가져올 수 잇는 최대 크기
max.poll.records poll가져오는 최대 Record 수

다음 포스트 - Consumer Group


Reference

https://always-kimkim.tistory.com/entry/kafka101-consumer https://www.linkedin.com/pulse/kafka-consumer-overview-sylvester-daniel/


Tags

#KAFKA#DATA
NASA1515

NASA1515

Data Engineer

Hello I'M Wonseok aka NASA1515

Expertise

Public Cloud
k8s/Docker
Python

Social Media

instagramwebsitelinkedingithub

Related Posts

Lock 기반의 DataBase Transaction의 Isolation Level
2024-07-03
5 min

Topics

CloudDevelop

Social Media