kafka-topics --bootstrap-server localhost:9092 --create --topic pizza-topic-partitioner --partitions 5
1. 기본 구현
[1] Kafka Topic 생성
kafka-topics --bootstrap-server localhost:9092 --create --topic simple-topic
[2] Java 코드 작성 및 실행
com/example/kafka/SimpleProducer.java
package com.example.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* If you wanna make KafkaProducer, ProducerRecord to Integer,
* you have to declare serializer.class with IntegerSerializer.class
*/
// KafkaProducer Configuration Settings
public class SimpleProducer {
public static void main(String[] args) {
String topicName = "simple-topic";
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092"); // [1] bootstrap.servers | WHY server"s"? -> Cause we can use multi brokers(servers)
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [2] key.serializer.class
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [3] value.serializer.class
// Create KafkaProducer Object
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
// Create ProducerRecord Object
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, "id-001", "hello world 2");
// Send KafkaProducer Message
kafkaProducer.send(producerRecord);
// Terminate KafkaProducer
kafkaProducer.flush();
kafkaProducer.close();
}
}

[3] Kafka Topic 확인
kafka-console-consumer --bootstrap-server localhost:9092 --topic simple-topic --from-beginning

2. Producer와 브로커와의 메시지 동기화(SYNC)로 리팩터링
[1] Java 코드 작성 및 실행
package com.example.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* If you wanna make KafkaProducer, ProducerRecord to Integer,
* you have to declare serializer.class with IntegerSerializer.class
*/
// KafkaProducer Configuration Settings
public class SimpleProducerSync {
public static final Logger logger = LoggerFactory.getLogger(SimpleProducerSync.class.getName());
public static void main(String[] args) {
String topicName = "simple-topic";
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092"); // [1] bootstrap.servers | WHY server"s"? -> Cause we can use multi brokers(servers)
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [2] key.serializer.class
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [3] value.serializer.class
// Create KafkaProducer Object
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
// Create ProducerRecord Object
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, "id-001", "hello world 2");
// Send KafkaProducer Message
try {
RecordMetadata recordMetadata = kafkaProducer.send(producerRecord).get();
logger.info("\n ###### record metadata received ##### \n" +
"partition: " + recordMetadata.partition() + "\n" +
"offset: " + recordMetadata.offset() + "\n" +
"timestamp: " + recordMetadata.timestamp());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
} finally {
kafkaProducer.close();
}
// Terminate KafkaProducer
kafkaProducer.flush();
kafkaProducer.close();
}
}
3. Producer와 브로커와의 메시지 비동기화(ASYNC)로 리팩터링
[1] Java 코드 작성 및 실행
package com.example.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* If you wanna make KafkaProducer, ProducerRecord to Integer,
* you have to declare serializer.class with IntegerSerializer.class
*/
// KafkaProducer Configuration Settings
public class SimpleProducerAsync {
public static final Logger logger = LoggerFactory.getLogger(SimpleProducerAsync.class.getName());
public static void main(String[] args) {
String topicName = "simple-topic";
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092"); // [1] bootstrap.servers | WHY server"s"? -> Cause we can use multi brokers(servers)
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [2] key.serializer.class
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [3] value.serializer.class
// Create KafkaProducer Object
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
// Create ProducerRecord Object
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, "id-001", "hello world 2");
// [1] Send KafkaProducer Message (Java Basic)
// kafkaProducer.send(producerRecord, new Callback() {
// @Override
// public void onCompletion(RecordMetadata metadata, Exception exception) {
// if (exception == null) {
// logger.info("\n ###### record metadata received ##### \n" +
// "partition: " + metadata.partition() + "\n" +
// "offset: " + metadata.offset() + "\n" +
// "timestamp: " + metadata.timestamp());
// } else {
// logger.error("exception error from broker" + exception.getMessage());
// }
// }
// });
// [2] Send KafkaProducer Message (Java Lambda)
kafkaProducer.send(producerRecord, (metadata, exception) -> {
if (exception == null) {
logger.info("\n ###### record metadata received ##### \n" +
"partition: " + metadata.partition() + "\n" +
"offset: " + metadata.offset() + "\n" +
"timestamp: " + metadata.timestamp());
} else {
logger.error("exception error from broker" + exception.getMessage());
}
});
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
kafkaProducer.close();
}
}
4. Producer에서 다중 파티션(Partition) 환경에서의 키(Key)값을 가지는 메시지 전송
지금까지는 메시지 전송의 '결과'에 집중했다면, 이제는 메시지가 토픽 내에서 어디로 저장되는지, 즉 파티션(Partition)에 대해 알아본다. Kafka에서 메시지 Key는 단순히 데이터를 구분하는 식별자를 넘어, 메시지가 어떤 파티션에 저장될지 결정하는 매우 중요한 역할을 한다.
- Key가 없는 경우: 메시지는 라운드 로빈(Round-Robin) 방식으로 여러 파티션에 순차적으로 분배된다.
- Key가 있는 경우: Kafka는 Key의 해시(hash) 값을 계산하여 특정 파티션에 메시지를 할당한다. 동일한 Key를 가진 메시지는 항상 동일한 파티션에 저장되는 것이 보장된다.
이를 통해 특정 Key에 대한 메시지 처리 순서를 보장할 수 있다.
[1] 다중 파티션 토픽 생성
먼저, 실습을 위해 3개의 파티션을 가진 토픽을 생성한다.
kafka-topics --bootstrap-server localhost:9092 --create --topic multipart-topic --partitions 3
[2] Key를 사용한 메시지 전송 코드 작성 및 실행
이제 0부터 19까지 순차적으로 증가하는 Key를 가진 메시지 20개를 비동기 방식으로 전송하는 코드를 작성한다.
package com.example.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
// KafkaProducer Configuration Settings
public class ProducerAsyncWithKey {
public static final Logger logger = LoggerFactory.getLogger(ProducerAsyncWithKey.class.getName());
public static void main(String[] args) {
String topicName = "multipart-topic";
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Create KafkaProducer Object
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
for (int seq = 0; seq < 20; seq++) {
// Create ProducerRecord Object with a key
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, String.valueOf(seq), "hello world " + seq);
// Send KafkaProducer Message (Java Lambda)
kafkaProducer.send(producerRecord, (metadata, exception) -> {
if (exception == null) {
logger.info("\n ###### record metadata received ##### \n" +
"partition: " + metadata.partition() + "\n" +
"offset: " + metadata.offset() + "\n" +
"timestamp: " + metadata.timestamp());
} else {
logger.error("exception error from broker" + exception.getMessage());
}
});
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
kafkaProducer.close();
}
}
[3] 실행 로그 분석
코드를 실행하면 Producer의 설정값과 함께 각 메시지가 어느 파티션으로 전송되었는지 콜백 로그를 통해 확인할 수 있다.
Producer 설정 로그:
partitioner.class가 DefaultPartitioner로 설정되어 있음을 주목해야 한다. 이 기본 파티셔너가 바로 메시지 Key의 해시 값을 계산하여 파티션을 결정하는 역할을 한다. 또한 batch.size와 linger.ms 설정에 따라 프로듀서는 메시지를 모아 배치(batch) 단위로 전송한다.
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
batch.size = 16384
linger.ms = 0
메시지 전송 결과 (Callback 로그):
로그를 자세히 보면, Key 값(0~19)에 따라 메시지들이 파티션 0, 1, 2에 골고루 분산되어 들어간 것을 확인할 수 있다. 이는 DefaultPartitioner가 각 Key의 해시 값을 계산하고, 그 결과를 총 파티션 개수(3)로 나눈 나머지를 통해 목적지 파티션을 정하기 때문이다.
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.ProducerAsyncWithKey -
###### record metadata received #####
partition: 2
offset: 1
timestamp: 1757298130194
...
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.ProducerAsyncWithKey -
###### record metadata received #####
partition: 0
offset: 0
timestamp: 1757298130216
...
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.ProducerAsyncWithKey -
###### record metadata received #####
partition: 1
offset: 0
timestamp: 1757298130217
...
[4] Consumer를 통한 최종 확인
이제 컨슈머를 통해 토픽의 메시지를 확인해본다. Key와 Value를 함께 출력하기 위해 --property print.key=true 옵션을 추가한다.
kafka-console-consumer --bootstrap-server localhost:9092 --group group-01 --topic multipart-topic \
--property print.key=true --property print.value=true
[실행 결과]
0 hello world0
2 hello world2
3 hello world3
9 hello world9
...
1 hello world1
5 hello world5
...
가장 중요한 점은 컨슈머가 출력한 메시지의 Key 순서가 0, 1, 2, ... 19가 아니라는 것이다. 컨슈머는 3개의 파티션으로부터 동시에 메시지를 가져오기 때문에, 전체 메시지의 순서는 보장되지 않는다. 이 결과는 메시지 Key에 따라 데이터가 실제로 여러 파티션에 분산 저장되었음을 명확하게 보여준다.
5. Producer에서 키(Key) 타입 변경 및 Custom Callback 구현
이전 단계에서는 메시지 Key를 사용해 데이터를 여러 파티션에 분산했지만, 컨슈머에서 확인했을 때 전체 메시지의 순서가 보장되지 않는다는 점을 확인했다. 또한, 람다(Lambda) 콜백의 로그만으로는 어떤 Key를 가진 메시지가 어느 파티션으로 들어갔는지 정확히 추적하기 어려웠다. 이번 단계에서는 두 가지 개선 작업을 진행한다.
- 메시지 Key의 타입을 String에서 Integer로 변경한다.
- 재사용 가능하고 더 상세한 로그를 남길 수 있는 별도의 CustomCallback 클래스를 구현한다
[1] 메시지 Key 타입을 Integer로 변경하기
메시지 Key 타입을 String에서 Integer로 변경하기 위해서는 프로듀서의 코드 몇 군데를 수정해야 한다.
- Key Serializer 변경: ProducerConfig에서 Key 직렬화 방식을 StringSerializer가 아닌 IntegerSerializer로 지정해야 한다.
- 제네릭 타입 변경: KafkaProducer와 ProducerRecord 객체를 생성할 때 사용한 제네릭 타입을 <String, String>에서 <Integer, String>으로 변경해야 한다.
// ProducerAsyncWithKey.java를 기반으로 수정
// ...
// [1] Key Serializer를 IntegerSerializer로 변경
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// [2] KafkaProducer의 제네릭 타입을 <Integer, String>으로 변경
KafkaProducer<Integer, String> kafkaProducer = new KafkaProducer<>(props);
for (int seq = 0; seq < 20; seq++) {
// [3] ProducerRecord의 제네릭 타입과 Key 타입을 Integer로 변경
ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>(topicName, seq, "hello world" + seq);
// ...
}
// ...
[2] Consumer에서 Key Deserializer 설정하기
프로듀서가 Key를 Integer 타입으로 직렬화해서 보냈으므로, 컨슈머 또한 Key를 Integer 타입으로 역직렬화(Deserializer)해야 올바르게 읽을 수 있다. kafka-console-consumer는 기본적으로 Key와 Value를 모두 String으로 처리하기 때문에, 다음과 같이 --key-deserializer 옵션을 명시적으로 추가해야 한다.
kafka-console-consumer --bootstrap-server localhost:9092 --group group-01 --topic multipart-topic \
--property print.key=true --property print.value=true \
--key-deserializer "org.apache.kafka.common.serialization.IntegerDeserializer"
[3] Custom Callback 클래스 구현하기
람다 콜백은 간결하지만, 콜백이 실행되는 시점에는 for 루프의 seq 변수에 접근할 수 없어 "어떤 메시지(seq)가 성공했는지" 로그로 남기기 어렵다. 이 문제를 해결하기 위해, Callback 인터페이스를 구현하는 별도의 클래스를 만든다.
CustomCallback 클래스는 생성자를 통해 메시지의 seq 번호를 멤버 변수로 저장한다. 덕분에 콜백의 onCompletion 메서드가 호출될 때, 저장해 둔 seq 번호와 브로커로부터 받은 메타데이터(파티션, 오프셋)를 함께 출력할 수 있다.
package com.example.kafka;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CustomCallback implements Callback {
public static final Logger logger = LoggerFactory.getLogger(CustomCallback.class.getName());
private int seq;
public CustomCallback(int seq) {
this.seq = seq;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception == null) {
logger.info("seq:{} partition:{} offset:{}", this.seq, metadata.partition(), metadata.offset());
} else {
logger.error("exception error from broker" + exception.getMessage());
}
};
}
[4] Custom Callback을 적용한 최종 Producer 코드
이제 send() 메서드에 람다 대신 new CustomCallback(seq) 인스턴스를 전달하도록 Producer 코드를 리팩토링한다.
package com.example.kafka;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class ProducerAsyncCustomCallback {
public static final Logger logger = LoggerFactory.getLogger(ProducerAsyncCustomCallback.class.getName());
public static void main(String[] args) {
// ... (Properties 설정은 동일) ...
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<Integer, String> kafkaProducer = new KafkaProducer<>(props);
for (int seq = 0; seq < 20; seq++) {
ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>("multipart-topic", seq, "hello world" + seq);
Callback callback = new CustomCallback(seq);
// 람다 대신 CustomCallback 인스턴스를 전달
kafkaProducer.send(producerRecord, callback);
}
// ... (Thread.sleep 및 close는 동일) ...
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
kafkaProducer.close();
}
}
[5] 로그 결과 확인
코드를 실행하면, CustomCallback 덕분에 어떤 seq(Key)가 어느 파티션과 오프셋에 저장되었는지 명확하게 추적할 수 있다. 즉, 로그를 통해 Key 2번 메시지가 파티션 2번에, Key 1번 메시지가 파티션 0번에 들어가는 등, Key에 따라 파티션이 결정되는 과정을 상세히 확인할 수 있다.
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:2 partition:2 offset:13
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:5 partition:2 offset:14
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:6 partition:2 offset:15
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:12 partition:2 offset:16
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:16 partition:2 offset:17
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:18 partition:2 offset:18
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:19 partition:2 offset:19
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:1 partition:0 offset:13
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:7 partition:0 offset:14
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:8 partition:0 offset:15
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:14 partition:0 offset:16
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:15 partition:0 offset:17
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:17 partition:0 offset:18
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:0 partition:1 offset:15
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:3 partition:1 offset:16
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:4 partition:1 offset:17
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:9 partition:1 offset:18
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:10 partition:1 offset:19
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:11 partition:1 offset:20
[kafka-producer-network-thread | producer-1] INFO com.example.kafka.CustomCallback - seq:13 partition:1 offset:21
6. 피자 주문 시뮬레이션 Producer 구현
지금까지는 단순한 문자열 메시지를 보내는 예제를 다뤘다면, 이번에는 좀 더 실용적인 시나리오를 시뮬레이션해본다. 가상의 피자 주문 데이터를 실시간으로 생성하여 Kafka 토픽으로 전송하는 Producer를 구현한다.
이 과정은 두 개의 핵심 클래스로 나뉜다.
- PizzaMessage.java: Faker 라이브러리를 사용해 현실적인 피자 주문 데이터를 생성하는 역할.
- PizzaProducer.java: PizzaMessage를 통해 생성된 주문 데이터를 Kafka 브로커로 전송하는 역할.
[1] PizzaMessage - 가상 주문 데이터 생성하기
먼저, 다양한 피자 가게, 메뉴, 고객 정보를 조합하여 가상의 주문 메시지를 만드는 PizzaMessage 클래스를 작성한다.
- 주요 로직:
- 미리 정의된 피자 이름(pizzaNames)과 가게 ID(pizzaShop) 리스트를 가진다.
- Faker 라이브러리를 사용하여 고객 이름, 전화번호, 주소 등 현실적인 가상 데이터를 생성한다.
- produce_msg() 메서드는 Faker로 생성된 데이터와 임의의 가게 ID, 피자 이름을 조합하여 key와 message를 포함하는 HashMap을 반환한다. 이때 메시지의 Key는 가게 ID(shopId)로 설정된다.
package com.example.kafka;
import com.github.javafaker.Faker;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
public class PizzaMessage {
private static final List<String> pizzaNames = List.of("Potato Pizza", "Cheese Pizza", "Super Supreme", "Peperoni");
private static final List<String> pizzaShop = List.of("A001", "B001", "C001", "D001", "E001");
private String getRandomValueFromList(List<String> list, Random random) {
int size = list.size();
int index = random.nextInt(size);
return list.get(index);
}
public HashMap<String, String> produce_msg(Faker faker, Random random, int id) {
String shopId = getRandomValueFromList(pizzaShop, random);
String pizzaName = getRandomValueFromList(pizzaNames, random);
String ordId = "ord" + id;
String customerName = faker.name().fullName();
String address = faker.address().streetAddress();
LocalDateTime now = LocalDateTime.now();
String message = String.format("order_id:%s, shop:%s, pizza_name:%s, customer_name:%s, ...",
ordId, shopId, pizzaName, customerName);
HashMap<String, String> messageMap = new HashMap<>();
messageMap.put("key", shopId);
messageMap.put("message", message);
return messageMap;
}
}
[2] PizzaProducer - 주문 메시지를 Kafka로 전송하기
PizzaProducer는 PizzaMessage를 이용해 생성된 주문 데이터를 Kafka로 전송하는 메인 클래스이다.
- 주요 로직:
- sendPizzaMessage(): 지정된 횟수만큼 루프를 돌며 PizzaMessage.produce_msg()를 호출하여 메시지를 생성하고 sendMessage()를 통해 전송한다. 중간에 Thread.sleep()을 두어 실시간으로 데이터가 들어오는 것처럼 시뮬레이션할 수 있다.
- sendMessage(): sync 파라미터 값에 따라 동기(Sync) 또는 비동기(Async) 전송을 선택할 수 있도록 분기 처리되어 있다.
- main(): Producer의 각종 설정을 정의하고 sendPizzaMessage()를 호출하여 시뮬레이션을 시작한다.
package com.example.kafka;
import com.github.javafaker.Faker;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Properties;
import java.util.Random;
public class PizzaProducer {
public static final Logger logger = LoggerFactory.getLogger(PizzaProducer.class.getName());
public static void sendPizzaMessage(KafkaProducer<String, String> kafkaProducer, String topicName, int iterCount,
int interIntervalMillis, int intervalMillis, int intervalCount, Boolean sync) {
PizzaMessage pizzaMessage = new PizzaMessage();
int iterSeq = 0;
long seed = 2022;
Random random = new Random(seed);
Faker faker = Faker.instance(random);
while (iterSeq != iterCount) {
HashMap<String, String> pMessage = pizzaMessage.produce_msg(faker, random, iterSeq);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, pMessage.get("key"), pMessage.get("message"));
sendMessage(kafkaProducer, producerRecord, pMessage, sync);
iterSeq++;
// ... (interval 로직) ...
}
}
public static void sendMessage(KafkaProducer<String, String> kafkaProducer, ProducerRecord<String, String> producerRecord,
HashMap<String, String> pMessage, boolean sync) {
if (!sync) { // ASYNC
kafkaProducer.send(producerRecord, (metadata, exception) -> {
// ... (callback 로직) ...
});
} else { // SYNC
try {
RecordMetadata metadata = kafkaProducer.send(producerRecord).get();
// ... (sync 로직) ...
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
public static void main(String[] args) {
String topicName = "pizza-topic";
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
// -1: 무한 반복, 10ms 간격, 100건마다 100ms 휴식, 동기 방식
sendPizzaMessage(kafkaProducer, topicName, -1, 10, 100, 100, true);
kafkaProducer.close();
}
}
[3] Consumer Group을 이용한 메시지 소비 확인
이제 이 시뮬레이션 데이터를 소비(Consume)하는 방법을 알아본다. Kafka의 강력한 기능 중 하나인 컨슈머 그룹(Consumer Group)을 활용하여 여러 컨슈머가 작업을 분산 처리하도록 구성할 수 있다.
- 실행 방법:
- 새로운 터미널 창을 3개 연다.
- 각 터미널에서 아래의 동일한 명령어를 실행한다.
kafka-console-consumer --bootstrap-server localhost:9092 --group group_01 --topic pizza-topic \
--property print.key=true --property print.value=true \
--property print.partition=true
- --group group_01: 모든 컨슈머가 group_01이라는 동일한 그룹에 속하도록 지정한다.
- --property print.partition=true: 메시지가 어느 파티션에서 왔는지 함께 출력한다.
이렇게 여러 컨슈머를 동일한 그룹으로 묶어 실행하면, Kafka는 토픽의 파티션들을 각 컨슈머에게 자동으로 분배(Rebalance)한다. 결과적으로 하나의 컨슈머가 모든 메시지를 처리하는 대신, 여러 컨슈머가 작업을 나누어 처리하므로 전체 처리량을 높일 수 있다. 각 터미널에 서로 다른 파티션의 로그가 출력되는 것을 확인할 수 있다.

7. Producer의 acks 설정 관련 실습
package com.example.kafka;
import com.github.javafaker.Faker;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;
/**
* If you wanna make KafkaProducer, ProducerRecord to Integer,
* you have to declare serializer.class with IntegerSerializer.class
*/
// KafkaProducer Configuration Settings
public class PizzaProducer {
public static final Logger logger = LoggerFactory.getLogger(PizzaProducer.class.getName());
public static void sendPizzaMessage(KafkaProducer<String, String> kafkaProducer, String topicName, int iterCount,
int interIntervalMillis, int intervalMillis, int intervalCount, Boolean sync) {
PizzaMessage pizzaMessage = new PizzaMessage();
int iterSeq = 0;
// seed값을 고정하여 Random 객체와 Faker 객체를 생성.
long seed = 2022;
Random random = new Random(seed);
Faker faker = Faker.instance(random);
while (iterSeq != iterCount) {
HashMap<String, String> pMessage = pizzaMessage.produce_msg(faker, random, iterSeq);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, pMessage.get("key"), pMessage.get("message"));
sendMessage(kafkaProducer, producerRecord, pMessage, sync);
if ((intervalCount > 0) && (iterSeq % intervalCount == 0)) {
try {
logger.info("###### intervalCount:" + intervalCount + " intervalMillis" + intervalMillis + "######");
Thread.sleep(intervalMillis);
} catch (InterruptedException e) {
logger.error(e.getMessage());
}
}
if (interIntervalMillis > 0) {
try {
logger.info("interIntervalMillis: " + interIntervalMillis);
Thread.sleep(interIntervalMillis);
} catch (InterruptedException e) {
logger.error(e.getMessage());
}
}
}
}
// 동기, 비동기 분기처리
public static void sendMessage(KafkaProducer<String, String> kafkaProducer, ProducerRecord<String, String> producerRecord,
HashMap<String, String> pMessage, boolean sync) {
if (!sync) { // ASYNC (비동기)
kafkaProducer.send(producerRecord, (metadata, exception) -> {
if (exception == null) {
logger.info("async message: " + pMessage.get("key") + "partition: " + metadata.partition() + "\n" +
"offset: " + metadata.offset());
} else {
logger.error("exception error from broker" + exception.getMessage());
}
});
} else { // SYNC (동기)
try {
RecordMetadata metadata = kafkaProducer.send(producerRecord).get();
logger.info("async message: " + pMessage.get("key") + "partition: " + metadata.partition() + "\n" +
"offset: " + metadata.offset());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}
public static void main(String[] args) {
String topicName = "pizza-topic";
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092"); // [1] bootstrap.servers | WHY server"s"? -> Cause we can use multi brokers(servers)
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [2] key.serializer.class
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [3] value.serializer.class
props.setProperty(ProducerConfig.ACKS_CONFIG, "0");
// Create KafkaProducer Object
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
sendPizzaMessage(kafkaProducer, topicName, -1, 10,
100, 100, false);
kafkaProducer.close();
}
}
8. Producer의 메시지 배치 전송 내부 실습 (파라미터 수정)
[1] batch size와 linger.ms 변경
package com.example.kafka;
import com.github.javafaker.Faker;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;
/**
* If you wanna make KafkaProducer, ProducerRecord to Integer,
* you have to declare serializer.class with IntegerSerializer.class
*/
// KafkaProducer Configuration Settings
public class PizzaProducer {
public static final Logger logger = LoggerFactory.getLogger(PizzaProducer.class.getName());
public static void sendPizzaMessage(KafkaProducer<String, String> kafkaProducer, String topicName, int iterCount,
int interIntervalMillis, int intervalMillis, int intervalCount, Boolean sync) {
PizzaMessage pizzaMessage = new PizzaMessage();
int iterSeq = 0;
// seed값을 고정하여 Random 객체와 Faker 객체를 생성.
long seed = 2022;
Random random = new Random(seed);
Faker faker = Faker.instance(random);
while (iterSeq != iterCount) {
HashMap<String, String> pMessage = pizzaMessage.produce_msg(faker, random, iterSeq);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, pMessage.get("key"), pMessage.get("message"));
sendMessage(kafkaProducer, producerRecord, pMessage, sync);
if ((intervalCount > 0) && (iterSeq % intervalCount == 0)) {
try {
logger.info("###### intervalCount:" + intervalCount + " intervalMillis" + intervalMillis + "######");
Thread.sleep(intervalMillis);
} catch (InterruptedException e) {
logger.error(e.getMessage());
}
}
if (interIntervalMillis > 0) {
try {
logger.info("interIntervalMillis: " + interIntervalMillis);
Thread.sleep(interIntervalMillis);
} catch (InterruptedException e) {
logger.error(e.getMessage());
}
}
}
}
// 동기, 비동기 분기처리
public static void sendMessage(KafkaProducer<String, String> kafkaProducer, ProducerRecord<String, String> producerRecord,
HashMap<String, String> pMessage, boolean sync) {
if (!sync) { // ASYNC (비동기)
kafkaProducer.send(producerRecord, (metadata, exception) -> {
if (exception == null) {
logger.info("async message: " + pMessage.get("key") + "partition: " + metadata.partition() + "\n" +
"offset: " + metadata.offset());
} else {
logger.error("exception error from broker" + exception.getMessage());
}
});
} else { // SYNC (동기)
try {
RecordMetadata metadata = kafkaProducer.send(producerRecord).get();
logger.info("async message: " + pMessage.get("key") + "partition: " + metadata.partition() + "\n" +
"offset: " + metadata.offset());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}
public static void main(String[] args) {
String topicName = "pizza-topic";
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092"); // [1] bootstrap.servers | WHY server"s"? -> Cause we can use multi brokers(servers)
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [2] key.serializer.class
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [3] value.serializer.class
// props.setProperty(ProducerConfig.ACKS_CONFIG, "0"); // [4] props 설정
props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "32000"); // [5] Batch Size 설정
props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20"); // [6] linger.ms 설정
// Create KafkaProducer Object
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
sendPizzaMessage(kafkaProducer, topicName, -1, 10,
100, 100, true);
kafkaProducer.close();
}
}
[2] 결과 확인
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
...
batch.size = 32000
...
linger.ms = 20
...
9. Producer의 재전송 관련 주요 파라미터 설정 실습
[1] PizzaProducer 기본 Config가 적용하여 실행
package com.example.kafka;
import com.github.javafaker.Faker;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;
/**
* If you wanna make KafkaProducer, ProducerRecord to Integer,
* you have to declare serializer.class with IntegerSerializer.class
*/
// KafkaProducer Configuration Settings
public class PizzaProducer {
public static final Logger logger = LoggerFactory.getLogger(PizzaProducer.class.getName());
public static void sendPizzaMessage(KafkaProducer<String, String> kafkaProducer, String topicName, int iterCount,
int interIntervalMillis, int intervalMillis, int intervalCount, Boolean sync) {
PizzaMessage pizzaMessage = new PizzaMessage();
int iterSeq = 0;
// seed값을 고정하여 Random 객체와 Faker 객체를 생성.
long seed = 2022;
Random random = new Random(seed);
Faker faker = Faker.instance(random);
while (iterSeq != iterCount) {
HashMap<String, String> pMessage = pizzaMessage.produce_msg(faker, random, iterSeq);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, pMessage.get("key"), pMessage.get("message"));
sendMessage(kafkaProducer, producerRecord, pMessage, sync);
if ((intervalCount > 0) && (iterSeq % intervalCount == 0)) {
try {
logger.info("###### intervalCount:" + intervalCount + " intervalMillis" + intervalMillis + "######");
Thread.sleep(intervalMillis);
} catch (InterruptedException e) {
logger.error(e.getMessage());
}
}
if (interIntervalMillis > 0) {
try {
logger.info("interIntervalMillis: " + interIntervalMillis);
Thread.sleep(interIntervalMillis);
} catch (InterruptedException e) {
logger.error(e.getMessage());
}
}
}
}
// 동기, 비동기 분기처리
public static void sendMessage(KafkaProducer<String, String> kafkaProducer, ProducerRecord<String, String> producerRecord,
HashMap<String, String> pMessage, boolean sync) {
if (!sync) { // ASYNC (비동기)
kafkaProducer.send(producerRecord, (metadata, exception) -> {
if (exception == null) {
logger.info("async message: " + pMessage.get("key") + "partition: " + metadata.partition() + "\n" +
"offset: " + metadata.offset());
} else {
logger.error("exception error from broker" + exception.getMessage());
}
});
} else { // SYNC (동기)
try {
RecordMetadata metadata = kafkaProducer.send(producerRecord).get();
logger.info("async message: " + pMessage.get("key") + "partition: " + metadata.partition() + "\n" +
"offset: " + metadata.offset());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}
public static void main(String[] args) {
String topicName = "pizza-topic";
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092"); // [1] bootstrap.servers | WHY server"s"? -> Cause we can use multi brokers(servers)
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [2] key.serializer.class
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [3] value.serializer.class
// props.setProperty(ProducerConfig.ACKS_CONFIG, "0"); // [4] props 설정
// props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "32000"); // [5] Batch Size 설정
// props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20"); // [6] linger.ms 설정
// Create KafkaProducer Object
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
sendPizzaMessage(kafkaProducer, topicName, -1, 10,
100, 100, false);
kafkaProducer.close();
}
}
[2] 로그 확인
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
...
delivery.timeout.ms = 120000
...
max.block.ms = 60000
...
request.timeout.ms = 30000
...
retry.backoff.ms = 100
...
retries = 2147483647
...
[3] DELIVRY_TIMEOUT_MS_CONFIG 변경
...
public static void main(String[] args) {
String topicName = "pizza-topic";
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092"); // [1] bootstrap.servers | WHY server"s"? -> Cause we can use multi brokers(servers)
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [2] key.serializer.class
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [3] value.serializer.class
// props.setProperty(ProducerConfig.ACKS_CONFIG, "0"); // [4] props 설정
// props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "32000"); // [5] Batch Size 설정
// props.setProperty(ProducerConfig.LINGER_MS_CONFIG, "20"); // [6] linger.ms 설정
props.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "29000");
// Create KafkaProducer Object
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
sendPizzaMessage(kafkaProducer, topicName, -1, 10,
100, 100, false);
kafkaProducer.close();
}
위처럼 delivery.timeout.ms를 30초가 아니라 29초로 변경하면 어떻게 될까? 아래 결과를 확인해보자. delivery.timeout.ms should be equal to or larger than linger.ms + request.timeout.ms 로그를 확인할 수 있는데, delivery.time.out.ms는 linger.ms와 request.timeout.ms를 합한 값 보다 크거나 같아야 한다는 조건 때문에 오류가 발생한것을 확인할 수 있다.

10. 커스텀 파티셔너 (Custom Partitioner) 실습
Java 코드 작성: Custom Partitioner, PizzaProducerCustomPartitioner
[1] CustomPartitioner.java
package com.example.kafka;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.clients.producer.internals.StickyPartitionCache;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
// 피자 집 중에 특정 피자집은 대형 피자집이라서 따로 파티셔닝이 필요하다는 시나리오에 기반하여 작성
public class CustomPartitioner implements Partitioner {
public static final Logger logger = LoggerFactory.getLogger(CustomPartitioner.class.getName());
private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();
private String specialKeyName;
@Override
public void configure(Map<String, ?> configs) {
specialKeyName = configs.get("custom.specialKey").toString();
}
/**
* byte[] keyBytes, byte[] valueBytes
* ㄴ Serialized된 byte code
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic); // Cluster cluset: Broker들의 정보 -> partition들의 정보 추출
int numPartitions = partitionInfoList.size(); // partition 개수 추출 (5개)
int numSpecialPartitions = (int) (numPartitions * (0.5)); // special topic에 대해서는 5개 중에 2개의 partition 할당 (2개)
int partitionIndex = 0;
if (keyBytes == null) {
// return stickyPartitionCache.partition(topic, cluster);
throw new InvalidRecordException("key shoud not be null");
}
if (((String) key).equals(specialKeyName)) {
// (keyBytes -> valueBytes)
partitionIndex = Utils.toPositive(Utils.murmur2(valueBytes)) % numSpecialPartitions; // 0,1 설정
} else {
// specialKey에 대한 파티셔닝이 0,1로 설정되었기 때문에 specialKey가 아닌 Key에 대해서는 2,3,4로 설정해줘야 함
partitionIndex = Utils.toPositive(Utils.murmur2(keyBytes)) % (numPartitions - numSpecialPartitions) + 2;
}
logger.info("key:{} is sent to partition: {}", key.toString(), partitionIndex);
return partitionIndex;
}
@Override
public void close() {
}
}
[2] PizzaProducerCustomPartitioner.java
package com.example.kafka;
import com.github.javafaker.Faker;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutionException;
/**
* If you wanna make KafkaProducer, ProducerRecord to Integer,
* you have to declare serializer.class with IntegerSerializer.class
*/
// KafkaProducer Configuration Settings
public class PizzaProducerCustomPartitioner {
public static final Logger logger = LoggerFactory.getLogger(PizzaProducerCustomPartitioner.class.getName());
public static void sendPizzaMessage(KafkaProducer<String, String> kafkaProducer, String topicName, int iterCount,
int interIntervalMillis, int intervalMillis, int intervalCount, Boolean sync) {
PizzaMessage pizzaMessage = new PizzaMessage();
int iterSeq = 0;
// seed값을 고정하여 Random 객체와 Faker 객체를 생성.
long seed = 2022;
Random random = new Random(seed);
Faker faker = Faker.instance(random);
while (iterSeq != iterCount) {
HashMap<String, String> pMessage = pizzaMessage.produce_msg(faker, random, iterSeq);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, pMessage.get("key"), pMessage.get("message"));
sendMessage(kafkaProducer, producerRecord, pMessage, sync);
if ((intervalCount > 0) && (iterSeq % intervalCount == 0)) {
try {
logger.info("###### intervalCount:" + intervalCount + " intervalMillis" + intervalMillis + "######");
Thread.sleep(intervalMillis);
} catch (InterruptedException e) {
logger.error(e.getMessage());
}
}
if (interIntervalMillis > 0) {
try {
logger.info("interIntervalMillis: " + interIntervalMillis);
Thread.sleep(interIntervalMillis);
} catch (InterruptedException e) {
logger.error(e.getMessage());
}
}
}
}
// 동기, 비동기 분기처리
public static void sendMessage(KafkaProducer<String, String> kafkaProducer, ProducerRecord<String, String> producerRecord,
HashMap<String, String> pMessage, boolean sync) {
if (!sync) { // ASYNC (비동기)
kafkaProducer.send(producerRecord, (metadata, exception) -> {
if (exception == null) {
logger.info("async message: " + pMessage.get("key") + "partition: " + metadata.partition() + "\n" +
"offset: " + metadata.offset());
} else {
logger.error("exception error from broker" + exception.getMessage());
}
});
} else { // SYNC (동기)
try {
RecordMetadata metadata = kafkaProducer.send(producerRecord).get();
logger.info("async message: " + pMessage.get("key") + "partition: " + metadata.partition() + "\n" +
"offset: " + metadata.offset());
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
}
public static void main(String[] args) {
String topicName = "pizza-topic-partitioner";
Properties props = new Properties();
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092"); // [1] bootstrap.servers | WHY server"s"? -> Cause we can use multi brokers(servers)
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [2] key.serializer.class
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // [3] value.serializer.class
// custom partitioner 등록
props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.example.kafka.CustomPartitioner");
// custom partitioner을 실행시키기 위한 특별 키 등록
props.setProperty("custom.specialKey", "P001");
// Create KafkaProducer Object
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
sendPizzaMessage(kafkaProducer, topicName, -1, 100,
0, 0, true);
kafkaProducer.close();
}
}
Kafka Client Custom Partitioner 적용
1. 5개의 파티션을 가지는 pizza-topic-partitioner 토픽 생성.
kafka-topics --bootstrap-server localhost:9092 --create --topic pizza-topic-partitioner --partitions 5
2. kafka-dump-log 명령어로 파티션별로 메시지 확인하기
kafka-dump-log --deep-iteration --files /home/ubuntu/data/kafka-logs/pizza-topic-partitioner-0/00000000000000000000.log --print-data-log
kafka-dump-log --deep-iteration --files /home/ubuntu/data/kafka-logs/pizza-topic-partitioner-1/00000000000000000000.log --print-data-log
# 1번 Partition 조회해보면 아래처럼 P001(특정 키)만 들어오는것을 알 수 있다.
:403 Carma Vista, time:2025-09-09 12:24:24
baseOffset: 62 lastOffset: 62 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 14798 CreateTime: 1757388272098 size: 241 magic: 2 compresscodec: none crc: 2587438241 isvalid: true
| offset: 62 CreateTime: 1757388272098 keySize: 4 valueSize: 167 sequence: -1 headerKeys: [] key: P001 payload: order_id:ord0, shop:P001, pizza_name:Cheese Pizza, customer_name:Russ Huel PhD, phone_number:120-673-8367 x73988, address:484 Krista Mountain, time:2025-09-09 12:24:32
baseOffset: 63 lastOffset: 63 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 15039 CreateTime: 1757388278807 size: 235 magic: 2 compresscodec: none crc: 2748402471 isvalid: true
| offset: 63 CreateTime: 1757388278807 keySize: 4 valueSize: 161 sequence: -1 headerKeys: [] key: P001 payload: order_id:ord0, shop:P001, pizza_name:Peperoni, customer_name:Saul Rippin, phone_number:1-339-197-5397 x55301, address:122 Numbers Lodge, time:2025-09-09 12:24:38
baseOffset: 64 lastOffset: 64 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 15274 CreateTime: 1757388286258 size: 239 magic: 2 compresscodec: none crc: 630731090 isvalid: true
| offset: 64 CreateTime: 1757388286258 keySize: 4 valueSize: 165 sequence: -1 headerKeys: [] key: P001 payload: order_id:ord0, shop:P001, pizza_name:Cheese Pizza, customer_name:Hans Sipes, phone_number:436-217-8007 x5900, address:626 Clarence Turnpike, time:2025-09-09 12:24:46
# 아래처럼 0번과 1번 파티션을 제외하고
# 2,3,4 파티션 중에 P001을 갖는지 "grep" 명령어로 찾아보면 아무것도 뜨지 않는다.
ubuntu@ubuntu-VirtualBox:~$ kafka-dump-log --deep-iteration --files /home/ubuntu/data/kafka-logs/pizza-topic-partitioner-2/00000000000000000000.log --print-data-log | grep P001
ubuntu@ubuntu-VirtualBox:~$
3. Consumer를 partition 별로 접속하여 확인 (--group 인자 설정 X)
kafka-console-consumer --bootstrap-server localhost:9092 --topic pizza-topic-partitioner \
--property print.key=true --property print.value=true --partition 0

'Kafka > Core' 카테고리의 다른 글
| [ADVANCED #4][실습] Consumer 완전 정복: 기본 컨슈머부터 안전한 종료, 커밋 전략까지 (0) | 2025.09.09 |
|---|---|
| [ADVANCED #3] Consumer 심층 분석: 그룹 코디네이터부터 리밸런싱 전략까지 (0) | 2025.09.09 |
| [ADVANCED #1] Producer 심층 분석: 내부 동작 원리와 고급 설정 (0) | 2025.09.07 |
| [BASIC #8] Kafka: Java 클라이언트 구현 환경 구축 (0) | 2025.09.07 |
| [BASIC #7] Config 구분 및 이해: 카프카 설정의 계층 구조 파악하기 (0) | 2025.09.07 |
