순서보장의 문제
가 발생할 수 있습니다.Consumer 그룹에 속한 Consumer가 하나일 경우에는, Consumer 1이 모든 Partition의 Record를 소비합니다.
위의 그림에서 보이는 것 처럼 하나의 Consumer 4개의 Partition에서 Data를 읽어갑니다.
참고) Kafka Topic의 Partion과 Consumer의 연결은 소유권(Ownership)이라고 하며, “Consumer 1”이 Partition 0을 소유(Own)한다고 표현합니다.
위의 설명처럼, Consumer가 구성되게 된다면, Producer가 전송하는 모든 Record를 하나의 Consumer가 처리하기에는 무리가 있습니다.
위와 같다면 Producer는 Record를 보냈지만 Consumer는 모든 Record를 처리하지 못해 Partition에는 처리되지 않은 Record가 쌓이게 됩니다.
이 Record의 간극과 병목을 LAG이라고 부르고, LAG의 갯수가 많을 수록 파이프라인의 실시간성은 떨어지고, 장애 발생확률이 올라가게 됩니다.
때문에 이런 문제를 해결하기 위해서는 Record를 읽어가는 Consumer의 Scale-Out이 필요합니다.
간단하게 말하면 Consumer의 갯수를 늘려 Topic A에 쌓이는 Record의 처리량을 늘리는 작업이 필요합니다.
Consumer의 병목을 해결하기 위해서 “Consumer 2”를 추가했습니다.
이제 Consumer Group A에는 기존에 존재하던 “Consumer 1”과 더불어 “Consumer 2”가 존재하게 되었습니다.
위처럼 Consumer Group에 Consumer가 추가되면, Consumer Group에 속한 Consumer 들은 기존에 가지고 있던 Partition의 소유권을 분배합니다.
아래서 더 자세하게 알아보겠지만, 이 과정을 리밸런싱(Rebalancing) 합니다.
Consumer 1개가 추가된 위 그림은 4개의 파티션의 소유권을 2개의 Consumer가 각각 2개씩 나눠갖는 이상적인 시나리오가 됩니다.
이 경우 Consumer 별 담당하는 저장 Topic Record의 양이 줄어, 처리 속도도 증가하고, 기존보다 시나리오보다 지연이 덜 발생하게 됩니다.
그렇다면 처리량(Throughput)을 늘리기 위해서는 Consumer를 계속 추가하면 될까????
아닙니다. 그렇지는 않습니다.
위의 그림처럼, 4개의 Consumer로 구성된 Consumer Group으로 예를 들어 보겠습니다.
해당 Consumer Group이 Topic A를 구독하게 된다면 그림과 동일하게 Partition의 소유권이 Consumer에 할당될 것 입니다.
즉, 각자 하나씩 Partition을 담당해서 Data를 읽어가는 이상적인 시나리오가 됩니다.
그러나 이 시나리오에서도 지연이 발생한다고 가정한다면, 또 Consumer를 추가해서 처리량을 늘리면 되겠지? 라고 생각할 것입니다.
하지만 처리량 증가의 효과는 없습니다.
왜냐하면 하나의 Partition에는 동일한 Consumer Group에서 반드시 하나의 Consumer만 할당이 됩니다. (Partition : Consumer = N : 1 관계)
즉, Consumer Group의 Consumer가 Topic Partition보다 많으면, Consumer Group의 초과 할당 된 Consumer는 유휴 상태로 대기하게 됩니다.
따라서 Consumer를 늘려도, LAG이 발생한다면 Consumer만 추가하는 것이 아니라, Partition의 갯수도 같이 늘려줘야 합니 다.
Kafka가 이와 같은 제약을 만들게 된 이유는 “Partition 내에서의 순서보장”을 위해서 입니다.
Kakfa에서 Producer는 Broker의 Topic에 레코드를 전송하고, Topic은 한개 이상의 Partition으로 나눠지게 된다고 계속 말하고 있습니다.
Partition은 Topic을 분할한 단위이고, Partition이 여러개일 경우 Producer가 보낸 Messages의 순서는 보장 될 수 없습니다.
Messages의 순서는 각 Partition 안에만 보장됩니다. 이해가 쉽게 아래에 제가 간단한 예를 들었습니다.
① Producer는 “1,2,3,4,5,6,7,8,9”의 순서로 레코드를 Publish 했습니다.
② Consumer는 각 Partition 마다 가지고 있는 Offset (마지막으로 읽은 Record)부터 차례로 Messages를 읽습니다.
③ 그러나, 하나의 Consumer가 세개의 Partition을 모두 구독(Subscribe)했을 경우에는 Messages는 원래 전송된 순서가 보장되지 않습니다.
이유는 consumer는 연결되어 있는 세개의 partition에서 동시에 Messages를 Polling하기 때문입니다.
결론적으로는 각 partition에서 돌아가면서 레코드를 가져와도 “1,3,4,2,5,6,7,9,8” 로 Messages의 순서가 보장되지 않은 채로 도착하게 될 것입니다.
④ 그렇지만, 각 Partition 내에서는 레코드 순서가 보장됩니다. 즉, 정리해보면 아래와 같이 보장됩니다.
- partition 1에서의 1,2,7,8
- partition2에서의 3,5
- partition3에서의 4,6,9
Messages는 key에 따라 각자 다른 partition으로 assign되도록 설정할 수 있으므로, 이는 Messages의 순서가 중요한 경우 유용하게 사용될 수 있습니다.
Ex)
nasa1515라는 User의 action Event가 A -> B -> C의 순서로 발생하고, Kafka Consumer가 반드시 순서를 보장해서 subscribe 해야 합니다.
이때 nasa1515 userId를 partition key로 설정해, nasa1515 user의 Event는 모두 같은 partition에 전송되게 설정해 순서를 보장할 수 있습니다.
이처럼 레코드의 순서가 중요한 경우, partition의 수를 1로 조절하거나, partition key를 적절히 설정하여야 합니다.
Kakfa의 큰 장점 중 하나를 꼽자면 다중 Consumer 그룹(Multiple Consumer Group)을 지원한다는 것 입니다.
일반적인 Messages 큐 시스템에서는 한 Consumer가 Record를 읽어가면 다른 Consumer는 해당 Record를 제외한 다음 Record를 읽어갑니다.
즉, Topic에 [1], [2], [3]이 있으면 첫 번째 Consumer가 [1]을 읽어가고 두 번째 Consumer는 [2]를 읽어가게 되는 것입니다..
하지만 Kakfa는 Consumer 그룹 단위로 다른 Consumer 그룹과는 독립적으로 Data를 읽어갈 수 있습니다.
A 서비스를 하는 팀이 Record를 nasa Topic으로 보내고 Consumer Group A로 이 Record를 처리하고 있습니다.
얼마 후 B 서비스팀에서 A 서비스의 Record를 필요하게 되었습니다.
원래는 A 서비스 팀이 직접 로그 메시지를 전달해주었지만 (이러한 상황이 반복되어 복잡한 시스템 흐름도가 만들어짐)
Kafka를 사용해서 A 팀의 Topic 정보를 기반으로 B팀의 새로운 Consumer 그룹이 Topic에 접근하여 동일한 메시지를 가져갈 수 있습니다.
이렇게 여러 Consumer 그룹들이 하나의 Topic의 메시지를 가져갈 수 있는 이유는 우선 Kafka는 Record 읽었다고 Record를 삭제하지 않고, Conumser Group마다 각 Group의 offset을 별도로 관리되어 운영되기 때문입니다.
그렇기 때문에 하나의 Topic에 여러개의 Consumer Group이 연결되어도 영향 없이 Record를 읽어 가져갈 수 있게 됩니다.
위에서 잠깐 언급했었던 리밸런싱(Rebalancing)에 대해서 살펴보도록 하겠습니다.
위에서 병렬로 여러개(4개)로 나뉘어진 Partition을 한개의 Consumer로 처리하는 구성 시나리오를 예로들며 Consumer Group을 설명했었습니다.
해당 시나리오를 설명하며, 병목 현상 (LAG 발생)으로 인해 Consumer를 하나 추가 구성에 대해서도 설명을 했었습니다.
하나의 Consumer가 추가된 이후 Partition의 소유권(owner)를 Consumer끼리 나눠 갖는 과정을 리밸런싱(Rebalancing)?이라고 지칭합니다.
위의 시나리오에서 리밸런싱(Rebalancing)은 병목 해소를 위한 Consumer가 추가되었을 발생했습니다.
그러나 리밸런싱(Rebalancing)은 Consumer 추가 상황에서 뿐만 아니라 보통 대표적으로 아래 상황을 기준으로 발생합니다.
아래 그림은 Consumer가 Consumer group에서 없어졌을 때 (퇴출) 경우 즉, Consumer Application에 장애가 발생했을 경우로 설명하겠습니다.
4개의 파티션을 두 개의 Conumser가 잘 소비하고 있다가 “Consumer 2”가 비정상적인 상황으로 동작하지 못하는 경우가 발생할 수 있습니다.
이 경우 “Consumer 2”에 할당되었던 Partition 1, Partition 3의 Record는 소비되지 않고 남아 있게됩니다.
결론적으로는 Partition 1, Partition 3의 Record는 계속해서 처리되지 않고 쌓이는 지연, 병목 현상이 발생하게 됩니다.
이런 상황은 바람직하지 않은 장애 상황이기 때문에 살아있는 Conumser끼리 다시 Partition의 소유권을 다시 나눠 갖게 됩니다.
그러나 이 경우 Consumer Group내에 살아있는 Consumer는 “Consumer 1” 밖에 없기 때문에 “Consumer 1”이 모든 소유권을 가져가게 됩니다.
리밸런싱(Rebalancing)을 다른 관점으로 바라보자면, 리벨런싱을 통해서는 새로 참여하게 되거나 제외시키고자 하는 클라이언트를 dynamically 하게 반영할 뿐 아니라 클라이언트의 최신 상황에 맞추어 Data를 지속적으로 처리해갈 수 있도록 합니다. 따라서 카프카의 특징 중 확장성과 가용성을 보장해주는 수단이라고 할 수 있을 것 같습니다.
이렇게 Consumer가 Group cooperating 할 수 있도록, Kafka Rebalance Protocol (Group Membership Protocol 및 Client Embedded Protocol)을 제공합니다. Consumer Clinet의 입장에서 보면, Topic을 Consumer할 논리적인 그룹을 형성하고 Leader 역할을 할 클라이언트를 선출하며 파티션을 분배하는 과정이라고 할 수 있습니다. (Group의 Leader 노드는 멤버들에게 자원을 분배하는 역할을 가지고 있습니다.)
즉, Kafka Client는 Group Membership Protocol을 사용해서 Client들을 논리적인 그룹에 참여 시킴으로서 cooperating 하려고 합니다.
Cooperating 과정에서, 그룹 코디네이터(Group Coordinator)는 GroupCoordinator 인스턴스를 백그라운드 프로세스로 실행하면서 Consumer 그룹을 관리하는 역할을 가진 카프카 브로커입니다. Consumer Group Membership에 변화가 발생했을 때, 리벨런싱 과정에서 Consumer 클라이언트와 그룹 코디네이터 간의 요청/응답 Messages 전달이 이루어집니다.
하지만 Kafka Rebalance Protocol 에서는 그룹 코디네이터의 개입을 여기까지만으로 제한시키면서 역할을 축소시키는 특징이 있습니다. 사실상 실질적인 리소스 재분배 작업은 Consumer 클라이언트끼리만 하는 일이고, 코디네이터는 그룹의 형성 자체만을 담당한다고 봐도 무방합니다.
Kafka Rebalance Protocol에서는 리소스가 그룹 멤버들 (Consumer 클라이언트) 사이에서 적절하게 분리되어야 할 때마다 리벨런싱의 새로운 라운드가 시작되어야 한다는 원칙이 지켜져야 합니다. 즉, Rebalancing의 발생은 곧 어떤 Rebalancing Round가 1회 완료되었음을 뜻합니다. 위에서 언급했듯이 이 과정에서 그룹 Group Membership Protocol 조건을 충족시켜가면서 Consumer 그룹 내에서 파티션을 분배하는 작업을 수행하게 됩니다.
group id = A 인 Consumer 그룹에 C-3 이라는 새로운 Consumer 멤버가 추가되었다고 가정해보면, Rebalancing Round는 아래와 같이 동작하게 됩니다.
C-3 이 스타트되면, 그룹 멤버십 구성에 변화가 생겼으므로 변화를 반영시켜주기 위해 리벨런싱 라운드를 시작해야 합니다.
따라서 group.id = A 에 참여할 Consumer들은 특정 카프카 브로커를 Group Coordinator 자격으로 얻기 위해 Broker의 Coordinator를 찾는 FindCoordinator request 를 브로커에게 보냅니다.
그 뒤로 Consumer 클라이언트들은 JoinGroup request 를 전송하며 Rebalance Protocol 을 초기화합니다.
여기서 JoinGroup request 메시지에선 다음 필드를 포함합니다.
(JoinGroup request 는 자바 애플리케이션에서 클라이언트가 poll() 메소드를 호출하며 이루어집니다.)
JoinGroup 요청은 session.timeout.ms와 max.poll.interval.ms 등의 클라이언트 설정들을 포함합니다.
coordinator는 이 설정들을 그룹에서 Consumer들로부터 응답이 없을 경우 제외시키기 위한 기준으로 사용합니다. (아래에서 다루겠습니다.)
group protocols에서는 protocol name 과 protocol metadata 필드를 포함해서 사용 가능한 client protocol의 목록, 그리고 프로토콜을 실행시킬 때 필요한 메타Data들을 보냅니다. client-protocol 은 partition.assignment.strategy 와 같은 Consumer의 partition assignor 의 목록입니다. 메타 Data들은 Consumer가 구독하고 있는 topic 들입니다.
JoinGroup 은 일종의 베리어 역할을 합니다. 즉, coordinator는 모든 Consumer로부터 해당 요청을 받기 전(group.initial.rebalance.delay.ms)까지, 혹은 rebalance timeout 이 발생하기 전까지 응답하지 않습니다.
이렇게 Group Management 에 참여하는 모든 Consumer들이 코디네이터에게 JoinGroup 요청을 전송하고 나면, 코디네이터 브로커는 이에 대한 Response 를 전송합니다. 이 과정에서 Consumer 그룹 내 첫 번째 Consumer는 group leader로 선출되며, leader Consumer는 작동중인 구성원들의 목록, 그리고 선택된 assignment strategy를 응답을 받고, group leader는 파티션 배정의 책임을 지게 됩니다.
이제 Group Management 에 참여하고 있는 모든 Consumer 멤버들은 코디네이터에게 SyncGroup Request 를 전송합니다.
이때 그룹 리더는 계산한 파티션 배치를 함께 보내며, 다른 Consumer들은 그냥 빈 요청을 보냅니다.
즉, 위의 그림처럼 Leader Consumer의 경우, Group_assignment 속성에 각 Consumer의 Partition 할당 정보를 전달합니다.
(Member id에 따른 member_assignment) 결론적으로는 파티션 배치를 보냅니다.
coordinator는 모든 SyncGroup 요청에 응답(Response)을 보내게 되는데 (이 응답엔 Consumer들에게 각자 할당받은 Partition 정보를 Member_assignment에 담겨 있습니다.) Cleint 단에서는 해당 Messages를 받게되면, 연계된 Application에서는 ConsumerRebalanceListener의 onPartitionsAssignmedMethod()가 트리거되면서 Record를 fecth 해오기 시작합니다.
위의 JoinGroup 과 SyncGroup 요청 / 응답 과정으로 보면, 코디네이터의 역할은 Consumer 그룹 내의 리더를 선출하고, 리더가 전송한 새로운 파티션 할당 정보를 다른 멤버들에게 분배해주는 것으로 나뉘어져 한정되어 있습니다.
이제 Consumer 그룹에선 Rebalancing Round를 완료하고 Record를 가져올 수 있습니다. 이 때 Fetcher Class를 활용하게 됩니다.
Fetching 과정에서 poll() 메소드가 호출되면 Fetcher 는 자기가 갖고 있는 레코드를 max.poll.records 속성값만큼 반환하고, 이 때 Fetcher 가 레코드를 가지고 있지 않아서 빈 Map 이 반환된 경우에만 브로커에서 호스팅하는, Consumer가 할당받은 토픽의 파티션 리더에게 요청이 전송됩니다.
즉, Consumer의 poll() 호출은 보통 Fetcher 가 가지고 있는 레코드를 반환하지만, Fetcher 가 레코드를 가지고 있지 않는 경우에만 브로커로 요청이 전송되는 구조입니다.
Coordinator 는 Rebalancing Round에서 어떤 Consumer를 그룹에 참여시킬지 결정하게 되는데, 그걸 판단하는 방법 중 하나는 Heartbeat 입니다.
각 Consumer는 heartbeat.interval.ms로 설정된 시간마다 한번씩 코디네이터 브로커에게 Hearbeat 메시지를 전송해 Session Alive 신호를 보냅니다. Rebalancing Process에 들어가게 되면 코디네이터는 그 Heartbeat를 기준으로 Consumer를 그룹에 (re)join 시킬지 판단하게 됩니다..
옵션 | 설명 | default |
---|---|---|
session.timeout.ms | Consumer가 종료된 것인지 파악하는 간격. 코디네이터는 heartbeat을 수신하지 못하면 Consumer의 판단하고 리밸런싱 한다. | 45 Second |
heartbeat.intermal.ms | Consumer가 코디네이터에게 active 상태임을 전달하는 간격 일반적으로 session.timeout.ms 1/3로 설정 | 3 Second |
max.poll.interval.ms | Consumer가 실패한 것을 판단하는 Record Polling 간격 | 5 Min |
Rebalancing Process가 시작된다면 Clinet 단의 JoinGroup Request 메시지에 session.timeout.ms 속성을 포함하며 시작되며,
해당 설정에서 한번 더 Heartbeat 응답이 없다면 그룹 코디네이터는 Consumer 클라이언트 이슈로 판단하게 됩니다.
예를 들어, Consumer를 의도적으로 Stop 하기 위해 LeaveGroup Request 를 코디네이터에게 전송했다고 가정하면,
이 때 나머지 Consumer들은 다음 Heartbeat 때에 리벨런싱이 수행될 것을 알 수 있고, 새로운 리벨런싱 라운드로 파티션 재분배가 발생합니다.
heartbeat.interval.ms는 session.timeout.ms 의 1/3 이하로 구성하는 것이 좋습니다.
이 경우, 일시적인 이슈로 인한 약간의 Heartbeat 손실에 대한 경우를 대비할 수 있습니다. 즉, 소비자가 실패한 것으로 간주되지 않습니 다.
아래 그림에서 두 개의 Heartbeat가 손실되었지만 session.timeout.ms이 시간 초과되기 전에 세 번째 Heartbeat가 도착했으므로 Group Coordinator는 소비자를 정상으로 판단합니다.
아래 그림과 같이 Consumer의 이슈로 Heartbeat이 전달되지 않아 session.timeout.ms이 만료되면
소비자 그룹에서 제거되어 Rebalancing Round가 발생합니다.
위에서 Rebalancing에 대해서 장황하게 설명을 했는데, 중요한 부분은 언제 Rebalancing이 일어나는지?, Rebalancing 동작이란 무엇인지 입니다.
최종적으로 요약하자면 Rebalancing Process가 완료되면 다음과 같은 일이 완료된 것이라고 파악하면 될 것 같습니다.
1. Logical 그룹 멤버십 재정의
2. 리더 선출
3. 리소스 재할당
그러나 위에서 미처 설명하지 못한 Rebalancing Process 한계점들이 존재하는데…
전체 Process를 멈추지 않고는 Rebalancing Round를 완료할 수 없다는 부분 입니다. (Stop-the-World Effect)
풀어보자면, Rebalancing Process 도중 그 어떤 Consumer들도 정상적인 Data 처리를 하지 못한다는 문제 입니다.
예를 들어보자면 천 개의 Connect Task 가 Group에 존재한다면, 그 천 개의 프로세스가 전부 정상 동작하지 못하게 되는 상황을 맞이하게 됩니다.
또한 이렇게 리벨런싱이 초래한 Stop The World 는 일반적인 하드웨어나 네트워크 손실 문제로 발생한 일시적인 client fail 과 더불어, scale up / down 의 상황이나 계획적인 클라이언트 start / stop / restart 의 상황에서 전부 발생할 수 있습니다.
조금 더 쉬운 이해를 위해서, 하나의 Consumer Instance가 M1(C-1) 종료되었다고 가정해보고 예를 들겠습니다.
첫 번째 리벨런싱은 Consumer가 coordinator에게 멈추기 전 LeaveGroup 요청을 보내면서 시작됩니다.
남은 Consumer들은 다음 번 Heartbeat 때에 이러서야 리벨런싱이 필요하다는 사실을 알게 되고,
파티션 재배치를 위해 또 새로운 JoinGroup/SyncGroup 을 시작하게 됩니다.
이떄, 전체 Consumer의 파티션 재조정이 끝나기 전까지는, Consumer들은 아무런 Data도 처리할 수 없습니다. 기본적으로 rebalance timeout은 5분으로 설정되어 있으며 상황에 따라서는 매우 큰 consumer-lag을 유발할 수 있습니다.
여기서 또 다른 문제는 해당 Consumer M1(C-1)가 다시 시작되면 어떤 일이 발생할까요?
Consumer는 다시 Consumer 그룹에 참여하기 위해 또 다시 새로운 리벨런싱을 유발할 것이고, 또 다시 전체 Consumer가 멈추는 일이 발생할 것입니다.
다음과 같은 재시작은 rolling update 시에도 발생할 수 있고, 해당 시나리오는 Consumer에게 매우 좋지 않습니다.
이유는, 실제로 위의 그림에서 예를 든 세 개의 Consumer들이 재시작 되면 총 6번의 파티션 재조정이 발생하기 떄문입니다.
최종적으로 Consumer에서 가장 흔하게 발생하는 문제는 다음과 같습니다.
첫 번째 경우는 coordinator 는 session.timeout.ms 만큼의 시간동안 heartbeat 요청을 못받으면 Consumer가 죽었다고 생각합니다.
두 번째의 경우, Data를 처리하는데 걸리는 시간은 max.poll.interval.ms 시간보다 적어야 합니다.
이전 포스트 : Producer의 Delivery semantics에서 언급했듯이, Delivery semantics은 다음 세가지가 존재합니다.
1. At Most Once, 2. At Least Once, 3. Exactly Once 다만 Producer와는 다르게. consumer는 아래 두 가지 의미 Sementics만 지원됩니다.
At most once Sementics 에서는 Record는 최대 한 번만 배달되어야 합니다.
해당 Sementics을 요약하자면 Record를 여러번 전달해서 중복을 허용하는 것보다는 Record를 읽어버리는 손실을 우선적으로 택한 방식입니다.
기본적으로 Consumer는 “enable.autu.commit”이 True로 설정되어 있으므로 자동적으로 At Most once를 사용하도록 설정이 됩니다.
Record를 읽고, Commit을 진행, Record의 처리 전에 Consumer에 장애가 발생하는 경우 처리되지 않은 Record는 손실되고 다시 읽지 않습니다.
그러나 Partition Rebalancing으로 인해 다른 Consumer가 할당되면 마지막 Commit Offset에서 Record를 읽게됩니다.
아래 그림처럼 Record는 Batch로 읽히고, Batch Record의 일부 또는 전체 Record는 처리되지 않을 수 있지만 정상 상태입니다.
At least once Sementics 의미론에서는 Record를 여러번 전달해서 Record의 손실은 없지만, 중복을 허용하는 방식입니다.
해당 방법론에서 Consumer는 Record의 중복이 발생하더라도 모든 Record를 확실히 읽고 처리하도록 합니다. 때문에 Application은 보다 높은 중간 처리량, 대기 시간이 존재합니다.
“enable.auto.commit” 값을 false로 설정하면 Record 처리 후 수동으로 Commit이 가능합니다.
처리하기 전에 Consumer에 장애가 발생하더라도 Offset이 읽은 상태로 Commit되지 않으므로, 처리되지 않은 Record가 손실되지 않습니다.
그러나 Partition Rebalancing으로 다른 Consumer가 마지막 Commit된 Offset에서 동일한 Record를 읽어 Messages의 중복이 발생할 수 있습니다.
총 3개의 포스트를 통해서 Kafka의 Producer와 Consumer가 어떤 방식으로 동작하는 지 간단하게 설명해봤습니다.
이제 다음 포스트에서는 kafka brocker에 대한 내용에 대해서 설명하도록 하겠습니다.
https://www.linkedin.com/pulse/kafka-consumer-delivery-semantics-sylvester-daniel/ https://joooootopia.tistory.com/30 https://medium.com/streamthoughts/apache-kafka-rebalance-protocol-or-the-magic-behind-your-streams-applications-e94baf68e4f2