[ADVANCED #4][실습] Consumer 완전 정복: 기본 컨슈머부터 안전한 종료, 커밋 전략까지

2025. 9. 9. 12:30·Kafka/Core

1. 기본 Consumer 생성

SimpleConsumer.java

더보기
더보기
더보기
package com.example;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.List;
import java.util.Properties;

public class SimpleConsumer {

    public static final Logger logger = LoggerFactory.getLogger(SimpleConsumer.class.getName());

    public static void main(String[] args) {

        String topicName = "simple-topic";

        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group_01");

        // kafka consumer 객체 생성
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

        // subscribe
        kafkaConsumer.subscribe(List.of(topicName));

        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                logger.info("record key={}, record value={}, partition={}",
                        consumerRecord.key(), consumerRecord.value(), consumerRecord.partition());
            }
        }

//        kafkaConsumer.close();
    }

}

[1] 환경 설정 (Properties)

컨슈머에 필요한 옵션들을 설정한다. bootstrap.servers, key.deserializer, value.deserializer와 함께, 가장 중요한 group.id를 반드시 지정해야 한다.

Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

 

[2] KafkaConsumer 객체 생성

설정한 Properties 객체를 사용하여 KafkaConsumer 인스턴스를 생성한다.

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

 

[3] 토픽 구독 (Subscribe)

subscribe()를 호출하여 읽어올 토픽을 지정한다. 여러 토픽을 구독할 수도 있다.

consumer.subscribe(Arrays.asList("pizza-topic"));

 

[4] 메시지 폴링 (Poll)

무한 루프 안에서 poll()을 지속적으로 호출하여 새로운 메시지를 가져와 처리한다.

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        logger.info("Received message: " + record.value());
    }
}

 

[5] 컨슈머 종료 (Close)

애플리케이션이 종료될 때는 close()를 호출하여 컨슈머가 그룹에서 정상적으로 떠나고 리소스를 정리하도록 해야 한다. 이는 다른 컨슈머들의 불필요한 리밸런싱을 방지한다.

consumer.close();

 

근데 막상 consumer.close();를 해보면 Unreachable statement 오류가 발생한다. 당연히 while (true)에 의해 무한 루프에 빠지면서 바깥에 있는 코드가 실행될 수 없기 때문이다. 컨슈머 애플리케이션은 보통 while(true) 무한 루프 안에서 poll()을 호출하며 동작한다. 그런데 이 무한 루프, 어떻게 중지시켜야 할까? Ctrl+C로 그냥 프로세스를 죽여버리면 될까? 그렇게 하면 컨슈머가 그룹 코디네이터에게 제대로 된 작별 인사도 없이 사라지게 된다. 그룹 코디네이터는 해당 컨슈머가 죽었다는 사실을 세션 타임아웃이 발생할 때까지 알지 못하고, 그동안 불필요한 리밸런싱 지연이 발생할 수 있다.

 


2. Consumer의 close()

[1] 문제는 poll()의 대기 상태

컨슈머는 poll() 메서드를 호출하면, 데이터가 없거나 타임아웃이 될 때까지 스레드가 대기 상태(Blocking)에 빠진다. 이 상태에서는 외부에서 종료 신호를 보내도 즉시 반응하기 어렵다. 무한 루프를 빠져나오기 위한 별도의 장치가 필요한 이유다. 카프카는 이 문제를 해결하기 위해 wakeup()이라는 특별한 메서드를 제공한다.

  • kafkaConsumer.wakeup(): 다른 스레드에서 이 메서드를 호출하면, 대기 중이던 poll() 메서드는 즉시 WakeupException을 던지며 깨어난다.

즉, wakeup()은 잠들어 있는 poll()을 강제로 깨워서 루프를 탈출시키는 알람 시계 같은 역할을 한다.

 

[2] Shutdown Hook과 wakeup()의 콜라보

그렇다면 wakeup()은 어느 시점에 호출해야 할까? 가장 좋은 방법은 Shutdown Hook을 사용하는 것이다. Shutdown Hook은 JVM이 종료 신호(예: Ctrl+C)를 받았을 때, 종료되기 직전에 실행할 코드를 등록하는 기능이다. 아래 코드는 Shutdown Hook을 이용해 wakeup()을 호출하는 전형적인 패턴을 보여준다.

