[ADVANCED #10] Spring 기반의 Kafka 사용 및 운용

2026. 2. 27. 17:36·Kafka/Core

0. 들어가며

지금까지 우리는 Kafka의 핵심 개념부터 고급 기능까지 순수 Java 기반의 클라이언트를 사용하여 학습했다. 이는 Kafka 자체의 동작 원리를 이해하는 데 매우 효과적인 방법이었다. 하지만 실제 백엔드 개발 현장에서는 대부분 Spring Framework와 Spring Kafka 모듈을 사용하여 Kafka를 활용한다. 이번 글에서는 지금까지 배운 순수 Java 코드가 Spring Kafka에서는 어떻게 간결하고 선언적으로 바뀌는지, 그리고 Spring 환경에서 Kafka를 효과적으로 사용하고 운영하는 방법에 대해 자세히 알아보겠다.


1. Spring Kafka 개요

1.1. Spring Kafka란?

Spring Kafka는 Spring Framework 기반의 애플리케이션에서 Apache Kafka와 쉽게 통합할 수 있도록 도와주는 모듈이다. 핵심적으로 다음과 같은 기능을 제공한다.

  • KafkaTemplate: 메시지 전송을 위한 고수준 템플릿
  • @KafkaListener: 메시지 소비를 위한 선언적 리스너
  • KafkaAdmin: 토픽 자동 생성 등 관리 기능
  • 컨버터: 메시지 직렬화/역직렬화 추상화
  • 트랜잭션: Kafka 트랜잭션 지원

1.2. 의존성 설정

1.2.1. Gradle (build.gradle)

dependencies {
    implementation 'org.springframework.kafka:spring-kafka'
}

1.2.2. Maven (pom.xml)

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2. 순수 Java vs Spring Kafka: 코드 비교

2.1. Producer 구현 비교

2.1.1. 순수 Java Producer

public class SimpleProducer {
    public static void main(String[] args) {
        // 1. Properties 객체로 모든 설정을 직접 정의
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                          StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                          StringSerializer.class.getName());
        props.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        props.setProperty(ProducerConfig.RETRIES_CONFIG, "10");

        // 2. KafkaProducer 객체 직접 생성
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        try {
            // 3. ProducerRecord 생성 및 전송
            ProducerRecord<String, String> record =
                new ProducerRecord<>("simple-topic", "key1", "Hello Kafka");

            // 4. 동기 전송 (Future.get())
            RecordMetadata metadata = producer.send(record).get();
            System.out.println("Offset: " + metadata.offset());

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 5. 명시적 close() 호출
            producer.close();
        }
    }
}

2.1.2. Spring Kafka Producer

@Service
public class KafkaProducerService {

    // 1. KafkaTemplate 자동 주입 (Spring이 관리)
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String key, String value) {
        // 2. 단순한 send() 호출
        CompletableFuture<SendResult<String, String>> future =
            kafkaTemplate.send(topic, key, value);

        // 3. 비동기 결과 처리 (콜백)
        future.whenComplete((result, ex) -> {
            if (ex == null) {
                System.out.println("Sent message: " + value +
                                   " with offset: " + result.getRecordMetadata().offset());
            } else {
                System.err.println("Failed to send message: " + ex.getMessage());
            }
        });
    }

    // 프로듀서 객체 생성, 설정, 종료는 Spring이 모두 자동 처리!
}

2.1.3. application.yml 설정

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all
      retries: 10
      properties:
        enable.idempotence: true
        max.in.flight.requests.per.connection: 5

2.1.4. 주요 변화 포인트

항목  순수 Java  Spring Kafka
설정 Properties 객체로 직접 정의 application.yml에 선언적으로 정의
객체 생성 개발자가 직접 KafkaProducer 생성 Spring이 KafkaTemplate을 빈으로 자동 생성
리소스 관리 개발자가 close() 직접 호출 Spring 컨테이너가 자동으로 관리
전송 방식 Future.get()으로 동기 처리 또는 콜백 CompletableFuture로 비동기 처리

2.2. Consumer 구현 비교

2.2.1. 순수 Java Consumer

public class SimpleConsumer {
    public static void main(String[] args) {
        // 1. Properties 객체로 설정 정의
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                          StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                          StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        // 2. KafkaConsumer 객체 직접 생성
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 3. 토픽 구독
        consumer.subscribe(List.of("simple-topic"));

        // 4. Shutdown Hook 등록 (안전한 종료를 위한 복잡한 코드)
        Thread mainThread = Thread.currentThread();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            consumer.wakeup();
            try {
                mainThread.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));

