[ADVANCED #3] Consumer 심층 분석: 그룹 코디네이터부터 리밸런싱 전략까지

2025. 9. 9. 12:30·Kafka/Core

Kafka Consumer, 핵심 원리 파헤치기

프로듀서가 메시지를 만드는 역할이라면, 컨슈머(Consumer)는 토픽의 메시지를 읽어와 소비하는 역할을 담당한다. 이번 글에서는 컨슈머가 어떻게 동작하고, 효율적으로 데이터를 가져오기 위해 어떤 전략을 사용하는지 깊이 파고들어 보자.

 

1. 컨슈머와 컨슈머 그룹: 함께 일하는 법

 

카프카의 모든 컨슈머는 컨슈머 그룹(Consumer Group)이라는 팀에 소속되어야 한다. group.id로 식별되는 이 그룹은 토픽의 파티션들을 효율적으로 분배받아 처리하는 단위가 된다.

  • 하나의 토픽 파티션은 같은 그룹 내에서 오직 하나의 컨슈머에게만 할당된다. 이는 동일한 메시지가 중복으로 처리되는 것을 막는 카프카의 핵심 원칙이다.
  • 반면, 서로 다른 그룹은 같은 파티션의 데이터를 각자 독립적으로 읽어갈 수 있다.
  • 하나의 컨슈머는 여러 개의 파티션을 할당받을 수 있다.

이상적인 상황은 파티션의 개수와 컨슈머의 개수를 일치시키는 것이다. 이렇게 하면 각 컨슈머가 하나의 파티션을 전담하여 처리 효율을 극대화할 수 있다. 만약 컨슈머가 파티션보다 많아지면, 남는 컨슈머는 아무 일도 하지 않고 대기하게 된다.


2. 컨슈머의 기본 동작 핵심 로직

2.1. Subscribe, Poll, Commit

 

컨슈머의 작업은 크게 세 단계로 이루어진다.

  1. 구독 (Subscribe): subscribe() 메서드를 호출하여 "앞으로 이 토픽의 메시지를 읽겠다"고 카프카에 등록한다. 이 과정을 통해 컨슈머는 자신이 읽어야 할 토픽과 파티션의 메타데이터 정보를 받아온다.
  2. 폴링 (Poll): poll() 메서드를 주기적으로 호출하여 브로커로부터 실제 메시지를 가져온다. 컨슈머는 이 메서드를 통해 루프를 돌며 계속해서 새로운 메시지가 있는지 확인하고 가져오는 작업을 반복한다.
  3. 커밋 (Commit): 메시지 처리를 성공적으로 완료하면, "여기까지 읽었다"는 표시를 남기는 것을 커밋이라고 한다. 이 정보는 __consumer_offsets라는 특별한 토픽에 저장된다. 만약 컨슈머가 A, B, C 메시지를 읽고 커밋하면, 다음 poll()에서는 이전에 읽었던 데이터를 중복으로 가져오지 않고 D부터 읽기 시작한다. 즉, __consumer_offsets는 각 컨슈머 그룹이 파티션별로 어디까지 처리했는지 기록하는 중요한 역할을 한다.
    • 위의 사진에서 첫번째에 0,1,2 offset을 가져오고, 두번째에 3,4,5 offset을 가져온다. 따라서 다음에 가져와야 할 offset은 6이므로 __consumer_offsets의 offset은 6으로 설정된다.

2.2. Offset Commit의 이해

컨슈머의 기본 동작은 poll()로 메시지를 가져와 처리하는 것이다. 하지만 "어디까지 처리했는지"를 정확히 기록하고 관리하는 오프셋 커밋(Offset Commit)이야말로 카프카의 데이터 신뢰성을 지탱하는 가장 중요한 기둥이다.

 

예를 들어, 컨슈머가 0~5번 오프셋의 메시지를 모두 처리했다면, "다음엔 6번부터 주면 돼" 라는 의미로 __consumer_offsets에 오프셋 6을 기록한다. 덕분에 컨슈머가 재시작되어도 중복 처리 없이 다음 작업을 이어갈 수 있다.

 

__consumer_offsets는 누가 관리하고, 리밸런싱 시 어떻게 사용될까? 이 토픽은 개별 컨슈머가 아닌 컨슈머 그룹을 위한 공동 장부와 같다. 이 장부는 브로커의 그룹 코디네이터가 관리한다. 장부에는 [그룹ID, 토픽, 파티션]을 키로 하여 "이 그룹은 이 파티션을 몇 번 오프셋까지 처리했다"는 정보만 기록될 뿐, "어떤 컨슈머가 커밋했는지"는 기록되지 않는다.

이것이 리밸런싱의 핵심이다.

  1. 그룹 내 Consumer #3이 갑자기 종료되어 리밸런싱이 발생했다고 가정하자.
  2. Consumer #3이 담당하던 Partition #2는 이제 Consumer #2에게 할당된다.
  3. Consumer #2는 일을 시작하기 전에 그룹 코디네이터에게 묻는다. "우리 그룹이 Partition #2를 마지막으로 어디까지 처리했나요?"
  4. 코디네이터는 공동 장부(__consumer_offsets)를 보고 "오프셋 10까지 처리했으니, 넌 10번부터 시작하면 돼"라고 알려준다.

즉, 리더 컨슈머가 직접 다른 컨슈머에게 지시하는 것이 아니라, 새로운 담당자가 그룹의 공동 장부를 조회하여 자신의 작업 시작점을 파악하는 방식이다.


3. Offset 내부 토픽 뜯어보기

컨슈머가 어디까지 메시지를 읽었는지 기억하는 것은 카프카의 신뢰성 있는 데이터 처리의 핵심이다. 이 중요한 정보는 __consumer_offsets라는 특별한 내부 토픽에 저장된다. 이번 글에서는 이 내부 토픽의 동작 방식을 파헤치고, 실제로 그 안의 데이터를 어떻게 들여다볼 수 있는지 알아보자.

3.1. Consumer의 auto.offset.reset

컨슈머 그룹이 특정 토픽을 처음으로 구독할 때, 어디서부터 메시지를 읽기 시작해야 할까? __consumer_offsets에 이 그룹에 대한 정보가 아직 없기 때문에 기준점이 필요하다. 이때 동작하는 것이 바로 auto.offset.reset 설정이다.

  • earliest: 파티션의 가장 처음, 즉 0번 오프셋부터 모든 데이터를 읽기 시작한다.
  • latest (기본값): 구독을 시작한 시점 이후에 들어오는, 가장 최신 데이터부터 읽기 시작한다.

중요한 점은 이 설정이 정말 "처음"일 때만 유효하다는 것이다. 일단 한 번이라도 커밋이 발생하여 __consumer_offsets에 오프셋 정보가 기록되면, 컨슈머는 재시작 시 auto.offset.reset 설정과 상관없이 저장된 오프셋 다음부터 데이터를 읽어온다.

컨슈머 그룹의 정보는 그룹 내 컨슈머가 모두 종료되어도 기본적으로 7일(offsets.retention.minutes) 동안 유지된다. 따라서 7일 안에 같은 그룹 ID로 컨슈머가 다시 접속하면 이전에 작업하던 위치를 이어서 처리할 수 있다.

 

3.2. __consumer_offsets 직접 읽기

  • __consumer_offsets는 카프카가 내부적으로 사용하는 토픽(Internal Topic)이라 일반적인 방법으로는 조회할 수 없다. 하지만 약간의 트릭을 사용하면 그 안의 내용을 직접 확인할 수 있다.
  • 카프카 로그 디렉토리를 살펴보면 __consumer_offsets가 수많은 파티션으로 나뉘어 있는 것을 볼 수 있다. 기본적으로 50개의 파티션을 가지는데, 이는 전 세계의 수많은 컨슈머 그룹이 보내는 오프셋 커밋 요청(Write)을 병렬로 처리하여 성능 저하를 막기 위함이다. 이제 이 토픽의 내용을 직접 읽어보자.

3.2.1. 임시 컨슈머 설정 파일 생성

내부 토픽 조회를 허용하는 설정 파일을 하나 만든다.

echo "exclude.internal.topics=false" > consumer_temp.config

3.2.2. kafka-console-consumer로 토픽 읽기

--consumer.config 옵션으로 방금 만든 설정 파일을 지정하고, 오프셋 정보를 예쁘게 볼 수 있도록 --formatter를 추가한다.

kafka-console-consumer --consumer.config /home/ubuntu/consumer_temp.config \
 --bootstrap-server localhost:9092 --topic __consumer_offsets \
 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"

이제 실제로 데이터가 오고 갈 때 __consumer_offsets 토픽과 컨슈머 애플리케이션 로그가 어떻게 변하는지 관찰해보자.

3.2.3. Producer: simple-topic에 메시지 전송

콘솔 프로듀서로 a부터 e까지 5개의 메시지를 보낸다.

$ kafka-console-producer --bootstrap-server localhost:9092 --topic simple-topic
>a
>b
>c
>d
>e

3.2.4. Consumer: Java 애플리케이션 실행

오프셋 정보를 로그로 출력하는 컨슈머 코드를 실행한다.

// ConsumerWakeup.java
// ...
while (true) {
    ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
        logger.info("record key={}, record value={}, partition={}, record offset={}",
                consumerRecord.key(), consumerRecord.value(), consumerRecord.partition(), consumerRecord.offset());
    }
}
// ...

