[ADVANCED #5][실습] Producer & Consumer 연동 프로젝트: 파일 기반 주문 데이터를 DB로 저장하기

2025. 9. 11. 12:55·Kafka/Core
해당 구현은 처음부터 완전한 코드를 제공하지 않습니다.
단계별로 구현되며. 중간에 필요한 이론이 있다면 해당 이론을 설명한 뒤
구현을 이어가는 구조입니다.

0. 환경 구성

[1] 모듈 추가 (practice)

 

[2] build.gradle 수정

plugins {
    id 'java'
}

group = 'com.example'
version = '1.0-SNAPSHOT'

repositories {
    mavenCentral()
}

dependencies {
    // https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
    implementation 'org.apache.kafka:kafka-clients:3.1.0'
    // https://mvnrepository.com/artifact/org.slf4j/slf4j-api
    implementation 'org.slf4j:slf4j-api:2.0.17'
    // https://mvnrepository.com/artifact/org.slf4j/slf4j-simple
    implementation 'org.slf4j:slf4j-simple:2.0.17'
    // https://mvnrepository.com/artifact/com.github.javafaker/javafaker
    implementation 'com.github.javafaker:javafaker:1.0.2'
}

test {
    useJUnitPlatform()
}

1. Producer

[1] 샘플 데이터

practice/src/main/resources/pizza_sample.txt

pizza_sample.txt
0.10MB

 

반드시 1000줄 뒤에 Enter를 눌러서 비어있는 1001 행으로 만들어야 한다.

 

[2] Producer 코드 작성

 

practice/src/main/java/com/practice/kafka/producer/FileProducer.java

package com.practice.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;

public class FileProducer {

    public static final Logger logger = LoggerFactory.getLogger(FileProducer.class.getName());

    public static void main(String[] args) {

        String topicName = "file-topic"; // ✅ 토픽 이름
        String filePath = "C:\\0. inflearn\\kafka-proj1\\practice\\src\\main\\resources\\pizza_sample.txt"; // ✅ 파일 경로

        Properties props = new Properties();

        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // ✅ kafkaProducer 객체 생성 -> ProducerRecord 생성 -> kafkaProducer.send() 비동기 방식 전송 -> kafkaProducer.close();
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);
        sendFileMessages(kafkaProducer, topicName, filePath);

        kafkaProducer.close();

    }

    private static void sendFileMessages(KafkaProducer<String, String> kafkaProducer, String topicName, String filePath) {

        String line = ""; // ✅ 라인 변수 선언
        final String delimiter = ",";

        try {
            FileReader fileReader = new FileReader(filePath); // ✅ 파일 리더 객체 생성
            BufferedReader bufferedReader = new BufferedReader(fileReader); // ✅ 버퍼 리더 객체 생성

            // ✅ 1000번 반복
            while ( (line = bufferedReader.readLine()) != null ) {
                String[] tokens = line.split(delimiter); // ✅ ","을 기준으로 split
                String key = tokens[0]; // ✅ 키 추출 (P001)
                StringBuffer value = new StringBuffer();

                for (int i=1; i<tokens.length; i++) {
                    if (i != (tokens.length - 1)) { // ✅ 마지막에는 ","을 안붙이기 위한 if문 처리
                        value.append(tokens[i] + ",");
                    } else {
                        value.append(tokens[i]);
                    }
                }
                sendMessages(kafkaProducer, topicName, key, value.toString()); // ✅ Serialize가 String 값이라서, StringBuffer 형태의 value를 toString()으로 전환
            }
        } catch (IOException e) {
            logger.info(e.getMessage());
        }
    }

    private static void sendMessages(KafkaProducer<String, String> kafkaProducer, String topicName, String key, String value) {

        // ✅ key, value 기반으로 ProducerRecord 객체 생성
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, key, value);
        logger.info("key={}, value={}", key, value);

        // ✅ 비동기(콜백) 전송
        kafkaProducer.send(producerRecord, (metadata, exception) -> {
            if (exception == null) {
                logger.info("\n ###### record metadata received ##### \n" +
                        "partition: " + metadata.partition() + "\n" +
                        "offset: " + metadata.offset() + "\n" +
                        "timestamp: " + metadata.timestamp());
            } else {
                logger.error("exception error from broker" + exception.getMessage());
            }
        });
    }

}

 

[3] File에 텍스트가 추가될 때 마다 Producer 메시지 전송 개요

 

파일을 모니터링 하는 Thread (Monitoring Thread) 추가한다. 만약에 파일 내용(pizza_sample.txt)에 파일이 추가된다면, 이는 일종의 Event이다. 따라서 파일 내용을 추가된다는 Event를 감지할 수 있는 EventHandler를 만든다.

 