package com.example;

// ... imports

public class ConsumerWakeup {

    public static final Logger logger = LoggerFactory.getLogger(ConsumerWakeup.class.getName());

    public static void main(String[] args) {

        // ... 컨슈머 설정 ...
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
        kafkaConsumer.subscribe(List.of("simple-topic"));

        // [1] 현재 실행 중인 메인 스레드를 가져온다.
        Thread mainThread = Thread.currentThread();

        // [2] Shutdown Hook 등록
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            logger.info("main program starts to exit by calling wakeup");
            // poll()을 깨우기 위해 wakeup() 호출
            kafkaConsumer.wakeup();

            // 메인 스레드가 모든 정리를 마칠 때까지 기다린다.
            try {
                mainThread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));

        try {
            // [3] 무한 루프 시작
            while (true) {
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    logger.info("record key={}, record value={}, partition={}",
                            consumerRecord.key(), consumerRecord.value(), consumerRecord.partition());
                }
            }
        } catch (WakeupException e) {
            // [4] wakeup()이 호출되면 이 예외가 발생한다.
            logger.error("wakeup exception has been called");
        } finally {
            // [5] 루프를 빠져나온 후, 안전하게 컨슈머를 닫는다.
            logger.info("finally consumer is closing");
            kafkaConsumer.close();
        }
    }
}

 

[3] 코드 흐름 분석

  1. 메인 스레드 저장: Shutdown Hook은 별도의 스레드에서 실행되므로, 나중에 메인 스레드가 종료될 때까지 기다리기 위해 현재 스레드(mainThread)를 변수에 저장해 둔다.
  2. Shutdown Hook 등록: Ctrl+C와 같은 종료 신호가 들어오면 실행될 로직을 등록한다. 이 로직의 핵심은 kafkaConsumer.wakeup()을 호출하는 것이다.
  3. 무한 루프: 컨슈머는 평소처럼 poll()을 호출하며 메시지를 처리한다.
  4. WakeupException 처리: 사용자가 Ctrl+C를 누르면 Shutdown Hook이 동작하여 wakeup()을 호출하고, poll()은 즉시 WakeupException을 던진다. catch 블록이 이 예외를 잡으면서 무한 루프를 정상적으로 탈출하게 된다.
  5. 자원 정리: finally 블록에서 kafkaConsumer.close()를 호출한다. 이 과정에서 컨슈머는 자신이 처리하던 메시지의 마지막 오프셋을 커밋하고, 그룹 코디네이터에게 "나 이제 떠난다"는 신호를 보낸다. 덕분에 그룹은 즉시 리밸런싱을 시작할 수 있다.

mainThread.join()은 Shutdown Hook 스레드가 메인 스레드의 finally 블록 실행이 모두 끝날 때까지 기다리게 하는 역할을 한다. 이를 통해 모든 정리 작업이 완료된 후 프로그램이 완전히 종료되도록 보장한다.

 

이처럼 wakeup()과 Shutdown Hook을 함께 사용하면, 예기치 않은 종료 상황에서도 데이터를 안전하게 커밋하고 다른 컨슈머들에게 영향을 최소화하며 시스템을 안정적으로 운영할 수 있다.

[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group_01-1, groupId=group_01] Revoke previously assigned partitions simple-topic-0
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group_01-1, groupId=group_01] Member consumer-group_01-1-f88fa422-2659-4676-aab3-12257a510131 sending LeaveGroup request to coordinator ubuntu-VirtualBox:9092 (id: 2147483647 rack: null) due to the consumer is being closed
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group_01-1, groupId=group_01] Resetting generation due to: consumer pro-actively leaving the group
[main] INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-group_01-1, groupId=group_01] Request joining group due to: consumer pro-actively leaving the group

 


3. 신규 Consumer 생성에 따른 Rebalance 실습

[1] CLI 환경에서 Intellij 환경으로 전환

Running Program 토글(우측 상단) > Edit Configurations > Modify options > Allow Multiple instances

 

[2] PizzaProducer.java 실행

더보기
더보기
더보기
public class PizzaProducer {