3.2.5.관찰 결과

__consumer_offsets 토픽의 내용 오프셋 정보가 계속 업데이트되는 것을 볼 수 있다. [그룹ID, 토픽, 파티션] 형식으로 키가 구성되고, OffsetAndMetadata에 커밋된 오프셋 정보가 담긴다.

[group_01,simple-topic,0]::OffsetAndMetadata(offset=3, ...
[group_01,simple-topic,0]::OffsetAndMetadata(offset=4, ...
[group_01,simple-topic,0]::OffsetAndMetadata(offset=5, ...

 

Java 컨슈머 로그 컨슈머는 메시지를 성공적으로 가져와 로그를 출력한다.

[main] INFO com.example.ConsumerWakeup - record key=null, record value=d, partition=0, record offset=3
[main] INFO com.example.ConsumerWakeup - record key=null, record value=e, partition=0, record

 

여기서 흥미로운 점을 발견할 수 있다. 컨슈머 로그를 보면 offset=4인 메시지(e)까지 처리가 완료되었다. 그런데 __consumer_offsets 토픽에 최종적으로 기록된 오프셋은 offset=5다. 왜 숫자가 다를까?

이는 커밋된 오프셋이 "다음에 읽어야 할 메시지의 위치"를 의미하기 때문이다. 즉, 컨슈머는 "내가 4번 오프셋까지 처리를 완료했으니, 다음에는 5번 오프셋을 줘"라는 의미로 offset=5를 커밋하는 것이다. 이 간단한 원리를 이해하는 것이 컨슈머의 동작을 정확히 파악하는 핵심이다.


4. KafkaConsumer 의 주요 수행 개요

  • KafkaConsumer는 Fetcher, ConsumerClientNetwork등의 주요 내부 객체와 별도의 Heart Beat Thread를 생성 
  • Fetcher, ConsumerClientNetwork 객체는 Broker의 토픽 파티션에서 메시지를 Fetch 및 Poll 수행
  • Heart Beat Thread는 Consumer의 정상적인 활동을 Group Coordinator에 보고하는 역할을 수행(Group Coordinator는 주어진 시간동안 Heart Beat을 받지 못하면 Consumer들의 Rebalance를 수행 명령)

5. Java Consumer Client API 처리 로직 개요

[1] Consumer 환경 설정(Properties 객체를 이용) - 반드시 group.id 추가해야
[2] 1에서 설정한 환경 설정값을 반영하여 KafkaConsumer 객체 생성.
[3] 읽어들일 Topic을 subscribe()를 호출하여 설정
[4] 지속적으로 poll() 을 호출하여 Topic의 새로운 메시지를 계속 읽어 들임.
[5] KafkaConsumer객체를 명확하게 close() 수행

6. poll() 메서드, 그 내부를 들여다보다

 

KafkaConsumer.poll()은 단순히 메시지를 가져오는 것 이상의 복잡한 내부 동작을 포함한다. poll()이 호출되면 Fetcher, ConsumerNetworkClient와 같은 내부 객체들이 유기적으로 움직이며, 별도의 하트비트 스레드(Heartbeat Thread)가 동작을 시작한다.

6.1. 주요 내부 구성 요소

  • Fetcher: 브로커로부터 데이터를 가져오는 실질적인 역할을 한다.
  • ConsumerNetworkClient: Fetcher의 요청을 받아 브로커와 실제 네트워크 통신을 수행한다.
  • ConsumerCoordinator: 그룹 내 컨슈머들을 관리하고 파티션을 재조정(Rebalance)하는 역할을 담당한다.
  • Heartbeat Thread: "나 아직 살아있어!"라는 신호를 주기적으로 그룹 코디네이터에게 보낸다. 만약 코디네이터가 일정 시간 동안 이 신호를 받지 못하면 해당 컨슈머에 문제가 생겼다고 판단하고, 남아있는 다른 컨슈머들에게 파티션을 재할당하는 리밸런싱을 실행한다.

6.2. poll()의 데이터 처리 흐름

poll()의 동작은 내부에 있는 LinkedBlockingQueue라는 데이터 버퍼를 중심으로 이루어진다.

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));

 

poll(1000)이 호출되었을 때의 흐름을 따라가 보자.

  1. poll()이 호출되면, Fetcher는 먼저 내부 큐(LinkedBlockingQueue)에 데이터가 있는지 확인한다.
  2. (데이터가 있을 경우): Fetcher는 큐에서 데이터를 즉시 꺼내 사용자에게 반환하고 poll()은 바로 종료된다. 이 과정에서 ConsumerNetworkClient는 백그라운드에서 계속해서 브로커로부터 다음 데이터를 가져와 큐에 채워 넣는다.
  3. (데이터가 없을 경우): Fetcher는 ConsumerNetworkClient에게 브로커로부터 데이터를 가져오라고 요청한다. ConsumerNetworkClient는 브로커에 새로운 데이터가 들어올 때까지 poll()에 지정된 시간(여기서는 1000ms)만큼 대기(리스닝)한다.
  4. 만약 대기 시간 안에 브로커에 새 데이터가 도착하면, ConsumerNetworkClient는 즉시 데이터를 가져와 큐에 넣고, Fetcher가 이 데이터를 사용자에게 반환한다. 만약 1000ms가 다 지날 때까지 데이터가 도착하지 않으면, poll()은 빈 데이터를 반환하고 종료된다.

7. 똑똑하게 데이터 가져오기: Fetch 옵션 정복하기

컨슈머는 브로커로부터 얼마나 많은 데이터를, 어떤 주기로 가져올지 세부적인 옵션을 통해 제어할 수 있다. 이는 시스템의 부하와 응답성을 조절하는 중요한 설정이다.

7.1. Fetcher 관련 주요 설정

  • fetch.min.bytes (기본값: 1): poll() 요청에 대해 브로커가 응답하기 위해 기다리는 데이터의 최소량. 브로커는 이만큼의 데이터가 쌓이거나, fetch.max.wait.ms 시간이 될 때까지 응답을 지연시킨다.
  • fetch.max.wait.ms (기본값: 500ms): fetch.min.bytes를 만족시키지 못하더라도, 브로커가 응답을 보내기까지 기다리는 최대 시간.
  • fetch.max.bytes (기본값: 50MB): poll() 한 번에 가져올 수 있는 데이터의 최대 크기.
  • max.partition.fetch.bytes (기본값: 1MB): 파티션 하나당 한 번에 가져올 수 있는 데이터의 최대 크기.
  • max.poll.records (기본값: 500): poll() 한 번에 반환되는 레코드의 최대 개수.
Q. poll(1000)과 fetch.max.wait.ms=500은 뭐가 다른가?
A. 둘은 서로 다른 레이어에서 동작한다. poll(1000)은 컨슈머 관점에서 데이터가 내부 큐에 없을 때, 데이터가 도착할 때까지 최대 1000ms를 기다리겠다는 의미다. 반면, fetch.max.wait.ms는 브로커 관점에서 fetch.min.bytes 만큼의 데이터가 쌓이지 않았을 때, 응답을 최대 500ms까지 지연시키겠다는 의미다. 즉, 네트워크 효율을 위한 브로커의 배치(batch) 대기 시간이다.

7.2. 시나리오로 이해하는 Fetch 동작

가정을 통해 Fetcher가 어떻게 동작하는지 알아보자.

  • fetch.min.bytes = 16KB
  • fetch.max.wait.ms = 500ms
  • max.partition.fetch.bytes = 1MB

시나리오 1: 실시간 데이터 처리 (최신 offset을 따라잡은 경우)

프로듀서가 메시지를 계속 보내고 있고, 컨슈머는 거의 실시간으로 데이터를 처리하고 있다. 이때 poll()이 호출되면, 브로커는 fetch.min.bytes(16KB)만큼 데이터가 쌓일 때까지 기다린다. 만약 500ms가 지나도 16KB가 채워지지 않고 5KB만 쌓였다면, 브로커는 더 이상 기다리지 않고 5KB의 데이터만 컨슈머에게 보낸다. 반대로, 500ms가 되기 전에 16KB가 쌓이면 즉시 응답한다. 이는 불필요한 네트워크 요청을 줄여 효율을 높이는 방식이다.

 

시나리오 2: 과거 데이터 처리 (따라잡아야 할 offset이 많은 경우)

컨슈머가 오랫동안 중지되었다가 다시 시작되어 처리해야 할 과거 데이터가 수백만 건 쌓여있다. 이 경우 fetch.min.bytes는 거의 항상 충족되므로 의미가 없다. 대신 컨슈머는 한 번의 poll() 요청으로 최대한 많은 데이터를 가져와야 효율적이다. 이때는 max.partition.fetch.bytes(1MB) 설정이 동작한다. 컨슈머는 각 파티션으로부터 최대 1MB 단위로 데이터를 '싹 긁어오는' 방식으로 빠르게 과거 데이터를 따라잡는다.


8. Group Coordinator와 Rebalancing

Kafka Consumer Group은 살아있는 유기체처럼 동작한다. 새로운 멤버가 합류하고, 기존 멤버가 떠나기도 하며, 처리해야 할 일(파티션)이 늘어나기도 한다. 이 모든 변화에 맞춰 각 멤버의 역할을 재조정하는 과정이 바로 리밸런싱(Rebalancing)이다. 그리고 이 복잡한 과정을 지휘하는 오케스트라의 지휘자가 바로 그룹 코디네이터(Group Coordinator)다.

8.1. 그룹 코디네이터는 누구인가?

그룹 코디네이터는 카프카 브로커 중 하나로, 특정 컨슈머 그룹의 상태를 관리하고 리밸런싱을 총괄하는 책임을 진다. 각 컨슈머 그룹은 해시 함수를 통해 자신을 담당할 그룹 코디네이터를 배정받는다. 컨슈머들은 이 코디네이터에게 하트비트(heartbeat)를 보내 자신의 상태를 알리고, 코디네이터의 지시에 따라 움직인다.

8.2. 리밸런싱은 언제 일어나는가?

 

리밸런싱은 컨슈머 그룹에 변화가 생겼을 때 발생하는 "일시 정지" 이벤트다. 이 기간 동안 그룹의 모든 컨슈머는 메시지 처리를 중단하고 새로운 파티션 할당을 기다린다. 리밸런싱이 발생하는 주요 원인은 다음과 같다.

  1. 컨슈머의 변화: 그룹에 새로운 컨슈머가 추가되거나 기존 컨슈머가 정상적으로 종료될 때.
  2. 장애 발생: 컨슈머가 비정상적으로 종료되거나, 네트워크 문제 등으로 코디네이터에게 하트비트를 보내지 못해 세션 타임아웃이 발생할 때.
  3. 토픽의 변화: 구독 중인 토픽에 새로운 파티션이 추가될 때.

8.3. 리밸런싱 동작 흐름

 

Consumer의 Leader는 Group Coordinator가 설정해주고, Consumer Group 내에서 각 Consumer가 어떤 Partition 담당할지는 Leader Consumer가 파티션 할당 전략에 따라 파티션을 할당해준다.

 

Consumer #1만 있을 때는 Partition 3개 모두 Consumer #1이 담당한다. 이후에 Consumer #2가 들어오면, Group Coordinator한테 "저는 Consumer #2입니다. Join 받아주세요" 라고 요청을 하게 되고, 이에 대해 허락을 해준다. Consumer #2가 그룹에 참여하면 리밸런싱이 시작된다. 그룹 코디네이터는 리더 컨슈머(예: Consumer #1)에게 파티션 재할당 계획을 요청한다. 리더는 계획을 세워 코디네이터에게 제출하고, 코디네이터가 이 계획에 따라 각 컨슈머에게 담당할 파티션을 전달해 준다.

 

그러다 만약 Consumer #2가 종료되면, Heartbeat가 오지 않기 때문에 Group Coordinator가 바로 인지하여 Rebalancing 수행을 지시한다.

 

Group에는 각 상태(Consumer Group Status)가 존재한다. 아래 사진을 통해 empty, rebalance, stable 상태를 확인해보자.


 

9. Consumer Static Group Membership

 컨슈머 그룹의 리밸런싱은 안정적인 데이터 처리를 위한 필수 기능이지만, 때로는 비효율의 주범이 되기도 한다. 컨슈머가 30~40개씩 있는 대규모 그룹을 생각해보자. 여기서 단 하나의 컨슈머를 유지보수 차원에서 잠깐 재시작했을 뿐인데, 그룹의 모든 컨슈머가 하던 일을 멈추고 대규모 리밸런싱에 참여해야 한다. 이런 불필요한 "전체 멈춤"은 데이터 처리 지연(Lag)을 유발하는 골칫거리다. 이 문제를 해결하기 위해 등장한 것이 바로 정적 그룹 멤버십(Static Group Membership)이다.

9.1. Static Membership은 왜 필요한가?

기존의 동적 멤버십(Dynamic Membership)에서는 컨슈머가 그룹을 떠났다가 다시 합류하면, 그룹 코디네이터는 이를 완전히 새로운 멤버로 인식한다. 따라서 멤버가 떠날 때 한 번, 다시 합류할 때 또 한 번, 총 두 번의 리밸런싱이 발생한다.

 

하지만 Static Membership을 사용하면 각 컨슈머에게 고유한 ID 카드(group.instance.id)를 발급해 줄 수 있다. 덕분에 컨슈머가 잠시 그룹을 떠났다가 돌아와도, 코디네이터는 "아, 아까 그 친구구나" 하고 알아보고 기존에 하던 일을 그대로 다시 맡긴다. 즉, 불필요한 리밸런싱을 건너뛸 수 있게 된다.

9.2. 동작 방식: 기다림의 미학

tatic Membership의 핵심은 "기다림"이다.

  • 동적 그룹: 컨슈머가 떠나면 즉시 리밸런싱이 시작된다.
  • 정적 그룹: 컨슈머가 떠나면, 그룹 코디네이터는 리밸런싱을 바로 시작하지 않고 session.timeout.ms 동안 기다린다.

 

위 그림의 시나리오를 통해 동작 흐름을 살펴보자.

  1. group.instance.id=3을 가진 Consumer #3이 정상적으로 또는 비정상적으로 종료되었다.
  2. 그룹 코디네이터는 즉시 리밸런싱을 실행하지 않는다. 그 결과, Consumer #3이 담당하던 Partition #3은 일시적으로 아무에게도 할당되지 않은 상태가 된다. Consumer #1과 #2는 자신의 파티션을 계속 처리할 뿐, Partition #3의 일을 넘겨받지 않는다.
  3. 여기서 두 가지 시나리오로 나뉜다.
    • (성공) Consumer #3이 session.timeout.ms(보통 45초 이상으로 설정) 안에 재시작한다. 코디네이터는 ID를 확인하고 Partition #3을 다시 할당해준다. 리밸런싱은 일어나지 않았고, 모든 것이 평화롭게 해결된다.
    • (실패) Consumer #3이 session.timeout.ms가 지나도 돌아오지 않는다. 코디네이터는 그제서야 "이 친구는 정말 떠났구나"라고 판단하고, 남아있는 Consumer #1, #2를 대상으로 리밸런싱을 실행하여 Partition #3을 재할당한다.

9.3. 주요 설정

Static Membership을 활성화하려면 컨슈머 설정에 group.instance.id를 추가하기만 하면 된다. 이 ID는 그룹 내에서 유일해야 한다.

Properties props = new Properties();
// ... other properties
props.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "my-static-consumer-01");

 