앞으로는 만약에 EventHandler가 Event를 인식하면 onMessage() 메서드를 호출하게 되고 그 onMessage()는 내부적으로는 KafkaProducer.send()를 호출하게 되는 구조로 돌아갈거임

EventHandler를 도입한 이유는 추가되는 데이터가 File 형식일수도 있고 Stream 형식일 수도 있어서 좀 더 범용성 있게 처리하기 위해 추가한거임.

 

 

앞으로 구현해야 할 내용을 더 자세히 보자면, 위의 사진과 같다.

1) FileEventSource를 추가한다. 인자로는 FileEventHandler를 받으며, 파일을 계속 모니터링 하다가 파일이 추가되면 FileEventHandler를 호출한다.

2) 더 정확히는 EventHandler가 파일 내용 추가 이벤트를 감지하면, onMessage() 메서드를 호출하게 되는데, 여기서의 onMessage()는 EventHandler를 implements한 FileEventHandler의 onMessage() 메서드를 호출하게 된다.

3) 이후로는 onMessage() 내부적으로 KafkaProducer.send()를 호출하게 된다.

 

1) practice/src/main/java/com/practice/kafka/producer/FileAppendProducer.java

package com.practice.kafka.producer;

import com.practice.kafka.event.EventHandler;
import com.practice.kafka.event.FileEventHandler;
import com.practice.kafka.event.FileEventSource;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.util.Properties;

public class FileAppendProducer {
    public static final Logger logger = LoggerFactory.getLogger(FileAppendProducer.class.getName());
    public static void main(String[] args) {
        String topicName = "file-topic";

        //KafkaProducer configuration setting
        // null, "hello world"

        Properties props  = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //KafkaProducer object creation
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
        boolean sync = false;

        File file = new File("C:\\0. inflearn\\kafka-proj1\\practice\\src\\main\\resources\\pizza_append.txt");
        EventHandler eventHandler = new FileEventHandler(kafkaProducer, topicName, sync);
        FileEventSource fileEventSource = new FileEventSource(100, file, eventHandler);
        Thread fileEventSourceThread = new Thread(fileEventSource);
        fileEventSourceThread.start();

        try {
            fileEventSourceThread.join();
        }catch(InterruptedException e) {
            logger.error(e.getMessage());
        }finally {
            kafkaProducer.close();
        }

    }
}

 

2) practice/src/main/java/com/practice/kafka/event/EventHandler.java

package com.practice.kafka.event;

import java.util.concurrent.ExecutionException;

public interface EventHandler {
    void onMessage(MessageEvent messageEvent) throws InterruptedException, ExecutionException;
}

 

3) practice/src/main/java/com/practice/kafka/event/FileEventHandler

package com.practice.kafka.event;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class FileEventHandler implements EventHandler{

    public static final Logger logger = LoggerFactory.getLogger(FileEventHandler.class.getName());

    private KafkaProducer<String, String> kafkaProducer;
    private String topicName;
    private boolean sync;

    public FileEventHandler(KafkaProducer<String, String> kafkaProducer, String topicName, boolean sync) {
        this.kafkaProducer = kafkaProducer;
        this.topicName = topicName;
        this.sync = sync;
    }

    @Override
    public void onMessage(MessageEvent messageEvent) throws InterruptedException, ExecutionException {

        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(this.topicName, messageEvent.getKey(), messageEvent.getValue());

        if (this.sync) { // ✅ [1] SYNC
            RecordMetadata recordMetadata = this.kafkaProducer.send(producerRecord).get();
            logger.info("\n ###### record metadata received ##### \n" +
                    "partition: " + recordMetadata.partition() + "\n" +
                    "offset: " + recordMetadata.offset() + "\n" +
                    "timestamp: " + recordMetadata.timestamp());
        } else { // ✅ [2] ASYNC
            kafkaProducer.send(producerRecord, (metadata, exception) -> {
                if (exception == null) {
                    logger.info("\n ###### record metadata received ##### \n" +
                            "partition: " + metadata.partition() + "\n" +
                            "offset: " + metadata.offset() + "\n" +
                            "timestamp: " + metadata.timestamp());
                } else {
                    logger.error("exception error from broker" + exception.getMessage());
                }
            });
        }
    }

    //FileEventHandler가 제대로 생성되었는지 확인을 위해 직접 수행.
    public static void main(String[] args) throws Exception {
        String topicName = "file-topic";

        Properties props  = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
        boolean sync = true;

        FileEventHandler fileEventHandler = new FileEventHandler(kafkaProducer, topicName, sync);
        MessageEvent messageEvent = new MessageEvent("key00001", "this is test message");
        fileEventHandler.onMessage(messageEvent);
    }

}

 

 