    public static final Logger logger = LoggerFactory.getLogger(PizzaProducer.class.getName());


    public static void sendPizzaMessage(KafkaProducer<String, String> kafkaProducer, String topicName, int iterCount,
                                        int interIntervalMillis, int intervalMillis, int intervalCount, Boolean sync) {

        PizzaMessage pizzaMessage = new PizzaMessage();
        int iterSeq = 0;

        // seed값을 고정하여 Random 객체와 Faker 객체를 생성.
        long seed = 2022;
        Random random = new Random(seed);
        Faker faker = Faker.instance(random);

        while (iterSeq != iterCount) {
            HashMap<String, String> pMessage = pizzaMessage.produce_msg(faker, random, iterSeq);
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, pMessage.get("key"), pMessage.get("message"));

            sendMessage(kafkaProducer, producerRecord, pMessage, sync);

            if ((intervalCount > 0) && (iterSeq % intervalCount == 0)) {
                try {
                    logger.info("###### intervalCount:" + intervalCount + " intervalMillis" + intervalMillis + "######");
                    Thread.sleep(intervalMillis);
                } catch (InterruptedException e) {
                    logger.error(e.getMessage());
                }
            }

            if (interIntervalMillis > 0) {
                try {
                    logger.info("interIntervalMillis: " + interIntervalMillis);
                    Thread.sleep(interIntervalMillis);
                } catch (InterruptedException e) {
                    logger.error(e.getMessage());
                }
            }
        }

    }

    // 동기, 비동기 분기처리
    public static void sendMessage(KafkaProducer<String, String> kafkaProducer, ProducerRecord<String, String> producerRecord,
                                   HashMap<String, String> pMessage, boolean sync) {

        if (!sync) { // ASYNC (비동기)
            kafkaProducer.send(producerRecord, (metadata, exception) -> {
                if (exception == null) {
                    logger.info("async message: " + pMessage.get("key") + "partition: " + metadata.partition() + "\n" +
                            "offset: " + metadata.offset());
                } else {
                    logger.error("exception error from broker" + exception.getMessage());
                }
            });
        } else { // SYNC (동기)
            try {
                RecordMetadata metadata = kafkaProducer.send(producerRecord).get();
                logger.info("async message: " + pMessage.get("key") + "partition: " + metadata.partition() + "\n" +
                        "offset: " + metadata.offset());
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }

    }


    public static void main(String[] args) {

        String topicName = "pizza-topic";

        Properties props = new Properties();

        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092"); // [1] bootstrap.servers | WHY server"s"? -> Cause we can use multi brokers(servers)
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [2] key.serializer.class
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [3] value.serializer.class
//        props.setProperty(ProducerConfig.ACKS_CONFIG, "0"); // [4] props 설정
//        props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "32000"); // [5] Batch Size 설정
//        props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20"); // [6] linger.ms 설정
//        props.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "50000");


        // Create KafkaProducer Object
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        sendPizzaMessage(kafkaProducer, topicName,
                -1, 500, 0, 0, true);

        kafkaProducer.close();

    }

}

 

[3] ConsumerWakeup.java 실행

더보기
더보기
더보기
public class ConsumerWakeup {

    public static final Logger logger = LoggerFactory.getLogger(ConsumerWakeup.class.getName());

    public static void main(String[] args) {

        String topicName = "pizza-topic";

        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group_01");
//        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        // kafka consumer 객체 생성
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

        // subscribe
        kafkaConsumer.subscribe(List.of(topicName));

        // Main Thread 선언
        Thread mainThread = Thread.currentThread();

        // Runtime.getRuntime().addShutdownHook: Main Thread 종료 전 실행
        // kafkaConsumer.wakeup(): poll 시점에 Exception 발생 용도
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                logger.info("main program starts to exit by calling wakeup");
                kafkaConsumer.wakeup();

                // Main Thread 죽을 때 같이 죽어야 해서 대기 상태로 놔야함
                try {
                    mainThread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    logger.info("record key={}, partition={}, record offset={}, record value={}",
                            consumerRecord.key(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.value());
                }
            }
        } catch (WakeupException e) {
            logger.error("wakeup exception has been called");
        } finally {
            logger.info("finally consumer is closing");
            kafkaConsumer.close();
        }
    }

}

