[ADVANCED #2][실습] Producer 완전 정복: 기초부터 고급 설정

2025. 9. 8. 16:53·Kafka/Core
kafka-topics --bootstrap-server localhost:9092 --create --topic pizza-topic-partitioner --partitions 5

1. 기본 구현

[1] Kafka Topic 생성

kafka-topics --bootstrap-server localhost:9092 --create --topic simple-topic

[2] Java 코드 작성 및 실행

com/example/kafka/SimpleProducer.java

package com.example.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;


/**
 * If you wanna make KafkaProducer, ProducerRecord to Integer,
 * you have to declare serializer.class with IntegerSerializer.class
 */
// KafkaProducer Configuration Settings
public class SimpleProducer {
    public static void main(String[] args) {

        String topicName = "simple-topic";

        Properties props = new Properties();

        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092"); // [1] bootstrap.servers | WHY server"s"? -> Cause we can use multi brokers(servers)
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [2] key.serializer.class
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [3] value.serializer.class

        // Create KafkaProducer Object
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        // Create ProducerRecord Object
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, "id-001", "hello world 2");

        // Send KafkaProducer Message
        kafkaProducer.send(producerRecord);

        // Terminate KafkaProducer
        kafkaProducer.flush();
        kafkaProducer.close();
    }
}

[3] Kafka Topic 확인

kafka-console-consumer --bootstrap-server localhost:9092 --topic simple-topic --from-beginning


2. Producer와 브로커와의 메시지 동기화(SYNC)로 리팩터링

[1] Java 코드 작성 및 실행

package com.example.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.ExecutionException;


/**
 * If you wanna make KafkaProducer, ProducerRecord to Integer,
 * you have to declare serializer.class with IntegerSerializer.class
 */
// KafkaProducer Configuration Settings
public class SimpleProducerSync {

    public static final Logger logger = LoggerFactory.getLogger(SimpleProducerSync.class.getName());

    public static void main(String[] args) {

        String topicName = "simple-topic";

        Properties props = new Properties();

        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092"); // [1] bootstrap.servers | WHY server"s"? -> Cause we can use multi brokers(servers)
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [2] key.serializer.class
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [3] value.serializer.class

        // Create KafkaProducer Object
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        // Create ProducerRecord Object
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, "id-001", "hello world 2");

        // Send KafkaProducer Message
        try {
            RecordMetadata recordMetadata = kafkaProducer.send(producerRecord).get();
            logger.info("\n ###### record metadata received ##### \n" +
                    "partition: " + recordMetadata.partition() + "\n" +
                    "offset: " + recordMetadata.offset() + "\n" +
                    "timestamp: " + recordMetadata.timestamp());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        } finally {
            kafkaProducer.close();
        }

        // Terminate KafkaProducer
        kafkaProducer.flush();
        kafkaProducer.close();
    }
}

3. Producer와 브로커와의 메시지 비동기화(ASYNC)로 리팩터링

[1] Java 코드 작성 및 실행

package com.example.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.ExecutionException;


/**
 * If you wanna make KafkaProducer, ProducerRecord to Integer,
 * you have to declare serializer.class with IntegerSerializer.class
 */
// KafkaProducer Configuration Settings
public class SimpleProducerAsync {

    public static final Logger logger = LoggerFactory.getLogger(SimpleProducerAsync.class.getName());

    public static void main(String[] args) {

        String topicName = "simple-topic";

        Properties props = new Properties();

        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092"); // [1] bootstrap.servers | WHY server"s"? -> Cause we can use multi brokers(servers)
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [2] key.serializer.class
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [3] value.serializer.class

        // Create KafkaProducer Object
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        // Create ProducerRecord Object
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, "id-001", "hello world 2");

        // [1] Send KafkaProducer Message (Java Basic)
//        kafkaProducer.send(producerRecord, new Callback() {
//            @Override
//            public void onCompletion(RecordMetadata metadata, Exception exception) {
//                if (exception == null) {
//                    logger.info("\n ###### record metadata received ##### \n" +
//                            "partition: " + metadata.partition() + "\n" +
//                            "offset: " + metadata.offset() + "\n" +
//                            "timestamp: " + metadata.timestamp());
//                } else {
//                    logger.error("exception error from broker" + exception.getMessage());
//                }
//            }
//        });

        // [2] Send KafkaProducer Message (Java Lambda)
        kafkaProducer.send(producerRecord, (metadata, exception) -> {
            if (exception == null) {
                logger.info("\n ###### record metadata received ##### \n" +
                        "partition: " + metadata.partition() + "\n" +
                        "offset: " + metadata.offset() + "\n" +
                        "timestamp: " + metadata.timestamp());
            } else {
                logger.error("exception error from broker" + exception.getMessage());
            }
        });

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        kafkaProducer.close();

    }
}

4. Producer에서 다중 파티션(Partition) 환경에서의 키(Key)값을 가지는 메시지 전송

지금까지는 메시지 전송의 '결과'에 집중했다면, 이제는 메시지가 토픽 내에서 어디로 저장되는지, 즉 파티션(Partition)에 대해 알아본다. Kafka에서 메시지 Key는 단순히 데이터를 구분하는 식별자를 넘어, 메시지가 어떤 파티션에 저장될지 결정하는 매우 중요한 역할을 한다.

  • Key가 없는 경우: 메시지는 라운드 로빈(Round-Robin) 방식으로 여러 파티션에 순차적으로 분배된다.
  • Key가 있는 경우: Kafka는 Key의 해시(hash) 값을 계산하여 특정 파티션에 메시지를 할당한다. 동일한 Key를 가진 메시지는 항상 동일한 파티션에 저장되는 것이 보장된다.