4) practice/src/main/java/com/practice/kafka/event/MessageEvent

package com.practice.kafka.event;

public class MessageEvent {

    public String key;

    public String value;

    public MessageEvent(String key, String value) {
        this.key = key;
        this.value = value;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }
}


5) practice/src/main/java/com/practice/kafka/event/FileEventSource

package com.practice.kafka.event;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.ExecutionException;

// 파일에 내용이 추가되었는지 Thread로 모니터링하면서 내용 추가된 경우 EventHandler를 호출하여 Producer로 해당 메시지 전송
// Thread로 수행되므로 Runnable Interface 구현.
public class FileEventSource implements Runnable {
    public static final Logger logger = LoggerFactory.getLogger(FileEventSource.class.getName());

    //주기적으로 file을 모니터할 기간. ms 단위.
    private long updateInterval;

    //모니터할 File 객체
    private File file;

    //file에 변경사항이 발생했을 때 Producer를 이용하여 메시지를 전송하는 EventHandler
    private EventHandler eventHandler;
    private boolean sync;

    private long filePointer = 0;
    public boolean keepRunning = true;

    public FileEventSource(long updateInterval, File file, EventHandler eventHandler) {
        this.updateInterval = updateInterval;
        this.file = file;
        this.eventHandler = eventHandler;
    }

    //무한 반복 수행하되, updateInterval 동안 sleep하면서 파일에 내용이 추가되었는지 모니터링 후 메시지 전송.
    @Override
    public void run() {
        try {
            //this.keepRunning은 Main Thread가 종료시 false로 변경 되도록 Main 프로그램 수정.
            while(this.keepRunning) {
                //생성자에 입력된 updateInterval ms 동안 sleep
                Thread.sleep(this.updateInterval);
                //file의 크기를 계산.
                long len = this.file.length();
                //만약 최종 filePointer가 현재 file의 length보다 작다면 file이 초기화 되었음.
                if (len < this.filePointer) {
                    logger.info("log file was reset as filePointer is longer than file length");
                    //최종 filePointer를 file의 length로 설정.
                    filePointer = len;
                    // 만약 file의 length가 최종 filePointer보다 크다면 file이 append 되었음.
                } else if (len > this.filePointer){
                    //최종 filePointer에서 맨 마지막 파일까지 한 라인씩 읽고 이를 Producer에서 메시지로 전송.
                    readAppendAndSend();
                    // 만약 file의 length와 filePointer가 같다면 file에 변화가 없음.
                } else {
                    continue;
                }
            }
        } catch(InterruptedException e) {
            logger.error(e.getMessage());
        } catch(ExecutionException e) {
            logger.error(e.getMessage());
        } catch(Exception e) {
            logger.error(e.getMessage());
        }

    }

    //최종 filePointer에서 맨 마지막 파일까지 한 라인씩 읽고 sendMessage()를 이용하여 메시지로 전송
    public void readAppendAndSend() throws IOException, ExecutionException, InterruptedException {
        RandomAccessFile raf = new RandomAccessFile(this.file, "r");
        raf.seek(this.filePointer);
        String line = null;

        while((line = raf.readLine()) != null) {

            sendMessage(line);
        }
        //file이 변경되었으므로  file의 filePointer를 현재 file의 마지막으로 재 설정함.
        this.filePointer = raf.getFilePointer();
    }

    //한 라인을 parsing하여 key와 value로 MessageEvent를 만들고 이를 EventHandler를 이용하여 Producer 전송.
    private void sendMessage(String line) throws ExecutionException, InterruptedException {
        String[] tokens = line.split(",");
        String key = tokens[0];
        StringBuffer value = new StringBuffer();

        for(int i=1; i<tokens.length; i++) {
            if( i != (tokens.length -1)) {
                value.append(tokens[i] + ",");
            } else {
                value.append(tokens[i]);
            }
        }
        MessageEvent messageEvent = new MessageEvent(key, value.toString());
        eventHandler.onMessage(messageEvent);
    }

}

 

6) practice/src/main/java/com/practice/kafka/event/FileUtilAppend

package com.practice.kafka.producer;

import com.github.javafaker.Faker;

import java.io.*;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Random;

public class FileUtilAppend {
    // 피자 메뉴를 설정. getRandomValueFromList()에서 임의의 피자명을 출력하는 데 사용.
    private static final List<String> pizzaNames = List.of("Potato Pizza", "Cheese Pizza",
            "Cheese Garlic Pizza", "Super Supreme", "Peperoni");
//    private static final List<String> pizzaNames = List.of("고구마 피자", "치즈 피자",
//            "치즈 갈릭 피자", "슈퍼 슈프림", "페페로니 피자");