Consumer가 1개라서 3개의 모든 파티션으로부터 가져온다.

 

[4] Kafka server log 확인

[2025-09-09 19:17:50,179] INFO [GroupCoordinator 0]: Dynamic member with unknown member id joins group group_01 in Empty state. Created a new member id consumer-group_01-1-34f73a17-83d8-43bd-a849-b854ece9f943 and request the member to   rejoin with this id. (kafka.coordinator.group.GroupCoordinator)
[2025-09-09 19:17:50,186] INFO [GroupCoordinator 0]: Preparing to rebalance group group_01 in state PreparingRebalance with old generation 19 (__consumer_offsets-45) (reason: Adding new member consumer-group_01-1-34f73a17-83d8-43bd-a8  49-b854ece9f943 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2025-09-09 19:17:50,188] INFO [GroupCoordinator 0]: Stabilized group group_01 generation 20 (__consumer_offsets-45) with 1 members (kafka.coordinator.group.GroupCoordinator)
[2025-09-09 19:17:50,196] INFO [GroupCoordinator 0]: Assignment received from leader consumer-group_01-1-34f73a17-83d8-43bd-a849-b854ece9f943 for group group_01 for generation 20. The group has 1 members, 0 of which are static. (kafka  .coordinator.group.GroupCoordinator)
[2025-09-09 19:20:43,619] INFO [GroupMetadataManager brokerId=0] Group console-consumer-9977 transitioned to Dead in generation 2 (kafka.coordinator.group.GroupMetadataManager)

 

[5] Consumer 하나 더 실행

1) Consumer #1

 

2) Consumer #2

Consumer #1은 0번, 1번 파티션을 담당하고, Consumer #2는 2번 파티션을 담당하는 모습을 확인할 수 있다.

 


4. Static Group Membership 실습

[1] Kafka Topic(pizza-topic) 생성

ubuntu@ubuntu-VirtualBox:~$ kafka-topics --bootstrap-server localhost:9092 --delete --topic pizza-topic
ubuntu@ubuntu-VirtualBox:~$ kafka-topics --bootstrap-server localhost:9092 --create --topic pizza-topic --partitions 3
Created topic pizza-topic.

[2] ConsumerWakeup.java 실행 (세번 실행하되, GROUP_INSTANCE_ID_CONFIG 값을 실행마다 변경)

// 첫 번째 실행
public class ConsumerWakeup {

    public static final Logger logger = LoggerFactory.getLogger(ConsumerWakeup.class.getName());

    public static void main(String[] args) {

        String topicName = "pizza-topic";

        Properties props = new Properties();
        ...
        props.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "1");
		...
}
// 두 번째 실행
public class ConsumerWakeup {

    public static final Logger logger = LoggerFactory.getLogger(ConsumerWakeup.class.getName());

    public static void main(String[] args) {

        String topicName = "pizza-topic";

        Properties props = new Properties();
        ...
        props.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "2");
		...
}
// 세 번째 실행
public class ConsumerWakeup {

    public static final Logger logger = LoggerFactory.getLogger(ConsumerWakeup.class.getName());

    public static void main(String[] args) {

        String topicName = "pizza-topic";

        Properties props = new Properties();
        ...
        props.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "3");
		...
}

[3] Kafka CLI 확인

ubuntu@ubuntu-VirtualBox:~$ kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group group_01_static
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                            HOST            CLIENT-ID
group_01_static pizza-topic     0          12              12              0               1-221d9d25-6ba9-4a18-aa70-09cbfb0d33d7 /192.168.56.1   consumer-group_01_static-1
group_01_static pizza-topic     2          6               6               0               3-9faca238-eb28-40e3-a923-d2d8c4bf844e /192.168.56.1   consumer-group_01_static-3
group_01_static pizza-topic     1          11              11              0               2-7bb9634e-8cb4-4d7c-96d1-7e3c8539ef08 /192.168.56.1   consumer-group_01_static-2

[4] Rebalance 확인

 

