Kafka Producer가 Record를 생산하고, Brocker의 Topic으로 전송하는 역할을 담당하고 있다면,
Kafka Consumer는 Record를 가져와서 소비하는 역할을 담당하는 Application or Client를 지칭합니다.
Consumer의 주요 기능은 특정 Partition을 관리하는 Partition Leader에게 Record Polling 요청을 하는 것 입니다.
Broker는 실제 Topic의 Record를 Local에 존재하는 Segment File에 저장하기 떄문에 실제로는 Segment File의 내용을 가져온다고 볼 수 있습니다.
일반적인 Messaging Queue들은 Queue에서 Record를 Push
하는 방식을 사용합니다.
예를들면 Brocker가 Consumer로 Record를 보냅니다. 하지만 Push 방식의 단점은 Queue가 Consumer의 처리 성능을 미리 염두해둬야 합니다.
즉, Consumer가 이 정도는 처리할 수 있겠지!” 등 의 Consumer의 사전 환경 정보를 미리 알고 고려해 전송해야 합니다.
반대로 Kafka는 Consumer가 Broker로부터 Record를 요청하는 Polling 구조
입니다.
Consumer는 원하는 Record를 Broker로 요청합니다. 이 구조의 장점은 Consumer의 Subscription 성능을 최적화할 수 있다는 것입니다.
추가로 Broker는 Consumer가 요청하는 것만큼 Messages를 전달하기만 하면 되기 때문에 Consumer의 환경을 고려할 필요가 없습니다.
Kafka Consumer의 동작 방식을 요약하자면 아래와 같습니다.
Consumer Group의 Consumer들은 각 Partitino에 가져간 Record의 위치정보 (offset)를 기록하기 때문입니다.
각 Consumer Group의 Partition별로 Offset 정보를 저장하기 위한 저장소가 별도로 필요합니다.
offset은 Kafka 내부 broker의 “__consumer_offset” Topic에 저장/호출되며 관리되고, Commit이 일어나면 값이 변경됩니다.
Kafka는 분산 처리 클러스터를 관리한다는 이유로 어쩔 수 없는 Zookeper의 의존도 있었습니다.
최신 Version의 Kafka Client에서는 Producer, Consumer, Broker 등 Zookeper에 대한 의존도를 제거한 Version으로 통용되어
Consumer는 0.9 version을 기점으로 Old Consumer와 New Consumer 두 가지로 나뉘게 되었습니다.
두가지 Version의 가장 큰 차이는 Offset을 저장하는 방식에 있습니다. Old Consumer는 Consumer의 offset을 주키퍼의 지노드에 저장하는 방식을 지원했었지만, 0.9 version 이후로는 offset을 주키퍼가 아닌 카프카 Topic에 저장하는 방식으로 변경되었습니다.
그렇다면 commit은 언제 일어나게 될까요…?
기본적으로 consumer에서 poll() 메서드가 실행되면 auto commit이 일어나고 이때 offset의 값 또한 변경됩니다.
auto commit은 enable.auto.commit값이 true로 설정되어 있어 일어나게 되고 필요에 따라 false로 설정해 사용합니다.
auto commit을 사용하면 발생할 수 있는 문제점이 있지만, 후반부에서 다시 설명하도록 하겠습니다.
위에서 설명한 부분에 대해서 간단하게 그림으로 예를 들어 이해를 쉽게 해봤습니다.
Commit/Offset의 동작은 설명한 그대로입니다.
poll()로 record를 읽어오기 위해 접근을 하는데 처음 접근이거나 Commit 된 Offset이 없는 경우
auto.offset.reset
설정 사용earliest
: 맨 처음 Offset 사용latest
: 가장 마지막 Offset 사용 (기본값)none
: Consumer Group에 대한 이전 Commit이 없으면 Exception 발생 주로 earliest, latest 두 옵션을 사용하는 편입니다.
Consumer Commit은 자동 Commit/수동 Commit 두가지 방법으로 관리할 수 있습니다.
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSecond(1)); for (ConsumerRecord<String, String> record : records) { ... 처리 } try { consumer.commitSync(); } catch(Exception ex) { // Commit 실패시 에러 발생 }
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSecond(1)); for (ConsumerRecord<String, String> record : records) { ... 처리 } consumer.commitAsync(); // commitAsync(OffsetCommitCallback callback)
enable.auto.commit 설정
auto.commit.interval.ms: 자동 Commit 주기
poll(), close() 메서드 호출시 자동 Commit 실행
Kafka Consumer의 특징 중 하나는 하나의 Topic에 서로 다른 Consumer Application이 동시에 구독할 수 있다는 것입니다.
이렇게 단일 Topic에 대한 멀티 Consuming이 가능한 이유는 Consumer가 Record를 읽을 때 Broker의 Messages가 삭제되는 것이 아니라
Consumer Application들의 Messages는 Consumer Offset으로 관리되기 때문입니다. 또한, Consumer Application이 Record 구독중 중단되었다가
다시 구동되는 경우에도 Offset을 이용해서 Record 관리가 가능합니다. 즉. Consumer 상태와 관계 없이 안정적인 Messages 구독이 가능해집니다.
public class SampleConsumer { public static void main(String[] args) { // 기본 설정 부분 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "nasa1515-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // Consumer 초기화 Consumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singleton("nasa1515-topic")); // Consuming while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(2_000)); records.forEach(record -> System.out.println(record.value())); } } }
Consumer를 생성 및 사용하기 위한 필수 설정은 아래 3가지 입니다.
또한 Consumer는 반드시 하나의 Consumer Group에 속해 있어야 하고, 때문에 Consumer Group에는 하나 이상의 Consumer가 존재하게 됩니다.
위 예제 코드의 Conumser는 로컬(localhost:9092) Broker를 통해 Metadata를 초기화하고, nasa1515-group이라는 Consumer Group에 속합니다.
또한 구독하는 Messages의 키와 값은 StringDeserializer를 통해 역직렬화합니다(기본 설정 부분).
위 설정 값으로 생성된 Properties를 이용하여 Conumser를 생성, Consumer의 subscribe 메소드를 이용해 nasa1515-topic을 구독합니다.
반복문과 poll 메소드를 이용하여 Messages를 무한히 구독하고, Consumer로 읽은 Messages의 Offset과 값을 표준 출력합니다
from kafka import KafkaConsumer # To consume latest messages and auto-commit offsets consumer = KafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092']) for message in consumer: # message value and key are raw bytes -- decode if necessary! # e.g., for unicode: `message.value.decode('utf-8')` print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, message.key, message.value)) # consume earliest available messages, don't commit offsets KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False) # consume json messages KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii'))) # consume msgpack KafkaConsumer(value_deserializer=msgpack.unpackb) # StopIteration if no message after 1sec KafkaConsumer(consumer_timeout_ms=1000) # Subscribe to a regex topic pattern consumer = KafkaConsumer() consumer.subscribe(pattern='^awesome.*') # Use multiple consumers in parallel w/ 0.9 kafka brokers # typically you would run each on a different server / process / CPU consumer1 = KafkaConsumer('my-topic', group_id='my-group', bootstrap_servers='my.server.com')
Consumer는 얼마 만큼의 Record의 양을 가져와야 하고, 만일 Consumer가 한번에 읽어들이는 양이 적다면 Fetch(Messages Read)가 반복적으로 일어나게 되면서 성능 저하가 발생할 수 있습니다. 이를 해결하기 위해 많은 옵션이 있지만, 주로 아래의 내용이 처리 성능을 높이는 데 사용됩니다.
옵션 | 설명 |
---|---|
fetch.max.bytes | (Batch) 한번에 가져 올 수 있는 최소 크기, 다 채워지지 않는다면 기다린다. |
fetch.max.wait.ms | 이 설정 값보다 적은 경우 요청에 대한 응답을 기다리는 최대 시간 |
max.partition.fetch.bytes | Partition당 가져올 수 잇는 최대 크기 |
max.poll.records poll | 가져오는 최대 Record 수 |
https://always-kimkim.tistory.com/entry/kafka101-consumer https://www.linkedin.com/pulse/kafka-consumer-overview-sylvester-daniel/