    // 피자 가게명을 설정. getRandomValueFromList()에서 임의의 피자 가게명을 출력하는데 사용.
    private static final List<String> pizzaShop = List.of("A001", "B001", "C001",
            "D001", "E001", "F001", "G001", "H001", "I001", "J001", "K001", "L001", "M001", "N001",
            "O001", "P001", "Q001");

    private static int orderSeq = 5000;
    public FileUtilAppend() {}

    //인자로 피자명 또는 피자가게 List와 Random 객체를 입력 받아서 random한 피자명 또는 피자 가게 명을 반환.
    private String getRandomValueFromList(List<String> list, Random random) {
        int size = list.size();
        int index = random.nextInt(size);

        return list.get(index);
    }

    //random한 피자 메시지를 생성하고, 피자가게 명을 key로 나머지 정보를 value로 하여 Hashmap을 생성하여 반환.
    public HashMap<String, String> produce_msg(Faker faker, Random random, int id) {

        String shopId = getRandomValueFromList(pizzaShop, random);
        String pizzaName = getRandomValueFromList(pizzaNames, random);

        String ordId = "ord"+id;
        String customerName = faker.name().fullName();
        String phoneNumber = faker.phoneNumber().phoneNumber();
        String address = faker.address().streetAddress();
        LocalDateTime now = LocalDateTime.now();
        String message = String.format("%s, %s, %s, %s, %s, %s, %s"
                , ordId, shopId, pizzaName, customerName, phoneNumber, address
                , now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss", Locale.KOREAN)));
        //System.out.println(message);
        HashMap<String, String> messageMap = new HashMap<>();
        messageMap.put("key", shopId);
        messageMap.put("message", message);

        return messageMap;
    }

    public void writeMessage(String filePath, Faker faker, Random random) {
        try {
            File file = new File(filePath);
            if(!file.exists()) {
                file.createNewFile();
            }

            FileWriter fileWriter = new FileWriter(file, true);
            BufferedWriter bufferedWriter = new BufferedWriter(fileWriter);
            PrintWriter printWriter = new PrintWriter(bufferedWriter);

            for(int i=0; i < 50; i++) {
                HashMap<String, String> message = produce_msg(faker, random, orderSeq++);
                printWriter.println(message.get("key")+"," + message.get("message"));
            }
            printWriter.close();

        } catch(IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        FileUtilAppend fileUtilAppend = new FileUtilAppend();
        // seed값을 고정하여 Random 객체와 Faker 객체를 생성.
        long seed = 2022;
        Random random = new Random(seed);
        Faker faker = Faker.instance(random);
        //여러분의 절대경로 위치로 변경해 주세요.
        String filePath = "C:\\0. inflearn\\kafka-proj1\\practice\\src\\main\\resources\\pizza_append.txt";
        // 100회 반복 수행. (총 5000개의 행 추가)
        for(int i=0; i<1000; i++) {
            //50 라인의 주문 문자열을 출력
            fileUtilAppend.writeMessage(filePath, faker, random);
            System.out.println("###### iteration:"+i+" file write is done");
            try {
                //주어진 기간동안 sleep
                Thread.sleep(20000);
            }catch(InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

 

[4] 실행

1) kafka console consumer 실행

kafka-console-consumer --bootstrap-server localhost:9092 --group group-file --topic file-topic --property print.key=true --property print.value=true --from-beginning

 

2) FileAppendProducer 실행 및 consumer console 확인

 

3) FileUtilAppend 실행 및 consumer console 확인


2. Basic Consumer

[1] Java 코드 작성

1) practice/src/main/java/com/practice/kafka/consumer/BaseConsumer.java

package com.practice.kafka.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.Properties;

public class BaseConsumer<K extends Serializable, V extends Serializable> {
    public static final Logger logger = LoggerFactory.getLogger(BaseConsumer.class.getName());

    private KafkaConsumer<K, V> kafkaConsumer;
    private List<String> topics;

    public BaseConsumer(Properties consumerProps, List<String> topics) {
        this.kafkaConsumer = new KafkaConsumer<K, V>(consumerProps);
        this.topics = topics;
    }

    public void initConsumer() {
        this.kafkaConsumer.subscribe(this.topics);
        shutdownHookToRuntime(this.kafkaConsumer);
    }

    private void shutdownHookToRuntime(KafkaConsumer<K, V> kafkaConsumer) {
        //main thread
        Thread mainThread = Thread.currentThread();

        //main thread 종료시 별도의 thread로 KafkaConsumer wakeup()메소드를 호출하게 함.
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                logger.info(" main program starts to exit by calling wakeup");
                kafkaConsumer.wakeup();

                try {
                    mainThread.join();
                } catch(InterruptedException e) { e.printStackTrace();}
            }
        });

    }

    private void processRecord(ConsumerRecord<K, V> record) {
        logger.info("record key:{},  partition:{}, record offset:{} record value:{}",
                record.key(), record.partition(), record.offset(), record.value());
    }

    private void processRecords(ConsumerRecords<K, V> records) {
        records.forEach(record -> processRecord(record));
    }


    public void pollConsumes(long durationMillis, String commitMode) {
        try {
            while (true) {
                if (commitMode.equals("sync")) {
                    pollCommitSync(durationMillis);
                } else {
                    pollCommitAsync(durationMillis);
                }
            }
        }catch(WakeupException e) {
            logger.error("wakeup exception has been called");
        }catch(Exception e) {
            logger.error(e.getMessage());
        }finally {
            logger.info("##### commit sync before closing");
            kafkaConsumer.commitSync();
            logger.info("finally consumer is closing");
            closeConsumer();
        }
    }

    private void pollCommitAsync(long durationMillis) throws WakeupException, Exception {
        ConsumerRecords<K, V> consumerRecords = this.kafkaConsumer.poll(Duration.ofMillis(durationMillis));
        processRecords(consumerRecords);
        this.kafkaConsumer.commitAsync( (offsets, exception) -> {
            if(exception != null) {
                logger.error("offsets {} is not completed, error:{}", offsets, exception.getMessage());
            }

        });

    }
    private void pollCommitSync(long durationMillis) throws WakeupException, Exception {
        ConsumerRecords<K, V> consumerRecords = this.kafkaConsumer.poll(Duration.ofMillis(durationMillis));
        processRecords(consumerRecords);
        try {
            if(consumerRecords.count() > 0 ) {
                this.kafkaConsumer.commitSync();
                logger.info("commit sync has been called");
            }
        } catch(CommitFailedException e) {
            logger.error(e.getMessage());
        }
    }

    public void closeConsumer() {
        this.kafkaConsumer.close();
    }

    public static void main(String[] args) {
        String topicName = "file-topic";

        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "file-group");
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        BaseConsumer<String, String> baseConsumer = new BaseConsumer<String, String>(props, List.of(topicName));
        baseConsumer.initConsumer();
        String commitMode = "async";

        baseConsumer.pollConsumes(100, commitMode);
        baseConsumer.closeConsumer();

    }


}

 

