HomeAbout
Apache Kafka Pruducer의 동작 방식에 대해서 [Processing method, ack, Idempotence]
DATA
Apache Kafka Pruducer의 동작 방식에 대해서 [Processing method, ack, Idempotence]
NASA1515
NASA1515
December 02, 2022
6 min

목차

01
✔ KAFKA Producer
02
Producer의 Batch Processing 동작
03
멱등성 (idempotence)
04
Message Delivery semantics (Message 전송 방식)
05
Producer delivery semantics (Feat. ack())
06
Producer의 Retry 방식
07
멱등성 보장 producer (Duplicate message detection)
08
Reference

✔ KAFKA Producer

Kafak Client인 Producer의 기본적인 역할은 Source에 있는 Message들을 Broker에 있는 Topic으로 전달하는 역할을 맡고 있습니다.
여기서 Producer는 Kafka Producer API와 구성된 Application을 의미합니다. Producer를 통해 전달되는 Message의 구조는 아래과 같습니다.

image

Producer의 Message는 Record 형식으로 정의하여 전달할 수 있습니다.
Record는 보통 아래 5가지의 정보가 추가될 수 있고, Avro, Json Format으로도 전달이 가능합니다.

  • Topic (Topic)
  • Topic 중 특정 파티션 위치 (Partition)
  • Message 생성 시간 (Timestamp)
  • Message 키 (Key)
  • Message 값 (Value)


Broker 및 Metadata 탐색

Bootstrap server

Kafka Cluster의 모든 Broker들은 각자 Bootstrap Server의 역할을 수행할 수 있습니다.
각 Server별로 정보를 공유할 때 일반적으로 하나의 서버가 아닌 Cluster에 연결되어 있는 Bootstrap Server List와 Metadata가 전달됩니다.
뒤에서 더 자세히 설명하겠지만, 데이터의 가용성 및 안정성을 위해서는 가급적 2개 이상의 Bootstrap Server가 필수입니다.

image

Producer는 Massage Record를 적절한 Brocker에 보내기 위해, Bootstrap Server 중 하나에 대한 Connection을 진행하게 됩니다.
Bootstrap Server에서는 사용 가능한 모든 Brocker List, Topic List, Partition, Replication Factor 등의 MetaData를 반환하고
해당 정보를 기반으로 Producer는 특정 Record의 Leader Partition을 호스팅해서 Leader Broker를 식별하게 됩니다.




Producer 구조와 Message 전달 과정1

Producer는 아래의 4가지 과정을 통해서 Message를 Broker로 Producing 합니다.

  • 직렬화 (Serializer)
  • Partitioning (Partitioner)
  • Message Batch (Record Accumulator)
  • 압축 (Compression)
  • 전달 (Sender)

해당 과정은 Broker에게 Message를 전송할 수 있도록 Message를 변환하거나, 필요한 값을 지정하는 부분입니다.

  • 이때 record는 byte형식으로 serialize 되어 보내지고 broker에 저장됩니다.
    또한 record를 작성할 때 bootstrap.server, key serializer, value serializer는 반드시 설정해야하는 필수 옵션입니다.

구동원리

직렬화(serializing) : Producer가 Send() Method를 호출합니다, Message의 키/값은 Byte 뭉치 형태로 변환됩니다.

파티셔닝(Partitioning) : 직렬화 된 Message는 Partitioner를 통해 Key 값을 기준으로 Topic의 어떤 파티션에 저장되는 지 결정됩니다.

  • Partitioner 시 Key 값이 정의되어 있지 않으면 RR(Round Robbin) 형태로 Partitioning을 진행합니다. image
    그러나 Key가 정의되어 있다면 murmur2 알고리즘을 기본으로 Partitioning에 사용됩니다.
    Key를 정의해서 Record에 전달함으로 Kafka는 동일한 Partition에 대해서는 순서를 보장합니다.

압축 (Compression) : Message Compression이 설정되어있으면, 해당 Format으로 압축합니다. (Compression은 broker가 아닌 client에서 진행됩니다)

  • 압축 옵션은 전달 속도 및 Broker 내부의 복제 성능과도 관련이 있습니다. Kafka 주요 Format은 아래 그림과 같습니다.


전달 (Sender) : 위의 로직이 완료되었다면, Producer는 Broker의 Leader Partition으로 Message를 전송합니다.

  • 그러나 Message마다 매번 네트워크를 통해 전달하는 것은 굉장히 비효율적입니다.
    그래서 Producer는 정의된 만큼 Message를 모아뒀다가 한번에 Brocker에게 전달하는 Batch 전송을 사용할 수 있습니다.


Producer의 Batch Processing 동작


Recode Accumulator (Buffer)

image

