1. CQRS란 무엇인가?
마이크로서비스 아키텍처(MSA)와 같이 복잡한 시스템을 설계할 때 데이터 관리 전략은 매우 중요하다. 특히, k8s 처럼 서비스별로 데이터베이스를 분리하는 'Database-per-Service' 패턴을 따를 경우, 데이터 변경 작업과 조회 작업의 요구사항이 달라 충돌하는 경우가 많다. 이러한 문제를 해결하기 위한 패턴 중 하나가 CQRS (Command Query Responsibility Segregation)이다.
CQRS는 시스템에서 데이터의 상태를 변경하는 책임(Command)과 데이터를 조회하는 책임(Query)을 명확하게 분리하는 디자인 패턴이다. 전통적인 CRUD 모델에서는 단일 모델과 데이터 저장소를 사용하여 데이터의 생성, 읽기, 수정, 삭제를 모두 처리하지만, CQRS는 이 책임을 분리하여 각 작업에 최적화된 접근 방식을 사용할 수 있도록 한 것이다.
- Command: 시스템의 상태를 변경하는 작업(생성, 수정, 삭제)이다. 보통 성공/실패 여부 외에는 데이터를 반환하지 않는다.
- Query: 시스템의 상태를 변경하지 않고 데이터를 읽어오는 작업이다. 조회 결과를 반환한다.
2. CQRS 구현 수준 (Levels)
CQRS는 시스템의 요구사항과 복잡성에 따라 여러 수준으로 구현될 수 있다.
- 1단계: 코드 레벨 분리
- 애플리케이션 코드 내에서 명령(Command)을 처리하는 로직(Controller, Service)과 조회(Query)를 처리하는 로직을 분리하는 것이다.
- 데이터베이스 스키마나 물리적 DB는 분리하지 않는다.
- 장점: 구현이 비교적 간단하며, 코드의 명확성과 유지보수성이 향상된다. 향후 확장을 위한 기반을 마련할 수 있다.
- 단점: DB 레벨에서의 성능 최적화나 독립적인 확장은 어렵다.
- 2단계: 스키마(데이터 모델) 분리
- 명령 처리에 사용되는 Write 모델과 조회 처리에 사용되는 Read 모델의 데이터베이스 스키마(테이블 구조)를 분리하는 것이다.
- Write 모델은 데이터 무결성을 위해 정규화된 구조를 유지하고, Read 모델은 조회 성능을 위해 비정규화된 구조(예: 구체화된 뷰)를 가질 수 있다.
- 동일한 물리적 DB 인스턴스 내에서 테이블을 분리하여 구현할 수 있다.
- 장점: 조회 성능을 크게 향상시킬 수 있다.
- 단점: Write 모델과 Read 모델 간의 데이터 동기화 메커니즘이 필요하며, 이로 인해 구현 복잡성이 증가한다.
- 3단계: 물리적 DB 분리
- Write 모델과 Read 모델을 위한 물리적인 데이터베이스 저장소 자체를 분리하는 것이다.
- 예를 들어, Write는 RDB를 사용하고 Read는 NoSQL이나 검색 엔진(Elasticsearch)을 사용할 수 있다.
- 장점: 쓰기와 읽기 작업을 독립적으로 확장하고 최적화할 수 있어 최고의 성능과 확장성을 제공한다.
- 단점: 구현 및 관리 복잡성이 가장 높고, 인프라 비용이 증가한다. 데이터 동기화 메커니즘이 필수적이다.
📌 실무에서는 보통 2단계(스키마 분리) 까지만 구현하는 경우가 많다.
3. 데이터 동기화 메커니즘 (CQRS 2, 3단계)
[1] 데이터베이스의 분리
CQRS 2단계는 논리적으로 Write DB와 Read DB로 분리되며, 3단계는 물리적으로 Write DB와 Read DB가 분리된다. 즉, 2단계 또는 3단계에서는 Write 모델에서 발생한 변경 사항을 Read 모델에 반영하는 동기화 과정이 필요하다.
이 과정에서 최종적 일관성(Eventual Consistency)이 발생하는 경우가 많다. 즉, 쓰기 작업 후 읽기 작업 시 약간의 시간 지연 후에 변경된 데이터가 반영될 수 있다. 가장 일반적인 동기화 방식은 이벤트 기반 아키텍처(EDA)와 메시지 브로커(예: Kafka)를 활용하는 것이다.
- 이벤트 발행: Write 모델 서비스는 데이터 변경(Command 처리)이 성공적으로 완료된 후, 변경 내용을 담은 이벤트를 Kafka와 같은 메시지 브로커로 발행한다.
- 이벤트 구독: Read 모델을 업데이트하는 별도의 Consumer(또는 Read 모델 서비스 자체)가 이 이벤트를 구독한다.
- Read 모델 업데이트: Consumer는 수신한 이벤트를 기반으로 Read 모델 데이터베이스의 내용을 업데이트한다.
하지만 이 과정에서 DB 저장(Write 모델)과 Kafka 발행이라는 두 가지 작업이 별도의 트랜잭션으로 처리될 경우, 데이터 불일치 문제(이중 쓰기 문제)가 발생할 수 있어서 Transactional Outbox 패턴을 이용하는것이 좋다.
📌 CQRS는 데이터베이스의 동기화를 위해 Kafka나 EDA를 사용하는데, 여기서 데이터 불일치 문제 때문에 Transactional Outbox 패턴을 사용한다고 생각하면 된다.
[2] Outbox 패턴이란? (2 Phase commit 혹은 Dual Write Problem 해결책)
Outbox 패턴은 마이크로서비스 아키텍처에서 데이터베이스 변경과 이벤트/메시지 발행이라는 두 가지 작업을 하나의 원자적인 트랜잭션으로 묶어 데이터 일관성을 보장하기 위한 디자인 패턴이다.
일반적으로 서비스를 업데이트할 때 데이터베이스에 상태를 저장하고(예: 주문 생성), 이 변경 사항을 다른 서비스에 알리기 위해 메시지 브로커(예: Kafka)로 이벤트를 발행해야 한다. 하지만 이 두 작업(DB 저장, 메시지 발행)을 별도의 트랜잭션으로 처리하면, 둘 중 하나만 성공하고 다른 하나는 실패하는 경우 데이터 불일치(Inconsistency)가 발생할 수 있다 (이를 이중 쓰기 문제라고 한다). Outbox 패턴은 이 문제를 다음과 같이 해결한다.
- 애플리케이션은 하나의 DB 트랜잭션 내에서 다음 두 가지를 모두 수행한다.
- 비즈니스 데이터를 해당 테이블에 저장/수정/삭제한다.
- 발행해야 할 이벤트/메시지의 내용을 동일한 데이터베이스 내의 특별한 테이블(outbox 테이블)에 기록한다.
- 이 트랜잭션이 성공적으로 커밋되면, 비즈니스 데이터 변경과 이벤트 기록이 원자적으로 보장된다. 👍
- 별도의 프로세스(CDC 도구 또는 릴레이 프로세스)가 outbox 테이블을 모니터링하다가 새로운 이벤트 기록을 발견하면, 이를 읽어서 실제 메시지 브로커(Kafka 등)로 안전하게 전달한다.
[3] Outbox 구성 요소
- Outbox 테이블: Write 모델 데이터베이스 내에 발행할 이벤트를 임시로 저장하는 outbox_events 테이블을 생성한다.
- 단일 트랜잭션: Write 모델 서비스는 하나의 DB 트랜잭션 내에서 비즈니스 데이터 변경과 outbox_events 테이블에 이벤트 기록을 동시에 처리한다.
- 이벤트 발행: 별도의 메커니즘을 통해 outbox_events 테이블에 기록된 이벤트를 안전하게 읽어 Kafka로 발행한다.
- CDC (Change Data Capture) 방식 (권장): Debezium과 같은 CDC 도구가 DB의 트랜잭션 로그(예: MySQL binlog)를 읽어 outbox_events 테이블의 변경 사항을 감지하고 자동으로 Kafka에 이벤트를 발행한다. 애플리케이션 코드는 Kafka 발행 로직에서 완전히 분리된다.
- 릴레이(Polling Publisher) 방식: 별도의 스케줄러가 주기적으로 outbox_events 테이블을 폴링하여 미발행 이벤트를 Kafka로 발행하고 처리 상태를 업데이트한다. CDC보다 구현이 간단할 수 있으나 폴링 부하와 지연이 발생할 수 있다.
📌 Outbox 패턴 != CDC
📌 CQRS 2단계에서 데이터 동기화를 위해 Kafka를 사용할 때 Outbox + CDC 조합을 쓰면 “DB 저장”만으로 Kafka 이벤트 발행이 자동으로 일어나고, DB와 Kafka 간의 정합성 문제는 완전히 사라진다. 즉, 이 구조에서는 Kafka publish 코드 자체가 필요 없어지므로 Read DB에 저장하는 코드가 사라진다.
3. 이벤트 소싱 (Event Sourcing)과의 관계
이벤트 소싱은 데이터의 현재 상태 대신 상태를 변경시킨 모든 이벤트를 순서대로 저장하는 패턴이다. 이벤트 저장소(Event Store) 자체가 Write 모델의 역할을 한다.
- CQRS와 이벤트 소싱은 함께 사용될 때 강력한 시너지를 낸다. 이벤트 소싱을 CQRS의 Write 측 구현 방식으로 사용하는 것이다.
- Read 모델은 이벤트 저장소의 이벤트 스트림을 구독하여 필요한 데이터를 구축한다. joneconsulting/practice-msa-comms 레포지토리의 cqrs2 브랜치가 이 방식을 보여준다.
- 이벤트 소싱은 데이터 변경 이력 추적, 특정 시점 복구 등 강력한 장점이 있지만, 구현 복잡성이 높아 모든 상황에 적합한 것은 아니며 실무 사용 빈도가 일반 CRUD 방식보다 낮다.
결론
CQRS는 MSA와 같이 복잡한 시스템에서 명령과 조회의 서로 다른 요구사항을 만족시키기 위한 유용한 패턴이다. 물리적인 DB 분리 없이 코드 레벨에서 적용하는 것만으로도 코드 구조 개선과 향후 확장성 확보에 도움이 된다. DB 스키마 분리(2단계)나 물리적 DB 분리(3단계)를 적용할 경우, Kafka와 Transactional Outbox 패턴(CDC 또는 Polling 활용)을 이용한 안정적인 데이터 동기화 메커니즘 구축이 중요하다. 이벤트 소싱은 CQRS의 Write 측 구현 방식으로 고려될 수 있으나, 그 복잡성으로 인해 신중한 도입이 필요하다.
CQRS 실습: 주문 서비스에 CQRS 2단계 적용하기
이제 CQRS 패턴, 특히 2단계(스키마 분리)를 실제 코드에 적용하는 실습 과정을 살펴보자. 여기서는 heojunhyoung/k8s-msa 프로젝트의 order-service를 예시로, Kafka와 Transactional Outbox 패턴(CDC 또는 Polling 방식)을 사용하여 Write 모델과 Read 모델을 분리하고 동기화하는 과정을 보여줄 것이다.
목표 🎯
- Write 모델: 기존 OrderEntity 및 OrderRepository를 사용하여 주문 생성/수정 요청 처리 (정규화된 구조).
- Read 모델: 조회 성능 향상을 위해 비정규화된 OrderSummary 테이블 및 관련 컴포넌트 신설.
- 데이터 동기화: Write 모델 변경 시 Kafka 이벤트를 발행하고, 이를 구독하여 Read 모델을 업데이트 (Transactional Outbox 패턴 사용).
실습 과정 🛠️
1단계: DB 준비 (Write DB 스키마 수정)
Write 모델 DB (현재 order-service가 사용하는 MySQL DB) 내에 두 개의 테이블을 추가 생성해야 한다. 이 SQL 구문들은 DB 초기화 스크립트(예: init-db-configmap.yaml 내 init.sql)에 추가되어야 한다.
- Outbox 테이블 생성: 발행할 이벤트를 임시 저장한다.
-
SQL
CREATE TABLE IF NOT EXISTS outbox_events ( event_id BINARY(16) PRIMARY KEY DEFAULT (UUID_TO_BIN(UUID())), aggregate_type VARCHAR(255) NOT NULL, aggregate_id VARCHAR(255) NOT NULL, event_type VARCHAR(255) NOT NULL, payload JSON NOT NULL, timestamp DATETIME(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ); - Read 모델 테이블 생성: 조회에 최적화된 비정규화 테이블이다.
-
SQL
CREATE TABLE IF NOT EXISTS order_summary ( order_id BIGINT PRIMARY KEY, -- OrderEntity의 ID와 매핑 user_id BIGINT NOT NULL, product_id BIGINT NOT NULL, product_name VARCHAR(255), -- 예시: 비정규화된 상품명 quantity INT NOT NULL, total_price INT NOT NULL, order_date DATETIME(6) -- 예시: 주문 일자 -- 필요한 다른 조회용 컬럼 추가 );
2단계: Write 측 코드 수정 (order-service)
명령(Command)을 처리하고 Outbox 이벤트를 기록하는 로직을 수정한다.
- Outbox 관련 Entity/Repository 추가: outbox_events 테이블용 JPA Entity(OutboxEvent.java)와 Repository(OutboxEventRepository.java)를 생성한다. (이전 답변 코드 예시 참고)
- OrderService.java 수정: @Transactional로 묶어 DB 저장과 Outbox 저장을 원자적으로 처리하고, Kafka 직접 발행 코드를 제거한다.
-
Java
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import jakarta.transaction.Transactional; // 중요! import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; // ... other imports for OrderEntity, OrderRepository, OutboxEvent, OutboxEventRepository, OrderRequest @Service @Slf4j @RequiredArgsConstructor // 생성자 주입 public class OrderCommandService { // 이름 변경 고려 (Command 책임 명시) private final OrderRepository orderRepository; private final OutboxEventRepository outboxEventRepository; // Outbox 리포지토리 주입 private final ObjectMapper objectMapper; // JSON 변환용 (Bean 등록 필요) @Transactional // 트랜잭션 보장! public void createOrder(Long userId, OrderRequest request) { // 1. 비즈니스 데이터 저장 (Write Model) OrderEntity orderEntity = OrderEntity.of(userId, request.getProductId(), request.getQuantity(), request.getUnitPrice()); OrderEntity savedOrder = orderRepository.save(orderEntity); log.info("Order entity saved: {}", savedOrder.getId()); // 2. Outbox 이벤트 생성 및 저장 try { // 이벤트 페이로드 DTO 생성 OrderCreatedEventPayload payload = new OrderCreatedEventPayload( savedOrder.getId(), userId, request.getProductId(), request.getQuantity(), request.getUnitPrice() // 필요시 savedOrder.getCreatedAt() 등 추가 ); String jsonPayload = objectMapper.writeValueAsString(payload); OutboxEvent outboxEvent = new OutboxEvent( "Order", String.valueOf(savedOrder.getId()), "OrderCreated", jsonPayload ); outboxEventRepository.save(outboxEvent); log.info("Outbox event saved for orderId: {}", savedOrder.getId()); } catch (JsonProcessingException e) { log.error("Failed to serialize order created event payload for orderId: {}", savedOrder.getId(), e); throw new RuntimeException("Failed to create outbox event", e); // 롤백 유도 } // 3. KafkaTemplate.send() 호출 제거! } // 주문 수정/삭제 시 유사 로직 구현 (OrderUpdated/OrderDeleted 이벤트 저장) } // OrderCreatedEventPayload DTO 정의 필요 - OrderController.java 수정: 조회 관련 getOrdersApi 메서드를 제거하고, 명령 관련 API만 남겨 OrderCommandController로 이름을 변경하는 것을 고려한다.
3단계: Read 측 코드 신설 (order-service)
조회(Query) 요청을 처리하는 컴포넌트들을 새로 만든다.
- Read 모델 Entity/Repository 추가: order_summary 테이블용 JPA Entity(OrderSummary.java)와 Repository(OrderSummaryRepository.java) 인터페이스를 생성한다.
-
Java
// OrderSummary.java @Entity @Table(name = "order_summary") @Getter @Setter @NoArgsConstructor public class OrderSummary { @Id private Long orderId; // Write Model의 ID와 동일하게 사용 private Long userId; private Long productId; private String productName; private Integer quantity; private Integer totalPrice; private LocalDateTime orderDate; // DATETIME(6) -> LocalDateTime 매핑 } // OrderSummaryRepository.java public interface OrderSummaryRepository extends JpaRepository<OrderSummary, Long> { List<OrderSummary> findByUserId(Long userId); } - Read 모델 DTO 추가: 컨트롤러가 반환할 DTO(OrderSummaryDto.java)를 정의한다. (Entity와 유사하게 정의)
- Query Service 신설: OrderSummaryRepository를 사용하여 조회 로직을 수행하는 OrderQueryService.java를 생성한다.
-
Java
@Service @RequiredArgsConstructor public class OrderQueryService { private final OrderSummaryRepository orderSummaryRepository; private final ModelMapper modelMapper = new ModelMapper(); // DTO 변환 public List<OrderSummaryDto> getOrdersByUserId(Long userId) { List<OrderSummary> summaries = orderSummaryRepository.findByUserId(userId); return summaries.stream() .map(summary -> modelMapper.map(summary, OrderSummaryDto.class)) .collect(Collectors.toList()); } } - Query Controller 신설: 조회 API 엔드포인트(GET /orders/{userId} 등)를 제공하는 OrderQueryController.java를 생성한다.
-
Java
@RestController @RequestMapping("/orders") // 또는 /query/orders @RequiredArgsConstructor @Slf4j public class OrderQueryController { private final OrderQueryService orderQueryService; @GetMapping("/{userId}") public ResponseEntity<List<OrderSummaryDto>> getOrdersByUserId(@PathVariable("userId") Long userId) { log.info("Retrieving order summary for userId: {}", userId); List<OrderSummaryDto> orders = orderQueryService.getOrdersByUserId(userId); return ResponseEntity.ok(orders); } }
4단계: 데이터 동기화 (Kafka Consumer 신설)
Kafka 이벤트를 구독하여 Read 모델 테이블을 업데이트하는 Consumer를 만든다.
- 이벤트 페이로드 DTO 정의: Kafka 메시지를 역직렬화할 DTO(OrderCreatedEventPayload.java 등)를 정의한다. (Write 측에서 사용한 것과 동일해야 함)
- Kafka Consumer 구현 (OrderReadModelUpdater.java):
-
Java
import com.fasterxml.jackson.databind.ObjectMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; // ... OrderSummary, OrderSummaryRepository, OrderCreatedEventPayload 등 import @Component @RequiredArgsConstructor @Slf4j public class OrderReadModelUpdater { private final OrderSummaryRepository orderSummaryRepository; private final ObjectMapper objectMapper; // Bean 등록 필요 // 필요시 다른 서비스 Client 주입 (예: CatalogServiceClient) // Debezium 사용 시 토픽 이름은 EventRouter 설정에 따라 달라짐 (예: "order-events") @KafkaListener(topics = "order-events", groupId = "order-summary-updater-group") public void handleOrderEvent(ConsumerRecord<String, String> record) { log.debug("Received event from Kafka: key={}, value={}", record.key(), record.value()); try { String payload = record.value(); // 실제로는 이벤트 타입을 구분하는 로직 필요 (헤더 또는 페이로드 내 필드 활용) OrderCreatedEventPayload event = objectMapper.readValue(payload, OrderCreatedEventPayload.class); OrderSummary summary = new OrderSummary(); // 이벤트 데이터로 OrderSummary 채우기 summary.setOrderId(event.getOrderId()); summary.setUserId(event.getUserId()); summary.setProductId(event.getProductId()); summary.setQuantity(event.getQuantity()); summary.setTotalPrice(event.getQuantity() * event.getUnitPrice()); // totalPrice 계산 또는 이벤트에서 가져오기 // summary.setOrderDate(event.getOrderTimestamp()); // 이벤트에 타임스탬프 포함 필요 // (선택) 다른 서비스 호출하여 비정규화 데이터 채우기 // CatalogResponse catalogInfo = catalogServiceClient.getCatalog(event.getProductId()); // summary.setProductName(catalogInfo.getProductName()); orderSummaryRepository.save(summary); log.info("Order summary created/updated for orderId: {}", event.getOrderId()); } catch (Exception e) { log.error("Error processing order event: {}", record.value(), e); // DLQ 전송 등 에러 처리 } } // OrderUpdated, OrderDeleted 이벤트 처리 로직 추가 }
5단계: Outbox 이벤트 발행 메커니즘 선택 및 구현
Outbox 테이블의 이벤트를 Kafka로 발행하는 방식을 선택하고 구현한다.
- 5-A: CDC (Debezium) 방식 (권장): ✨
- 개념: Debezium이 DB 로그를 읽어 outbox_events 테이블의 변경을 감지하고, 해당 데이터를 Kafka로 자동 발행한다.
- 필요 작업:
- MySQL Binlog 활성화 및 Debezium용 사용자/권한 설정 (1단계에서 완료).
- Kubernetes에 Kafka Connect 클러스터 배포 (Strimzi Operator 권장).
- Kafka Connect에 Debezium MySQL Connector 플러그인 설치.
- Debezium Connector 인스턴스 생성 및 설정 (YAML 또는 REST API 사용): DB 접속 정보, 모니터링할 outbox_events 테이블 지정, Outbox 패턴 처리를 위한 SMT(io.debezium.transforms.outbox.EventRouter) 설정 (Aggregate Type별 토픽 라우팅, Aggregate ID를 메시지 Key로 사용 등).
- 장점: 애플리케이션 코드가 Kafka 발행과 완전히 분리되어 안정적이다.
- 5-B: Polling Publisher 방식 (대안): 🕵️♂️
- 개념: 별도의 스케줄러가 주기적으로 outbox_events 테이블을 조회하여 미발행 이벤트를 Kafka로 직접 발행한다.
- 필요 작업:
- OutboxEvent Entity에 발행 상태 필드(예: boolean published) 추가 및 테이블 변경.
- Spring @Scheduled 어노테이션을 사용하는 스케줄러 컴포넌트(OutboxPollingScheduler.java) 구현.
- 스케줄러는 주기적으로 outboxEventRepository.findByPublishedFalse() 등을 호출하여 미발행 이벤트 조회.
- 조회된 이벤트를 KafkaTemplate으로 발행하고, 성공 시 event.setPublished(true) 후 outboxEventRepository.save(event) 또는 이벤트 삭제. 발행 실패 시 재시도 로직 구현 필요.
- 장점: CDC 설정 없이 애플리케이션 코드만으로 구현 가능하다.
- 단점: DB 폴링 부하, 발행 지연, 중복 발행 방지 등 고려 사항이 많다.
결론
위 단계를 통해 CQRS 2단계(스키마 분리)를 적용할 수 있다. Write 측은 DB와 Outbox 테이블에 원자적으로 저장하고, Read 측은 별도의 조회용 테이블과 API를 가지며, 둘 사이의 데이터는 Kafka와 Outbox 패턴(CDC 또는 Polling)을 통해 비동기적으로 동기화된다. 이를 통해 시스템의 조회 성능을 향상시키고 코드 구조를 개선할 수 있다.
'MSA > MSA 아키텍처' 카테고리의 다른 글
| [BASIC #1] 마이크로서비스 아키텍처의 이해: Monolithic에서 MSA까지 (0) | 2026.03.02 |
|---|---|
| [5] EDA (0) | 2025.10.20 |
| [4] SAGA 패턴 (0) | 2025.10.20 |
| [2] gRPC (0) | 2025.10.20 |
| [1] OpenFeign (0) | 2025.10.20 |