[2] 실행

1) FileAppendProducer 실행

2) FileUtilAppend 실행

3) BaseConsumer 실행


3. FileToDBConsumer

위에서 proccessRecord나 processRecords는 단순 Log만 출력하는 형태였지만, 실제 Application에서는 DB를 저장하는 경우가 대부분이다. 따라서 이번 Consumer는 단순 Log 출력이 아니라 DB에 저장하는것까지 포함한다.

[1] ubuntu에 postgreSQL 설치 및 설정

1) postgreSQL 설치

sudo apt install postgresql postgresql-client

2) 그 외 명령어

# postgreSQL 상태 확인
sudo systemctl status postgresql
# postgreSQL 종료
sudo systemctl stop postgresql
# postgreSQL 시작
sudo systemctl start postgresql
# postgreSQL 재시작
sudo systemctl restart postgresql

3) 권한 설정

ubuntu@ubuntu-VirtualBox:~$ sudo su - postgres
postgres@ubuntu-VirtualBox:~$ psql -c "alter user postgres with password 'postgres'"
postgres@ubuntu-VirtualBox:~$ exit

4) conf 파일 수정

ubuntu@ubuntu-VirtualBox:~$ cd /etc/postgresql/12/main
postgres@ubuntu-VirtualBox:/etc/postgresql/12/main$ vi postgresql.conf

 

postgres@ubuntu-VirtualBox:/etc/postgresql/12/main$ vi pg_hba.conf

 

postgres@ubuntu-VirtualBox:/etc/postgresql/12/main$ exit
ubuntu@ubuntu-VirtualBox:/etc/postgresql/12/main$ sudo systemctl restart postgresql
ubuntu@ubuntu-VirtualBox:/etc/postgresql/12/main$ sudo su - postgres
postgres@ubuntu-VirtualBox:~$ psql

[2] Java와 postgreSQL 연동 확인

1) build.gradle에 postgreSQL 추가