Batch의 경우 Producer 내부의 Record Accumulator(RA)가 RA는 각 Topic 파티션에 대응하는 배치 큐(Batch Queue)를 구성하고
Message를 레코드 Batch(Record Batch) 형태로 묶어 Queue에 저장합니다. 각 Batch Queue에 저장된 레코드 Batch들은 설정 값에 따라 각 Broker에 전달됩니다.
이 과정은 Sender가 처리합니다. Sender는 스레드 형태로 구성되며, 관리자가 설정한 특정 조건에 만족한 레코드 Batch를 Broker로 전송합니다.
결론적으로 batch size property에 따라서 그룹화되고, Topic의 각 Partition은 독립된 Buffer를 가지게 됩니다.


Sender thread

위에서 각 Message마다 매번 네트워크를 통해 전달하는 것이 굉장히 비효율적이라고 설명을 했는데 그 원인은 Send() Method에 있습니다.
기본적으로 send비동기 메서드이다. Kafka Client doc에서의 send 메서드 주석을 보면 아래와 같이 설명되어 있습니다.

The send is asynchronous and this method will return immediately once the record has been stored in the buffer of records waiting to be sent.
Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.

send() Method비동기로 동작하며, 전달된 ProducerRecord Message는 Broker로 즉시 전송되는 것이 아니라
Producer 프로세스의 buffer에 저장된 후 전송된다. 이와 같은 구현을 한 이유는 바로 다음 줄에 적혀있습니다.

This allows sending many records in parallel without blocking to wait for the response after each one.

동기 방식으로 Method를 구현했다면 send()가 호출될 때마다 Message가 하나씩 Broker로 전송될 것이다. 이는 아래와 같은 단점이 존재합니다.

  • 매번 send() 메서드 호출 시 많은 요청으로 인한 I/O Block이 발생하여 병렬성이 떨어짐
  • Bulk로 보내는 것보다 Network Overhead가 발생

때문에 KafkaProducer의 send를 호출하게 되면 ProducerRecord를 KafkaProducer의 내부 Buffer에 저장해놓은 후,
어느정도 Message가 모이게 되면 여러 개의 Message를 한번에 보내고 Callback을 호출하는 Batch 전송을 사용합니다.

image

이 단계에서 Recode Accumulator(RA)의 Partition Batch는 전송 될 Brocker 별로 그룹화되게 되고.
Send Method를 위한 batch 옵션을 제공하는데 아래 표의 batch.sizelinger.ms 두 설정을 통해 Brocker로 Message가 전송됩니다.

OptionDescription
batch.size(default 16kb)Batch size를 정의, Message의 용량이 정의된 size에 도달 할 때 까지 기다렸다가 보낸다
linger.ms(default 0)batch.size가 도달하지 않으면 쌓여있는 Message를 처리할 수가 없다. 때문에 유휴 시간을 설정.
size가 도달하지 않더라도 시간에 도달하면 Message를 보내게 된다.


멱등성 (idempotence)

멱등성이란 여러번의 연산이 수행되더라도 동일한 결과가 나타내는 것을 뜻합니다.
즉 멱등성 Kafka Producer라 함은 동일한 데이터를 여러번 전송하는 과정이 수행되더라도, Kafka Cluster에는 단 한번만 저장됨을 의미합니다.
기본적으로 Producer의 동작 방식은 적어도 한번 전달(at least once delivery)을 지원합니다.
at least once란, Producer가 Cluster에 데이터를 전송하여 저장할 때 적어도 하나 이상의 데이터는 적재되는 것을 보장한다는 의미입니다.
그러나 중복 발생의 가능성은 존재합니다., 아래서 멱등성, 즉 중복이 없는 전송을 위한 Message 전송 방식을 보겠습니다.


Message Delivery semantics (Message 전송 방식)

Kafka에서는 Broker와 Producer의 설정 및 구성에 따라 아래 세가지의 방식을 모두 지원합니다.

image


At Most Once

image

At Most Once 방식에서 Message는 최대 한 번만 전송되어야 합니다.
결론적으로 Message 응답에 대한 Ack를 받지 못하더라도 재전송하지 않습니다.
중복 가능성을 회피하기 위해서 재전송을 하지 않기 떄문에 네트워크 이슈 등이 발생해 Message를 받지 못한 경우
해당 Message를 유실할 수 있습니다. 즉 Message의 손실을 감안하더라돠 중복 전송을 하지 않는 경우 입니다.
해당 처리 방식은 더 높은 처리량과 낮은 대기 시간을 쉽게 달성할 수 있어, 대량 로그 수집 or IoT 같은 환경에서 사용 됩니다.


At Least Once

image