이 기능을 효과적으로 사용하려면 session.timeout.ms 값을 기본값(45초)보다 넉넉하게 설정하는 것이 좋다. 애플리케이션이 재시작되거나, 쿠버네티스(Kubernetes) 환경에서 포드(Pod)가 다시 스케줄링되는 데 걸리는 시간을 충분히 고려하여, 불필요한 리밸런싱이 발생하지 않도록 시간을 벌어주는 것이 핵심이다.

자세한 내용은 [12] Kafka: Consumer 4. Static Group Membership 실습을 확인하자.

 

10. Heartbeat Thread: Consumer의 생명 신호

컨슈머 애플리케이션을 Ctrl+C로 종료하면 wakeup() 메서드가 호출되어 그룹 코디네이터가 즉시 알아차리고 리밸런싱을 시작한다. 하지만 세상일이 항상 계획대로 되지는 않는다. 애플리케이션 프로세스는 살아있는데, 내부 로직의 문제로 컨슈머가 아무 일도 하지 않고 멈춰버리는, 이른바 '좀비 컨슈머' 상태가 될 수 있다.

 

이런 좀비 컨슈머는 메시지를 소비하지 못하고 계속 쌓이게 만들어 전체 시스템에 장애를 유발한다. 그룹 코디네이터는 어떻게 이런 '살아있는 시체'를 감지하고 그룹에서 퇴출시킬까? 그 해답이 바로 하트비트 스레드(Heartbeat Thread)에 있다.

 