// 버전은 최신버전으로 바꾸셔도 됩니다.
implementation 'org.postgresql:postgresql:42.4.0'

 

2) 테스트 용 JAVA 코드 작성 및 실행

package com.practice.kafka.consumer;

import java.sql.*;

public class JDBCTester {
    public static void main(String[] args) {

        Connection conn = null;
        Statement st = null;
        ResultSet rs = null;


        String url = "jdbc:postgresql://192.168.56.101:5432/postgres";
        String user = "postgres";
        String password = "postgres";
        try {
            conn = DriverManager.getConnection(url, user, password);
            st = conn.createStatement();
            rs = st.executeQuery("SELECT 'postgresql is connected' ");

            if (rs.next())
                System.out.println(rs.getString(1));
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            try {
                rs.close();
                st.close();
                conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}

연결 완료

[3] Java 코드 작성 

1) practice/src/main/java/com/practice/kafka/consumer/FileToDBConsumer.java

package com.practice.kafka.consumer;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Serializable;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class FileToDBConsumer<K extends Serializable, V extends Serializable> {
    public static final Logger logger = LoggerFactory.getLogger(FileToDBConsumer.class.getName());
    protected KafkaConsumer<K, V> kafkaConsumer;
    protected List<String> topics;

    private OrderDBHandler orderDBHandler;
    public FileToDBConsumer(Properties consumerProps, List<String> topics,
                            OrderDBHandler orderDBHandler) {
        this.kafkaConsumer = new KafkaConsumer<K, V>(consumerProps);
        this.topics = topics;
        this.orderDBHandler = orderDBHandler;
    }
    public void initConsumer() {
        this.kafkaConsumer.subscribe(this.topics);
        shutdownHookToRuntime(this.kafkaConsumer);
    }

    private void shutdownHookToRuntime(KafkaConsumer<K, V> kafkaConsumer) {
        //main thread
        Thread mainThread = Thread.currentThread();

        //main thread 종료시 별도의 thread로 KafkaConsumer wakeup()메소드를 호출하게 함.
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                logger.info(" main program starts to exit by calling wakeup");
                kafkaConsumer.wakeup();

                try {
                    mainThread.join();
                } catch(InterruptedException e) { e.printStackTrace();}
            }
        });

    }

    private void processRecord(ConsumerRecord<K, V> record) throws Exception {
        OrderDTO orderDTO = makeOrderDTO(record);
        orderDBHandler.insertOrder(orderDTO);
    }

    private OrderDTO makeOrderDTO(ConsumerRecord<K,V> record) throws Exception {
        String messageValue = (String)record.value();
        logger.info("###### messageValue:" + messageValue);
        String[] tokens = messageValue.split(",");
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        OrderDTO orderDTO = new OrderDTO(tokens[0], tokens[1], tokens[2], tokens[3],
                tokens[4], tokens[5], LocalDateTime.parse(tokens[6].trim(), formatter));

        return orderDTO;
    }


    private void processRecords(ConsumerRecords<K, V> records) throws Exception{
        List<OrderDTO> orders = makeOrders(records);
        orderDBHandler.insertOrders(orders);
    }

    private List<OrderDTO> makeOrders(ConsumerRecords<K,V> records) throws Exception {
        List<OrderDTO> orders = new ArrayList<>();
        //records.forEach(record -> orders.add(makeOrderDTO(record)));
        for(ConsumerRecord<K, V> record : records) {
            OrderDTO orderDTO = makeOrderDTO(record);
            orders.add(orderDTO);
        }
        return orders;
    }


    public void pollConsumes(long durationMillis, String commitMode) {
        if (commitMode.equals("sync")) {
            pollCommitSync(durationMillis);
        } else {
            pollCommitAsync(durationMillis);
        }
    }
    private void pollCommitAsync(long durationMillis) {
        try {
            while (true) {
                ConsumerRecords<K, V> consumerRecords = this.kafkaConsumer.poll(Duration.ofMillis(durationMillis));
                logger.info("consumerRecords count:" + consumerRecords.count());
                if(consumerRecords.count() > 0) {
                    try {
                        processRecords(consumerRecords);
                    } catch(Exception e) {
                        logger.error(e.getMessage());
                    }
                }
//                if(consumerRecords.count() > 0) {
//                    for (ConsumerRecord<K, V> consumerRecord : consumerRecords) {
//                        processRecord(consumerRecord);
//                    }
//                }
                //commitAsync의 OffsetCommitCallback을 lambda 형식으로 변경
                this.kafkaConsumer.commitAsync((offsets, exception) -> {
                    if (exception != null) {
                        logger.error("offsets {} is not completed, error:{}", offsets, exception.getMessage());
                    }
                });
            }
        }catch(WakeupException e) {
            logger.error("wakeup exception has been called");
        }catch(Exception e) {
            logger.error(e.getMessage());
        }finally {
            logger.info("##### commit sync before closing");
            kafkaConsumer.commitSync();
            logger.info("finally consumer is closing");
            close();
        }
    }

    protected void pollCommitSync(long durationMillis) {
        try {
            while (true) {
                ConsumerRecords<K, V> consumerRecords = this.kafkaConsumer.poll(Duration.ofMillis(durationMillis));
                processRecords(consumerRecords);
                try {
                    if (consumerRecords.count() > 0) {
                        this.kafkaConsumer.commitSync();
                        logger.info("commit sync has been called");
                    }
                } catch (CommitFailedException e) {
                    logger.error(e.getMessage());
                }
            }
        }catch(WakeupException e) {
            logger.error("wakeup exception has been called");
        }catch(Exception e) {
            logger.error(e.getMessage());
        }finally {
            logger.info("##### commit sync before closing");
            kafkaConsumer.commitSync();
            logger.info("finally consumer is closing");
            close();
        }
    }
    protected void close() {
        this.kafkaConsumer.close();
        this.orderDBHandler.close();
    }

    public static void main(String[] args) {
        String topicName = "file-topic";

        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.101:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "file-group");
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        String url = "jdbc:postgresql://192.168.56.101:5432/postgres";
        String user = "postgres";
        String password = "postgres";
        OrderDBHandler orderDBHandler = new OrderDBHandler(url, user, password);

        FileToDBConsumer<String, String> fileToDBConsumer = new
                FileToDBConsumer<String, String>(props, List.of(topicName), orderDBHandler);
        fileToDBConsumer.initConsumer();
        String commitMode = "async";

        fileToDBConsumer.pollConsumes(1000, commitMode);

    }

}

 