        try {
            // 5. 무한 루프로 poll() 호출
            while (true) {
                ConsumerRecords<String, String> records =
                    consumer.poll(Duration.ofMillis(1000));

                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n",
                                      record.offset(), record.key(), record.value());
                }

                // 6. 수동 커밋
                consumer.commitSync();
            }
        } catch (WakeupException e) {
            // 정상 종료
        } finally {
            // 7. 명시적 close()
            consumer.close();
        }
    }
}

2.2.2. Spring Kafka Consumer

@Service
public class KafkaConsumerService {

    private static final Logger logger =
        LoggerFactory.getLogger(KafkaConsumerService.class);

    // 1. 단 하나의 애노테이션으로 완성!
    @KafkaListener(topics = "simple-topic", groupId = "my-group")
    public void listen(ConsumerRecord<String, String> record) {

        // 2. 비즈니스 로직에만 집중
        logger.info("Received message - offset: {}, key: {}, value: {}",
                    record.offset(), record.key(), record.value());

        // 3. 예외가 없으면 자동 커밋 (설정에 따라)
        //    예외 발생 시 다양한 재시도/에러 핸들링 전략 적용 가능
    }

    // 4. 여러 토픽을 동시에 구독
    @KafkaListener(topics = {"topic1", "topic2"}, groupId = "another-group")
    public void listenMultiple(String message) {
        logger.info("Received: {}", message);
    }

    // 5. 배치 리스닝
    @KafkaListener(topics = "batch-topic", groupId = "batch-group")
    public void listenBatch(List<ConsumerRecord<String, String>> records) {
        logger.info("Received {} messages in batch", records.size());
        for (ConsumerRecord<String, String> record : records) {
            // 배치 처리 로직
        }
    }
}

2.2.3. application.yml 설정

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
      enable-auto-commit: false  # 수동 커밋 사용 시
    listener:
      type: single  # 또는 batch
      concurrency: 3  # 동시 실행할 리스너 스레드 수
      poll-timeout: 3000

2.2.4. 주요 변화 포인트

항목   순수 Java Spring Kafka
설정 Properties 객체로 직접 정의 application.yml에 선언적으로 정의
객체 생성 개발자가 직접 KafkaConsumer 생성 Spring이 내부적으로 KafkaListenerContainer 생성
구독 subscribe() 메서드 직접 호출 @KafkaListener 애노테이션으로 선언
메시지 폴링 개발자가 while(true) 루프로 직접 폴링 Spring이 내부에서 폴링하고 메서드 호출
오프셋 커밋 개발자가 commitSync() 직접 호출 ackMode 설정으로 자동/수동 제어
종료 처리 wakeup()과 Shutdown Hook으로 복잡하게 처리 Spring이 애플리케이션 종료 시 자동 처리

3. Spring Kafka 상세 설정 및 활용

3.1. KafkaTemplate 커스터마이징

기본 KafkaTemplate 외에 추가 설정이 필요하다면 @Bean으로 직접 정의할 수 있다. 단순한 토이 프로젝트나 초기 단계의 서비스에서는 위처럼 `application.yml`만 사용하는 경우가 많지만, 실무 환경(대규모 트래픽, 복잡한 에러 처리, 멀티 클러스터 등)으로 갈수록 아래와 같은 `@Configuration` 방식을 활용하는 것이 훨씬 흔하고 표준적이다.

@Configuration
public class KafkaConfig {

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate(
            ProducerFactory<String, Object> producerFactory) {
        KafkaTemplate<String, Object> template = new KafkaTemplate<>(producerFactory);

        // 기본 토픽 설정
        template.setDefaultTopic("default-topic");

        // 전송 후 결과를 기다리는 최대 시간
        template.setProducerListener(new ProducerListener<String, Object>() {
            @Override
            public void onSuccess(ProducerRecord<String, Object> producerRecord,
                                  RecordMetadata recordMetadata) {
                // 성공 시 로깅
            }

            @Override
            public void onError(ProducerRecord<String, Object> producerRecord,
                                RecordMetadata recordMetadata, Exception exception) {
                // 실패 시 처리
            }
        });

        return template;
    }

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, 10);
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        return new DefaultKafkaProducerFactory<>(props);
    }
}

3.2. @KafkaListener 상세 옵션

어노테이션(@KafkaListener) 내부에서 concurrency, errorHandler, properties 등을 직접 제어할 수 있다. 하지만 properties 내부에 `max.poll.records:500`과 같은 인프라 설정을 하드코딩하는 것은 환경별 튜닝을 어렵게 만들므로 주의가 필요하다. 실무에서는 어노테이션에 모든 설정을 적기보다, 미리 설정된 (3.3.) 컨테이너 팩토리를 지정하는 방식을 권장한다. 