At Least Once 방식에서는 Message의 중복은 허용되지만, Message의 손실은 허용하지 않습니다.
간단하게 정리하면 Producer는 Brocker에서 Ack를 보내지 않은 경우 (특정 이슈 상황에 의해)
Broker가 Message는 저장되고 Ack만 전송하지 못했는지, Message 저장과 Ack 전송 둘다 못한 것인지 판단하지 못합니다.
떄문에 위와 같은 상황에서 만약 Message는 저장된 전자의 경우에는 중복 Message가 저장될 수도 있는 것입니다.


Exactly Once

image

멱등성이라는 의미에 가장 적합한 방식입니다.
Message는 한번만 전달되어야 하고, 어떤 Message도 손실되어서는 안됩니다.
Producer ID, Sequence Number를 사용해서, Ack()의 응답을 받고 Message를 보내는 과정이 필수적이기 때문에
다른 응답 방식에 비해서 응답 대기시간이 길고, 처리량이 낮습니다.


Summary

At Most OnceAt least OnceExactly Once
DuplicatesNoYesNo
Data LossYesNoNo
ProcessingZero or One TimeOne or More TimeExactly One Time
Discription전달하는 과정에서 누락 가능저장하는 과정에서 중복 체크에 대한 유실모두 만족



Producer delivery semantics (Feat. ack())

여기서 조금 더 자세하게 짚고 넘어가야 하는 부분은 ack()라고 하는 Message 처리에 대한 응답 옵션 입니다.
아래의 Producer의 Acks 속성과 Brocker의 min.insync.replica 속성을 사용해서 kafka에서는 여러가지 delivery semantics 처리가 가능합니다.


ACK=0

image

ACK=0 일 경우 At most once 해당 하는 방식이 적용됩니다.
Kafka Producer는 Brocer에 레코드를 보내고 응답을 기다리지 않습니다. 즉 응답을 기다리지 않는 상태입니다.
위에서 설명한 것과 동일하게, 빠른 속도의 데이터 전송에서만 사용해야 합니다.


ACK=1

image

ACK=1 일 경우도 At most once 해당 하는 방식이 적용됩니다.
다만 다른 점은 Leader에게만 전달되었는지 확인하고, 다른 follower에게까지 전달되었는지는 확인하지 않습니다.


ACK=-1 or ACK=all

image

exactly once 에 해당 하는 방식이 적용됩니다.
leader에 레코드가 전송되고, 다른 follower에게까지 Replica가 모두 적용된 후에 ack()를 보내는 옵션입니다.
exactly once을 구현하기 위해선 Brocker의 idempotent 설정, Acks = all, 또한 min.insync.replicas Option이 필요합니다.


Ack Summary

Acks지연율처리량데이터 보장
0LowHigh보장 하지 않음
1MediumMediumLeader Only
allHighLowAll Replicas


Producer의 Retry 방식

위에서 Message 그리고 Producer의 semantics에 대해서 전체적으로 살펴 보았습니다.
여기서 한번 더 정확하게 짚고 넘어가야 하는 부분은 Producer에선 레코드 전송에 실패 시 Retry에 대한 시나리오입니다.

image

옵션설명Default
retriesMessage를 send하기 위해 재시도 되는 횟수MAX_INT
retry.backoff.ms재시도 사이에 추가되는 대기 시간100
request.timout.msProducer가 응답을 기다리는 최대 시간30.0000(30초)
delevery.timeout.mssend 후 성공 또는 실패를 보고하는 시간의 총 시간120.0000(2분)

Message send 주기

속성에 대한 Message 관련 주기는 아래와 같은 그림을 가진다.
delivery.timeout.ms는 Message를 완료 보고 받는데 걸리는 총 시간 입니다.
때문에 linger.ms, retry.backoff.ms, request.timeout.ms가 합쳐진 시간 보다 커야 합니다. image



멱등성 보장 producer (Duplicate message detection)

위에서 멱등성을 보장한다는게 kafka에서는 어떤 의미를 갖고 있는지에 대한 설명을 진행했습니다.
간단하게 요약하자면 Producer에서 레코드를 전송했지만, 기타 이슈가 발생하여 ack를 받지 못한 경우 중복 Message가 전달 및 저장될 수 있습니다.
여에 멱등성 보장 Producer는 중복 방지를 위한 PID, Sequence Number 기반의 Message를 추적, 동일한 Commit에 대해서 중복으로 처리합니다.

멱등성을 위한 필수 설정

ConfigurationDefaultDescription
acksallProducer 가 요청을 보내고 Leader 가 Replication의 수신을 확인해야되는 개수
enable.idempotenceTrueProducer 가 Record 쓰기 작업을 단 한번만 허용할 것인지 멱등성을 보장
max.in.flight.requests.per.connection<=5한 번에 몇 개의 요청(Request)을 전송할 것인가를 결정
retries0 이상Message를 send하기 위해 재시도 되는 횟수

위의 설정값을 모두 설정하지 않으면 ConfigException이 발생하므로 모두 적용이 필요하다.