이를 통해 특정 Key에 대한 메시지 처리 순서를 보장할 수 있다.

[1] 다중 파티션 토픽 생성

먼저, 실습을 위해 3개의 파티션을 가진 토픽을 생성한다.

kafka-topics --bootstrap-server localhost:9092 --create --topic multipart-topic --partitions 3

[2] Key를 사용한 메시지 전송 코드 작성 및 실행

이제 0부터 19까지 순차적으로 증가하는 Key를 가진 메시지 20개를 비동기 방식으로 전송하는 코드를 작성한다.

package com.example.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

// KafkaProducer Configuration Settings
public class ProducerAsyncWithKey {

    public static final Logger logger = LoggerFactory.getLogger(ProducerAsyncWithKey.class.getName());

    public static void main(String[] args) {

        String topicName = "multipart-topic";

        Properties props = new Properties();

        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // Create KafkaProducer Object
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        for (int seq = 0; seq < 20; seq++) {

            // Create ProducerRecord Object with a key
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, String.valueOf(seq), "hello world " + seq);

            // Send KafkaProducer Message (Java Lambda)
            kafkaProducer.send(producerRecord, (metadata, exception) -> {
                if (exception == null) {
                    logger.info("\n ###### record metadata received ##### \n" +
                            "partition: " + metadata.partition() + "\n" +
                            "offset: " + metadata.offset() + "\n" +
                            "timestamp: " + metadata.timestamp());
                } else {
                    logger.error("exception error from broker" + exception.getMessage());
                }
            });
        }
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        kafkaProducer.close();
    }
}

[3] 실행 로그 분석

코드를 실행하면 Producer의 설정값과 함께 각 메시지가 어느 파티션으로 전송되었는지 콜백 로그를 통해 확인할 수 있다.

Producer 설정 로그:

partitioner.class가 DefaultPartitioner로 설정되어 있음을 주목해야 한다. 이 기본 파티셔너가 바로 메시지 Key의 해시 값을 계산하여 파티션을 결정하는 역할을 한다. 또한 batch.size와 linger.ms 설정에 따라 프로듀서는 메시지를 모아 배치(batch) 단위로 전송한다.

partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
batch.size = 16384
linger.ms = 0

메시지 전송 결과 (Callback 로그):

로그를 자세히 보면, Key 값(0~19)에 따라 메시지들이 파티션 0, 1, 2에 골고루 분산되어 들어간 것을 확인할 수 있다. 이는 DefaultPartitioner가 각 Key의 해시 값을 계산하고, 그 결과를 총 파티션 개수(3)로 나눈 나머지를 통해 목적지 파티션을 정하기 때문이다.

[kafka-producer-network-thread | producer-1] INFO com.example.kafka.ProducerAsyncWithKey -
 ###### record metadata received #####
partition: 2
offset: 1
timestamp: 1757298130194
...
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.ProducerAsyncWithKey -
 ###### record metadata received #####
partition: 0
offset: 0
timestamp: 1757298130216
...
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.ProducerAsyncWithKey -
 ###### record metadata received #####
partition: 1
offset: 0
timestamp: 1757298130217
...

[4] Consumer를 통한 최종 확인

이제 컨슈머를 통해 토픽의 메시지를 확인해본다. Key와 Value를 함께 출력하기 위해 --property print.key=true 옵션을 추가한다.

kafka-console-consumer --bootstrap-server localhost:9092 --group group-01 --topic multipart-topic \
--property print.key=true --property print.value=true

[실행 결과]

0        hello world0
2        hello world2
3        hello world3
9        hello world9
...
1        hello world1
5        hello world5
...

가장 중요한 점은 컨슈머가 출력한 메시지의 Key 순서가 0, 1, 2, ... 19가 아니라는 것이다. 컨슈머는 3개의 파티션으로부터 동시에 메시지를 가져오기 때문에, 전체 메시지의 순서는 보장되지 않는다. 이 결과는 메시지 Key에 따라 데이터가 실제로 여러 파티션에 분산 저장되었음을 명확하게 보여준다.


5. Producer에서 키(Key) 타입 변경 및 Custom Callback 구현

이전 단계에서는 메시지 Key를 사용해 데이터를 여러 파티션에 분산했지만, 컨슈머에서 확인했을 때 전체 메시지의 순서가 보장되지 않는다는 점을 확인했다. 또한, 람다(Lambda) 콜백의 로그만으로는 어떤 Key를 가진 메시지가 어느 파티션으로 들어갔는지 정확히 추적하기 어려웠다. 이번 단계에서는 두 가지 개선 작업을 진행한다.

  1. 메시지 Key의 타입을 String에서 Integer로 변경한다.
  2. 재사용 가능하고 더 상세한 로그를 남길 수 있는 별도의 CustomCallback 클래스를 구현한다

[1] 메시지 Key 타입을 Integer로 변경하기

메시지 Key 타입을 String에서 Integer로 변경하기 위해서는 프로듀서의 코드 몇 군데를 수정해야 한다.

  • Key Serializer 변경: ProducerConfig에서 Key 직렬화 방식을 StringSerializer가 아닌 IntegerSerializer로 지정해야 한다.
  • 제네릭 타입 변경: KafkaProducer와 ProducerRecord 객체를 생성할 때 사용한 제네릭 타입을 <String, String>에서 <Integer, String>으로 변경해야 한다.
// ProducerAsyncWithKey.java를 기반으로 수정

// ...
// [1] Key Serializer를 IntegerSerializer로 변경
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// [2] KafkaProducer의 제네릭 타입을 <Integer, String>으로 변경
KafkaProducer<Integer, String> kafkaProducer = new KafkaProducer<>(props);