@Service
public class AdvancedKafkaListener {

    @KafkaListener(
        topics = "orders",
        groupId = "order-processor",
        containerFactory = "kafkaListenerContainerFactory",
        concurrency = "3",  // 3개의 스레드로 동시 처리
        errorHandler = "myErrorHandler",  // 커스텀 에러 핸들러
        properties = {
            "max.poll.records:500",  // 한 번에 가져올 최대 레코드 수
            "fetch.min.bytes:65536"  // 최소 64KB가 쌓이면 가져오기
        }
    )
    public void processOrder(Order order) {
        // 주문 처리 로직
    }
}

3.3. 컨슈머 팩토리와 리스너 컨테이너 커스터마이징

실무 환경에서 가장 선호되는 방식이다. 인프라 설정(Factory)과 비즈니스 로직(Listener)을 분리하여 가독성과 유지보수성을 극대화할 수 있기 때문이다.

  • 이유 1: 중앙 집중식 관리: FixedBackOff를 통한 재시도 전략이나 DLQ(Dead Letter Queue) 처리 등 복잡한 에러 핸들링 로직을 한 곳에서 관리할 수 있다.
  • 이유 2: 환경별 동적 튜닝: application.yml의 값을 주입받아 운영 환경에 따라 concurrency나 ackMode를 유연하게 변경할 수 있다.
  • 이유 3: 필터링 및 전처리: 특정 조건에 맞는 메시지만 소비하도록 하는 RecordFilterStrategy 등을 적용할 수 있다.
@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object>
           kafkaListenerContainerFactory(ConsumerFactory<String, Object> consumerFactory) {

        ConcurrentKafkaListenerContainerFactory<String, Object> factory =
            new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);

        // 1. 배치 리스닝 설정: 메시지를 하나씩 처리하지 않고 묶어서 수신하여 처리량 향상
        factory.setBatchListener(true); 

        // 2. 동시성 설정: 3개의 컨슈머 스레드를 실행하여 병렬 처리 최적화
        factory.setConcurrency(3); 

        // 3. 에러 핸들러 설정: 예외 발생 시 1초 간격으로 최대 3번 재시도하는 전략 (실무 필수)
        factory.setCommonErrorHandler(new DefaultErrorHandler(
            new FixedBackOff(1000L, 3) 
        ));

        // 4. 필터 설정: 특정 조건(예: "skip" 포함)에 맞는 메시지는 비즈니스 로직 도달 전 사전 차단
        factory.setRecordFilterStrategy(record ->
            record.value().toString().contains("skip") 
        );

        // 5. ACK 모드 설정: 메시지 처리가 완벽히 끝난 시점에 개발자가 수동 커밋 (데이터 정합성 보장)
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);

        return factory;
    }

    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "custom-group");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        
        // 자동 커밋을 끄고 위에서 설정한 AckMode.MANUAL을 활성화
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        // JSON 역직렬화 시 신뢰할 수 있는 패키지 지정 (보안 설정)
        props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.*");

        return new DefaultKafkaConsumerFactory<>(props);
    }
}

4. 메시지 컨버터와 직렬화/역직렬화

4.1. JSON 직렬화/역직렬화

4.1.1. 순수 Java에서의 JSON 처리

// 순수 Java: ObjectMapper를 직접 사용
public class OrderProducer {
    private final ObjectMapper objectMapper = new ObjectMapper();

    public void sendOrder(Order order) {
        try {
            String json = objectMapper.writeValueAsString(order);
            ProducerRecord<String, String> record =
                new ProducerRecord<>("orders", order.getOrderId(), json);
            producer.send(record);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }
}

4.1.2. Spring Kafka에서의 JSON 처리

// Spring Kafka: 컨버터가 자동 처리
@Service
public class OrderService {

    @Autowired
    private KafkaTemplate<String, Order> kafkaTemplate;  // Order 객체 직접 사용!

    public void placeOrder(Order order) {
        // Order 객체를 그대로 전송 (JsonSerializer가 자동 변환)
        kafkaTemplate.send("orders", order.getOrderId(), order);
    }
}

4.1.3. application.yml 설정

spring:
  kafka:
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
      properties:
        spring.json.trusted.packages: com.example.*  # 신뢰할 패키지
        spring.json.value.default.type: com.example.Order  # 기본 타입

4.2. Avro + Schema Registry 연동

4.2.1. 순수 Java에서의 Avro 처리

// 순수 Java: Avro 직렬화를 위한 복잡한 설정
public class AvroProducer {
    private final KafkaProducer<Object, Object> producer;
    private final SchemaRegistryClient schemaRegistry;