enable.idempotence 동작

image

enable.idempotence는 PID 라는 ProducerId 를 통해서 어떤 Producer 가 어떤 record 를 기록하게 되는지 체크하게 됩니다.
Producer 가 PID 를 지정하여 보내게 되면 해당 레코드를 처리하는 Broker 에서 PID 값을 기반으로 Record에 대한 쓰기 작업을
단 한번만 하도록 중복 체크할 수 있게 되는 것입니다. (그림의 4,5번) 위 그림처럼 데이터를 전달 할때 마다 시퀀스 넘버를 1씩 증가합니다.

동일한 세션 멱등성 Producer는 동일한 세션에서만 Exactly Once을 보장합니다.
여기에서 동일한 세션이란, PID의 생명주기를 뜻합니다.
PID의 생명주기는 Producer Application에 이슈가 발생해 재시작되면 PID가 달라지게 되는 부분 입니다.
동일한 데이터를 보내도, PID가 달라지게 된다면 Broker는 다른 Producer Application이 다른 데이터를 보냈다고 판단합니다.
때문에 중복이 발생할 수 있습니다.

즉, 멱등성 Producer는 장애가 발생하지 않을 경우에만 정확히 한번 적재하는 것을 보장합니다.

OutofOrderSequenceException

멱등성 Producer의 시퀀스 넘버는 0부터 시작하여 +1씩 증가합니다.
Broker에서 멱등성 Producer가 전송한 레코드의 PID와 시퀀스 넘버를 확인하는 과정에서
저장되어 있는 시퀀스 넘버가 일정하는 않은 경우에 발생하는 에러입니다.
ex) 시퀀스 넘버 0 다음에 1이 와야 하는데, 다음에 시퀀스 2로 데이터가 존재할 경우 (즉 순서보장이 되지 않은 경우)


max.in.flight.requests.per.connection 설정의 동작

max.in.flight.requests.per.connection Option은 위에 설명한 것 처럼 Batch 처리를 할때 한번에 보내는 레코드의 양을 정의 합니다.
여기서 Default 설정은 “5” 입니다. 즉 한번의 Connection 당 요청으로 전송될 수 있는 최대 갯수가 5개라는 의미 입니다.
그러나 Batch 5개의 Message를 보내는 도중 실패가 발생하면 어떤식으로 동작하게 될까요??
결론은 Commit되는 Message의 순서가 뒤죽박죽 되게 됩니다. 즉 멱등성의 조건이 충족되지 않는다는 말입니다.

image

때문에 위에서 설명한 것 처럼 Topic Partition의 순서와 멱등성을 보장하기 위해서는
enable.idempotence, max.in.flight.requests.per.connection Option이 같이 쓰여야 하는 것입니다.


Producer 설정 유의점

Kafka Producer는 기본적으로 한번에 Message를 많이 처리(전송)하려고 합니다.
하지만 이러한 설정은 각자 요구되는 전송 환경에 따라 적합하게 맞춰가야 하는 부분입니다. (개발자의 역량)
처리량과 지연율 사이에서 혹은 지연율과 전송 안정성 사이에서 트레이드오프(Trade off)를 고려해야 합니다.

아래는 주요한 설정 유의점에 관한 예시입니다.

  • Compression.type, batch.size, linger.ms 세개의 설정을 변경하면 Batch 처리량 증가를 얻을 수 있습니다.
    설정 값이 높을수록 처리량은 증가하지만 그만큼 저장되었다가 전송되므로 지연율이 높아집니다(안 좋아집니다).
    지연 없는 전송이 목표라면 두 설정값을 낮게 설정해야 대기(지연) 없이 전송이 가능하니, 개발자의 역량이 중요합니다.
  • acks 설정이 0 -> 1 -> all 로 설정될수록 Message 전송의 안정성은 증가하지만, 그만큼 응답을 대기시간이 증가하므로 지연율이 높아집니다.
  • acks 설정이 all 이더라도 Broker의 min.insync.replicas 설정이 1인 경우, acks 설정이 1인 경우와 동일합니다. Replication이 없으니…
  • retries 설정이 1 이상인 경우 Message 전송 안정성이 증가합니다. 하지만 max.in.flight.request.per.connection 설정과 연관된 설정으로
    해당 설정이 2 이상일 경우 부분 실패에 따른 재전송 과정에서 Message 순서가 보장되지 않습니다.

Reference


Tags

#KAFKA#DATA
NASA1515

NASA1515

Data Engineer

Hello I'M Wonseok aka NASA1515

Expertise

Public Cloud
k8s/Docker
Python

Social Media

instagramwebsitelinkedingithub

Related Posts

Lock 기반의 DataBase Transaction의 Isolation Level
2024-07-03
4 min

Topics

CloudDevelop

Social Media