10.1. 하트비트란 무엇인가?

하트비트 스레드는 컨슈머의 메인 스레드와는 별개로 동작하는 백그라운드 스레드다. 이 스레드의 유일한 임무는 그룹 코디네이터에게 주기적으로 "나 아직 건강하게 살아있어!" 라는 생명 신호를 보내는 것이다.

 

만약 코디네이터가 이 신호를 정해진 시간 동안 받지 못하면, "이 컨슈머에게 문제가 생겼구나"라고 판단하고 해당 컨슈머를 그룹에서 강제로 제외시킨 뒤 리밸런싱을 실행하여 다른 컨슈머에게 일을 재분배한다.

10.2. 동작 플로우

아래 사진처럼 하트비트 스레드는 kafka한테 주기적으로 계속 하트비트를 보낸다.

 

10.3. Heart beat와 poll( ) 관련 주요 Consumer 파라미터

그룹 코디네이터는 컨슈머의 건강 상태를 확인하기 위해 두 가지 주요한 시간제한 장치를 사용한다. 이 둘의 차이를 이해하는 것이 장애 없는 컨슈머를 만드는 핵심이다

Consumer 파라미터명  기본값(ms) 기본값(ms) 설명
heartbeat.interval.ms 3000 Heart Beat Thread가 Heart Beat을 보내는 간격.
session.timeout.ms보다 낮게 설정되어야 함.
보통 session.timeout.ms의 1/3 이하로 설정 권장.
session.timeout.ms 45000 브로커가 Consumer로부터 Heart Beat을 기다리는 최대 시간.
이 시간 동안 Heart Beat을 받지 못하면 해당 Consumer를 Group에서 제외하고 rebalance 명령을 실행.
max.poll.interval.ms 300000 이전 poll() 호출 후 다음 poll() 호출까지 브로커가 기다리는 시간.
이 시간 동안 poll() 호출이 없으면 해당 Consumer에 문제가 있다고 판단하여 rebalance 명령을 실행.
[ 시나리오로 이해하는 세 가지 타임아웃 ]

이 세 가지 파라미터는 서로 다른 역할을 하는 두 명의 감시자와 같다.

  • 감시자 A (하트비트 & 세션 타임아웃): "너 살아는 있니?"
    • 역할: 컨슈머 애플리케이션 자체가 살아있는지, 네트워크 연결은 정상인지 감시한다.
    • 스토리: 컨슈머는 3초(heartbeat.interval.ms)마다 코디네이터에게 "살아있음!" 하고 문자를 보낸다. 코디네이터는 45초(session.timeout.ms) 동안 이 문자가 오지 않으면, "아, 이 친구는 전원이 꺼졌거나 통신이 끊겼구나"라고 판단하고 그룹에서 퇴출시킨다. 이 감시자는 컨슈머가 무슨 일을 하는지에는 관심이 없다. 오직 생존 신호에만 집중한다.

  • 감시자 B (폴 간격 타임아웃): "너 일은 하고 있니?"
    • 역할: 컨슈머가 메시지를 가져와서 처리하는 '실제 작업'을 제대로 수행하고 있는지 감시한다.
    • 스토리: 컨슈머는 poll()을 호출할 때마다 코디네이터에게 "이제 새 일감(메시지) 받으러 왔습니다!"라고 보고하는 것과 같다. 코디네이터는 이 보고가 5분(max.poll.interval.ms) 동안 없으면, "이 친구, 살아는 있는데 무한 루프에 빠졌거나 DB 작업이 길어져서 멈춰있구나. 일할 능력이 없으니 퇴출시켜야겠다"고 판단하고 리밸런싱을 시작한다. 이 감시자는 생존 여부와 상관없이 '업무 처리 능력'을 감시한다.

10.4. Consumer에서 읽은 데이터 처리 시 유의 사항

컨슈머 개발 시 가장 주의해야 할 점은 poll()로 가져온 데이터를 처리하는 로직이 max.poll.interval.ms를 넘기지 않도록 설계하는 것이다. 즉, poll() 루프 안의 무거운 작업을 최소화 해야 한다.

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        // 이 작업이 5분(기본값)을 초과하면 리밸런싱이 발생한다!
        heavyDatabaseJob(record.value());
        externalApiCall(record.value());
    }
}

 