    public AvroProducer() {
        // Schema Registry 클라이언트 설정
        schemaRegistry = new MockSchemaRegistryClient();

        // Avro Serializer 설정
        Map<String, Object> config = new HashMap<>();
        config.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
                   "<http://localhost:8081>");

        KafkaAvroSerializer avroSerializer = new KafkaAvroSerializer(schemaRegistry, config);

        Properties props = new Properties();
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                  KafkaAvroSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                  KafkaAvroSerializer.class);

        producer = new KafkaProducer<>(props);
    }
}

4.2.2. Spring Kafka에서의 Avro 처리

spring:
  kafka:
    producer:
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    consumer:
      value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
    properties:
      schema.registry.url: <http://localhost:8081>
      specific.avro.reader: true  # SpecificRecord 사용 시
@Service
public class AvroOrderService {

    @Autowired
    private KafkaTemplate<String, OrderAvro> kafkaTemplate;  // Avro SpecificRecord

    public void sendOrder(OrderAvro order) {
        // Avro 객체를 그대로 전송 (✅ AvroSerializer가 자동 처리)
        kafkaTemplate.send("avro-orders", order.getOrderId().toString(), order);
    }

    @KafkaListener(topics = "avro-orders")
    public void receiveOrder(OrderAvro order) {
        // Avro 객체를 그대로 수신 (AvroDeserializer가 자동 처리)
        System.out.println("Received Avro order: " + order.getCustomerName());
    }
}

4.3. 요약

Kafka에서 Java 객체를 주고받을 때 직렬화와 역직렬화는 필수적인 과정이다. 순수 Java 환경에서는 개발자가 ObjectMapper를 직접 사용하여 객체를 JSON으로 변환하고, 예외 처리까지 수동으로 구현해야 하는 번거로움이 있다. 이는 코드의 복잡성을 증가시키고 핵심 비즈니스 로직보다 인프라 코드에 더 많은 신경을 쓰게 만든다.

 

반면 Spring Kafka는 이러한 문제를 우아하게 해결한다. KafkaTemplate과 메시지 컨버터 계층이 직렬화/역직렬화 과정을 완전히 추상화하여, 개발자는 오직 도메인 객체에만 집중할 수 있게 해준다. application.yml에 JsonSerializer만 설정하면 Spring이 내부적으로 객체 변환부터 전송까지 모든 과정을 자동으로 처리한다.

 

결과적으로 Spring Kafka를 사용하면 개발자는 복잡한 직렬화 로직에서 해방되어 비즈니스 로직 구현에만 집중할 수 있으며, 생산성과 코드 가독성 모두 향상되는 이점을 얻을 수 있다.


5. 에러 처리 및 재시도 전략

5.1. 순수 Java에서의 에러 처리

// 순수 Java: 복잡한 재시도 로직 직접 구현
public class ResilientConsumer {