for (int seq = 0; seq < 20; seq++) {
    // [3] ProducerRecord의 제네릭 타입과 Key 타입을 Integer로 변경
    ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(topicName, seq, "hello world" + seq);
    // ...
}
// ...

[2] Consumer에서 Key Deserializer 설정하기

프로듀서가 Key를 Integer 타입으로 직렬화해서 보냈으므로, 컨슈머 또한 Key를 Integer 타입으로 역직렬화(Deserializer)해야 올바르게 읽을 수 있다. kafka-console-consumer는 기본적으로 Key와 Value를 모두 String으로 처리하기 때문에, 다음과 같이 --key-deserializer 옵션을 명시적으로 추가해야 한다.

kafka-console-consumer --bootstrap-server localhost:9092 --group group-01 --topic multipart-topic \
--property print.key=true --property print.value=true \
--key-deserializer "org.apache.kafka.common.serialization.IntegerDeserializer"

[3] Custom Callback 클래스 구현하기

람다 콜백은 간결하지만, 콜백이 실행되는 시점에는 for 루프의 seq 변수에 접근할 수 없어 "어떤 메시지(seq)가 성공했는지" 로그로 남기기 어렵다. 이 문제를 해결하기 위해, Callback 인터페이스를 구현하는 별도의 클래스를 만든다.

CustomCallback 클래스는 생성자를 통해 메시지의 seq 번호를 멤버 변수로 저장한다. 덕분에 콜백의 onCompletion 메서드가 호출될 때, 저장해 둔 seq 번호와 브로커로부터 받은 메타데이터(파티션, 오프셋)를 함께 출력할 수 있다.

package com.example.kafka;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CustomCallback implements Callback {

    public static final Logger logger = LoggerFactory.getLogger(CustomCallback.class.getName());

    private int seq;

    public CustomCallback(int seq) {
        this.seq = seq;
    }

    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {

        if (exception == null) {
            logger.info("seq:{} partition:{} offset:{}", this.seq, metadata.partition(), metadata.offset());
        } else {
            logger.error("exception error from broker" + exception.getMessage());
        }

    };
}

[4] Custom Callback을 적용한 최종 Producer 코드

이제 send() 메서드에 람다 대신 new CustomCallback(seq) 인스턴스를 전달하도록 Producer 코드를 리팩토링한다.

package com.example.kafka;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;

public class ProducerAsyncCustomCallback {

    public static final Logger logger = LoggerFactory.getLogger(ProducerAsyncCustomCallback.class.getName());

    public static void main(String[] args) {
        // ... (Properties 설정은 동일) ...
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());


        KafkaProducer<Integer, String> kafkaProducer = new KafkaProducer<>(props);

        for (int seq = 0; seq < 20; seq++) {
            ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>("multipart-topic", seq, "hello world" + seq);
            Callback callback = new CustomCallback(seq);

            // 람다 대신 CustomCallback 인스턴스를 전달
            kafkaProducer.send(producerRecord, callback);
        }

        // ... (Thread.sleep 및 close는 동일) ...
         try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        kafkaProducer.close();
    }
}

[5] 로그 결과 확인

코드를 실행하면, CustomCallback 덕분에 어떤 seq(Key)가 어느 파티션과 오프셋에 저장되었는지 명확하게 추적할 수 있다. 즉, 로그를 통해 Key 2번 메시지가 파티션 2번에, Key 1번 메시지가 파티션 0번에 들어가는 등, Key에 따라 파티션이 결정되는 과정을 상세히 확인할 수 있다.

