Kafak Client인 Producer의 기본적인 역할은 Source에 있는 Message들을 Broker에 있는 Topic으로 전달하는 역할을 맡고 있습니다.
여기서 Producer는 Kafka Producer API와 구성된 Application을 의미합니다. Producer를 통해 전달되는 Message의 구조는 아래과 같습니다.
Producer의 Message는 Record 형식으로 정의하여 전달할 수 있습니다.
Record는 보통 아래 5가지의 정보가 추가될 수 있고, Avro, Json Format으로도 전달이 가능합니다.
- Topic (Topic)
- Topic 중 특정 파티션 위치 (Partition)
- Message 생성 시간 (Timestamp)
- Message 키 (Key)
- Message 값 (Value)
Kafka Cluster의 모든 Broker들은 각자 Bootstrap Server의 역할을 수행할 수 있습니다.
각 Server별로 정보를 공유할 때 일반적으로 하나의 서버가 아닌 Cluster에 연결되어 있는 Bootstrap Server List와 Metadata가 전달됩니다.
뒤에서 더 자세히 설명하겠지만, 데이터의 가용성 및 안정성을 위해서는 가급적 2개 이상의 Bootstrap Server가 필수입니다.
Producer는 Massage Record를 적절한 Brocker에 보내기 위해, Bootstrap Server 중 하나에 대한 Connection을 진행하게 됩니다.
Bootstrap Server에서는 사용 가능한 모든 Brocker List, Topic List, Partition, Replication Factor 등의 MetaData를 반환하고
해당 정보를 기반으로 Producer는 특정 Record의 Leader Partition을 호스팅해서 Leader Broker를 식별하게 됩니다.
Producer는 아래의 4가지 과정을 통해서 Message를 Broker로 Producing 합니다.
- 직렬화 (Serializer)
- Partitioning (Partitioner)
- Message Batch (Record Accumulator)
- 압축 (Compression)
- 전달 (Sender)
해당 과정은 Broker에게 Message를 전송할 수 있도록 Message를 변환하거나, 필요한 값을 지정하는 부분입니다.
byte형식으로 serialize
되어 보내지고 broker에 저장됩니다.bootstrap.server
, key serializer
, value serializer
는 반드시 설정해야하는 필수 옵션입니다.
직렬화(serializing)
: Producer가 Send() Method를 호출합니다, Message의 키/값은 Byte 뭉치 형태로 변환됩니다.파티셔닝(Partitioning)
: 직렬화 된 Message는 Partitioner를 통해 Key 값을 기준으로 Topic의 어떤 파티션에 저장되는 지 결정됩니다.RR(Round Robbin)
형태로 Partitioning을 진행합니다.
Kafka는 동일한 Partition에 대해서는 순서를 보장합니다.
압축 (Compression)
: Message Compression이 설정되어있으면, 해당 Format으로 압축합니다. (Compression은 broker가 아닌 client에서 진행됩니다)
전달 (Sender)
: 위의 로직이 완료되었다면, Producer는 Broker의 Leader Partition으로 Message를 전송합니다.Message마다 매번 네트워크를 통해 전달하는 것은 굉장히 비효율적입니다.
Batch
전송을 사용할 수 있습니다.
Batch의 경우 Producer 내부의 Record Accumulator(RA)가 RA는 각 Topic 파티션에 대응하는 배치 큐(Batch Queue)를 구성하고
Message를 레코드 Batch(Record Batch) 형태로 묶어 Queue에 저장합니다. 각 Batch Queue에 저장된 레코드 Batch들은 설정 값에 따라 각 Broker에 전달됩니다.
이 과정은 Sender가 처리합니다. Sender는 스레드 형태로 구성되며, 관리자가 설정한 특정 조건에 만족한 레코드 Batch를 Broker로 전송합니다.
결론적으로 batch size property에 따라서 그룹화되고, Topic의 각 Partition은 독립된 Buffer를 가지게 됩니다.
위에서 각 Message마다 매번 네트워크를 통해 전달하는 것이 굉장히 비효율적이라고 설명을 했는데 그 원인은 Send() Method에 있습니다.
기본적으로 send는 비동기 메서드이다. Kafka Client doc에서의 send 메서드 주석을 보면 아래와 같이 설명되어 있습니다.
The send is asynchronous and this method will return immediately once the record has been stored in the buffer of records waiting to be sent.
Asynchronously send a record to a topic and invoke the provided callback when the send has been acknowledged.
즉 send() Method는 비동기로 동작하며, 전달된 ProducerRecord Message는 Broker로 즉시 전송되는 것이 아니라
Producer 프로세스의 buffer에 저장된 후 전송된다. 이와 같은 구현을 한 이유는 바로 다음 줄에 적혀있습니다.
This allows sending many records in parallel without blocking to wait for the response after each one.
동기 방식으로 Method를 구현했다면 send()가 호출될 때마다 Message가 하나씩 Broker로 전송될 것이다. 이는 아래와 같은 단점이 존재합니다.
- 매번 send() 메서드 호출 시 많은 요청으로 인한 I/O Block이 발생하여 병렬성이 떨어짐
- Bulk로 보내는 것보다 Network Overhead가 발생
때문에 KafkaProducer의 send를 호출하게 되면 ProducerRecord를 KafkaProducer의 내부 Buffer에 저장해놓은 후,
어느정도 Message가 모이게 되면 여러 개의 Message를 한번에 보내고 Callback을 호출하는 Batch 전송을 사용합니다.
이 단계에서 Recode Accumulator(RA)의 Partition Batch는 전송 될 Brocker 별로 그룹화되게 되고.
Send Method를 위한 batch 옵션을 제공하는데 아래 표의 batch.size와 linger.ms 두 설정을 통해 Brocker로 Message가 전송됩니다.
Option | Description |
---|---|
batch.size(default 16kb) | Batch size를 정의, Message의 용량이 정의된 size에 도달 할 때 까지 기다렸다가 보낸다 |
linger.ms(default 0) | batch.size가 도달하지 않으면 쌓여있는 Message를 처리할 수가 없다. 때문에 유휴 시간을 설정. size가 도달하지 않더라도 시간에 도달하면 Message를 보내게 된다. |
멱등성이란 여러번의 연산이 수행되더라도 동일한 결과가 나타내는 것을 뜻합니다.
즉 멱등성 Kafka Producer라 함은 동일한 데이터를 여러번 전송하는 과정이 수행되더라도, Kafka Cluster에는 단 한번만 저장됨을 의미합니다.
기본적으로 Producer의 동작 방식은 적어도 한번 전달(at least once delivery)을 지원합니다.
at least once
란, Producer가 Cluster에 데이터를 전송하여 저장할 때 적어도 하나 이상의 데이터는 적재되는 것을 보장한다는 의미입니다.
그러나 중복 발생의 가능성은 존재합니다., 아래서 멱등성, 즉 중복이 없는 전송을 위한 Message 전송 방식을 보겠습니다.
Kafka에서는 Broker와 Producer의 설정 및 구성에 따라 아래 세가지의 방식을 모두 지원합니다.
At Most Once 방식에서 Message는 최대 한 번만 전송되어야 합니다.
결론적으로 Message 응답에 대한 Ack를 받지 못하더라도 재전송하지 않습니다.
중복 가능성을 회피하기 위해서 재전송을 하지 않기 떄문에 네트워크 이슈 등이 발생해 Message를 받지 못한 경우
해당 Message를 유실할 수 있습니다. 즉 Message의 손실을 감안하더라돠 중복 전송을 하지 않는 경우 입니다.
해당 처리 방식은 더 높은 처리량과 낮은 대기 시간을 쉽게 달성할 수 있어, 대량 로그 수집 or IoT 같은 환경에서 사용 됩니다.
At Least Once 방식에서는 Message의 중복은 허용되지만, Message의 손실은 허용하지 않습니다.
간단하게 정리하면 Producer는 Brocker에서 Ack를 보내지 않은 경우 (특정 이슈 상황에 의해)
Broker가 Message는 저장되고 Ack만 전송하지 못했는지, Message 저장과 Ack 전송 둘다 못한 것인지 판단하지 못합니다.
떄문에 위와 같은 상황에서 만약 Message는 저장된 전자의 경우에는 중복 Message가 저장될 수도 있는 것입니다.
멱등성이라는 의미에 가장 적합한 방식입니다.
Message는 한번만 전달되어야 하고, 어떤 Message도 손실되어서는 안됩니다.
Producer ID, Sequence Number를 사용해서, Ack()의 응답을 받고 Message를 보내는 과정이 필수적이기 때문에
다른 응답 방식에 비해서 응답 대기시간이 길고, 처리량이 낮습니다.
At Most Once | At least Once | Exactly Once | |
---|---|---|---|
Duplicates | No | Yes | No |
Data Loss | Yes | No | No |
Processing | Zero or One Time | One or More Time | Exactly One Time |
Discription | 전달하는 과정에서 누락 가능 | 저장하는 과정에서 중복 체크에 대한 유실 | 모두 만족 |
여기서 조금 더 자세하게 짚고 넘어가야 하는 부분은 ack()라고 하는 Message 처리에 대한 응답 옵션 입니다.
아래의 Producer의 Acks 속성과 Brocker의 min.insync.replica 속성을 사용해서 kafka에서는 여러가지 delivery semantics 처리가 가능합니다.
ACK=0 일 경우 At most once 해당 하는 방식이 적용됩니다.
Kafka Producer는 Brocer에 레코드를 보내고 응답을 기다리지 않습니다. 즉 응답을 기다리지 않는 상태입니다.
위에서 설명한 것과 동일하게, 빠른 속도의 데이터 전송에서만 사용해야 합니다.
ACK=1 일 경우도 At most once 해당 하는 방식이 적용됩니다.
다만 다른 점은 Leader에게만 전달되었는지 확인하고, 다른 follower에게까지 전달되었는지는 확인하지 않습니다.
exactly once 에 해당 하는 방식이 적용됩니다.
leader에 레코드가 전송되고, 다른 follower에게까지 Replica가 모두 적용된 후에 ack()를 보내는 옵션입니다.
exactly once을 구현하기 위해선 Brocker의 idempotent 설정, Acks = all, 또한 min.insync.replicas Option이 필요합니다.
Acks | 지연율 | 처리량 | 데이터 보장 |
---|---|---|---|
0 | Low | High | 보장 하지 않음 |
1 | Medium | Medium | Leader Only |
all | High | Low | All Replicas |
위에서 Message 그리고 Producer의 semantics에 대해서 전체적으로 살펴 보았습니다.
여기서 한번 더 정확하게 짚고 넘어가야 하는 부분은 Producer에선 레코드 전송에 실패 시 Retry에 대한 시나리오입니다.
옵션 | 설명 | Default |
---|---|---|
retries | Message를 send하기 위해 재시도 되는 횟수 | MAX_INT |
retry.backoff.ms | 재시도 사이에 추가되는 대기 시간 | 100 |
request.timout.ms | Producer가 응답을 기다리는 최대 시간 | 30.0000(30초) |
delevery.timeout.ms | send 후 성공 또는 실패를 보고하는 시간의 총 시간 | 120.0000(2분) |
속성에 대한 Message 관련 주기는 아래와 같은 그림을 가진다.
delivery.timeout.ms는 Message를 완료 보고 받는데 걸리는 총 시간 입니다.
때문에 linger.ms, retry.backoff.ms, request.timeout.ms가 합쳐진 시간 보다 커야 합니다.
위에서 멱등성을 보장한다는게 kafka에서는 어떤 의미를 갖고 있는지에 대한 설명을 진행했습니다.
간단하게 요약하자면 Producer에서 레코드를 전송했지만, 기타 이슈가 발생하여 ack를 받지 못한 경우 중복 Message가 전달 및 저장될 수 있습니다.
여에 멱등성 보장 Producer는 중복 방지를 위한 PID, Sequence Number 기반의 Message를 추적, 동일한 Commit에 대해서 중복으로 처리합니다.
멱등성을 위한 필수 설정
Configuration Default Description acks all Producer 가 요청을 보내고 Leader 가 Replication의 수신을 확인해야되는 개수 enable.idempotence True Producer 가 Record 쓰기 작업을 단 한번만 허용할 것인지 멱등성을 보장 max.in.flight.requests.per.connection <=5 한 번에 몇 개의 요청(Request)을 전송할 것인가를 결정 retries 0 이상 Message를 send하기 위해 재시도 되는 횟수 위의 설정값을 모두 설정하지 않으면 ConfigException이 발생하므로 모두 적용이 필요하다.
enable.idempotence는 PID 라는 ProducerId 를 통해서 어떤 Producer 가 어떤 record 를 기록하게 되는지 체크하게 됩니다.
Producer 가 PID 를 지정하여 보내게 되면 해당 레코드를 처리하는 Broker 에서 PID 값을 기반으로 Record에 대한 쓰기 작업을
단 한번만 하도록 중복 체크할 수 있게 되는 것입니다. (그림의 4,5번) 위 그림처럼 데이터를 전달 할때 마다 시퀀스 넘버를 1씩 증가합니다.
동일한 세션 멱등성 Producer는 동일한 세션에서만 Exactly Once을 보장합니다.
여기에서 동일한 세션이란, PID의 생명주기를 뜻합니다.
PID의 생명주기는 Producer Application에 이슈가 발생해 재시작되면 PID가 달라지게 되는 부분 입니다.
동일한 데이터를 보내도, PID가 달라지게 된다면 Broker는 다른 Producer Application이 다른 데이터를 보냈다고 판단합니다.
때문에 중복이 발생할 수 있습니다.
OutofOrderSequenceException
멱등성 Producer의 시퀀스 넘버는 0부터 시작하여 +1씩 증가합니다.
Broker에서 멱등성 Producer가 전송한 레코드의 PID와 시퀀스 넘버를 확인하는 과정에서
저장되어 있는 시퀀스 넘버가 일정하는 않은 경우에 발생하는 에러입니다.
ex) 시퀀스 넘버 0 다음에 1이 와야 하는데, 다음에 시퀀스 2로 데이터가 존재할 경우 (즉 순서보장이 되지 않은 경우)
max.in.flight.requests.per.connection Option은 위에 설명한 것 처럼 Batch 처리를 할때 한번에 보내는 레코드의 양을 정의 합니다.
여기서 Default 설정은 “5” 입니다. 즉 한번의 Connection 당 요청으로 전송될 수 있는 최대 갯수가 5개라는 의미 입니다.
그러나 Batch 5개의 Message를 보내는 도중 실패가 발생하면 어떤식으로 동작하게 될까요??
결론은 Commit되는 Message의 순서가 뒤죽박죽 되게 됩니다. 즉 멱등성의 조건이 충족되지 않는다는 말입니다.
때문에 위에서 설명한 것 처럼 Topic Partition의 순서와 멱등성을 보장하기 위해서는
enable.idempotence, max.in.flight.requests.per.connection Option이 같이 쓰여야 하는 것입니다.
Kafka Producer는 기본적으로 한번에 Message를 많이 처리(전송)하려고 합니다.
하지만 이러한 설정은 각자 요구되는 전송 환경에 따라 적합하게 맞춰가야 하는 부분입니다. (개발자의 역량)
즉 처리량과 지연율 사이에서 혹은 지연율과 전송 안정성 사이에서 트레이드오프(Trade off)를 고려해야 합니다.
Compression.type, batch.size, linger.ms
세개의 설정을 변경하면 Batch 처리량 증가를 얻을 수 있습니다.acks 설정이 0 -> 1 -> all
로 설정될수록 Message 전송의 안정성은 증가하지만, 그만큼 응답을 대기시간이 증가하므로 지연율이 높아집니다. min.insync.replicas
설정이 1인 경우, acks 설정이 1인 경우와 동일합니다. Replication이 없으니…