    public void consume() {
        while (true) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    try {
                        processRecord(record);
                        consumer.commitSync();
                    } catch (Exception e) {
                        // 재시도 로직 직접 구현
                        int retries = 0;
                        while (retries < 3) {
                            try {
                                Thread.sleep(1000 * retries);
                                processRecord(record);
                                consumer.commitSync();
                                break;
                            } catch (Exception ex) {
                                retries++;
                                if (retries == 3) {
                                    // DLQ로 직접 전송
                                    sendToDlq(record);
                                }
                            }
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

5.2. Spring Kafka에서의 에러 처리

@Configuration
public class ErrorHandlingConfig {

    @Bean
    public DefaultErrorHandler kafkaErrorHandler() {
        // 재시도 정책: 1초, 2초, 4초 간격으로 최대 3회 재시도
        BackOff backOff = new ExponentialBackOff(1000L, 2.0);
        ((ExponentialBackOff) backOff).setMaxAttempts(3);

        DefaultErrorHandler errorHandler = new DefaultErrorHandler(
            (record, exception) -> {
                // 모든 재시도 실패 후 호출되는 복구 로직
                System.err.println("Failed to process record: " + record.value());
                // 실패한 메시지를 DLQ로 전송
                sendToDlq(record);
            },
            backOff
        );

        // 재시도하지 않을 예외 지정
        errorHandler.addNotRetryableExceptions(IllegalArgumentException.class);

        // 재시도할 예외 지정
        errorHandler.addRetryableExceptions(NetworkException.class);

        return errorHandler;
    }

    private void sendToDlq(ConsumerRecord<?, ?> record) {
        // DLQ로 메시지 전송 로직
    }
}

5.3. Dead Letter Topic(DLT) 자동 설정

Spring Kafka 2.7+ 버전부터는 DLT를 자동으로 생성하고 전송하는 기능을 제공한다.

@Service
public class DltExampleService {

    @KafkaListener(topics = "orders", groupId = "order-group")
    public void processOrder(Order order) {
        try {
            // 비즈니스 로직
            if (order.getAmount() < 0) {
                throw new IllegalArgumentException("Amount cannot be negative");
            }
        } catch (Exception e) {
            // 예외를 던지면 DLT로 자동 전송 (설정된 경우)
            throw e;
        }
    }

    // DLT 리스너
    @DltHandler
    public void handleDlt(Order order, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        System.err.println("Message from topic " + topic + " moved to DLT: " + order);
        // DLT 메시지 처리 로직 (로깅, 알림 등)
    }
}

6. 트랜잭션과 멱등성

6.1. Kafka 트랜잭션 설정

spring:
  kafka:
    producer:
      transaction-id-prefix: tx-  # 트랜잭션 ID 접두사 설정
      properties:
        enable.idempotence: true   # 멱등성 활성화

6.2. @Transactional과 함께 사용

@Service
public class OrderProcessingService {

    @Autowired
    private KafkaTemplate<String, Order> kafkaTemplate;

    @Autowired
    private OrderRepository orderRepository;

    @Transactional  // DB 트랜잭션
    public void createOrder(Order order) {
        // 1. DB에 주문 저장
        orderRepository.save(order);

        // 2. Kafka로 이벤트 전송 (같은 트랜잭션 내에서)
        kafkaTemplate.send("order-events", order.getId().toString(), order);

        // 3. 두 작업이 모두 성공하거나 모두 실패 (원자성 보장)
    }
}

6.3. ChainedKafkaTransactionManager (DB + Kafka 동시 트랜잭션)

@Configuration
public class ChainedTransactionConfig {

    @Bean
    public KafkaTransactionManager<String, String> kafkaTransactionManager(
            ProducerFactory<String, String> producerFactory) {
        return new KafkaTransactionManager<>(producerFactory);
    }

    @Bean
    public JpaTransactionManager jpaTransactionManager(EntityManagerFactory emf) {
        return new JpaTransactionManager(emf);
    }

    @Bean
    public ChainedKafkaTransactionManager<Object, Object> chainedTransactionManager(
            KafkaTransactionManager<String, String> kafkaTM,
            JpaTransactionManager jpaTM) {
        return new ChainedKafkaTransactionManager<>(kafkaTM, jpaTM);
    }
}

 

6.4. 요약 (한계와 실전 패턴)

Kafka의 트랜잭션 기능은 분산 환경에서 데이터 일관성을 보장하기 위한 중요한 도구다. `transaction-id-prefix` 설정과 `enable.idempotence: true`를 통해 프로듀서의 멱등성과 트랜잭션 기능을 활성화할 수 있으며, @Transactional과 결합하면 데이터베이스 저장과 Kafka 메시지 전송을 하나의 논리적 단위로 묶을 수 있다. 여기에 `ChainedKafkaTransactionManager`를 활용하면 DB 트랜잭션과 Kafka 트랜잭션을 물리적으로도 연결하여 더 강력한 원자성을 보장한다.

 

하지만 이것으로 모든 문제가 해결되는 것은 아니다. 위 방식은 동일한 애플리케이션 내에서 DB 작업과 Kafka 전송이 함께 일어날 때 유효하지만, 진정한 분산 트랜잭션의 복잡성을 완전히 해결하지는 못한다. 특히 장애 상황에서 메시지 중복이나 유실 가능성은 여전히 존재한다.

 

실무에서는 이러한 한계를 보완하기 위해 아웃박스 패턴(Outbox Pattern)이나 CDC(Change Data Capture) 같은 아키텍처 패턴을 도입한다. 아웃박스 패턴은 DB에 'outbox' 테이블을 두고 메시지를 먼저 저장한 후 별도 프로세스가 이를 읽어 Kafka로 발행하는 방식이고, CDC는 Debezium 같은 도구를 사용해 DB 로그를 직접 추적하여 이벤트를 발행한다. 이러한 패턴들은 DB 트랜잭션과 메시지 발행 간의 원자성을 더 확실하게 보장하며, 결과적 일관성 기반의 안정적인 이벤트 기반 아키텍처를 구축하는 데 핵심적인 역할을 한다.


7. 멀티 모듈 프로젝트에서의 Kafka 활용

실무에서는 보통 Producer와 Consumer가 분리된 멀티 모듈 구조로 개발한다.

7.1. 공통 모듈 (common)

// common 모듈: 공통 DTO, 상수 등
package com.example.common.dto;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderEvent {
    private String orderId;
    private String customerId;
    private Long amount;
    private String status;
    private LocalDateTime timestamp;
}

7.2. Producer 모듈 (order-service)

// order-service 모듈
@Service
@Slf4j
public class OrderProducer {

    @Autowired
    private KafkaTemplate<String, OrderEvent> kafkaTemplate;

    @Value("${kafka.topics.order-created}")
    private String orderCreatedTopic;

    public void sendOrderCreatedEvent(OrderEvent event) {
        CompletableFuture<SendResult<String, OrderEvent>> future =
            kafkaTemplate.send(orderCreatedTopic, event.getOrderId(), event);

        future.whenComplete((result, ex) -> {
            if (ex == null) {
                log.info("Order event sent successfully: offset={}, partition={}",
                        result.getRecordMetadata().offset(),
                        result.getRecordMetadata().partition());
            } else {
                log.error("Failed to send order event: {}", ex.getMessage());
            }
        });
    }
}

7.3. Consumer 모듈 (notification-service)

// notification-service 모듈
@Service
@Slf4j
public class NotificationConsumer {

    @KafkaListener(topics = "${kafka.topics.order-created}",
                   groupId = "${kafka.consumer.group-id}")
    public void handleOrderCreated(OrderEvent event) {
        log.info("Received order event: {}", event);

        // 알림 발송 로직
        sendNotification(event);
    }

    private void sendNotification(OrderEvent event) {
        // 이메일, SMS, 푸시 알림 등 발송
    }
}

8. 테스트 코드 작성

8.1. 순수 Java 테스트의 어려움

// 순수 Java: MockKafkaConsumer 등을 직접 만들어야 함
public class ConsumerTest {
    @Test
    public void testConsume() {
        // MockConsumer를 직접 생성하고 설정
        MockConsumer<String, String> mockConsumer =
            new MockConsumer<>(OffsetResetStrategy.EARLIEST);

        // 파티션 할당, 오프셋 설정 등을 직접 해줘야 함
        mockConsumer.schedulePollTask(() -> {
            mockConsumer.addRecord(new ConsumerRecord<>("topic", 0, 0, "key", "value"));
        });

        // 테스트 대상 Consumer에 주입
        MyConsumer consumer = new MyConsumer(mockConsumer);
        consumer.consume();

        // 복잡한 검증 로직
    }
}

8.2. Spring Kafka 테스트

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"test-topic"})
class KafkaConsumerTest {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @SpyBean
    private TestConsumer testConsumer;  // 실제 Consumer 빈

    @Test
    void testConsumeMessage() throws InterruptedException {
        // Given
        String message = "test-message";

        // When
        kafkaTemplate.send("test-topic", message);

        // Then
        await().atMost(10, TimeUnit.SECONDS)
               .untilAsserted(() ->
                   verify(testConsumer, times(1)).listen(message)
               );
    }

    @Test
    void testConsumeWithError() {
        // 에러 상황 테스트
        doThrow(new RuntimeException("Test error"))
            .when(testConsumer).listen(any());

        kafkaTemplate.send("test-topic", "error-message");

        // 에러 핸들링 검증 (DLT 전송 등)
    }
}

// 테스트용 Consumer
@Component
class TestConsumer {

    @KafkaListener(topics = "test-topic")
    public void listen(String message) {
        // 실제 비즈니스 로직
    }
}

9. 모니터링 및 운영

9.1. Micrometer를 통한 메트릭 수집

management:
  endpoints:
    web:
      exposure:
        include: health,info,metrics,prometheus
  metrics:
    export:
      prometheus:
        enabled: true
    tags:
      application: my-kafka-app
@Configuration
public class KafkaMetricsConfig {

    @Bean
    public KafkaMetrics kafkaMetrics(KafkaTemplate<?, ?> kafkaTemplate) {
        return new KafkaMetrics(kafkaTemplate.metrics());
    }
}

9.2. 컨슈머 Lag 모니터링

@Component
@Slf4j
public class KafkaLagMonitor {

    @Autowired
    private KafkaAdmin kafkaAdmin;

    @Scheduled(fixedDelay = 60000)  // 1분마다 실행
    public void monitorLag() {
        try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {

            // 컨슈머 그룹 목록 조회
            ListConsumerGroupsResult groupsResult = adminClient.listConsumerGroups();
            Collection<ConsumerGroupListing> groups = groupsResult.all().get();

            for (ConsumerGroupListing group : groups) {
                // 각 그룹의 오프셋 정보 조회
                ListConsumerGroupOffsetsResult offsetsResult =
                    adminClient.listConsumerGroupOffsets(group.groupId());

                Map<TopicPartition, OffsetAndMetadata> offsets =
                    offsetsResult.partitionsToOffsetAndMetadata().get();

                // 각 파티션의 Lag 계산
                for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
                    TopicPartition tp = entry.getKey();
                    long consumerOffset = entry.getValue().offset();

                    // Log End Offset 조회
                    Map<TopicPartition, OffsetSpec> request = Map.of(tp, OffsetSpec.latest());
                    ListOffsetsResult listOffsetsResult = adminClient.listOffsets(request);
                    long endOffset = listOffsetsResult.partitionResult(tp).get().offset();

                    long lag = endOffset - consumerOffset;

                    if (lag > 1000) {
                        log.warn("High lag detected - group: {}, topic: {}, partition: {}, lag: {}",
                                group.groupId(), tp.topic(), tp.partition(), lag);
                    }
                }
            }
        } catch (Exception e) {
            log.error("Failed to monitor Kafka lag", e);
        }
    }
}

9.3. 헬스 체크

spring:
  kafka:
    producer:
      health:
        enabled: true
    consumer:
      health:
        enabled: true
@Component
public class KafkaHealthIndicator implements HealthIndicator {