만약 poll()로 가져온 데이터를 처리하는 데 시간이 오래 걸린다면, 아무리 하트비트를 잘 보내고 있어도 코디네이터는 "이 컨슈머는 일을 안 한다"고 판단하여 그룹에서 제외시켜 버린다. 이 경우, 처리 중이던 메시지는 다른 컨슈머에게 다시 할당되어 중복 처리될 위험이 있다.

 

따라서 poll() 루프 안에서는 데이터를 빠르게 내부 큐에 넣고, 실제 무거운 작업은 별도의 스레드 풀에서 비동기적으로 처리하는 아키텍처를 고려해야 한다.

 


11. Rebalancing 전략: Eager vs Cooperative

컨슈머 그룹의 안정성을 보장하는 리밸런싱. 하지만 이 과정은 때로 시스템 전체의 성능을 저하시키는 '전체 멈춤'을 유발하기도 한다. 카프카는 이러한 리밸런싱을 더 효율적으로 처리하기 위해 두 가지 다른 방식의 프로토콜을 제공한다. 바로 Eager와 Cooperative다.

11.1. Rebalancing , 다시 한번 짚고 넘어가기

리밸런싱이 정확히 언제 발생하는지 다시 한번 정리해보자. 다음 네 가지 상황 중 하나라도 발생하면 그룹 코디네이터는 리밸런싱을 지시한다.

  • 그룹 구성원 변경: 새로운 컨슈머가 추가되거나, 기존 컨슈머가 close()를 호출하며 정상적으로 종료될 때.
  • 토픽 파티션 변경: 구독 중인 토픽의 파티션 개수가 늘어날 때.
  • 세션 타임아웃: 컨슈머의 장애나 네트워크 문제로 session.timeout.ms 이내에 하트비트가 도착하지 않을 때.
  • 폴링 타임아웃: poll()로 가져온 데이터 처리 시간이 너무 길어져 max.poll.interval.ms 이내에 다음 poll()이 호출되지 않을 때.

11.2. Consumer Rebalancing Protocol

리밸런싱이 시작되면, 컨슈머들은 어떤 방식으로 파티션을 재할당받게 될까? 카프카는 두 가지 프로토콜을 제공하며, partition.assignment.strategy 설정을 통해 선택할 수 있다.

11.2.1. Eager 모드: "다 비켜! 처음부터 다시 시작한다!"

 

Eager(열성적인, 조급한)라는 이름처럼, 이 방식은 매우 급진적으로 동작한다.

  • 동작 방식: 리밸런싱이 발생하면, 그룹 내 모든 컨슈머는 자신이 담당하던 모든 파티션의 소유권을 즉시 포기하고 일시적으로 작업을 중단한다. 마치 "전원 작업 중지!" 명령이 내려진 것과 같다. 이후 그룹 리더가 모든 파티션을 처음부터 다시 할당하고, 컨슈머들은 새로운 임무를 받아 작업을 재개한다.
  • 단점: 이 "전체 멈춤(Stop-the-world)" 시간 동안에는 어떤 컨슈머도 메시지를 처리할 수 없으므로, 데이터 처리 지연(Lag)이 크게 발생할 수 있다. 특히 컨슈머가 많은 대규모 그룹에서는 이 중단 시간이 길어져 시스템에 부담을 줄 수 있다.
  • 해당 전략: Range, RoundRobin, Sticky 파티션 할당 전략이 이 모드로 동작한다.

11.2.1. (Incremental) Cooperative 모드: "각자 할 일 하면서, 바뀐 것만 처리하자!"

 

Cooperative(협력적인) 모드는 Eager의 단점을 보완하기 위해 도입된 더 세련된 방식이다.

  • 동작 방식: 리밸런싱이 발생해도, 모든 컨슈머가 작업을 중단하지 않는다. 대신, 변경이 필요한 파티션만 기존 담당자로부터 회수하고, 새로운 담당자에게 점진적으로(Incremental) 재할당한다. 다른 컨슈머들은 리밸런싱의 영향을 받지 않고 자신의 파티션을 계속 처리한다.
  • 장점: 전체 멈춤이 없으므로 리밸런싱으로 인한 서비스 중단 시간을 최소화할 수 있다. 대규모 컨슈머 그룹에서 일부 컨슈머의 재시작이나 추가가 빈번하게 발생할 때 매우 효과적이다.
  • 해당 전략: CooperativeSticky 파티션 할당 전략이 이 모드로 동작한다.

12. Consumer Partition 할당 전략

컨슈머 그룹의 진정한 힘은 '분산 처리'에 있다. 여러 컨슈머가 토픽의 파티션들을 나눠 가짐으로써 막대한 양의 데이터를 병렬로 처리할 수 있다. 그렇다면, 어떤 컨슈머가 어떤 파티션을 가져갈지는 어떻게 결정될까? 이 중요한 임무를 수행하는 것이 바로 파티션 할당 전략(Partition Assignment Strategy)이다. `partition.assignment.strategy` 설정을 통해 선택할 수 있는 네 가지 주요 전략을 알아보자.

12.1. Range 할당 (Default)

 

Range 전략은 각 토픽을 기준으로 파티션을 순서대로 나열한 뒤, 컨슈머들에게 균등하게 범위를 할당한다.

  • 동작 방식: 예를 들어, 파티션이 6개인 TopicA와 4개인 TopicB를 2개의 컨슈머가 구독한다고 가정해보자.
    • TopicA: Partition 0-2는 Consumer #1에게, Partition 3-5는 Consumer #2에게 할당된다.
    • TopicB: Partition 0-1은 Consumer #1에게, Partition 2-3은 Consumer #2에게 할당된다.
  • 특징: 이 방식은 결과적으로 여러 토픽에서 같은 ID를 가진 파티션들이 같은 컨슈머에게 할당될 가능성이 높아진다.
  • 장점: 만약 TopicA와 TopicB가 동일한 키(예: user_id)를 기준으로 파티셔닝(Co-partitioning)되어 있다면, 특정 user_id에 대한 데이터는 항상 같은 컨슈머에게 모이게 된다. 이는 두 토픽의 데이터를 조인(Join)하거나 연관 분석을 할 때, 불필요한 네트워크 통신 없이 메모리 내에서 효율적으로 처리할 수 있게 해주는 강력한 장점이다.
partition.assignment.strategy 설정 없이 Consumer 코드를 실행시키면 아래와 같이 Deafult로 Range가 할당된다.

12.2. Round Robin 할당

 

Round Robin 전략은 모든 토픽과 파티션을 하나의 긴 줄로 세운 뒤, 컨슈머들에게 카드 돌리듯 순서대로 하나씩 할당한다.

  • 동작 방식: TopicA(P0, P1, P2)와 TopicB(P0, P1)를 2개의 컨슈머가 구독한다면, 할당은 다음과 같이 이루어진다.
    • TopicA-P0 -> Consumer #1
    • TopicA-P1 -> Consumer #2
    • TopicA-P2 -> Consumer #1
    • TopicB-P0 -> Consumer #2
    • TopicB-P1 -> Consumer #1
  • 특징: 토픽에 상관없이 파티션을 순차적으로 분배하므로, 컨슈머들이 담당하는 파티션의 개수가 거의 균등하게 맞춰진다.
  • 단점: 리밸런싱이 발생하면 기존의 할당 내역이 완전히 무시되고 처음부터 다시 카드를 돌리기 때문에, 컨슈머와 파티션의 매핑 관계가 크게 변경될 수 있다.

12.3.Sticky 할당

- 최초에 할당된 파티션과 Consumer 매핑을 Rebalance 수행되어도 가급적 그대로 유지할 수 있도록 지원하는 전략
- 하지만 Eager Protocol 기반이므로 Rebalance 시 모든 Consumer의 파티션 매핑이 해제된 후에 다시 매핑되는 형태

 

 