강제로 Consumer #3 (id가 3인 Consumer)를 중지한다. 이때 kafka-server에는 어떠한 로그도 발생하지 않는다. 이후 45초 안에 한번 더 실행한다. kafka-server에는 아래와 같은 로그가 출력된다. 요약하면 static member가 coordinator에게 join 요청을 해서 리밸런싱 없이 Consumer와 Partition이 원래처럼 연결되었다는 의미이다.

[2025-09-09 21:36:26,224] INFO [GroupCoordinator 0]: Static member with groupInstanceId=3 and unknown member id joins group group_01_static in Stable state. Replacing previously mapped member 3-9faca238-eb28-40e3-a923-d2d8c4bf844e with this groupInstanceId. (kafka.coordinator.group.GroupCoordinator)
[2025-09-09 21:36:26,228] INFO [GroupCoordinator 0]: Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance. (kafka.coordinator.group.GroupCoordinator)

5. Consumer에서 여러 개의 Topic 읽기 실습

지금까지는 하나의 Topic만 읽었음. 근데 Consumer는 여러개의 Topic을 읽을 수 있어야 함. Consumer의 역할 중 하나가 Topic 데이터 읽어서 Processing하는건데, 여러 개의 Topic에 있는 데이터를 함께 가공해서 다른 데이터 소스에 넣는다거나 필터링 하는 작업을 하기 때문에 결국에는 Producer도 여러 Topic에 데이터 넣을 수 있어야 하고 Consumer도 여러 Topic에 있는 데이터를 읽을 수 있어야 한다.

[1] Java 코드 작성

public class ConsumerMTopicRebalance {

    public static final Logger logger = LoggerFactory.getLogger(ConsumerMTopicRebalance.class.getName());

    public static void main(String[] args) {

        // properties 설정 ...

        // kafka consumer 객체 생성
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

        // subscribe
        kafkaConsumer.subscribe(List.of("topic-p3-t1", "topic-p3-t2")); // ✅ 멀티 토픽 설정

        // add shutdown 코드 ...

        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
                // ✅ 토픽 출력 (consumerRecord.topic())
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    logger.info("topic:{}, record key={}, partition={}, record offset={}, record value={}",
                            consumerRecord.topic(), consumerRecord.key(), consumerRecord.partition(), consumerRecord.offset(), consumerRecord.value());
                }
            }
        } 
        
        ...

}

[2] 토픽 생성

ubuntu@ubuntu-VirtualBox:~$ kafka-topics --bootstrap-server localhost:9092 --create --topic topic-p3-t1 --partitions 3
Created topic topic-p3-t1.
ubuntu@ubuntu-VirtualBox:~$ kafka-topics --bootstrap-server localhost:9092 --create --topic topic-p3-t2 --partitions 3
Created topic topic-p3-t2.

[3] 메시지 전송 후 결과 확인

1)topic-p3-t1

Kafka Producer Console
Java 로그

2) topic-p3-t2

Kafka Producer Console
Java 로그


6. 파티션 할당 전략 실습(1) - Range와 Round Robin 방식 할당 실습

[1] Range

1) ConsumerMTopicRebalance.java 코드 실행 (2번 실행)

package com.example;

ipmort ...

public class ConsumerMTopicRebalance {

    public static final Logger logger = LoggerFactory.getLogger(ConsumerMTopicRebalance.class.getName());

    public static void main(String[] args) {

        String topicName = "pizza-topic";

        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group_assign");

        // kafka consumer 객체 생성
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

        // subscribe
        kafkaConsumer.subscribe(List.of("topic-p3-t1", "topic-p3-t2"));

        ...
    }

}

 

2) Java 로그 확인

Consumer #1
Consumer #2

 

위에서 ConsumerMTopicRebalance.java의 코드에서 properties 설정을 보면 따로 파티셔닝 전략을 지정해주지 않았는데, 각 Consumer의 로그를 확인해보면 range로 설정되어 있는것을 확인할 수 있다. 또한 Consumer #1를 확인해보면 Consumer #2가 실행됨과 동시에 기존 파티셔닝에 대한 Revoke가 시행되어, Rebalancing을 수행한다. Consumer #2 또한 마찬가지로 Revoke가 시행되어 Rebalancing이 수행된다.

[2] Round Robin

1) ConsumerMTopicRebalance.java 코드 실행 (2번 실행)

package com.example;

ipmort ...