    @Autowired
    private KafkaTemplate<?, ?> kafkaTemplate;

    @Override
    public Health health() {
        try {
            // 각 브로커에 대한 메트릭 확인
            Map<MetricName, ? extends Metric> metrics = kafkaTemplate.metrics();

            boolean allBrokersConnected = metrics.entrySet().stream()
                .filter(entry -> entry.getKey().name().contains("connection-count"))
                .allMatch(entry -> ((Double) entry.getValue().metricValue()) > 0);

            if (allBrokersConnected) {
                return Health.up()
                    .withDetail("brokers", "connected")
                    .withDetail("metrics", metrics.size())
                    .build();
            } else {
                return Health.down()
                    .withDetail("error", "No brokers available")
                    .build();
            }
        } catch (Exception e) {
            return Health.down(e).build();
        }
    }
}

10. 성능 최적화 팁

10.1. Producer 최적화

spring:
  kafka:
    producer:
      # 배치 크기 증가 (기본 16KB)
      batch-size: 32768

      # 배치 전송 대기 시간 (기본 0ms)
      properties:
        linger.ms: 20

      # 압축 활성화
      compression-type: snappy

      # 버퍼 메모리 크기
      buffer-memory: 33554432  # 32MB

      # 멱등성 프로듀서 활성화
      properties:
        enable.idempotence: true
        max.in.flight.requests.per.connection: 5

10.2. Consumer 최적화

spring:
  kafka:
    consumer:
      # 한 번에 가져올 최대 레코드 수
      max-poll-records: 500