2) practice/src/main/java/com/practice/kafka/consumer/OrderDBHandler.java

package com.practice.kafka.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.time.LocalDateTime;
import java.util.List;

public class OrderDBHandler {
    public static final Logger logger = LoggerFactory.getLogger(OrderDBHandler.class.getName());
    private Connection connection = null;
    private PreparedStatement insertPrepared = null;
    private static final String INSERT_ORDER_SQL = "INSERT INTO public.orders " +
            "(ord_id, shop_id, menu_name, user_name, phone_number, address, order_time) "+
            "values (?, ?, ?, ?, ?, ?, ?)";

    public OrderDBHandler(String url, String user, String password) {
        try {
            this.connection = DriverManager.getConnection(url, user, password);
            this.insertPrepared = this.connection.prepareStatement(INSERT_ORDER_SQL);
        } catch(SQLException e) {
            logger.error(e.getMessage());
        }

    }

    public void insertOrder(OrderDTO orderDTO)  {
        try {
            PreparedStatement pstmt = this.connection.prepareStatement(INSERT_ORDER_SQL);
            pstmt.setString(1, orderDTO.orderId);
            pstmt.setString(2, orderDTO.shopId);
            pstmt.setString(3, orderDTO.menuName);
            pstmt.setString(4, orderDTO.userName);
            pstmt.setString(5, orderDTO.phoneNumber);
            pstmt.setString(6, orderDTO.address);
            pstmt.setTimestamp(7, Timestamp.valueOf(orderDTO.orderTime));

            pstmt.executeUpdate();
        } catch(SQLException e) {
            logger.error(e.getMessage());
        }

    }

    public void insertOrders(List<OrderDTO> orders) {
        try {
            PreparedStatement pstmt = this.connection.prepareStatement(INSERT_ORDER_SQL);
            for(OrderDTO orderDTO : orders) {
                pstmt.setString(1, orderDTO.orderId);
                pstmt.setString(2, orderDTO.shopId);
                pstmt.setString(3, orderDTO.menuName);
                pstmt.setString(4, orderDTO.userName);
                pstmt.setString(5, orderDTO.phoneNumber);
                pstmt.setString(6, orderDTO.address);
                pstmt.setTimestamp(7, Timestamp.valueOf(orderDTO.orderTime));

                pstmt.addBatch();
            }
            pstmt.executeUpdate();

        } catch(SQLException e) {
            logger.info(e.getMessage());
        }

    }

    public void close()
    {
        try {
            logger.info("###### OrderDBHandler is closing");
            this.insertPrepared.close();
            this.connection.close();
        }catch(SQLException e) {
            logger.error(e.getMessage());
        }
    }