Sticky(끈끈한) 전략은 리밸런싱이 발생하더라도 기존의 할당을 최대한 유지하려는 똑똑한 방식이다.

  • 동작 방식: 리밸런싱 시, 꼭 재할당이 필요한 최소한의 파티션만 이동시키고, 나머지 할당은 그대로 유지한다. 이를 통해 파티션의 대규모 이동을 막아 리밸런싱의 부하를 줄인다.
  • 한계: 이 전략은 Eager 프로토콜 기반에서 동작한다. 즉, 리밸런싱이 시작되면 일단 모든 컨슈머가 작업을 멈추고 파티션 소유권을 전부 반납했다가, Sticky 전략에 따라 다시 재할당받는 "전체 멈춤" 과정은 피할 수 없다.

12.4. Cooperative(협력적) Sticky 할당 (Sticky 할당의 개선 버전)

 

Cooperative Sticky는 Sticky 전략의 장점은 그대로 가져오면서, Eager 프로토콜의 단점을 극복한 가장 진보된 방식이다.

  • 동작 방식: Sticky 전략처럼 기존 할당을 최대한 유지하려 노력한다. 결정적인 차이점은 Cooperative 프로토콜 위에서 동작한다는 것이다.
  • 장점: 리밸런싱이 발생해도 "전체 멈춤"이 없다. 재할당이 필요한 파티션과 관련된 컨슈머들만 점진적으로 리밸런싱에 참여하고, 나머지 컨슈머들은 아무런 영향 없이 계속해서 메시지를 처리한다. 이는 서비스 중단을 최소화하는 가장 이상적인 리밸런싱 방식이다.


13. Kafka Consumer 고급 기술: 특정 파티션 할당과 오프셋 제어 (assign & seek)

대부분의 경우, 우리는 컨슈머 그룹의 리밸런싱 기능에 모든 것을 맡겨두는 것이 좋다. 카프카가 알아서 파티션을 분배하고, 장애가 나면 재조정까지 해주니 편리하다. 하지만 때로는 이런 자동화된 방식에서 벗어나 특정 파티션의 데이터를 수동으로 제어해야 하는 특별한 상황이 생긴다. 이럴 때 사용하는 두 가지 강력한 무기가 바로 assign()과 seek()다.

13.1. assign() - 내 파티션은 내가 정한다

 

subscribe()가 "이 토픽을 구독할테니, 그룹 내에서 적절한 파티션을 자동으로 할당해줘"라는 의미라면, assign()은 "리밸런싱은 신경쓰지 말고, 내가 지정하는 이 파티션만 읽을게" 라고 직접 선언하는 방식이다.

즉, assign()을 사용하는 순간 해당 컨슈머는 컨슈머 그룹의 자동 리밸런싱 멤버에서 제외된다.

  • 언제 사용할까?: 주로 특정 키를 가진 데이터가 모여있는 파티션을 항상 동일한 컨슈머가 처리해야 하는 상태 기반(Stateful) 처리나 배치(Batch) 작업에 유용하다. 일반적인 마이크로서비스 환경에서는 잘 사용되지 않는다.
  • 사용법: TopicPartition 객체로 원하는 토픽과 파티션 번호를 지정한 뒤, assign() 메서드의 인자로 넘겨준다.
String topicName = "my-topic";
// my-topic의 0번 파티션을 지정
TopicPartition topicPartition = new TopicPartition(topicName, 0);

// 0번 파티션만 구독하도록 수동 할당
kafkaConsumer.assign(List.of(topicPartition));

13.2. seek()  - 시간 여행을 떠나는 법

 

seek()는 컨슈머의 읽기 위치, 즉 오프셋(Offset)을 강제로 이동시키는 강력한 기능이다. 이를 통해 이미 처리한 데이터를 다시 읽거나, 특정 지점까지 건너뛸 수 있다.

  • 언제 사용할까?: 주로 유지보수 목적으로 사용된다. 예를 들어, 데이터 처리 로직에 버그가 발견되어 특정 구간의 메시지를 재처리해야 할 때, seek()를 사용해 오프셋을 과거로 되돌릴 수 있다.
  • 사용법: 먼저 assign()으로 파티션을 수동 할당한 뒤, seek() 메서드로 이동하고 싶은 오프셋 번호를 지정한다.
String topicName = "my-topic";
TopicPartition topicPartition = new TopicPartition(topicName, 1);

// 1번 파티션을 수동 할당
kafkaConsumer.assign(List.of(topicPartition));

// 1번 파티션의 6번 오프셋으로 읽기 위치 이동
kafkaConsumer.seek(topicPartition, 6L);
⚠️ 주의! seek와 커밋의 위험한 만남
seek()는 매우 유용하지만, 운영 중인 컨슈머 그룹과 함께 사용할 때는 치명적인 실수를 유발할 수 있다.

만약 재처리 작업을 위해 임시 컨슈머를 띄우면서, 운영 중인 컨슈머와 동일한 group.id를 사용했다고 가정해보자. 이 임시 컨슈머가 seek()로 오프셋을 과거로 돌려 데이터를 처리하고 커밋을 하면, __consumer_offsets에 기록된 그룹의 공식적인 오프셋 정보가 과거 시점으로 덮어씌워진다.

그 결과, 아무것도 모르고 열심히 일하던 운영 컨슈머들이 갑자기 리밸런싱 후 과거로 돌아가 이미 처리했던 수많은 데이터를 중복으로 처리하는 대참사가 발생할 수 있다. 따라서 seek를 사용할 때는 반드시 다음 원칙을 지켜야 한다.

재처리용 컨슈머는 반드시 새로운 임시 group.id를 사용한다.또는, enable.auto.commit=false로 설정하고 절대 커밋을 하지 않도록 하여 기존 그룹의 오프셋에 영향을 주지 않도록 한다.

 


14. Offset Commit과 신뢰성의 위협: 중복과 유실 시나리오

위에서 offset에 대한 기본적인 내용에 대해 다루어봤다. 이번에는 Offset Commit 방식에 따른 신뢰성 문제에 대해 알아보자. offset commit 시점을 잘못 제어하면 데이터 정합성에 심각한 문제가 생길 수 있다.

14.1. 중복(duplicate) 읽기 상황

 

가장 흔하게 발생하는 시나리오로, "처리했지만, 커밋하지 못한" 상황이다.

  1. Poll: poll()을 통해 오프셋 5~8번 메시지를 가져온다.
  2. 처리: 가져온 메시지를 DB에 성공적으로 저장한다.
  3. 장애: DB 저장은 끝났지만, 오프셋 8을 커밋하기 직전에 컨슈머가 비정상 종료된다.
  4. 리밸런싱: 다른 컨슈머가 이 파티션을 할당받는다.
  5. 재처리: 새로운 컨슈머는 마지막 커밋 위치인 5번부터 다시 메시지를 읽기 시작한다. 결국 5~8번 메시지가 DB에 중복으로 저장될 수 있다.

이는 "최소 한 번 이상 처리"를 보장하는 카프카의 기본 특성이며, 중복 처리가 비즈니스에 치명적이라면 애플리케이션 단에서 멱등성(Idempotence)을 확보하는 로직이 반드시 필요하다.

💡 이러한 중복은 카프카 시스템 자체의 문제라기보다는, 메시지 처리는 완료했으나 커밋에 실패한 채 컨슈머 애플리케이션이 비정상적으로 종료되는 경우에 주로 발생한다. 이는 분산 시스템에서 데이터 유실을 막기 위한 자연스러운 현상이며, "최소 한 번 이상 처리(At-least-once)"를 보장하는 카프카의 기본 특성이다.

 

14.2. 데이터 유실 (At-Most-Once)

 