[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:2 partition:2 offset:13
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:5 partition:2 offset:14
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:6 partition:2 offset:15
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:12 partition:2 offset:16
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:16 partition:2 offset:17
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:18 partition:2 offset:18
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:19 partition:2 offset:19
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:1 partition:0 offset:13
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:7 partition:0 offset:14
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:8 partition:0 offset:15
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:14 partition:0 offset:16
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:15 partition:0 offset:17
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:17 partition:0 offset:18
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:0 partition:1 offset:15
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:3 partition:1 offset:16
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:4 partition:1 offset:17
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:9 partition:1 offset:18
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:10 partition:1 offset:19
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:11 partition:1 offset:20
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:13 partition:1 offset:21

6. 피자 주문 시뮬레이션 Producer 구현

지금까지는 단순한 문자열 메시지를 보내는 예제를 다뤘다면, 이번에는 좀 더 실용적인 시나리오를 시뮬레이션해본다. 가상의 피자 주문 데이터를 실시간으로 생성하여 Kafka 토픽으로 전송하는 Producer를 구현한다.

이 과정은 두 개의 핵심 클래스로 나뉜다.

  1. PizzaMessage.java: Faker 라이브러리를 사용해 현실적인 피자 주문 데이터를 생성하는 역할.
  2. PizzaProducer.java: PizzaMessage를 통해 생성된 주문 데이터를 Kafka 브로커로 전송하는 역할.

[1] PizzaMessage - 가상 주문 데이터 생성하기

먼저, 다양한 피자 가게, 메뉴, 고객 정보를 조합하여 가상의 주문 메시지를 만드는 PizzaMessage 클래스를 작성한다.

  • 주요 로직:
    • 미리 정의된 피자 이름(pizzaNames)과 가게 ID(pizzaShop) 리스트를 가진다.
    • Faker 라이브러리를 사용하여 고객 이름, 전화번호, 주소 등 현실적인 가상 데이터를 생성한다.
    • produce_msg() 메서드는 Faker로 생성된 데이터와 임의의 가게 ID, 피자 이름을 조합하여 key와 message를 포함하는 HashMap을 반환한다. 이때 메시지의 Key는 가게 ID(shopId)로 설정된다.
package com.example.kafka;

import com.github.javafaker.Faker;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Random;

public class PizzaMessage {
    private static final List<String> pizzaNames = List.of("Potato Pizza", "Cheese Pizza", "Super Supreme", "Peperoni");
    private static final List<String> pizzaShop = List.of("A001", "B001", "C001", "D001", "E001");

    private String getRandomValueFromList(List<String> list, Random random) {
        int size = list.size();
        int index = random.nextInt(size);
        return list.get(index);
    }

    public HashMap<String, String> produce_msg(Faker faker, Random random, int id) {
        String shopId = getRandomValueFromList(pizzaShop, random);
        String pizzaName = getRandomValueFromList(pizzaNames, random);

        String ordId = "ord" + id;
        String customerName = faker.name().fullName();
        String address = faker.address().streetAddress();
        LocalDateTime now = LocalDateTime.now();
        String message = String.format("order_id:%s, shop:%s, pizza_name:%s, customer_name:%s, ...",
                ordId, shopId, pizzaName, customerName);

        HashMap<String, String> messageMap = new HashMap<>();
        messageMap.put("key", shopId);
        messageMap.put("message", message);
        return messageMap;
    }
}

[2] PizzaProducer - 주문 메시지를 Kafka로 전송하기

PizzaProducer는 PizzaMessage를 이용해 생성된 주문 데이터를 Kafka로 전송하는 메인 클래스이다.

  • 주요 로직:
    • sendPizzaMessage(): 지정된 횟수만큼 루프를 돌며 PizzaMessage.produce_msg()를 호출하여 메시지를 생성하고 sendMessage()를 통해 전송한다. 중간에 Thread.sleep()을 두어 실시간으로 데이터가 들어오는 것처럼 시뮬레이션할 수 있다.
    • sendMessage(): sync 파라미터 값에 따라 동기(Sync) 또는 비동기(Async) 전송을 선택할 수 있도록 분기 처리되어 있다.
    • main(): Producer의 각종 설정을 정의하고 sendPizzaMessage()를 호출하여 시뮬레이션을 시작한다.
package com.example.kafka;

import com.github.javafaker.Faker;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Properties;
import java.util.Random;

public class PizzaProducer {

    public static final Logger logger = LoggerFactory.getLogger(PizzaProducer.class.getName());

    public static void sendPizzaMessage(KafkaProducer<String, String> kafkaProducer, String topicName, int iterCount,
                                        int interIntervalMillis, int intervalMillis, int intervalCount, Boolean sync) {
        PizzaMessage pizzaMessage = new PizzaMessage();
        int iterSeq = 0;
        long seed = 2022;
        Random random = new Random(seed);
        Faker faker = Faker.instance(random);

        while (iterSeq != iterCount) {
            HashMap<String, String> pMessage = pizzaMessage.produce_msg(faker, random, iterSeq);
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, pMessage.get("key"), pMessage.get("message"));
            sendMessage(kafkaProducer, producerRecord, pMessage, sync);
            iterSeq++;
            // ... (interval 로직) ...
        }
    }

    public static void sendMessage(KafkaProducer<String, String> kafkaProducer, ProducerRecord<String, String> producerRecord,
                                   HashMap<String, String> pMessage, boolean sync) {
        if (!sync) { // ASYNC
            kafkaProducer.send(producerRecord, (metadata, exception) -> {
                // ... (callback 로직) ...
            });
        } else { // SYNC
            try {
                RecordMetadata metadata = kafkaProducer.send(producerRecord).get();
                // ... (sync 로직) ...
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static void main(String[] args) {
        String topicName = "pizza-topic";
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        // -1: 무한 반복, 10ms 간격, 100건마다 100ms 휴식, 동기 방식
        sendPizzaMessage(kafkaProducer, topicName, -1, 10, 100, 100, true);

        kafkaProducer.close();
    }
}

[3] Consumer Group을 이용한 메시지 소비 확인

이제 이 시뮬레이션 데이터를 소비(Consume)하는 방법을 알아본다. Kafka의 강력한 기능 중 하나인 컨슈머 그룹(Consumer Group)을 활용하여 여러 컨슈머가 작업을 분산 처리하도록 구성할 수 있다.

  • 실행 방법:
    1. 새로운 터미널 창을 3개 연다.
    2. 각 터미널에서 아래의 동일한 명령어를 실행한다.
kafka-console-consumer --bootstrap-server localhost:9092 --group group_01 --topic pizza-topic \
--property print.key=true --property print.value=true \
--property print.partition=true

 

  • --group group_01: 모든 컨슈머가 group_01이라는 동일한 그룹에 속하도록 지정한다.
  • --property print.partition=true: 메시지가 어느 파티션에서 왔는지 함께 출력한다.

이렇게 여러 컨슈머를 동일한 그룹으로 묶어 실행하면, Kafka는 토픽의 파티션들을 각 컨슈머에게 자동으로 분배(Rebalance)한다. 결과적으로 하나의 컨슈머가 모든 메시지를 처리하는 대신, 여러 컨슈머가 작업을 나누어 처리하므로 전체 처리량을 높일 수 있다. 각 터미널에 서로 다른 파티션의 로그가 출력되는 것을 확인할 수 있다.


7. Producer의 acks 설정 관련 실습

package com.example.kafka;

import com.github.javafaker.Faker;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;


/**
 * If you wanna make KafkaProducer, ProducerRecord to Integer,
 * you have to declare serializer.class with IntegerSerializer.class
 */
// KafkaProducer Configuration Settings
public class PizzaProducer {

    public static final Logger logger = LoggerFactory.getLogger(PizzaProducer.class.getName());


    public static void sendPizzaMessage(KafkaProducer<String, String> kafkaProducer, String topicName, int iterCount,
                                        int interIntervalMillis, int intervalMillis, int intervalCount, Boolean sync) {

        PizzaMessage pizzaMessage = new PizzaMessage();
        int iterSeq = 0;

        // seed값을 고정하여 Random 객체와 Faker 객체를 생성.
        long seed = 2022;
        Random random = new Random(seed);
        Faker faker = Faker.instance(random);

        while (iterSeq != iterCount) {
            HashMap<String, String> pMessage = pizzaMessage.produce_msg(faker, random, iterSeq);
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, pMessage.get("key"), pMessage.get("message"));

            sendMessage(kafkaProducer, producerRecord, pMessage, sync);

            if ((intervalCount > 0) && (iterSeq % intervalCount == 0)) {
                try {
                    logger.info("###### intervalCount:" + intervalCount + " intervalMillis" + intervalMillis + "######");
                    Thread.sleep(intervalMillis);
                } catch (InterruptedException e) {
                    logger.error(e.getMessage());
                }
            }

            if (interIntervalMillis > 0) {
                try {
                    logger.info("interIntervalMillis: " + interIntervalMillis);
                    Thread.sleep(interIntervalMillis);
                } catch (InterruptedException e) {
                    logger.error(e.getMessage());
                }
            }
        }

    }

    // 동기, 비동기 분기처리
    public static void sendMessage(KafkaProducer<String, String> kafkaProducer, ProducerRecord<String, String> producerRecord,
                                   HashMap<String, String> pMessage, boolean sync) {

        if (!sync) { // ASYNC (비동기)
            kafkaProducer.send(producerRecord, (metadata, exception) -> {
                if (exception == null) {
                    logger.info("async message: " + pMessage.get("key") + "partition: " + metadata.partition() + "\n" +
                            "offset: " + metadata.offset());
                } else {
                    logger.error("exception error from broker" + exception.getMessage());
                }
            });
        } else { // SYNC (동기)
            try {
                RecordMetadata metadata = kafkaProducer.send(producerRecord).get();
                logger.info("async message: " + pMessage.get("key") + "partition: " + metadata.partition() + "\n" +
                        "offset: " + metadata.offset());
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }

    }


    public static void main(String[] args) {

        String topicName = "pizza-topic";

        Properties props = new Properties();

        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092"); // [1] bootstrap.servers | WHY server"s"? -> Cause we can use multi brokers(servers)
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [2] key.serializer.class
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [3] value.serializer.class
        props.setProperty(ProducerConfig.ACKS_CONFIG, "0");

        // Create KafkaProducer Object
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        sendPizzaMessage(kafkaProducer, topicName, -1, 10,
                100, 100, false);

        kafkaProducer.close();

    }

}

8. Producer의 메시지 배치 전송 내부 실습 (파라미터 수정)

[1] batch size와 linger.ms 변경

package com.example.kafka;

import com.github.javafaker.Faker;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;


/**
 * If you wanna make KafkaProducer, ProducerRecord to Integer,
 * you have to declare serializer.class with IntegerSerializer.class
 */
// KafkaProducer Configuration Settings
public class PizzaProducer {

    public static final Logger logger = LoggerFactory.getLogger(PizzaProducer.class.getName());


    public static void sendPizzaMessage(KafkaProducer<String, String> kafkaProducer, String topicName, int iterCount,
                                        int interIntervalMillis, int intervalMillis, int intervalCount, Boolean sync) {

        PizzaMessage pizzaMessage = new PizzaMessage();
        int iterSeq = 0;

        // seed값을 고정하여 Random 객체와 Faker 객체를 생성.
        long seed = 2022;
        Random random = new Random(seed);
        Faker faker = Faker.instance(random);

        while (iterSeq != iterCount) {
            HashMap<String, String> pMessage = pizzaMessage.produce_msg(faker, random, iterSeq);
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, pMessage.get("key"), pMessage.get("message"));

            sendMessage(kafkaProducer, producerRecord, pMessage, sync);

            if ((intervalCount > 0) && (iterSeq % intervalCount == 0)) {
                try {
                    logger.info("###### intervalCount:" + intervalCount + " intervalMillis" + intervalMillis + "######");
                    Thread.sleep(intervalMillis);
                } catch (InterruptedException e) {
                    logger.error(e.getMessage());
                }
            }

            if (interIntervalMillis > 0) {
                try {
                    logger.info("interIntervalMillis: " + interIntervalMillis);
                    Thread.sleep(interIntervalMillis);
                } catch (InterruptedException e) {
                    logger.error(e.getMessage());
                }
            }
        }

    }

    // 동기, 비동기 분기처리
    public static void sendMessage(KafkaProducer<String, String> kafkaProducer, ProducerRecord<String, String> producerRecord,
                                   HashMap<String, String> pMessage, boolean sync) {

        if (!sync) { // ASYNC (비동기)
            kafkaProducer.send(producerRecord, (metadata, exception) -> {
                if (exception == null) {
                    logger.info("async message: " + pMessage.get("key") + "partition: " + metadata.partition() + "\n" +
                            "offset: " + metadata.offset());
                } else {
                    logger.error("exception error from broker" + exception.getMessage());
                }
            });
        } else { // SYNC (동기)
            try {
                RecordMetadata metadata = kafkaProducer.send(producerRecord).get();
                logger.info("async message: " + pMessage.get("key") + "partition: " + metadata.partition() + "\n" +
                        "offset: " + metadata.offset());
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }

    }


    public static void main(String[] args) {

        String topicName = "pizza-topic";

        Properties props = new Properties();

        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092"); // [1] bootstrap.servers | WHY server"s"? -> Cause we can use multi brokers(servers)
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [2] key.serializer.class
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [3] value.serializer.class
//        props.setProperty(ProducerConfig.ACKS_CONFIG, "0"); // [4] props 설정
        props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "32000"); // [5] Batch Size 설정
        props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20"); // [6] linger.ms 설정

        // Create KafkaProducer Object
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        sendPizzaMessage(kafkaProducer, topicName, -1, 10,
                100, 100, true);

        kafkaProducer.close();

    }

}

[2] 결과 확인

[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
	
    ...
    
	batch.size = 32000
	
    ...
    
	linger.ms = 20
    
    ...

9. Producer의 재전송 관련 주요 파라미터 설정 실습

[1] PizzaProducer 기본 Config가 적용하여 실행

package com.example.kafka;

import com.github.javafaker.Faker;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;


/**
 * If you wanna make KafkaProducer, ProducerRecord to Integer,
 * you have to declare serializer.class with IntegerSerializer.class
 */
// KafkaProducer Configuration Settings
public class PizzaProducer {

    public static final Logger logger = LoggerFactory.getLogger(PizzaProducer.class.getName());


    public static void sendPizzaMessage(KafkaProducer<String, String> kafkaProducer, String topicName, int iterCount,
                                        int interIntervalMillis, int intervalMillis, int intervalCount, Boolean sync) {

        PizzaMessage pizzaMessage = new PizzaMessage();
        int iterSeq = 0;

        // seed값을 고정하여 Random 객체와 Faker 객체를 생성.
        long seed = 2022;
        Random random = new Random(seed);
        Faker faker = Faker.instance(random);

        while (iterSeq != iterCount) {
            HashMap<String, String> pMessage = pizzaMessage.produce_msg(faker, random, iterSeq);
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, pMessage.get("key"), pMessage.get("message"));

            sendMessage(kafkaProducer, producerRecord, pMessage, sync);

            if ((intervalCount > 0) && (iterSeq % intervalCount == 0)) {
                try {
                    logger.info("###### intervalCount:" + intervalCount + " intervalMillis" + intervalMillis + "######");
                    Thread.sleep(intervalMillis);
                } catch (InterruptedException e) {
                    logger.error(e.getMessage());
                }
            }

            if (interIntervalMillis > 0) {
                try {
                    logger.info("interIntervalMillis: " + interIntervalMillis);
                    Thread.sleep(interIntervalMillis);
                } catch (InterruptedException e) {
                    logger.error(e.getMessage());
                }
            }
        }

    }

    // 동기, 비동기 분기처리
    public static void sendMessage(KafkaProducer<String, String> kafkaProducer, ProducerRecord<String, String> producerRecord,
                                   HashMap<String, String> pMessage, boolean sync) {

        if (!sync) { // ASYNC (비동기)
            kafkaProducer.send(producerRecord, (metadata, exception) -> {
                if (exception == null) {
                    logger.info("async message: " + pMessage.get("key") + "partition: " + metadata.partition() + "\n" +
                            "offset: " + metadata.offset());
                } else {
                    logger.error("exception error from broker" + exception.getMessage());
                }
            });
        } else { // SYNC (동기)
            try {
                RecordMetadata metadata = kafkaProducer.send(producerRecord).get();
                logger.info("async message: " + pMessage.get("key") + "partition: " + metadata.partition() + "\n" +
                        "offset: " + metadata.offset());
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }

    }


    public static void main(String[] args) {

        String topicName = "pizza-topic";

        Properties props = new Properties();

        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092"); // [1] bootstrap.servers | WHY server"s"? -> Cause we can use multi brokers(servers)
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [2] key.serializer.class
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [3] value.serializer.class
//        props.setProperty(ProducerConfig.ACKS_CONFIG, "0"); // [4] props 설정
//        props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "32000"); // [5] Batch Size 설정
//        props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20"); // [6] linger.ms 설정

        // Create KafkaProducer Object
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        sendPizzaMessage(kafkaProducer, topicName, -1, 10,
                100, 100, false);

        kafkaProducer.close();

    }

}

[2] 로그 확인

[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
    ...
    delivery.timeout.ms = 120000
    ...
    max.block.ms = 60000
    ...
    request.timeout.ms = 30000
    ...
    retry.backoff.ms = 100
    ...
    retries = 2147483647
    ...

[3] DELIVRY_TIMEOUT_MS_CONFIG 변경

	...
    
	public static void main(String[] args) {

        String topicName = "pizza-topic";

        Properties props = new Properties();

        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092"); // [1] bootstrap.servers | WHY server"s"? -> Cause we can use multi brokers(servers)
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [2] key.serializer.class
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [3] value.serializer.class
//        props.setProperty(ProducerConfig.ACKS_CONFIG, "0"); // [4] props 설정
//        props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "32000"); // [5] Batch Size 설정
//        props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20"); // [6] linger.ms 설정
        props.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "29000");


        // Create KafkaProducer Object
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        sendPizzaMessage(kafkaProducer, topicName, -1, 10,
                100, 100, false);

        kafkaProducer.close();

    }

 

위처럼 delivery.timeout.ms를 30초가 아니라 29초로 변경하면 어떻게 될까? 아래 결과를 확인해보자. delivery.timeout.ms should be equal to or larger than linger.ms + request.timeout.ms 로그를 확인할 수 있는데, delivery.time.out.ms는 linger.ms와 request.timeout.ms를 합한 값 보다 크거나 같아야 한다는 조건 때문에 오류가 발생한것을 확인할 수 있다.


10. 커스텀 파티셔너 (Custom Partitioner) 실습

Java 코드 작성: Custom Partitioner, PizzaProducerCustomPartitioner

[1] CustomPartitioner.java

package com.example.kafka;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.internals.StickyPartitionCache;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;


// 피자 집 중에 특정 피자집은 대형 피자집이라서 따로 파티셔닝이 필요하다는 시나리오에 기반하여 작성
public class CustomPartitioner implements Partitioner {

    public static final Logger logger = LoggerFactory.getLogger(CustomPartitioner.class.getName());

    private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

    private String specialKeyName;

    @Override
    public void configure(Map<String, ?> configs) {
        specialKeyName = configs.get("custom.specialKey").toString();
    }

    /**
     * byte[] keyBytes, byte[] valueBytes
     * ㄴ Serialized된 byte code
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic); // Cluster cluset: Broker들의 정보 -> partition들의 정보 추출
        int numPartitions = partitionInfoList.size();   // partition 개수 추출 (5개)
        int numSpecialPartitions = (int) (numPartitions * (0.5)); // special topic에 대해서는 5개 중에 2개의 partition 할당 (2개)
        int partitionIndex = 0;


        if (keyBytes == null) {
//            return stickyPartitionCache.partition(topic, cluster);
            throw new InvalidRecordException("key shoud not be null");
        }

        if (((String) key).equals(specialKeyName)) {
            // (keyBytes -> valueBytes)
            partitionIndex = Utils.toPositive(Utils.murmur2(valueBytes)) % numSpecialPartitions; // 0,1 설정
        } else {
            // specialKey에 대한 파티셔닝이 0,1로 설정되었기 때문에 specialKey가 아닌 Key에 대해서는 2,3,4로 설정해줘야 함
            partitionIndex = Utils.toPositive(Utils.murmur2(keyBytes)) % (numPartitions - numSpecialPartitions) + 2;
        }

        logger.info("key:{} is sent to partition: {}", key.toString(), partitionIndex);

        return partitionIndex;
    }

    @Override
    public void close() {
    }

}

[2]  PizzaProducerCustomPartitioner.java

package com.example.kafka;

import com.github.javafaker.Faker;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;


/**
 * If you wanna make KafkaProducer, ProducerRecord to Integer,
 * you have to declare serializer.class with IntegerSerializer.class
 */
// KafkaProducer Configuration Settings
public class PizzaProducerCustomPartitioner {

    public static final Logger logger = LoggerFactory.getLogger(PizzaProducerCustomPartitioner.class.getName());


    public static void sendPizzaMessage(KafkaProducer<String, String> kafkaProducer, String topicName, int iterCount,
                                        int interIntervalMillis, int intervalMillis, int intervalCount, Boolean sync) {

        PizzaMessage pizzaMessage = new PizzaMessage();
        int iterSeq = 0;

        // seed값을 고정하여 Random 객체와 Faker 객체를 생성.
        long seed = 2022;
        Random random = new Random(seed);
        Faker faker = Faker.instance(random);

        while (iterSeq != iterCount) {
            HashMap<String, String> pMessage = pizzaMessage.produce_msg(faker, random, iterSeq);
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, pMessage.get("key"), pMessage.get("message"));

            sendMessage(kafkaProducer, producerRecord, pMessage, sync);

            if ((intervalCount > 0) && (iterSeq % intervalCount == 0)) {
                try {
                    logger.info("###### intervalCount:" + intervalCount + " intervalMillis" + intervalMillis + "######");
                    Thread.sleep(intervalMillis);
                } catch (InterruptedException e) {
                    logger.error(e.getMessage());
                }
            }

            if (interIntervalMillis > 0) {
                try {
                    logger.info("interIntervalMillis: " + interIntervalMillis);
                    Thread.sleep(interIntervalMillis);
                } catch (InterruptedException e) {
                    logger.error(e.getMessage());
                }
            }
        }

    }

    // 동기, 비동기 분기처리
    public static void sendMessage(KafkaProducer<String, String> kafkaProducer, ProducerRecord<String, String> producerRecord,
                                   HashMap<String, String> pMessage, boolean sync) {

        if (!sync) { // ASYNC (비동기)
            kafkaProducer.send(producerRecord, (metadata, exception) -> {
                if (exception == null) {
                    logger.info("async message: " + pMessage.get("key") + "partition: " + metadata.partition() + "\n" +
                            "offset: " + metadata.offset());
                } else {
                    logger.error("exception error from broker" + exception.getMessage());
                }
            });
        } else { // SYNC (동기)
            try {
                RecordMetadata metadata = kafkaProducer.send(producerRecord).get();
                logger.info("async message: " + pMessage.get("key") + "partition: " + metadata.partition() + "\n" +
                        "offset: " + metadata.offset());
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }

    }


    public static void main(String[] args) {

        String topicName = "pizza-topic-partitioner";

        Properties props = new Properties();

        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092"); // [1] bootstrap.servers | WHY server"s"? -> Cause we can use multi brokers(servers)
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [2] key.serializer.class
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [3] value.serializer.class

        // custom partitioner 등록
        props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.kafka.CustomPartitioner");
        // custom partitioner을 실행시키기 위한 특별 키 등록
        props.setProperty("custom.specialKey", "P001");


        // Create KafkaProducer Object
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        sendPizzaMessage(kafkaProducer, topicName, -1, 100,
                0, 0, true);

        kafkaProducer.close();

    }

}

Kafka Client Custom Partitioner 적용

1. 5개의 파티션을 가지는 pizza-topic-partitioner 토픽 생성. 

kafka-topics --bootstrap-server localhost:9092 --create --topic pizza-topic-partitioner --partitions 5

2. kafka-dump-log 명령어로 파티션별로 메시지 확인하기

kafka-dump-log --deep-iteration --files /home/ubuntu/data/kafka-logs/pizza-topic-partitioner-0/00000000000000000000.log --print-data-log
kafka-dump-log --deep-iteration --files /home/ubuntu/data/kafka-logs/pizza-topic-partitioner-1/00000000000000000000.log --print-data-log
# 1번 Partition 조회해보면 아래처럼 P001(특정 키)만 들어오는것을 알 수 있다.
:403 Carma Vista, time:2025-09-09 12:24:24
baseOffset: 62 lastOffset: 62 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 14798 CreateTime: 1757388272098 size: 241 magic: 2 compresscodec: none crc: 2587438241 isvalid: true
| offset: 62 CreateTime: 1757388272098 keySize: 4 valueSize: 167 sequence: -1 headerKeys: [] key: P001 payload: order_id:ord0, shop:P001, pizza_name:Cheese Pizza, customer_name:Russ Huel PhD, phone_number:120-673-8367 x73988, address:484 Krista Mountain, time:2025-09-09 12:24:32
baseOffset: 63 lastOffset: 63 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 15039 CreateTime: 1757388278807 size: 235 magic: 2 compresscodec: none crc: 2748402471 isvalid: true
| offset: 63 CreateTime: 1757388278807 keySize: 4 valueSize: 161 sequence: -1 headerKeys: [] key: P001 payload: order_id:ord0, shop:P001, pizza_name:Peperoni, customer_name:Saul Rippin, phone_number:1-339-197-5397 x55301, address:122 Numbers Lodge, time:2025-09-09 12:24:38
baseOffset: 64 lastOffset: 64 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 15274 CreateTime: 1757388286258 size: 239 magic: 2 compresscodec: none crc: 630731090 isvalid: true
| offset: 64 CreateTime: 1757388286258 keySize: 4 valueSize: 165 sequence: -1 headerKeys: [] key: P001 payload: order_id:ord0, shop:P001, pizza_name:Cheese Pizza, customer_name:Hans Sipes, phone_number:436-217-8007 x5900, address:626 Clarence Turnpike, time:2025-09-09 12:24:46
# 아래처럼 0번과 1번 파티션을 제외하고 
# 2,3,4 파티션 중에 P001을 갖는지 "grep" 명령어로 찾아보면 아무것도 뜨지 않는다.
ubuntu@ubuntu-VirtualBox:~$ kafka-dump-log --deep-iteration --files /home/ubuntu/data/kafka-logs/pizza-topic-partitioner-2/00000000000000000000.log --print-data-log | grep P001
ubuntu@ubuntu-VirtualBox:~$

3. Consumer를 partition 별로 접속하여 확인 (--group 인자 설정 X)

kafka-console-consumer --bootstrap-server localhost:9092 --topic pizza-topic-partitioner \
--property print.key=true --property print.value=true --partition 0

'Kafka > Core' 카테고리의 다른 글

[ADVANCED #4][실습] Consumer 완전 정복: 기본 컨슈머부터 안전한 종료, 커밋 전략까지  (0) 2025.09.09
[ADVANCED #3] Consumer 심층 분석: 그룹 코디네이터부터 리밸런싱 전략까지  (0) 2025.09.09
[ADVANCED #1] Producer 심층 분석: 내부 동작 원리와 고급 설정  (0) 2025.09.07
[BASIC #8] Kafka: Java 클라이언트 구현 환경 구축  (0) 2025.09.07
[BASIC #7] Config 구분 및 이해: 카프카 설정의 계층 구조 파악하기  (0) 2025.09.07
'Kafka/Core' 카테고리의 다른 글
  • [ADVANCED #4][실습] Consumer 완전 정복: 기본 컨슈머부터 안전한 종료, 커밋 전략까지
  • [ADVANCED #3] Consumer 심층 분석: 그룹 코디네이터부터 리밸런싱 전략까지
  • [ADVANCED #1] Producer 심층 분석: 내부 동작 원리와 고급 설정
  • [BASIC #8] Kafka: Java 클라이언트 구현 환경 구축
h6bro
h6bro
백엔드 개발자의 기술 블로그
  • h6bro
    Jun's Tech Blog
    h6bro
  • 전체
    오늘
    어제
    • 분류 전체보기 (250) N
      • Java (18)
        • Core (9)
        • Design Pattern (9)
      • Spring (80)
        • Core (24)
        • MVC (6)
        • DB (10)
        • JPA (26)
        • Monitoring (3)
        • Security (11)
        • WebSocket (0)
      • Database (33)
        • Redis (15)
        • MySQL (18)
      • MSA (25) N
        • MSA 기본 (11)
        • MSA 아키텍처 (14) N
      • Kafka (30)
        • Core (18)
        • Connect (12)
      • ElasticSearch (11)
        • Search (11)
        • Logging (0)
      • Test (4)
        • k6 (4)
      • Docker (9)
      • CI&CD (10)
        • GitHub Actions (6)
        • ArgoCD (4)
      • Kubernetes (18)
        • Core (12)
        • Ops (6)
      • Cloud Engineering (4)
        • AWS Infrastructure (3)
        • AWS EKS (1)
        • Terraform (0)
      • Project (8)
        • LinkFolio (1)
        • Secondhand Market (7)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

    • Cloud Engineering 포스팅 정리
  • 인기 글

  • 태그

    ㅈ
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.5
h6bro
[ADVANCED #2][실습] Producer 완전 정복: 기초부터 고급 설정
상단으로

티스토리툴바