public class ConsumerMTopicRebalance {

    public static final Logger logger = LoggerFactory.getLogger(ConsumerMTopicRebalance.class.getName());

    public static void main(String[] args) {

        String topicName = "pizza-topic";

        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group_assign");
        
        // ✅✅파티션 할당 전략 명시적으로 Round Robin으로 설정✅✅
        props.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());

        // kafka consumer 객체 생성
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

        // subscribe
        kafkaConsumer.subscribe(List.of("topic-p3-t1", "topic-p3-t2"));

        ...
    }

}

partition.assignment.strategy가 Round Robin으로 설정됨

 

2) Java 로그 확인

Consumer #1
Consumer #2


7. 파티션 할당 전략 실습(2) - Cooperative Sticky 방식 할당 실습

1) ConsumerMTopicRebalance.java 코드 실행 (2번 실행)

package com.example;

ipmort ...

public class ConsumerMTopicRebalance {

    public static final Logger logger = LoggerFactory.getLogger(ConsumerMTopicRebalance.class.getName());

    public static void main(String[] args) {

        String topicName = "pizza-topic";

        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group_assign");
        
        // ✅✅파티션 할당 전략 명시적으로 cooperative sticky로 설정✅✅
        props.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());

        // kafka consumer 객체 생성
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

        // subscribe
        kafkaConsumer.subscribe(List.of("topic-p3-t1", "topic-p3-t2"));

        ...
    }

}

partition.assignment.strategy가 Cooperative Sticky로 설정됨

2) Java 로그 확인

Consumer #1 (Consumer #2 실행 전)
Consumer #1 (Consumer #2 실행 중)
Consumer #1 (Consumer #2 실행 후)
Consumer #2 (Consumer #2 실행 중)
Consumer #2 (Consumer #2 실행 후)

 

Cooperative Sticky는 이론에서 설명했듯이 모든 연결을 Revoke하지 않는다. 따라서 로그를 확인해보면 6개의 파티션을 revoke하는 것이 아니라 3개의 파티션만 revoke하는 것을 확인할 수 있다.

9.  Manual Commit 구현 실습 (Sync / Async)

package com.example.kafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class ConsumerCommit {

    public static final Logger logger = LoggerFactory.getLogger(ConsumerCommit.class.getName());

    public static void main(String[] args) {

        String topicName = "pizza-topic";

        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group_03");
        props.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());

        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");


        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
        kafkaConsumer.subscribe(List.of(topicName));

        //main thread
        Thread mainThread = Thread.currentThread();

        //main thread 종료시 별도의 thread로 KafkaConsumer wakeup()메소드를 호출하게 함.
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                logger.info(" main program starts to exit by calling wakeup");
                kafkaConsumer.wakeup();

                try {
                    mainThread.join();
                } catch(InterruptedException e) { e.printStackTrace();}
            }
        });

        //kafkaConsumer.close();
        //pollAutoCommit(kafkaConsumer);
        //pollCommitSync(kafkaConsumer);
        pollCommitAsync(kafkaConsumer);



    }

    private static void pollCommitAsync(KafkaConsumer<String, String> kafkaConsumer) {
        int loopCnt = 0;

        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
                logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count());
                for (ConsumerRecord record : consumerRecords) {
                    logger.info("record key:{},  partition:{}, record offset:{} record value:{}",
                            record.key(), record.partition(), record.offset(), record.value());
                }
                kafkaConsumer.commitAsync(new OffsetCommitCallback() {
                    @Override
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                        if(exception != null) {
                            logger.error("offsets {} is not completed, error:{}", offsets, exception.getMessage());
                        }
                    }
                });

            }
        }catch(WakeupException e) {
            logger.error("wakeup exception has been called");
        }catch(Exception e) {
            logger.error(e.getMessage());
        }finally {
            logger.info("##### commit sync before closing");
            kafkaConsumer.commitSync();
            logger.info("finally consumer is closing");
            kafkaConsumer.close();
        }

    }

    private static void pollCommitSync(KafkaConsumer<String, String> kafkaConsumer) {
        int loopCnt = 0;

        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
                logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count());
                for (ConsumerRecord record : consumerRecords) {
                    logger.info("record key:{},  partition:{}, record offset:{} record value:{}",
                            record.key(), record.partition(), record.offset(), record.value());
                }
                try {
                    if(consumerRecords.count() > 0 ) {
                        kafkaConsumer.commitSync();
                        logger.info("commit sync has been called");
                    }
                } catch(CommitFailedException e) {
                    logger.error(e.getMessage());
                }

            }
        }catch(WakeupException e) {
            logger.error("wakeup exception has been called");
        }catch(Exception e) {
            logger.error(e.getMessage());
        }finally {
            logger.info("finally consumer is closing");
            kafkaConsumer.close();
        }

    }


    public static void pollAutoCommit(KafkaConsumer<String, String> kafkaConsumer) {
        int loopCnt = 0;

        try {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
                logger.info(" ######## loopCnt: {} consumerRecords count:{}", loopCnt++, consumerRecords.count());
                for (ConsumerRecord record : consumerRecords) {
                    logger.info("record key:{},  partition:{}, record offset:{} record value:{}",
                            record.key(), record.partition(), record.offset(), record.value());
                }
                try {
                    logger.info("main thread is sleeping {} ms during while loop", 10000);
                    Thread.sleep(10000);
                }catch(InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }catch(WakeupException e) {            logger.error("wakeup exception has been called");
        }finally {
            logger.info("finally consumer is closing");
            kafkaConsumer.close();
        }

    }
}

 

 