훨씬 더 위험한 시나리오로, "처리하기 전에, 먼저 커밋한" 상황이다.

  1. Poll: poll()을 통해 오프셋 7, 8번 메시지를 가져온다.
  2. 선 커밋: 로직상 실수로, 메시지를 처리하기도 전에 다음 오프셋인 9를 먼저 커밋해버린다.
  3. 장애: 오프셋 7번 메시지를 처리하던 중 컨슈머가 비정상 종료된다.
  4. 리밸런싱: 다른 컨슈머가 이 파티션을 할당받는다.
  5. 데이터 유실: 새로운 컨슈머는 마지막 커밋 위치인 9번부터 메시지를 읽기 시작한다. 결국 아무도 처리하지 못한 7, 8번 메시지는 영원히 유실된다.

이런 실수는 피해야 하며, 메시지 처리가 완벽히 끝난 후에 커밋하는 것이 철칙이다.

 


15. 오프셋 커밋 전략 선택하기

카프카는 자동과 수동, 두 가지 커밋 방식을 제공한다.

  • Auto Offset: 사용자가 명시적으로 코드로 commi을 기술하지 않아도 Consumer가 자 동으로 지정된 기간마다 commi을 수행
  • Manual Offset: 사용자가 명시적으로 commit을 기술. Sync/Async 방식이 있음
Kafka의 기본 Commit 전략은 Auto Commit(자동 커밋)이다. 하지만 실무에서는 수동 커밋 중 commit Sync가 80% 이상, commit Async가 15% 이상을 차지한다. 

15.1. 자동 커밋 (Auto Commit)

 

enable.auto.commit=true로 설정하면, poll()이 호출될 때마다 auto.commit.interval.ms(기본 5초) 간격으로 자동으로 커밋이 수행된다.

  • 장점: 코드가 매우 단순해진다.
  • 위험성: 커밋 시점을 내가 제어할 수 없으므로 데이터 중복의 가능성이 항상 존재한다. 예를 들어, 첫 번째 poll()로 가져온 데이터를 4초에 걸쳐 처리하고, 두 번째 poll()을 호출하는 순간 5초가 지나 자동 커밋이 발생할 수 있다. 만약 두 번째 poll()로 가져온 데이터를 처리하다 장애가 발생하면, 이미 커밋된 첫 번째 데이터까지 다시 처리하게 된다.

15.2. 수동 커밋 (Manual Commit)

enable.auto.commit=false로 설정하고, 코드에서 명시적으로 commitSync() 또는 commitAsync()를 호출한다.

15.2.1. 동기 커밋 (commitSync)

가장 안전한 방식. commitSync()는 브로커로부터 커밋이 성공했다는 응답을 받을 때까지 코드를 멈춘다(Blocking). 커밋이 확실히 보장되지만, 응답을 기다리는 시간만큼 처리량이 감소한다. 실패 시 자동으로 재시도한다.

15.2.1. 비동기 커밋 (commitAsync): 

가장 빠른 방식. commitAsync()는 커밋 요청만 보내고 응답을 기다리지 않고(Non-blocking) 바로 다음 작업을 수행한다. 처리량은 높지만, 커밋이 실패해도 재시도를 하지 않으므로 데이터 중복의 가능성이 있다. 보통 마지막 종료 직전에는 commitSync를 호출하여 확실한 마무리를 하는 것이 좋다.

[ Consumer의 Manual 동기/비동기 Commit 구현 방법 ]
• enable.auto.commit = false로 Consumer Property 설정
• 동기 commit은 KafkaConsumer의 commitSync( )메소드를 호출
• 비동기 commit은 KafkaConsumer의 commitAsync( )메소드를 호출
구분 자동 커밋 (Auto)  수동-동기 (Manual Sync)  수동-비동기 (Manual Async)
데이터 정확성 ⚠️ 나쁨 (중복 발생 가능성 높음) ✅ 매우 좋음 (가장 안전함) ⚠️ 보통 (가끔 중복 발생 가능)
처리 속도 ✅ 매우 빠름 ❌ 느림 ✅ 매우 빠름
실무 사용처 🚫 거의 사용하지 않음(단순 모니터링, 로그 수집 등데이터 소실/중복이 큰 문제 안 되는 경우) ✅ 많이 사용함(금융 거래, 주문 처리 등데이터 1건의 오차도 허용하지 않는 경우) ✅ 가장 많이 사용함(대부분의 비즈니스 로직,빠른 속도가 필요하고약간의 중복은 처리 가능한 경우)

16. ✅ Broker <-> Consumer 동작 흐름 

카프카 컨슈머가 브로커로부터 메시지를 가져와 처리하는 내부 동작 과정의 핵심은 애플리케이션의 메인 스레드(Main Thread)와 컨슈머 내부에서 독립적으로 동작하는 백그라운드 스레드(Heartbeat, Fetch 등)의 명확한 역할 분담에 있다.

16.1. 준비 과정: 그룹 가입과 파티션 할당

컨슈머 애플리케이션이 시작되어 메시지를 가져오기 전, 가장 먼저 '어떤 파티션의 메시지를 가져올 것인가'를 할당받는 과정이 일어난다.

  1. subscribe() 호출 (by Main Thread) 개발자가 작성한 메인 스레드의 코드에서 구독할 토픽을 지정하여 kafkaConsumer.subscribe()를 호출한다.
  2. 그룹 코디네이터 탐색 (Discover Group Coordinator) 컨슈머는 클러스터 내의 브로커 중 하나에게 자신이 속한 컨슈머 그룹을 관리하는 '그룹 코디네이터' 브로커가 누구인지 물어보고 그 주소를 알아낸다.
  3. 그룹 가입 요청 및 리밸런싱 (Join Group & Rebalancing) 컨슈머는 그룹 코디네이터에게 JoinGroup 요청을 보내 그룹 참여 의사를 밝힌다. 그룹에 새로운 컨슈머가 들어오거나 기존 컨슈머가 나가는 등 변경이 생기면, 코디네이터는 리밸런싱(Rebalancing)을 발동시켜 각 컨슈머에게 어떤 파티션을 담당할지 재분배한다. 이 과정이 끝나면 각 컨슈머는 자신이 소비해야 할 파티션 목록을 할당받게 된다.

16.2. 메인 스레드의 핵심 역할: poll() 루프

파티션 할당이 완료되면, 메인 스레드는 보통 무한 루프 안에서 poll() 메소드를 반복적으로 호출하며 실제 메시지 처리를 담당한다.

  1. poll() 호출 (by Main Thread) 메인 스레드는 kafkaConsumer.poll()을 호출하여 브로커로부터 메시지를 가져오려고 시도한다.
  2. 데이터 인출 및 역직렬화 (Fetch & Deserialization) poll()이 호출되면, 컨슈머는 (정확히는 백그라운드의 Fetch 스레드가 미리 가져다 놓은) 내부 버퍼에서 메시지를 가져온다. 이때 브로커로부터 받은 바이트 배열(byte array) 형태의 메시지를 설정된 역직렬화기(Deserializer)를 사용하여 원래의 데이터 객체(예: String, JSON)로 변환한다.
  3. 메시지 처리 (Process Records) poll() 메소드는 역직렬화된 메시지들, 즉 ConsumerRecord의 묶음을 반환한다. 이제 개발자가 작성한 비즈니스 로직에 따라 이 레코드들을 하나씩 처리하는 과정이 수행된다.
  4. 오프셋 커밋 (Offset Commit) 메시지 처리가 완료되면, "여기까지 메시지를 성공적으로 처리했다"는 것을 브로커에 기록해야 다음번에 중복 처리하는 것을 막을 수 있다. 이 기록을 오프셋 커밋이라고 하며, 처리 방식에 따라 자동 또는 수동으로 이루어진다.

16.3. 백그라운드 작업자: 하트비트와 페치 스레드