      # poll 타임아웃
      properties:
        fetch.min.bytes: 65536  # 64KB
        fetch.max.wait.ms: 500
        max.partition.fetch.bytes: 1048576  # 1MB

    listener:
      # 동시 실행할 리스너 스레드 수
      concurrency: 3

      # 배치 리스닝 활성화
      type: batch

10.3. 비동기 처리와 스레드 풀 활용

@Configuration
@EnableAsync
public class AsyncKafkaConfig {

    @Bean("kafkaTaskExecutor")
    public Executor kafkaTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(50);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("kafka-async-");
        executor.initialize();
        return executor;
    }
}

@Service
public class AsyncKafkaService {

    @Async("kafkaTaskExecutor")
    @KafkaListener(topics = "async-topic")
    public void processAsync(String message) {
        // 오래 걸리는 작업을 비동기로 처리
        heavyProcessing(message);
    }

    private void heavyProcessing(String message) {
        // 시간이 오래 걸리는 작업
    }
}

11. 정리: 순수 Java vs Spring Kafka 비교표

영역  순수 Java  Spring Kafka
생산성 모든 코드를 직접 작성 선언적 애노테이션으로 간결하게 개발
설정 관리 Properties 객체로 산발적 관리 application.yml로 중앙 집중식 관리
리소스 관리 개발자가 직접 close() 호출 Spring 컨테이너가 자동 관리
에러 처리 try-catch와 재시도 로직 직접 구현 ErrorHandler로 일관된 에러 처리
트랜잭션 직접 구현 복잡 @Transactional로 간단하게 적용
테스트 Mock 객체 직접 생성 @EmbeddedKafka로 통합 테스트 용이
모니터링 JMX 직접 연동 Micrometer로 메트릭 자동 수집
확장성 모든 기능 직접 구현 다양한 확장 포인트 제공