    public static void main(String[] args) {
        String url = "jdbc:postgresql://192.168.56.101:5432/postgres";
        String user = "postgres";
        String password = "postgres";
        OrderDBHandler orderDBHandler = new OrderDBHandler(url, user, password);

        LocalDateTime now = LocalDateTime.now();
        OrderDTO orderDTO = new OrderDTO("ord001", "test_shop", "test_menu",
                "test_user", "test_phone", "test_address",
                now);

        orderDBHandler.insertOrder(orderDTO);
        orderDBHandler.close();
    }


}

 

3) practice/src/main/java/com/practice/kafka/consumer/OrderDTO.java

package com.practice.kafka.consumer;

import java.time.LocalDateTime;

public class OrderDTO {
    public String orderId;
    public String shopId;
    public String menuName;
    public String userName;
    public String phoneNumber;
    public String address;
    public LocalDateTime orderTime;

    public OrderDTO(String orderId, String shopId, String menuName, String userName,
                    String phoneNumber, String address, LocalDateTime orderTime) {
        this.orderId = orderId;
        this.shopId = shopId;
        this.menuName = menuName;
        this.userName = userName;
        this.phoneNumber = phoneNumber;
        this.address = address;
        this.orderTime = orderTime;
    }
}

 

[4] 실행 결과 확인

1) OrderDBHandler(테스트 main 코드) 실행 후 결과 확인 (데이터 한건 DB에 저장되는지 확인)

 

 

2) FileUtilAppend 실행 (파일에 데이터 축적 시키는지 확인)

 

3) FileAppendProducer 실행 (파일에 축적되는 데이터 브로커로 전달 중 확인)

 

4) FileToDBConsumer 실행 (데이터 받아오는지 확인 후 postgres에서 저장된 데이터 양 확인)

'Kafka > Core' 카테고리의 다른 글

[ADVANCED #7][실습] 멀티 브로커 클러스터 구축: 직접 띄우고 확인하는 리플리케이션과 장애 복구  (0) 2025.09.12
[ADVANCED #6] 클러스터 운영의 핵심: 리플리케이션, ISR, 그리고 리더 선출의 모든 것  (0) 2025.09.11
[ADVANCED #4][실습] Consumer 완전 정복: 기본 컨슈머부터 안전한 종료, 커밋 전략까지  (0) 2025.09.09
[ADVANCED #3] Consumer 심층 분석: 그룹 코디네이터부터 리밸런싱 전략까지  (0) 2025.09.09
[ADVANCED #2][실습] Producer 완전 정복: 기초부터 고급 설정  (0) 2025.09.08
'Kafka/Core' 카테고리의 다른 글
  • [ADVANCED #7][실습] 멀티 브로커 클러스터 구축: 직접 띄우고 확인하는 리플리케이션과 장애 복구
  • [ADVANCED #6] 클러스터 운영의 핵심: 리플리케이션, ISR, 그리고 리더 선출의 모든 것
  • [ADVANCED #4][실습] Consumer 완전 정복: 기본 컨슈머부터 안전한 종료, 커밋 전략까지
  • [ADVANCED #3] Consumer 심층 분석: 그룹 코디네이터부터 리밸런싱 전략까지
h6bro
h6bro
백엔드 개발자의 기술 블로그
  • h6bro
    Jun's Tech Blog
    h6bro
  • 전체
    오늘
    어제
    • 분류 전체보기 (250) N
      • Java (18)
        • Core (9)
        • Design Pattern (9)
      • Spring (80)
        • Core (24)
        • MVC (6)
        • DB (10)
        • JPA (26)
        • Monitoring (3)
        • Security (11)
        • WebSocket (0)
      • Database (33)
        • Redis (15)
        • MySQL (18)
      • MSA (25) N
        • MSA 기본 (11)
        • MSA 아키텍처 (14) N
      • Kafka (30)
        • Core (18)
        • Connect (12)
      • ElasticSearch (11)
        • Search (11)
        • Logging (0)
      • Test (4)
        • k6 (4)
      • Docker (9)
      • CI&CD (10)
        • GitHub Actions (6)
        • ArgoCD (4)
      • Kubernetes (18)
        • Core (12)
        • Ops (6)
      • Cloud Engineering (4)
        • AWS Infrastructure (3)
        • AWS EKS (1)
        • Terraform (0)
      • Project (8)
        • LinkFolio (1)
        • Secondhand Market (7)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

    • Cloud Engineering 포스팅 정리
  • 인기 글

  • 태그

    ㅈ
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.5
h6bro
[ADVANCED #5][실습] Producer & Consumer 연동 프로젝트: 파일 기반 주문 데이터를 DB로 저장하기
상단으로

티스토리툴바