메인 스레드가 poll()을 통해 메시지를 처리하는 동안, 컨슈머 내부의 백그라운드 스레드들은 다음과 같은 중요한 임무를 수행한다.

  1. 하트비트 스레드 (Heartbeat Thread) 이 스레드는 그룹 코디네이터에게 주기적으로 "나 아직 살아있다"는 신호, 즉 하트비트(Heartbeat)를 보낸다. 만약 session.timeout.ms 설정 시간 동안 하트비트가 도착하지 않으면, 코디네이터는 해당 컨슈머에 문제가 생겼다고 판단하고 그룹에서 제외시킨 후 리밸런싱을 트리거한다.
  2. 페치 스레드 (Fetch Thread) 메인 스레드가 poll()을 호출할 때마다 매번 네트워크를 통해 브로커에 데이터를 요청하면 비효율적이다. 페치 스레드는 메인 스레드가 메시지를 처리하는 동안 미리 브로커에 접속하여 할당된 파티션으로부터 메시지를 가져와 내부 버퍼에 채워 넣는 역할을 한다. 덕분에 poll()은 네트워크 지연 없이 버퍼에서 빠르게 데이터를 가져올 수 있다.

16.4. 오프셋 커밋 방식

오프셋을 커밋하는 방식은 컨슈머의 메시지 처리 보장 수준을 결정하는 매우 중요한 요소이다.

16.4.1. 자동 커밋 (Auto Commit): enable.auto.commit=true

Auto Commit

 

이것은 "신경 쓰지 않아도 알아서 커밋해주는 방식"이다.

  1. poll() 호출 후 자동 커밋 메인 스레드가 poll()을 호출하면, 카프카 컨슈머 라이브러리는 이전 poll()에서 반환했던 메시지 중 가장 높은 오프셋을 기준으로 커밋을 준비한다.
  2. 주기적인 커밋 실행 auto.commit.interval.ms 설정값에 따라 정해진 시간 간격으로 백그라운드에서 자동으로 오프셋 커밋을 브로커에 전송한다. 이 과정은 메인 스레드의 메시지 처리 로직과 완전히 비동기적으로 동작하는 것이다.
  • 특징: 사용하기 매우 편리하지만, 데이터 유실이나 중복의 위험이 있다. 예를 들어, poll()로 메시지를 가져와 처리하는 도중에 자동 커밋이 먼저 실행되고, 그 직후 애플리케이션이 비정상 종료되면, 해당 메시지는 커밋된 것으로 간주되어 다음 실행 시 유실될 수 있다. (최소 한 번 이상 처리 보장, At-least-once)

16.4.2. 수동 커밋 (Manual Commit): enable.auto.commit=false

이것은 "개발자가 원하는 정확한 시점에 커밋을 명령하는 방식"이다.

 

1) 동기 커밋 (Sync Commit): commitSync()

Manual Commit (SYNC)

  1. commitSync() 호출 (by Main Thread) 메인 스레드에서 메시지 처리를 완전히 마친 후, kafkaConsumer.commitSync()를 명시적으로 호출한다.
  2. 메인 스레드 멈춤 (Blocking) commitSync()가 호출되면 메인 스레드는 브로커로부터 "오프셋 커밋을 성공적으로 받았다"는 응답이 올 때까지 모든 작업을 멈추고 대기한다. 만약 커밋이 일시적인 네트워크 문제 등으로 실패하면, 성공할 때까지 자동으로 재시도한다.
  3. 메인 스레드 작업 재개 브로커로부터 성공 응답을 받아야 비로소 메인 스레드는 다음 코드를 실행한다.
  • 특징: 오프셋이 확실하게 커밋된 것을 보장하므로 데이터 처리 신뢰도가 가장 높다. 하지만 커밋이 완료될 때까지 메인 스레드가 멈추기 때문에 전체적인 처리량은 감소한다.

2) 비동기 커밋 (Async Commit): commitAsync()

  1. commitAsync() 호출 (by Main Thread) 메인 스레드에서 kafkaConsumer.commitAsync()를 호출한다. 콜백 함수를 함께 전달하여 커밋 성공/실패 시의 후속 조치를 정의할 수 있다.
  2. 메인 스레드 즉시 다음 작업 수행 (Non-Blocking) commitAsync()는 브로커에게 커밋 요청을 보낸 뒤, 응답을 기다리지 않고 메인 스레드가 즉시 다음 작업을 수행하도록 제어권을 넘긴다.
  3. 결과 처리 (by I/O Thread) 추후 브로커로부터 커밋 응답이 오면, 백그라운드의 I/O 스레드가 이를 받아 지정된 콜백 함수를 실행하여 결과를 처리한다.
  • 특징: 메인 스레드가 멈추지 않아 동기 커밋 방식보다 높은 처리량을 가진다. 다만, commitSync()와 달리 실패 시 자동 재시도를 하지 않으며, 비동기 요청의 순서가 꼬일 경우(예: offset 100 커밋 요청 후 재시도 중에 offset 110 커밋이 먼저 성공하는 경우) 문제가 발생할 수 있다.

'Kafka > Core' 카테고리의 다른 글

[ADVANCED #5][실습] Producer & Consumer 연동 프로젝트: 파일 기반 주문 데이터를 DB로 저장하기  (0) 2025.09.11
[ADVANCED #4][실습] Consumer 완전 정복: 기본 컨슈머부터 안전한 종료, 커밋 전략까지  (0) 2025.09.09
[ADVANCED #2][실습] Producer 완전 정복: 기초부터 고급 설정  (0) 2025.09.08
[ADVANCED #1] Producer 심층 분석: 내부 동작 원리와 고급 설정  (0) 2025.09.07
[BASIC #8] Kafka: Java 클라이언트 구현 환경 구축  (0) 2025.09.07
'Kafka/Core' 카테고리의 다른 글
  • [ADVANCED #5][실습] Producer & Consumer 연동 프로젝트: 파일 기반 주문 데이터를 DB로 저장하기
  • [ADVANCED #4][실습] Consumer 완전 정복: 기본 컨슈머부터 안전한 종료, 커밋 전략까지
  • [ADVANCED #2][실습] Producer 완전 정복: 기초부터 고급 설정
  • [ADVANCED #1] Producer 심층 분석: 내부 동작 원리와 고급 설정
h6bro
h6bro
백엔드 개발자의 기술 블로그
  • h6bro
    Jun's Tech Blog
    h6bro
  • 전체
    오늘
    어제
    • 분류 전체보기 (250) N
      • Java (18)
        • Core (9)
        • Design Pattern (9)
      • Spring (80)
        • Core (24)
        • MVC (6)
        • DB (10)
        • JPA (26)
        • Monitoring (3)
        • Security (11)
        • WebSocket (0)
      • Database (33)
        • Redis (15)
        • MySQL (18)
      • MSA (25) N
        • MSA 기본 (11)
        • MSA 아키텍처 (14) N
      • Kafka (30)
        • Core (18)
        • Connect (12)
      • ElasticSearch (11)
        • Search (11)
        • Logging (0)
      • Test (4)
        • k6 (4)
      • Docker (9)
      • CI&CD (10)
        • GitHub Actions (6)
        • ArgoCD (4)
      • Kubernetes (18)
        • Core (12)
        • Ops (6)
      • Cloud Engineering (4)
        • AWS Infrastructure (3)
        • AWS EKS (1)
        • Terraform (0)
      • Project (8)
        • LinkFolio (1)
        • Secondhand Market (7)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

    • Cloud Engineering 포스팅 정리
  • 인기 글

  • 태그

    ㅈ
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.5
h6bro
[ADVANCED #3] Consumer 심층 분석: 그룹 코디네이터부터 리밸런싱 전략까지
상단으로

티스토리툴바