'Kafka > Core' 카테고리의 다른 글

[ADVANCED #6] 클러스터 운영의 핵심: 리플리케이션, ISR, 그리고 리더 선출의 모든 것  (0) 2025.09.11
[ADVANCED #5][실습] Producer & Consumer 연동 프로젝트: 파일 기반 주문 데이터를 DB로 저장하기  (0) 2025.09.11
[ADVANCED #3] Consumer 심층 분석: 그룹 코디네이터부터 리밸런싱 전략까지  (0) 2025.09.09
[ADVANCED #2][실습] Producer 완전 정복: 기초부터 고급 설정  (0) 2025.09.08
[ADVANCED #1] Producer 심층 분석: 내부 동작 원리와 고급 설정  (0) 2025.09.07
'Kafka/Core' 카테고리의 다른 글
  • [ADVANCED #6] 클러스터 운영의 핵심: 리플리케이션, ISR, 그리고 리더 선출의 모든 것
  • [ADVANCED #5][실습] Producer & Consumer 연동 프로젝트: 파일 기반 주문 데이터를 DB로 저장하기
  • [ADVANCED #3] Consumer 심층 분석: 그룹 코디네이터부터 리밸런싱 전략까지
  • [ADVANCED #2][실습] Producer 완전 정복: 기초부터 고급 설정
h6bro
h6bro
백엔드 개발자의 기술 블로그
  • h6bro
    Jun's Tech Blog
    h6bro
  • 전체
    오늘
    어제
    • 분류 전체보기 (250)
      • Java (18)
        • Core (9)
        • Design Pattern (9)
      • Spring (80)
        • Core (24)
        • MVC (6)
        • DB (10)
        • JPA (26)
        • Monitoring (3)
        • Security (11)
        • WebSocket (0)
      • Database (33)
        • Redis (15)
        • MySQL (18)
      • MSA (25)
        • MSA 기본 (11)
        • MSA 아키텍처 (14)
      • Kafka (30)
        • Core (18)
        • Connect (12)
      • ElasticSearch (11)
        • Search (11)
        • Logging (0)
      • Test (4)
        • k6 (4)
      • Docker (9)
      • CI&CD (10)
        • GitHub Actions (6)
        • ArgoCD (4)
      • Kubernetes (18)
        • Core (12)
        • Ops (6)
      • Cloud Engineering (4)
        • AWS Infrastructure (3)
        • AWS EKS (1)
        • Terraform (0)
      • Project (8)
        • LinkFolio (1)
        • Secondhand Market (7)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

    • Cloud Engineering 포스팅 정리
  • 인기 글

  • 태그

    ㅈ
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.5
h6bro
[ADVANCED #4][실습] Consumer 완전 정복: 기본 컨슈머부터 안전한 종료, 커밋 전략까지
상단으로

티스토리툴바