0. 들어가며
마이크로서비스 아키텍처에서 각 서비스는 독립적인 데이터베이스를 가진다. 이는 서비스 간 결합도를 낮추고 독립적인 확장을 가능하게 하는 장점이 있지만, 데이터 일관성을 유지해야 하는 과제를 안겨준다. 앞서 살펴본 동기식 통신(RestTemplate, Feign Client)은 구현이 단순하지만, 서비스 간 강한 결합과 장애 전파라는 단점이 있다.
이러한 문제를 해결하기 위한 대안이 바로 이벤트 기반 아키텍처(Event-Driven Architecture)와 메시지 브로커(Message Broker)를 활용한 비동기 통신이다. 그중에서도 Apache Kafka는 높은 처리량, 확장성, 내구성을 갖춘 분산 이벤트 스트리밍 플랫폼으로, 마이크로서비스 간 데이터 동기화에 널리 사용된다.
이번 글에서는 Apache Kafka의 핵심 개념부터 시작하여, Kafka를 활용한 이커머스 애플리케이션의 데이터 동기화 구현까지 단계적으로 살펴본다. 특히 주문 생성 시 발생하는 재고 차감과 주문 내역 동기화를 Kafka를 통해 구현하는 방법에 중점을 둔다.
1. Apache Kafka 개요
1.1. Kafka란 무엇인가?
Apache Kafka는 링크드인(LinkedIn)에서 개발된 분산 이벤트 스트리밍 플랫폼이다. 전통적인 메시지 큐와 달리, Kafka는 단순한 메시지 전달을 넘어 대규모 실시간 데이터 스트림을 처리하고 저장하는 데 특화되어 있다.
Kafka의 핵심 특징:
- 높은 처리량: 초당 수백만 개의 메시지 처리 가능
- 확장성: 브로커 추가를 통한 수평적 확장
- 내구성: 디스크에 메시지를 저장하여 영속성 보장
- 가용성: 복제를 통한 고가용성 제공
- 순서 보장: 파티션 내에서 메시지 순서 보장
1.2. Kafka 핵심 개념
프로듀서(Producer): 메시지를 생성하여 Kafka 토픽으로 전송하는 애플리케이션
컨슈머(Consumer): Kafka 토픽에서 메시지를 읽어 처리하는 애플리케이션
토픽(Topic): 메시지를 구분하는 카테고리 (예: order-created, stock-deducted)
파티션(Partition): 토픽을 병렬 처리하기 위해 나누는 단위, 각 파티션은 순서가 보장됨
브로커(Broker): Kafka 서버 인스턴스
주키퍼(ZooKeeper): Kafka 클러스터의 메타데이터 관리 (Kafka 2.8부터는 주키퍼 없이도 실행 가능)
1.3. Kafka vs. 전통적인 메시지 큐
| 특징 | RabbitMQ | Apache Kafka |
| 메시지 모델 | 큐 기반 | 로그 기반 |
| 메시지 삭제 | 컨슘 후 삭제 | 설정된 보존 기간 동안 유지 |
| 처리량 | 중간 | 매우 높음 |
| 순서 보장 | 어려움 | 파티션 내에서 보장 |
| 사용 사례 | 작업 큐, RPC | 이벤트 스트리밍, 로그 수집 |
2. Apache Kafka 설치
2.1. Docker로 Kafka 설치
가장 간편한 방법은 Docker를 이용하는 것이다.
docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
실행:
docker-compose up -d
2.2. MacOS에서 Homebrew로 설치
# Kafka 설치
brew install kafka
# Zookeeper 시작
brew services start zookeeper
# Kafka 시작
brew services start kafka
2.3. 수동 설치 및 실행
- Apache Kafka 공식 사이트에서 바이너리 다운로드
- 압축 해제 후 실행
# 압축 해제
tar -xzf kafka_2.13-3.5.0.tgz
cd kafka_2.13-3.5.0
# Zookeeper 실행
bin/zookeeper-server-start.sh config/zookeeper.properties
# Kafka 실행 (다른 터미널)
bin/kafka-server-start.sh config/server.properties
3. Apache Kafka 사용 - Producer/Consumer
3.1. Spring Boot 프로젝트에 Kafka 의존성 추가
build.gradle (각 마이크로서비스)
dependencies {
implementation 'org.springframework.kafka:spring-kafka'
implementation 'com.fasterxml.jackson.core:jackson-databind'
// 기존 의존성...
}
3.2. Kafka 설정
application.yml (각 마이크로서비스)
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
spring.json.type.mapping: event:com.example.orderservice.event.OrderCreatedEvent
consumer:
group-id: ${spring.application.name}
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "*"
spring.json.type.mapping: event:com.example.orderservice.event.OrderCreatedEvent,stock:com.example.catalogservice.event.StockDeductedEvent
auto-offset-reset: earliest
3.3. 이벤트 클래스 정의
OrderCreatedEvent.java (common-library 또는 각 서비스에 정의)
package com.example.common.event;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderCreatedEvent {
private String orderId;
private String userId;
private String productId;
private Integer quantity;
private Integer unitPrice;
private LocalDateTime createdAt;
}
StockDeductedEvent.java
package com.example.common.event;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class StockDeductedEvent {
private String orderId;
private String productId;
private Integer quantity;
private Boolean success;
private String message;
}
3.4. Kafka Producer 구현 (Order Service)
OrderKafkaProducer.java
package com.example.orderservice.kafka;
import com.example.common.event.OrderCreatedEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderKafkaProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
private static final String TOPIC_ORDER_CREATED = "order-created-topic";
public void sendOrderCreatedEvent(OrderCreatedEvent event) {
CompletableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(TOPIC_ORDER_CREATED, event.getOrderId(), event);
future.whenComplete((result, ex) -> {
if (ex == null) {
log.info("Order created event sent successfully: {}, partition: {}, offset: {}",
event.getOrderId(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
} else {
log.error("Failed to send order created event: {}", event.getOrderId(), ex);
}
});
}
}
3.5. Kafka Consumer 구현 (Catalog Service)
StockKafkaConsumer.java
package com.example.catalogservice.kafka;
import com.example.catalogservice.service.CatalogService;
import com.example.common.event.OrderCreatedEvent;
import com.example.common.event.StockDeductedEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class StockKafkaConsumer {
private final CatalogService catalogService;
private final KafkaTemplate<String, Object> kafkaTemplate;
private static final String TOPIC_STOCK_DEDUCTED = "stock-deducted-topic";
@KafkaListener(topics = "order-created-topic", groupId = "catalog-service")
public void consumeOrderCreatedEvent(OrderCreatedEvent event) {
log.info("Received order created event: {}", event);
try {
// 재고 차감 처리
boolean success = catalogService.deductStock(
event.getProductId(),
event.getQuantity()
);
// 결과 이벤트 발행
StockDeductedEvent resultEvent = new StockDeductedEvent(
event.getOrderId(),
event.getProductId(),
event.getQuantity(),
success,
success ? "Stock deducted successfully" : "Insufficient stock"
);
kafkaTemplate.send(TOPIC_STOCK_DEDUCTED, event.getOrderId(), resultEvent);
log.info("Stock deduction result sent: {}", resultEvent);
} catch (Exception e) {
log.error("Failed to process order created event", e);
// 실패 이벤트 발행
StockDeductedEvent failedEvent = new StockDeductedEvent(
event.getOrderId(),
event.getProductId(),
event.getQuantity(),
false,
"Error: " + e.getMessage()
);
kafkaTemplate.send(TOPIC_STOCK_DEDUCTED, event.getOrderId(), failedEvent);
}
}
}
3.6. Order Service에서 결과 처리
OrderKafkaConsumer.java
package com.example.orderservice.kafka;
import com.example.common.event.StockDeductedEvent;
import com.example.orderservice.entity.OrderEntity;
import com.example.orderservice.repository.OrderRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderKafkaConsumer {
private final OrderRepository orderRepository;
@KafkaListener(topics = "stock-deducted-topic", groupId = "order-service")
@Transactional
public void consumeStockDeductedEvent(StockDeductedEvent event) {
log.info("Received stock deducted event: {}", event);
OrderEntity order = orderRepository.findByOrderId(event.getOrderId())
.orElseThrow(() -> new RuntimeException("Order not found: " + event.getOrderId()));
if (event.getSuccess()) {
order.setStatus("COMPLETED");
log.info("Order completed: {}", event.getOrderId());
} else {
order.setStatus("FAILED");
order.setFailureReason(event.getMessage());
log.warn("Order failed: {}, reason: {}", event.getOrderId(), event.getMessage());
}
orderRepository.save(order);
}
}
4. Apache Kafka 사용 - Kafka Connect
4.1. Kafka Connect란?
Kafka Connect는 Kafka와 외부 시스템(데이터베이스, 파일, 클라우드 서비스 등) 간의 데이터를 스트리밍 방식으로 전송하기 위한 도구다. 별도의 코드 작성 없이 커넥터 설정만으로 데이터를 주고받을 수 있다.
Kafka Connect의 장점:
- 코드 없는 데이터 통합
- 확장성 및 재사용성
- 내결함성 및 보장된 전달
- 다양한 소스/싱크 커넥터 지원
4.2. Kafka Connect 설치
docker-compose.yml에 Kafka Connect 추가
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
kafka-connect:
image: confluentinc/cp-kafka-connect:latest
container_name: kafka-connect
depends_on:
- kafka
- zookeeper
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
CONNECT_REST_ADVERTISED_HOST_NAME: "kafka-connect"
CONNECT_GROUP_ID: "kafka-connect-group"
CONNECT_CONFIG_STORAGE_TOPIC: "_connect-configs"
CONNECT_OFFSET_STORAGE_TOPIC: "_connect-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "_connect-status"
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_PLUGIN_PATH: "/usr/share/java,/etc/kafka-connect/jars"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
volumes:
- ./jars:/etc/kafka-connect/jars
4.3. JDBC Connector 다운로드
MySQL, PostgreSQL, MariaDB와 같은 데이터베이스와 연동하려면 JDBC Connector 플러그인이 필요하다.
# JDBC Connector 다운로드
wget <https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.7.4/confluentinc-kafka-connect-jdbc-10.7.4.zip>
# 압축 해제 후 jars 디렉토리에 복사
unzip confluentinc-kafka-connect-jdbc-10.7.4.zip
cp -r confluentinc-kafka-connect-jdbc-10.7.4/* ./jars/
4.4. Kafka Connect REST API
Kafka Connect는 REST API를 통해 커넥터를 관리할 수 있다.
커넥터 목록 조회
curl -X GET <http://localhost:8083/connectors>
커넥터 생성
curl -X POST <http://localhost:8083/connectors> \\
-H "Content-Type: application/json" \\
-d @source-connector.json
커넥터 상태 확인
curl -X GET <http://localhost:8083/connectors/my-source-connector/status>
커넥터 삭제
curl -X DELETE <http://localhost:8083/connectors/my-source-connector>
5. Kafka Source Connect 사용
5.1. Source Connect란?
Source Connect는 외부 시스템(데이터베이스, 파일 등)의 데이터를 읽어 Kafka 토픽으로 전송하는 커넥터다.
5.2. MariaDB Source Connector 설정
source-connector.json
{
"name": "mariadb-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mariadb://localhost:3306/orderdb",
"connection.user": "root",
"connection.password": "password",
"topic.prefix": "orderdb-",
"table.whitelist": "orders",
"mode": "incrementing",
"incrementing.column.name": "id",
"poll.interval.ms": "5000",
"tasks.max": "1"
}
}
설정 설명:
- connection.url: 데이터베이스 연결 URL
- table.whitelist: 모니터링할 테이블 목록
- mode: 데이터 변경 감지 모드 (incrementing, timestamp, bulk)
- incrementing.column.name: 증가하는 컬럼 (일반적으로 PK)
- poll.interval.ms: 폴링 간격
- topic.prefix: 생성될 토픽의 접두사
5.3. Source Connector 실행
curl -X POST <http://localhost:8083/connectors> \\
-H "Content-Type: application/json" \\
-d @source-connector.json
5.4. 데이터 확인
커넥터가 정상적으로 실행되면, orderdb-orders 토픽에 데이터가 전송된다.
# Kafka 컨슈머로 데이터 확인
docker exec -it kafka kafka-console-consumer \\
--bootstrap-server localhost:9092 \\
--topic orderdb-orders \\
--from-beginning
6. Kafka Sink Connect 사용
6.1. Sink Connect란?
Sink Connect는 Kafka 토픽의 데이터를 읽어 외부 시스템(데이터베이스, 파일 등)으로 저장하는 커넥터다.
6.2. MariaDB Sink Connector 설정
sink-connector.json
{
"name": "mariadb-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mariadb://localhost:3306/analyticsdb",
"connection.user": "root",
"connection.password": "password",
"topics": "orderdb-orders,userdb-users",
"table.name.format": "${topic}",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": "id",
"auto.create": "true",
"auto.evolve": "true",
"tasks.max": "1"
}
}
설정 설명:
- topics: 구독할 토픽 목록
- insert.mode: 삽입 모드 (insert, upsert, update)
- pk.mode: 기본 키 모드
- auto.create: 테이블 자동 생성 여부
- auto.evolve: 스키마 변경 자동 반영 여부
6.3. Sink Connector 실행
curl -X POST <http://localhost:8083/connectors> \\
-H "Content-Type: application/json" \\
-d @sink-connector.json
7. Orders Microservice에서 MariaDB 연동
7.1. MariaDB 의존성 추가
build.gradle (order-service)
dependencies {
implementation 'org.mariadb.jdbc:mariadb-java-client:3.1.4'
// 기존 의존성...
}
7.2. 프로덕션 환경 설정
application-prod.yml (order-service)
spring:
datasource:
driver-class-name: org.mariadb.jdbc.Driver
url: jdbc:mariadb://localhost:3306/orderdb
username: root
password: password
hikari:
maximum-pool-size: 10
minimum-idle: 5
connection-timeout: 5000
jpa:
hibernate:
ddl-auto: update
show-sql: false
database-platform: org.hibernate.dialect.MariaDBDialect
7.3. 프로필 활성화하여 실행
# 프로덕션 프로필로 실행
java -jar -Dspring.profiles.active=prod order-service.jar
8. Orders Microservice와 Catalogs Microservice에 Kafka Topic 적용
8.1. 공통 라이브러리 생성
여러 서비스에서 공통으로 사용할 이벤트 클래스를 별도 모듈로 분리한다.
common-library/build.gradle
dependencies {
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
}
8.2. 이벤트 클래스 정의 (확장)
OrderEvent.java
package com.example.common.event;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.time.LocalDateTime;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderEvent {
private String eventId;
private String eventType; // CREATED, CANCELLED, COMPLETED
private String orderId;
private String userId;
private String productId;
private Integer quantity;
private Integer unitPrice;
private Integer totalPrice;
private LocalDateTime timestamp;
}
StockEvent.java
package com.example.common.event;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class StockEvent {
private String eventId;
private String productId;
private Integer quantity;
private Integer remainingStock;
private String operation; // DEDUCT, RESTORE
private LocalDateTime timestamp;
}
8.3. Order Service에 Kafka Topic 적용
OrderService.java (Kafka 통합)
@Service
@Slf4j
@RequiredArgsConstructor
public class OrderService {
private final OrderRepository orderRepository;
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ObjectMapper objectMapper;
@Transactional
public ResponseOrder createOrder(String userId, RequestOrder requestOrder) {
// 1. 주문 생성
String orderId = UUID.randomUUID().toString();
String eventId = UUID.randomUUID().toString();
OrderEntity orderEntity = new OrderEntity();
orderEntity.setProductId(requestOrder.getProductId());
orderEntity.setQuantity(requestOrder.getQuantity());
orderEntity.setUnitPrice(requestOrder.getUnitPrice());
orderEntity.setTotalPrice(requestOrder.getQuantity() * requestOrder.getUnitPrice());
orderEntity.setUserId(userId);
orderEntity.setOrderId(orderId);
orderEntity.setStatus("PENDING");
orderEntity.setCreatedAt(LocalDateTime.now());
orderRepository.save(orderEntity);
// 2. Kafka 이벤트 발행
OrderEvent orderEvent = new OrderEvent(
eventId,
"CREATED",
orderId,
userId,
requestOrder.getProductId(),
requestOrder.getQuantity(),
requestOrder.getUnitPrice(),
orderEntity.getTotalPrice(),
LocalDateTime.now()
);
kafkaTemplate.send("order-events-topic", orderId, orderEvent)
.whenComplete((result, ex) -> {
if (ex == null) {
log.info("Order event sent successfully: {}", orderId);
} else {
log.error("Failed to send order event: {}", orderId, ex);
// 실패 시 보상 로직 또는 재시도 큐에 저장
}
});
return mapToResponseOrder(orderEntity);
}
@KafkaListener(topics = "stock-events-topic", groupId = "order-service")
public void handleStockEvent(StockEvent event) {
log.info("Received stock event: {}", event);
if ("DEDUCTED".equals(event.getOperation())) {
// 재고 차감 완료 - 주문 상태 업데이트
OrderEntity order = orderRepository.findByOrderId(event.getEventId())
.orElseThrow(() -> new RuntimeException("Order not found"));
order.setStatus("COMPLETED");
orderRepository.save(order);
log.info("Order completed: {}", event.getEventId());
} else if ("FAILED".equals(event.getOperation())) {
// 재고 차감 실패 - 주문 실패 처리
OrderEntity order = orderRepository.findByOrderId(event.getEventId())
.orElseThrow(() -> new RuntimeException("Order not found"));
order.setStatus("FAILED");
orderRepository.save(order);
log.warn("Order failed due to stock issue: {}", event.getEventId());
}
}
}
9. Catalog Service 수정
9.1. 재고 관리 서비스 구현
CatalogService.java
@Service
@Slf4j
@RequiredArgsConstructor
public class CatalogService {
private final CatalogRepository catalogRepository;
private final KafkaTemplate<String, Object> kafkaTemplate;
@Transactional
public boolean deductStock(String productId, int quantity) {
CatalogEntity catalog = catalogRepository.findByProductId(productId)
.orElseThrow(() -> new RuntimeException("Product not found: " + productId));
if (catalog.getStock() < quantity) {
log.warn("Insufficient stock. productId: {}, requested: {}, available: {}",
productId, quantity, catalog.getStock());
return false;
}
catalog.setStock(catalog.getStock() - quantity);
catalogRepository.save(catalog);
log.info("Stock deducted. productId: {}, quantity: {}, remaining: {}",
productId, quantity, catalog.getStock());
return true;
}
@Transactional
public void restoreStock(String productId, int quantity) {
CatalogEntity catalog = catalogRepository.findByProductId(productId)
.orElseThrow(() -> new RuntimeException("Product not found: " + productId));
catalog.setStock(catalog.getStock() + quantity);
catalogRepository.save(catalog);
log.info("Stock restored. productId: {}, quantity: {}, remaining: {}",
productId, quantity, catalog.getStock());
}
}
9.2. Kafka Consumer/Producer 구현
CatalogKafkaService.java
package com.example.catalogservice.kafka;
import com.example.catalogservice.service.CatalogService;
import com.example.common.event.OrderEvent;
import com.example.common.event.StockEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class CatalogKafkaService {
private final CatalogService catalogService;
private final KafkaTemplate<String, Object> kafkaTemplate;
@KafkaListener(topics = "order-events-topic", groupId = "catalog-service")
public void handleOrderEvent(OrderEvent event) {
log.info("Received order event: {}", event);
if ("CREATED".equals(event.getEventType())) {
processOrderCreated(event);
} else if ("CANCELLED".equals(event.getEventType())) {
processOrderCancelled(event);
}
}
private void processOrderCreated(OrderEvent event) {
try {
boolean success = catalogService.deductStock(
event.getProductId(),
event.getQuantity()
);
StockEvent stockEvent = new StockEvent(
event.getOrderId(),
event.getProductId(),
event.getQuantity(),
catalogService.getStock(event.getProductId()),
success ? "DEDUCTED" : "FAILED",
LocalDateTime.now()
);
kafkaTemplate.send("stock-events-topic", event.getOrderId(), stockEvent);
} catch (Exception e) {
log.error("Failed to process order created event", e);
StockEvent failedEvent = new StockEvent(
event.getOrderId(),
event.getProductId(),
event.getQuantity(),
0,
"FAILED",
LocalDateTime.now()
);
kafkaTemplate.send("stock-events-topic", event.getOrderId(), failedEvent);
}
}
private void processOrderCancelled(OrderEvent event) {
try {
catalogService.restoreStock(event.getProductId(), event.getQuantity());
StockEvent stockEvent = new StockEvent(
event.getOrderId(),
event.getProductId(),
event.getQuantity(),
catalogService.getStock(event.getProductId()),
"RESTORED",
LocalDateTime.now()
);
kafkaTemplate.send("stock-events-topic", event.getOrderId(), stockEvent);
} catch (Exception e) {
log.error("Failed to process order cancelled event", e);
}
}
}
10. Kafka를 활용한 데이터 동기화 테스트
10.1. 테스트 시나리오
- 정상 주문 시나리오
- Order Service에서 주문 생성 이벤트 발행
- Catalog Service에서 재고 차감 및 성공 이벤트 발행
- Order Service에서 주문 상태 COMPLETED로 변경
- 재고 부족 시나리오
- Order Service에서 주문 생성 이벤트 발행
- Catalog Service에서 재고 부족으로 실패 이벤트 발행
- Order Service에서 주문 상태 FAILED로 변경
- 주문 취소 시나리오
- Order Service에서 주문 취소 이벤트 발행
- Catalog Service에서 재고 복원
10.2. 테스트 코드
OrderServiceIntegrationTest.java
package com.example.orderservice;
import com.example.common.event.OrderEvent;
import com.example.common.event.StockEvent;
import com.example.orderservice.dto.RequestOrder;
import com.example.orderservice.dto.ResponseOrder;
import com.example.orderservice.entity.OrderEntity;
import com.example.orderservice.repository.OrderRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, topics = {"order-events-topic", "stock-events-topic"})
public class OrderServiceIntegrationTest {
@Autowired
private OrderService orderService;
@Autowired
private OrderRepository orderRepository;
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
private ObjectMapper objectMapper;
private RequestOrder requestOrder;
private String userId = "test-user-id";
@BeforeEach
void setUp() {
requestOrder = new RequestOrder();
requestOrder.setProductId("CATALOG-001");
requestOrder.setQuantity(2);
requestOrder.setUnitPrice(1500);
}
@Test
void testOrderCreationWithKafka() {
// 주문 생성
ResponseOrder response = orderService.createOrder(userId, requestOrder);
assertThat(response).isNotNull();
assertThat(response.getOrderId()).isNotNull();
// Kafka 이벤트 발행 확인
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
// 주문 상태가 PENDING인지 확인
OrderEntity order = orderRepository.findByOrderId(response.getOrderId())
.orElseThrow();
assertThat(order.getStatus()).isEqualTo("PENDING");
});
// 재고 차감 이벤트 시뮬레이션
StockEvent stockEvent = new StockEvent(
response.getOrderId(),
requestOrder.getProductId(),
requestOrder.getQuantity(),
100,
"DEDUCTED",
LocalDateTime.now()
);
kafkaTemplate.send("stock-events-topic", response.getOrderId(), stockEvent);
// 주문 상태가 COMPLETED로 변경되었는지 확인
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {
OrderEntity order = orderRepository.findByOrderId(response.getOrderId())
.orElseThrow();
assertThat(order.getStatus()).isEqualTo("COMPLETED");
});
}
}
11. Multi Orders Microservice 사용에 대한 데이터 동기화 문제
11.1. 다중 인스턴스 환경에서의 문제점
여러 개의 Order Service 인스턴스가 실행되는 환경에서는 다음과 같은 문제가 발생할 수 있다.
- 중복 이벤트 처리: 동일한 이벤트를 여러 인스턴스가 동시에 처리하여 데이터 정합성이 깨질 수 있다.
- 순서 보장 문제: 동일한 주문에 대한 이벤트가 순서대로 처리되지 않을 수 있다.
- 데드 레터 큐 관리: 실패한 이벤트의 재처리 메커니즘이 필요하다.
11.2. 해결 방안
1. 파티션 키 활용
// 동일한 orderId는 항상 같은 파티션으로 전송
kafkaTemplate.send("order-events-topic", orderId, event);
2. 멱등성 처리
@Entity
@Table(uniqueConstraints = @UniqueConstraint(columnNames = {"eventId"}))
public class ProcessedEvent {
@Id
private String eventId;
private String eventType;
private LocalDateTime processedAt;
}
// 이벤트 처리 전 중복 확인
@Transactional
public void handleStockEvent(StockEvent event) {
if (processedEventRepository.existsById(event.getEventId())) {
log.info("Event already processed: {}", event.getEventId());
return;
}
// 비즈니스 로직 처리
// ...
// 처리 완료 기록
ProcessedEvent processed = new ProcessedEvent();
processed.setEventId(event.getEventId());
processed.setEventType("STOCK_EVENT");
processed.setProcessedAt(LocalDateTime.now());
processedEventRepository.save(processed);
}
3. 재시도 메커니즘
@RetryableTopic(
attempts = "5",
backoff = @Backoff(delay = 1000, multiplier = 2),
include = {RetryableException.class}
)
@KafkaListener(topics = "order-events-topic")
public void handleOrderEventWithRetry(OrderEvent event) {
// 재시도 가능한 예외 발생 시 자동 재시도
}
12. Kafka Connect를 활용한 단일 데이터베이스 사용
12.1. CDC(Change Data Capture) 패턴
CDC는 데이터베이스의 변경 사항을 캡처하여 다른 시스템으로 전파하는 패턴이다. Kafka Connect와 Debezium을 활용하면 CDC를 쉽게 구현할 수 있다.
데이터 흐름:
Order Service Database (MariaDB)
→ Debezium Source Connector
→ Kafka Topic (order-changes)
→ JDBC Sink Connector
→ Analytics Database (MariaDB)
12.2. Debezium MySQL Connector 설정
debezium-connector.json
{
"name": "debezium-mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "localhost",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "order-server",
"database.include.list": "orderdb",
"table.include.list": "orderdb.orders",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.order",
"include.schema.changes": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}
12.3. 단일 데이터베이스 구성
애널리틱스 데이터베이스에 싱크 커넥터 설정
{
"name": "analytics-sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mariadb://localhost:3307/analytics",
"connection.user": "root",
"connection.password": "password",
"topics": "order-server.orderdb.orders",
"table.name.format": "orders_analytics",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value",
"auto.create": "true",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
}
}
13. Orders Microservice - Orders Kafka Topic
13.1. 주문 상태 관리 토픽 설계
토픽 설계:
주문 이벤트 토픽: order-events (파티션 3개, 복제 3개)
주문 상태 토픽: order-states (컴팩션 정책 사용)
재고 이벤트 토픽: stock-events
데드 레터 토픽: dead-letter-orders
13.2. 토픽 생성
# 토픽 생성 명령어
docker exec -it kafka kafka-topics \\
--bootstrap-server localhost:9092 \\
--create \\
--topic order-events \\
--partitions 3 \\
--replication-factor 1
docker exec -it kafka kafka-topics \\
--bootstrap-server localhost:9092 \\
--create \\
--topic order-states \\
--partitions 3 \\
--replication-factor 1 \\
--config cleanup.policy=compact
docker exec -it kafka kafka-topics \\
--bootstrap-server localhost:9092 \\
--create \\
--topic stock-events \\
--partitions 3 \\
--replication-factor 1
docker exec -it kafka kafka-topics \\
--bootstrap-server localhost:9092 \\
--create \\
--topic dead-letter-orders \\
--partitions 1 \\
--replication-factor 1
14. Orders Microservice - Order Kafka Producer
14.1. 고급 Producer 구현
AdvancedOrderKafkaProducer.java
package com.example.orderservice.kafka;
import com.example.common.event.OrderEvent;
import com.example.orderservice.entity.OrderEntity;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@Slf4j
@Component
@RequiredArgsConstructor
public class AdvancedOrderKafkaProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
private static final String TOPIC_ORDER_EVENTS = "order-events";
private static final String TOPIC_ORDER_STATES = "order-states";
@Transactional
public CompletableFuture<SendResult<String, Object>> sendOrderCreatedEvent(OrderEntity order) {
String eventId = UUID.randomUUID().toString();
OrderEvent event = OrderEvent.builder()
.eventId(eventId)
.eventType("CREATED")
.orderId(order.getOrderId())
.userId(order.getUserId())
.productId(order.getProductId())
.quantity(order.getQuantity())
.unitPrice(order.getUnitPrice())
.totalPrice(order.getTotalPrice())
.timestamp(LocalDateTime.now())
.build();
// 이벤트 토픽에 전송
CompletableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(TOPIC_ORDER_EVENTS, order.getOrderId(), event);
future.whenComplete((result, ex) -> {
if (ex == null) {
log.info("Order event sent successfully. orderId: {}, partition: {}, offset: {}",
order.getOrderId(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
// 상태 토픽에도 전송 (컴팩션 정책 적용)
kafkaTemplate.send(TOPIC_ORDER_STATES, order.getOrderId(), event);
} else {
log.error("Failed to send order event. orderId: {}", order.getOrderId(), ex);
// 실패 시 데드 레터 큐로 전송
sendToDeadLetter(order, event, ex.getMessage());
}
});
return future;
}
private void sendToDeadLetter(OrderEntity order, OrderEvent event, String errorMessage) {
// 실패한 메시지를 데드 레터 큐로 전송
kafkaTemplate.send("dead-letter-orders", order.getOrderId(),
Map.of("event", event, "error", errorMessage, "timestamp", LocalDateTime.now()));
}
@Transactional
public void sendOrderStatusUpdate(String orderId, String status, String reason) {
Map<String, Object> statusUpdate = new HashMap<>();
statusUpdate.put("orderId", orderId);
statusUpdate.put("status", status);
statusUpdate.put("reason", reason);
statusUpdate.put("timestamp", LocalDateTime.now());
kafkaTemplate.send(TOPIC_ORDER_STATES, orderId, statusUpdate);
log.info("Order status update sent. orderId: {}, status: {}", orderId, status);
}
}
14.2. 트랜잭셔널 아웃박스 패턴
데이터베이스 트랜잭션과 Kafka 메시지 전송을 원자적으로 처리하기 위한 패턴이다.
OutboxEntity.java
package com.example.orderservice.entity;
import jakarta.persistence.*;
import lombok.Data;
import java.time.LocalDateTime;
@Entity
@Table(name = "outbox")
@Data
public class OutboxEntity {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false, unique = true)
private String eventId;
@Column(nullable = false)
private String aggregateType; // Order, Payment 등
@Column(nullable = false)
private String aggregateId;
@Column(nullable = false)
private String eventType; // CREATED, CANCELLED 등
@Column(nullable = false, columnDefinition = "TEXT")
private String payload; // JSON 형태의 이벤트 데이터
@Column(nullable = false)
private LocalDateTime createdAt;
@Column
private LocalDateTime processedAt;
@Column(nullable = false)
private String status; // PENDING, PROCESSED, FAILED
}
OutboxScheduler.java
package com.example.orderservice.scheduler;
import com.example.orderservice.entity.OutboxEntity;
import com.example.orderservice.repository.OutboxRepository;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.List;
@Slf4j
@Component
@RequiredArgsConstructor
public class OutboxScheduler {
private final OutboxRepository outboxRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
@Scheduled(fixedDelay = 5000)
@Transactional
public void publishOutboxEvents() {
List<OutboxEntity> pendingEvents = outboxRepository.findByStatusAndCreatedAtBefore(
"PENDING",
LocalDateTime.now().minusSeconds(10)
);
for (OutboxEntity event : pendingEvents) {
try {
String topic = event.getAggregateType().toLowerCase() + "-events";
kafkaTemplate.send(topic, event.getAggregateId(), event.getPayload())
.whenComplete((result, ex) -> {
if (ex == null) {
event.setProcessedAt(LocalDateTime.now());
event.setStatus("PROCESSED");
outboxRepository.save(event);
log.info("Outbox event published: {}", event.getEventId());
} else {
log.error("Failed to publish outbox event: {}", event.getEventId(), ex);
event.setStatus("FAILED");
outboxRepository.save(event);
}
});
} catch (Exception e) {
log.error("Error processing outbox event: {}", event.getEventId(), e);
event.setStatus("FAILED");
outboxRepository.save(event);
}
}
}
}
15. 결론 및 모범 사례
15.1. Kafka 기반 데이터 동기화의 장점
느슨한 결합: 서비스 간 직접적인 의존성 제거
확장성: 파티션 증가를 통한 수평적 확장 용이
내구성: 디스크에 메시지 저장으로 장애 복구 가능
실시간 처리: 낮은 지연 시간으로 실시간 이벤트 처리 가능
15.2. 운영 모범 사례
1. 토픽 설계
- 비즈니스 이벤트 단위로 토픽 분리
- 파티션 수는 처리량과 컨슈머 수 고려하여 결정
- 컴팩션 정책 활용하여 최신 상태 유지
2. 메시지 설계
- 스키마 레지스트리 사용 (Avro, Protobuf)
- 이벤트 ID 포함하여 멱등성 보장
- 타임스탬프 포함하여 순서 보장
3. 오류 처리
- 데드 레터 큐(Dead Letter Queue) 운영
- 재시도 메커니즘 구현 (RetryTopic)
- 모니터링 및 알림 체계 구축
4. 모니터링
- Kafka Metrics 모니터링 (JMX, Prometheus)
- 컨슈머 랙(Consumer Lag) 모니터링
- 처리량, 지연 시간 추적
15.3. 문제 해결 팁
메시지 순서 보장:
- 동일한 키로 동일 파티션에 전송
- 파티션 내에서만 순서 보장됨을 인지
중복 메시지 처리:
- 멱등성 처리 구현
- 처리된 이벤트 ID 저장
메시지 손실 방지:
- acks=all 설정
- 복제 팩터 충분히 설정
- 트랜잭셔널 프로듀서 사용
다음 글에서는 마이크로서비스 환경에서의 장애 처리와 분산 추적에 대해 다룰 예정이다. 서킷 브레이커 패턴과 Resilience4J를 활용한 장애 격리, 그리고 Spring Cloud Sleuth와 Zipkin을 활용한 분산 추적 시스템 구축을 살펴보겠다.
'MSA > MSA 기본' 카테고리의 다른 글
| [BASIC #10] Docker 기반 MSA 배포 전략 (0) | 2025.09.21 |
|---|---|
| [BASIC #9] 장애 처리와 모니터링 (0) | 2025.09.20 |
| [BASIC #7] Microservice 간 통신 전략 (0) | 2025.09.20 |
| [BASIC #6] Configuration Service와 중앙 설정 관리 (0) | 2025.09.20 |
| [BASIC #5] 인증 처리와 JWT 기반 보안 (0) | 2025.09.20 |