12. 결론

Spring Kafka는 Kafka 클라이언트의 복잡성을 추상화하여 개발자가 비즈니스 로직에 집중할 수 있도록 도와준다.

Spring Kafka 도입 시 장점:

  • 개발 생산성 향상: 반복적인 코드 제거
  • 설정의 일관성: YAML 기반 중앙 관리
  • 테스트 용이성: 내장 Kafka로 통합 테스트
  • 운영 편의성: 모니터링, 헬스 체크, 메트릭 자동 지원
  • 확장성: 다양한 커스터마이징 포인트

도입 시 고려사항:

  • Spring 프레임워크에 대한 의존성 증가
  • 추상화로 인한 내부 동작 이해의 어려움
  • 버전 호환성 관리 필요

지금까지 배운 Kafka Core의 개념이 탄탄하다면, Spring Kafka는 단지 "더 편리하게 사용하는 도구"일 뿐이다. Core 개념을 잘 이해한 상태에서 Spring Kafka를 사용하면, 문제 발생 시 원인을 정확히 파악하고 효과적으로 대응할 수 있는 실력을 갖출 수 있을 것이다.

'Kafka > Core' 카테고리의 다른 글

[ADVANCED #9] 카프카의 데이터 저장 메커니즘: 파티션, 세그먼트, 인덱스 그리고 로그 정리 정책  (0) 2025.09.13
[ADVANCED #8] 커스텀 객체 직렬화: Order 객체로 배우는 Serializer/Deserializer 구현  (0) 2025.09.13
[ADVANCED #7][실습] 멀티 브로커 클러스터 구축: 직접 띄우고 확인하는 리플리케이션과 장애 복구  (0) 2025.09.12
[ADVANCED #6] 클러스터 운영의 핵심: 리플리케이션, ISR, 그리고 리더 선출의 모든 것  (0) 2025.09.11
[ADVANCED #5][실습] Producer & Consumer 연동 프로젝트: 파일 기반 주문 데이터를 DB로 저장하기  (0) 2025.09.11
'Kafka/Core' 카테고리의 다른 글
  • [ADVANCED #9] 카프카의 데이터 저장 메커니즘: 파티션, 세그먼트, 인덱스 그리고 로그 정리 정책
  • [ADVANCED #8] 커스텀 객체 직렬화: Order 객체로 배우는 Serializer/Deserializer 구현
  • [ADVANCED #7][실습] 멀티 브로커 클러스터 구축: 직접 띄우고 확인하는 리플리케이션과 장애 복구
  • [ADVANCED #6] 클러스터 운영의 핵심: 리플리케이션, ISR, 그리고 리더 선출의 모든 것
h6bro
h6bro
백엔드 개발자의 기술 블로그
  • h6bro
    Jun's Tech Blog
    h6bro
  • 전체
    오늘
    어제
    • 분류 전체보기 (250) N
      • Java (18)
        • Core (9)
        • Design Pattern (9)
      • Spring (80)
        • Core (24)
        • MVC (6)
        • DB (10)
        • JPA (26)
        • Monitoring (3)
        • Security (11)
        • WebSocket (0)
      • Database (33)
        • Redis (15)
        • MySQL (18)
      • MSA (25) N
        • MSA 기본 (11)
        • MSA 아키텍처 (14) N
      • Kafka (30)
        • Core (18)
        • Connect (12)
      • ElasticSearch (11)
        • Search (11)
        • Logging (0)
      • Test (4)
        • k6 (4)
      • Docker (9)
      • CI&CD (10)
        • GitHub Actions (6)
        • ArgoCD (4)
      • Kubernetes (18)
        • Core (12)
        • Ops (6)
      • Cloud Engineering (4)
        • AWS Infrastructure (3)
        • AWS EKS (1)
        • Terraform (0)
      • Project (8)
        • LinkFolio (1)
        • Secondhand Market (7)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

    • Cloud Engineering 포스팅 정리
  • 인기 글

  • 태그

    ㅈ
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.5
h6bro
[ADVANCED #10] Spring 기반의 Kafka 사용 및 운용
상단으로

티스토리툴바