[ADVANED #1] MSA 고급 패턴 정리

2025. 9. 21. 16:08·MSA/MSA 기본

0. 들어가며

지금까지 우리는 BASIC 시리즈를 통해 마이크로서비스 아키텍처의 핵심 개념부터 시작하여 실제 이커머스 애플리케이션을 구축하고, 서비스 디스커버리, API 게이트웨이, 설정 관리, 서비스 간 통신, 데이터 동기화, 장애 처리, 모니터링, 그리고 Docker 기반 배포까지 폭넓게 다루었다.

하지만 실제 프로덕션 환경에서 마이크로서비스를 운영하다 보면 더 복잡한 문제들과 마주치게 된다. 특히 데이터 일관성, 확장성, 복잡한 비즈니스 로직은 마이크로서비스 아키텍처의 가장 큰 도전 과제다.

이번 글에서는 이러한 문제들을 해결하기 위한 고급 아키텍처 패턴 세 가지를 살펴본다:

  • Event Sourcing: 상태가 아닌 상태 변경 이벤트를 저장하는 패턴
  • CQRS: 명령(Command)과 조회(Query)의 책임을 분리하는 패턴
  • Saga Pattern: 분산 환경에서 데이터 일관성을 유지하기 위한 패턴

이 패턴들은 각각 독립적으로 사용될 수도 있지만, 함께 사용될 때 더욱 강력한 시너지를 발휘한다. 특히 복잡한 도메인, 높은 확장성 요구사항, 감사 추적이 중요한 시스템에서 널리 활용된다.


1. Event Sourcing 패턴

1.1. 전통적인 상태 저장 방식의 한계

전통적인 애플리케이션에서는 엔티티의 **현재 상태(Current State)**만 데이터베이스에 저장한다. 예를 들어, 주문 엔티티에는 주문 ID, 사용자 ID, 상품 ID, 수량, 상태, 생성 시간 등의 현재 값만 저장된다.

전통적인 방식의 문제점:

  1. 변경 이력 부재: "이 주문이 어떻게 현재 상태에 도달했는가?"에 대한 정보가 없다. 누가, 언제, 왜 상태를 변경했는지 추적할 수 없다.
  2. 감사(Audit)의 어려움: 규제 산업(금융, 의료)에서는 모든 변경 이력을 보관해야 하는데, 별도의 감사 테이블을 관리해야 한다.
  3. 비즈니스 인사이트 손실: 상태 변경의 패턴을 분석하여 비즈니스 인사이트를 얻을 수 없다.
  4. 디버깅의 어려움: 버그 발생 시 현재 상태만으로는 원인 파악이 어렵다.

1.2. Event Sourcing 개념

Event Sourcing은 엔티티의 현재 상태를 저장하는 대신, 상태를 변경시키는 모든 이벤트를 저장하는 패턴이다. 이벤트는 불변(immutable)이며, 순서대로 저장된다. 현재 상태는 이러한 이벤트들을 순차적으로 재생(replay)하여 언제든지 복원할 수 있다.

핵심 개념:

  • 이벤트(Event): 과거에 발생한 사실. "무엇이 발생했는가"를 나타낸다. (예: OrderCreated, OrderShipped, OrderCancelled)
  • 이벤트 저장소(Event Store): 이벤트를 순서대로 저장하는 데이터베이스
  • 이벤트 재생(Event Replay): 저장된 이벤트를 순서대로 실행하여 현재 상태를 복원하는 과정
  • 스냅샷(Snapshot): 성능 최적화를 위해 특정 시점의 상태를 저장해두는 것

1.3. Event Sourcing 예시: 주문 시스템

주문 이벤트 정의

// 이벤트 기본 인터페이스
public interface DomainEvent {
    String getEventId();
    String getAggregateId();
    LocalDateTime getOccurredAt();
}

// 주문 생성 이벤트
@Value
public class OrderCreatedEvent implements DomainEvent {
    String eventId;
    String aggregateId;
    LocalDateTime occurredAt;
    String userId;
    String productId;
    int quantity;
    int unitPrice;
    int totalPrice;
}

// 주문 상태 변경 이벤트
@Value
public class OrderStatusChangedEvent implements DomainEvent {
    String eventId;
    String aggregateId;
    LocalDateTime occurredAt;
    OrderStatus oldStatus;
    OrderStatus newStatus;
    String changedBy;
    String reason;
}

// 주문 취소 이벤트
@Value
public class OrderCancelledEvent implements DomainEvent {
    String eventId;
    String aggregateId;
    LocalDateTime occurredAt;
    String reason;
}

이벤트 저장소 인터페이스

public interface EventStore {
    // 이벤트 저장
    void saveEvents(String aggregateId, List<DomainEvent> events, int expectedVersion);

    // 특정 aggregate의 모든 이벤트 조회
    List<DomainEvent> getEvents(String aggregateId);

    // 특정 버전 이후의 이벤트 조회
    List<DomainEvent> getEventsSince(String aggregateId, int version);
}

JPA를 이용한 이벤트 저장소 구현

@Entity
@Table(name = "event_store")
@Data
public class EventEntity {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false)
    private String eventId;

    @Column(nullable = false)
    private String aggregateId;

    @Column(nullable = false)
    private String aggregateType;

    @Column(nullable = false)
    private String eventType;

    @Column(nullable = false, columnDefinition = "TEXT")
    private String eventData;  // JSON 형태로 저장

    @Column(nullable = false)
    private int version;

    @Column(nullable = false)
    private LocalDateTime occurredAt;

    @Column
    private LocalDateTime persistedAt;
}

@Repository
public interface EventStoreRepository extends JpaRepository<EventEntity, Long> {
    List<EventEntity> findByAggregateIdOrderByVersionAsc(String aggregateId);
    List<EventEntity> findByAggregateIdAndVersionGreaterThanOrderByVersionAsc(
            String aggregateId, int version);
}

이벤트 소싱 기반 주문 애그리게이트

public class OrderAggregate {
    private String orderId;
    private String userId;
    private String productId;
    private int quantity;
    private int unitPrice;
    private int totalPrice;
    private OrderStatus status;
    private LocalDateTime createdAt;
    private int version;

    private List<DomainEvent> changes = new ArrayList<>();

    // 생성자: 새로운 주문 생성 시
    public OrderAggregate(String orderId, String userId, String productId,
                         int quantity, int unitPrice) {
        applyChange(new OrderCreatedEvent(
            UUID.randomUUID().toString(),
            orderId,
            LocalDateTime.now(),
            userId,
            productId,
            quantity,
            unitPrice,
            quantity * unitPrice
        ));
    }

    // 복원: 저장된 이벤트로부터 복원 시
    public OrderAggregate(List<DomainEvent> events) {
        events.forEach(this::applyEvent);
    }

    // 상태 변경 메서드
    public void changeStatus(OrderStatus newStatus, String changedBy, String reason) {
        if (this.status == newStatus) {
            return;  // 이미 같은 상태
        }

        // 비즈니스 규칙 검증
        if (this.status == OrderStatus.CANCELLED) {
            throw new IllegalStateException("Cancelled order cannot change status");
        }

        if (this.status == OrderStatus.COMPLETED && newStatus != OrderStatus.CANCELLED) {
            throw new IllegalStateException("Completed order can only be cancelled");
        }

        applyChange(new OrderStatusChangedEvent(
            UUID.randomUUID().toString(),
            orderId,
            LocalDateTime.now(),
            status,
            newStatus,
            changedBy,
            reason
        ));
    }

    // 주문 취소
    public void cancel(String reason) {
        if (status == OrderStatus.CANCELLED) {
            return;
        }

        if (status == OrderStatus.SHIPPED) {
            throw new IllegalStateException("Cannot cancel shipped order");
        }

        applyChange(new OrderCancelledEvent(
            UUID.randomUUID().toString(),
            orderId,
            LocalDateTime.now(),
            reason
        ));
    }

    // 이벤트 적용 (내부 상태 변경)
    private void applyEvent(DomainEvent event) {
        if (event instanceof OrderCreatedEvent) {
            apply((OrderCreatedEvent) event);
        } else if (event instanceof OrderStatusChangedEvent) {
            apply((OrderStatusChangedEvent) event);
        } else if (event instanceof OrderCancelledEvent) {
            apply((OrderCancelledEvent) event);
        }
    }

    private void apply(OrderCreatedEvent event) {
        this.orderId = event.getAggregateId();
        this.userId = event.getUserId();
        this.productId = event.getProductId();
        this.quantity = event.getQuantity();
        this.unitPrice = event.getUnitPrice();
        this.totalPrice = event.getTotalPrice();
        this.status = OrderStatus.CREATED;
        this.createdAt = event.getOccurredAt();
        this.version = 1;
    }

    private void apply(OrderStatusChangedEvent event) {
        this.status = event.getNewStatus();
        this.version++;
    }

    private void apply(OrderCancelledEvent event) {
        this.status = OrderStatus.CANCELLED;
        this.version++;
    }

    // 변경사항 적용 (이벤트 저장)
    private void applyChange(DomainEvent event) {
        applyEvent(event);
        changes.add(event);
    }

    // 변경된 이벤트 목록 반환
    public List<DomainEvent> getChanges() {
        return changes;
    }

    // 현재 상태 반환
    public OrderStatus getStatus() {
        return status;
    }

    public int getVersion() {
        return version;
    }
}

이벤트 소싱 서비스

@Service
@Slf4j
@RequiredArgsConstructor
public class OrderEventSourcingService {

    private final EventStore eventStore;
    private final EventPublisher eventPublisher;

    @Transactional
    public void createOrder(String orderId, String userId, String productId,
                           int quantity, int unitPrice) {

        // 새로운 주문 애그리게이트 생성
        OrderAggregate order = new OrderAggregate(orderId, userId, productId, quantity, unitPrice);

        // 이벤트 저장
        eventStore.saveEvents(orderId, order.getChanges(), 0);

        // 이벤트 발행 (다른 서비스에 전파)
        order.getChanges().forEach(eventPublisher::publish);

        log.info("Order created via event sourcing: {}", orderId);
    }

    @Transactional
    public void updateOrderStatus(String orderId, OrderStatus newStatus,
                                  String changedBy, String reason) {

        // 기존 이벤트 로드
        List<DomainEvent> events = eventStore.getEvents(orderId);
        OrderAggregate order = new OrderAggregate(events);

        int currentVersion = order.getVersion();

        // 상태 변경
        order.changeStatus(newStatus, changedBy, reason);

        // 이벤트 저장 (낙관적 락)
        try {
            eventStore.saveEvents(orderId, order.getChanges(), currentVersion);
        } catch (OptimisticLockingException e) {
            log.error("Concurrent modification detected for order: {}", orderId);
            throw new ConflictException("Order was modified by another transaction");
        }

        // 이벤트 발행
        order.getChanges().forEach(eventPublisher::publish);

        log.info("Order status updated: {} -> {}", orderId, newStatus);
    }

    public OrderProjection getOrder(String orderId) {
        List<DomainEvent> events = eventStore.getEvents(orderId);
        if (events.isEmpty()) {
            throw new OrderNotFoundException(orderId);
        }

        OrderAggregate order = new OrderAggregate(events);

        // 조회 모델로 변환
        return OrderProjection.builder()
                .orderId(order.getOrderId())
                .userId(order.getUserId())
                .productId(order.getProductId())
                .quantity(order.getQuantity())
                .unitPrice(order.getUnitPrice())
                .totalPrice(order.getTotalPrice())
                .status(order.getStatus())
                .version(order.getVersion())
                .build();
    }
}

1.4. Event Sourcing의 장단점

장점:

  • 완전한 감사 추적(Audit Trail) 제공
  • 비즈니스 인사이트 분석 가능 (이벤트 패턴 분석)
  • 디버깅 용이 (과거 상태로 복원하여 문제 분석)
  • 재해 복구 시 특정 시점으로 복원 가능

단점:

  • 학습 곡선이 높음
  • 이벤트 스키마 진화 관리가 복잡
  • 조회 쿼리가 복잡해짐 (모든 이벤트를 재생해야 함)
  • 데이터 저장량 증가

2. CQRS 패턴

2.1. CQRS 개념

CQRS는 Command and Query Responsibility Segregation의 약자로, **명령(Command)**과 **조회(Query)**의 책임을 분리하는 패턴이다.

전통적인 CRUD 방식에서는 읽기와 쓰기가 동일한 데이터 모델을 사용한다. 하지만 복잡한 도메인에서는 읽기와 쓰기의 요구사항이 크게 다를 수 있다.

  • 명령(Command): 시스템의 상태를 변경하는 작업 (create, update, delete). 비즈니스 규칙과 유효성 검사가 중요하다.
  • 조회(Query): 시스템의 상태를 읽는 작업. 성능과 유연성이 중요하다.

2.2. CQRS가 필요한 상황

  1. 읽기와 쓰기의 성능 요구사항이 다른 경우
    • 쓰기는 일관성이 중요하고, 읽기는 응답 속도가 중요
  2. 복잡한 비즈니스 로직
    • 쓰기 모델은 복잡한 규칙을, 읽기 모델은 단순한 조회를 담당
  3. 팀 분리
    • 명령 측면과 조회 측면을 담당하는 팀을 분리할 수 있음
  4. 다양한 조회 요구사항
    • 동일한 데이터를 다양한 방식으로 조회해야 하는 경우

2.3. Event Sourcing + CQRS 아키텍처

Event Sourcing과 CQRS는 자연스럽게 함께 사용된다. 이벤트 저장소는 명령 모델(쓰기)을 담당하고, 별도의 읽기 전용 데이터베이스를 구축하여 조회 성능을 최적화할 수 있다.

아키텍처 흐름:

[명령(Command)] → [Command Handler] → [Event Store]
                                        ↓
                                  [Event Processor]
                                        ↓
                              [읽기 모델 업데이트] → [Query Database]
                                                        ↓
                                                 [Query Handler] ← [조회(Query)]

2.4. CQRS 구현 예시

명령(Command) 모델

// 주문 생성 명령
@Value
public class CreateOrderCommand {
    String orderId;
    String userId;
    String productId;
    int quantity;
    int unitPrice;
}

// 주문 취소 명령
@Value
public class CancelOrderCommand {
    String orderId;
    String reason;
}

// Command Handler
@Component
@RequiredArgsConstructor
public class OrderCommandHandler {

    private final EventStore eventStore;
    private final EventPublisher eventPublisher;

    @Transactional
    public void handle(CreateOrderCommand command) {
        OrderAggregate order = new OrderAggregate(
            command.getOrderId(),
            command.getUserId(),
            command.getProductId(),
            command.getQuantity(),
            command.getUnitPrice()
        );

        eventStore.saveEvents(command.getOrderId(), order.getChanges(), 0);
        order.getChanges().forEach(eventPublisher::publish);
    }

    @Transactional
    public void handle(CancelOrderCommand command) {
        List<DomainEvent> events = eventStore.getEvents(command.getOrderId());
        OrderAggregate order = new OrderAggregate(events);

        int currentVersion = order.getVersion();
        order.cancel(command.getReason());

        eventStore.saveEvents(command.getOrderId(), order.getChanges(), currentVersion);
        order.getChanges().forEach(eventPublisher::publish);
    }
}

조회(Query) 모델 - 읽기 전용 데이터베이스

@Entity
@Table(name = "order_view")
@Data
public class OrderView {
    @Id
    private String orderId;
    private String userId;
    private String userName;      // User Service에서 조인된 정보
    private String productId;
    private String productName;   // Catalog Service에서 조인된 정보
    private int quantity;
    private int unitPrice;
    private int totalPrice;
    private String status;
    private LocalDateTime createdAt;
    private LocalDateTime updatedAt;
    private int version;
}

// 조회 리포지토리
@Repository
public interface OrderViewRepository extends JpaRepository<OrderView, String> {
    List<OrderView> findByUserId(String userId);
    List<OrderView> findByStatus(String status);
    List<OrderView> findByCreatedAtBetween(LocalDateTime start, LocalDateTime end);

    @Query("SELECT o FROM OrderView o WHERE o.userId = :userId AND o.createdAt >= :since")
    List<OrderView> findRecentOrdersByUser(@Param("userId") String userId,
                                           @Param("since") LocalDateTime since);
}

// Query Handler
@Service
@RequiredArgsConstructor
public class OrderQueryHandler {

    private final OrderViewRepository orderViewRepository;

    public OrderView getOrder(String orderId) {
        return orderViewRepository.findById(orderId)
                .orElseThrow(() -> new OrderNotFoundException(orderId));
    }

    public List<OrderView> getOrdersByUser(String userId) {
        return orderViewRepository.findByUserId(userId);
    }

    public Page<OrderView> searchOrders(OrderSearchCriteria criteria, Pageable pageable) {
        // 복잡한 검색 조건 처리
        Specification<OrderView> spec = Specification.where(null);

        if (criteria.getUserId() != null) {
            spec = spec.and((root, query, cb) ->
                cb.equal(root.get("userId"), criteria.getUserId()));
        }

        if (criteria.getStatus() != null) {
            spec = spec.and((root, query, cb) ->
                cb.equal(root.get("status"), criteria.getStatus()));
        }

        if (criteria.getStartDate() != null && criteria.getEndDate() != null) {
            spec = spec.and((root, query, cb) ->
                cb.between(root.get("createdAt"), criteria.getStartDate(), criteria.getEndDate()));
        }

        return orderViewRepository.findAll(spec, pageable);
    }
}

이벤트 프로세서 - 읽기 모델 업데이트

@Component
@Slf4j
@RequiredArgsConstructor
public class OrderViewUpdater {

    private final OrderViewRepository orderViewRepository;
    private final UserServiceClient userServiceClient;
    private final CatalogServiceClient catalogServiceClient;

    @EventListener
    @Transactional
    public void handleOrderCreated(OrderCreatedEvent event) {
        OrderView orderView = new OrderView();
        orderView.setOrderId(event.getAggregateId());
        orderView.setUserId(event.getUserId());
        orderView.setProductId(event.getProductId());
        orderView.setQuantity(event.getQuantity());
        orderView.setUnitPrice(event.getUnitPrice());
        orderView.setTotalPrice(event.getTotalPrice());
        orderView.setStatus("CREATED");
        orderView.setCreatedAt(event.getOccurredAt());
        orderView.setUpdatedAt(event.getOccurredAt());
        orderView.setVersion(1);

        // 추가 정보 조회 (다른 서비스에서)
        try {
            UserDto user = userServiceClient.getUser(event.getUserId());
            orderView.setUserName(user.getName());
        } catch (Exception e) {
            log.warn("Failed to fetch user info for view: {}", event.getUserId());
        }

        try {
            ProductDto product = catalogServiceClient.getProduct(event.getProductId());
            orderView.setProductName(product.getProductName());
        } catch (Exception e) {
            log.warn("Failed to fetch product info for view: {}", event.getProductId());
        }

        orderViewRepository.save(orderView);
        log.info("Order view updated for order: {}", event.getAggregateId());
    }

    @EventListener
    @Transactional
    public void handleOrderStatusChanged(OrderStatusChangedEvent event) {
        OrderView orderView = orderViewRepository.findById(event.getAggregateId())
                .orElseThrow(() -> new IllegalStateException("Order view not found"));

        orderView.setStatus(event.getNewStatus().name());
        orderView.setUpdatedAt(event.getOccurredAt());
        orderView.setVersion(orderView.getVersion() + 1);

        orderViewRepository.save(orderView);
        log.info("Order view status updated: {}", event.getAggregateId());
    }

    @EventListener
    @Transactional
    public void handleOrderCancelled(OrderCancelledEvent event) {
        OrderView orderView = orderViewRepository.findById(event.getAggregateId())
                .orElseThrow(() -> new IllegalStateException("Order view not found"));

        orderView.setStatus("CANCELLED");
        orderView.setUpdatedAt(event.getOccurredAt());
        orderView.setVersion(orderView.getVersion() + 1);

        orderViewRepository.save(orderView);
        log.info("Order view cancelled: {}", event.getAggregateId());
    }
}

2.5. CQRS의 장단점

장점:

  • 읽기/쓰기 모델 최적화 가능
  • 확장성 향상 (각 모델을 독립적으로 확장)
  • 보안 강화 (읽기/쓰기 권한 분리)
  • 복잡한 도메인을 단순화

단점:

  • 구현 복잡성 증가
  • 최종 일관성(Eventual Consistency)만 보장
  • 추가 인프라 필요
  • 학습 곡선이 높음

3. Saga Pattern

3.1. Saga 패턴 개념

Saga 패턴은 분산 환경에서 여러 마이크로서비스에 걸친 트랜잭션의 데이터 일관성을 유지하기 위한 패턴이다. 전통적인 분산 트랜잭션(2PC)은 성능 저하와 가용성 문제로 마이크로서비스 환경에 적합하지 않다. Saga는 각 로컬 트랜잭션을 순차적으로 실행하고, 실패 시 보상 트랜잭션(Compensating Transaction)을 실행하여 이전 상태로 되돌린다.

3.2. Saga 패턴의 두 가지 구현 방식

1. 코레오그래피(Choreography) 방식

  • 각 서비스가 이벤트를 발행하고, 다른 서비스가 이벤트를 구독하여 트랜잭션을 진행
  • 중앙 조정자 없이 서비스 간 이벤트 기반 통신
  • 구현이 단순하지만 트랜잭션 흐름 파악이 어려움

2. 오케스트레이션(Orchestration) 방식

  • 중앙 오케스트레이터가 트랜잭션 흐름을 제어
  • 각 서비스에 명령을 내리고 결과를 받아 다음 단계 결정
  • 트랜잭션 흐름 파악이 쉽지만 오케스트레이터가 단일 장애점

3.3. Saga 패턴 예시: 주문 처리 프로세스

주문 처리 Saga 흐름:

  1. Order Service: 주문 생성 (PENDING 상태)
  2. Payment Service: 결제 처리
  3. Inventory Service: 재고 차감
  4. Shipping Service: 배송 생성
  5. Order Service: 주문 상태 COMPLETED로 변경

실패 시 보상 트랜잭션:

  • 결제 실패: 주문 취소 (보상 트랜잭션)
  • 재고 부족: 결제 취소 → 주문 취소
  • 배송 생성 실패: 재고 복원 → 결제 취소 → 주문 취소

3.4. 코레오그래피 방식 구현 (Kafka 기반)

주문 생성 이벤트 발행

@Service
@Slf4j
@RequiredArgsConstructor
public class OrderSagaService {

    private final OrderRepository orderRepository;
    private final KafkaTemplate<String, Object> kafkaTemplate;

    @Transactional
    public OrderEntity createOrder(CreateOrderCommand command) {
        // 주문 생성 (PENDING 상태)
        OrderEntity order = new OrderEntity();
        order.setOrderId(command.getOrderId());
        order.setUserId(command.getUserId());
        order.setProductId(command.getProductId());
        order.setQuantity(command.getQuantity());
        order.setUnitPrice(command.getUnitPrice());
        order.setTotalPrice(command.getQuantity() * command.getUnitPrice());
        order.setStatus("PENDING");
        order.setCreatedAt(LocalDateTime.now());

        orderRepository.save(order);

        // 주문 생성 이벤트 발행
        OrderCreatedEvent event = new OrderCreatedEvent(
            UUID.randomUUID().toString(),
            order.getOrderId(),
            order.getUserId(),
            order.getProductId(),
            order.getQuantity(),
            order.getUnitPrice(),
            order.getTotalPrice(),
            LocalDateTime.now()
        );

        kafkaTemplate.send("order-created", order.getOrderId(), event);
        log.info("Order created event published: {}", order.getOrderId());

        return order;
    }

    // 결제 완료 이벤트 처리
    @KafkaListener(topics = "payment-completed")
    @Transactional
    public void handlePaymentCompleted(PaymentCompletedEvent event) {
        OrderEntity order = orderRepository.findByOrderId(event.getOrderId())
                .orElseThrow(() -> new OrderNotFoundException(event.getOrderId()));

        order.setPaymentId(event.getPaymentId());
        order.setStatus("PAYMENT_COMPLETED");
        orderRepository.save(order);

        // 재고 차감 이벤트 발행
        DeductInventoryCommand command = new DeductInventoryCommand(
            order.getOrderId(),
            order.getProductId(),
            order.getQuantity()
        );
        kafkaTemplate.send("inventory-deduct", order.getOrderId(), command);
        log.info("Inventory deduct command sent: {}", order.getOrderId());
    }

    // 결제 실패 이벤트 처리 (보상 트랜잭션)
    @KafkaListener(topics = "payment-failed")
    @Transactional
    public void handlePaymentFailed(PaymentFailedEvent event) {
        OrderEntity order = orderRepository.findByOrderId(event.getOrderId())
                .orElseThrow(() -> new OrderNotFoundException(event.getOrderId()));

        order.setStatus("FAILED");
        order.setFailureReason("Payment failed: " + event.getReason());
        orderRepository.save(order);

        log.info("Order failed due to payment issue: {}", event.getOrderId());
    }

    // 재고 차감 완료 이벤트 처리
    @KafkaListener(topics = "inventory-deducted")
    @Transactional
    public void handleInventoryDeducted(InventoryDeductedEvent event) {
        OrderEntity order = orderRepository.findByOrderId(event.getOrderId())
                .orElseThrow(() -> new OrderNotFoundException(event.getOrderId()));

        order.setStatus("INVENTORY_DEDUCTED");
        orderRepository.save(order);

        // 배송 생성 이벤트 발행
        CreateShippingCommand command = new CreateShippingCommand(
            order.getOrderId(),
            order.getUserId(),
            order.getProductId(),
            order.getQuantity()
        );
        kafkaTemplate.send("shipping-create", order.getOrderId(), command);
        log.info("Shipping create command sent: {}", order.getOrderId());
    }

    // 재고 부족 이벤트 처리 (보상 트랜잭션)
    @KafkaListener(topics = "inventory-insufficient")
    @Transactional
    public void handleInventoryInsufficient(InventoryInsufficientEvent event) {
        OrderEntity order = orderRepository.findByOrderId(event.getOrderId())
                .orElseThrow(() -> new OrderNotFoundException(event.getOrderId()));

        order.setStatus("FAILED");
        order.setFailureReason("Insufficient inventory");
        orderRepository.save(order);

        // 결제 취소 이벤트 발행 (보상 트랜잭션)
        CancelPaymentCommand command = new CancelPaymentCommand(
            order.getOrderId(),
            order.getPaymentId(),
            "Insufficient inventory"
        );
        kafkaTemplate.send("payment-cancel", order.getOrderId(), command);
        log.info("Payment cancel command sent: {}", order.getOrderId());
    }

    // 배송 생성 완료 이벤트 처리
    @KafkaListener(topics = "shipping-created")
    @Transactional
    public void handleShippingCreated(ShippingCreatedEvent event) {
        OrderEntity order = orderRepository.findByOrderId(event.getOrderId())
                .orElseThrow(() -> new OrderNotFoundException(event.getOrderId()));

        order.setShippingId(event.getShippingId());
        order.setStatus("COMPLETED");
        order.setCompletedAt(LocalDateTime.now());
        orderRepository.save(order);

        log.info("Order completed: {}", event.getOrderId());
    }

    // 배송 생성 실패 이벤트 처리 (보상 트랜잭션)
    @KafkaListener(topics = "shipping-failed")
    @Transactional
    public void handleShippingFailed(ShippingFailedEvent event) {
        OrderEntity order = orderRepository.findByOrderId(event.getOrderId())
                .orElseThrow(() -> new OrderNotFoundException(event.getOrderId()));

        order.setStatus("FAILED");
        order.setFailureReason("Shipping failed: " + event.getReason());
        orderRepository.save(order);

        // 재고 복원 이벤트 발행 (보상 트랜잭션)
        RestoreInventoryCommand command = new RestoreInventoryCommand(
            order.getOrderId(),
            order.getProductId(),
            order.getQuantity()
        );
        kafkaTemplate.send("inventory-restore", order.getOrderId(), command);
        log.info("Inventory restore command sent: {}", order.getOrderId());

        // 결제 취소 이벤트 발행 (보상 트랜잭션)
        CancelPaymentCommand paymentCommand = new CancelPaymentCommand(
            order.getOrderId(),
            order.getPaymentId(),
            "Shipping failed"
        );
        kafkaTemplate.send("payment-cancel", order.getOrderId(), paymentCommand);
        log.info("Payment cancel command sent: {}", order.getOrderId());
    }
}

3.5. 오케스트레이션 방식 구현

Saga 오케스트레이터

@Component
@Slf4j
public class OrderSagaOrchestrator {

    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final SagaStateRepository sagaStateRepository;

    // Saga 시작
    @Transactional
    public void startSaga(CreateOrderCommand command) {
        String sagaId = UUID.randomUUID().toString();

        SagaState state = new SagaState();
        state.setSagaId(sagaId);
        state.setOrderId(command.getOrderId());
        state.setCurrentStep("ORDER_CREATED");
        state.setStatus("IN_PROGRESS");
        state.setPayload(convertToJson(command));
        state.setStartedAt(LocalDateTime.now());

        sagaStateRepository.save(state);

        // 1단계: 주문 생성
        kafkaTemplate.send("create-order", command.getOrderId(), command);
        log.info("Saga started: {}, orderId: {}", sagaId, command.getOrderId());
    }

    // 주문 생성 완료 처리
    @KafkaListener(topics = "order-created-response")
    @Transactional
    public void handleOrderCreated(OrderCreatedResponse response) {
        SagaState state = sagaStateRepository.findByOrderId(response.getOrderId())
                .orElseThrow(() -> new SagaNotFoundException(response.getOrderId()));

        if (!response.isSuccess()) {
            failSaga(state, "Order creation failed");
            return;
        }

        state.setCurrentStep("PAYMENT_PENDING");
        sagaStateRepository.save(state);

        // 2단계: 결제 처리
        PaymentCommand command = new PaymentCommand(
            response.getOrderId(),
            response.getUserId(),
            response.getTotalPrice()
        );
        kafkaTemplate.send("process-payment", response.getOrderId(), command);
        log.info("Moving to payment step: {}", response.getOrderId());
    }

    // 결제 완료 처리
    @KafkaListener(topics = "payment-processed")
    @Transactional
    public void handlePaymentProcessed(PaymentProcessedResponse response) {
        SagaState state = sagaStateRepository.findByOrderId(response.getOrderId())
                .orElseThrow(() -> new SagaNotFoundException(response.getOrderId()));

        if (!response.isSuccess()) {
            // 보상 트랜잭션: 주문 취소
            compensateOrder(state, "Payment failed: " + response.getFailureReason());
            return;
        }

        state.setCurrentStep("INVENTORY_PENDING");
        sagaStateRepository.save(state);

        // 3단계: 재고 차감
        InventoryCommand command = new InventoryCommand(
            response.getOrderId(),
            response.getProductId(),
            response.getQuantity()
        );
        kafkaTemplate.send("deduct-inventory", response.getOrderId(), command);
        log.info("Moving to inventory step: {}", response.getOrderId());
    }

    // 재고 차감 완료 처리
    @KafkaListener(topics = "inventory-processed")
    @Transactional
    public void handleInventoryProcessed(InventoryProcessedResponse response) {
        SagaState state = sagaStateRepository.findByOrderId(response.getOrderId())
                .orElseThrow(() -> new SagaNotFoundException(response.getOrderId()));

        if (!response.isSuccess()) {
            // 보상 트랜잭션: 결제 취소 → 주문 취소
            compensatePaymentAndOrder(state, "Insufficient inventory");
            return;
        }

        state.setCurrentStep("SHIPPING_PENDING");
        sagaStateRepository.save(state);

        // 4단계: 배송 생성
        ShippingCommand command = new ShippingCommand(
            response.getOrderId(),
            response.getUserId(),
            response.getProductId(),
            response.getQuantity()
        );
        kafkaTemplate.send("create-shipping", response.getOrderId(), command);
        log.info("Moving to shipping step: {}", response.getOrderId());
    }

    // 배송 생성 완료 처리
    @KafkaListener(topics = "shipping-processed")
    @Transactional
    public void handleShippingProcessed(ShippingProcessedResponse response) {
        SagaState state = sagaStateRepository.findByOrderId(response.getOrderId())
                .orElseThrow(() -> new SagaNotFoundException(response.getOrderId()));

        if (!response.isSuccess()) {
            // 보상 트랜잭션: 재고 복원 → 결제 취소 → 주문 취소
            compensateAll(state, "Shipping failed: " + response.getFailureReason());
            return;
        }

        // Saga 성공 완료
        state.setCurrentStep("COMPLETED");
        state.setStatus("SUCCESS");
        state.setCompletedAt(LocalDateTime.now());
        sagaStateRepository.save(state);

        // 주문 완료 이벤트 발행
        kafkaTemplate.send("order-completed", response.getOrderId(),
            new OrderCompletedEvent(response.getOrderId()));

        log.info("Saga completed successfully: {}", response.getOrderId());
    }

    // 보상 트랜잭션: 주문 취소
    private void compensateOrder(SagaState state, String reason) {
        log.warn("Compensating order: {}, reason: {}", state.getOrderId(), reason);

        state.setStatus("FAILED");
        state.setFailureReason(reason);
        state.setCompletedAt(LocalDateTime.now());
        sagaStateRepository.save(state);

        CancelOrderCommand command = new CancelOrderCommand(state.getOrderId(), reason);
        kafkaTemplate.send("cancel-order", state.getOrderId(), command);
    }

    // 보상 트랜잭션: 결제 취소 + 주문 취소
    private void compensatePaymentAndOrder(SagaState state, String reason) {
        log.warn("Compensating payment and order: {}, reason: {}", state.getOrderId(), reason);

        state.setStatus("COMPENSATING");
        sagaStateRepository.save(state);

        // 결제 취소
        CancelPaymentCommand paymentCommand = new CancelPaymentCommand(
            state.getOrderId(),
            reason
        );
        kafkaTemplate.send("cancel-payment", state.getOrderId(), paymentCommand);

        // 주문 취소 (결제 취소 완료 후 처리)
        CancelOrderCommand orderCommand = new CancelOrderCommand(state.getOrderId(), reason);
        kafkaTemplate.send("cancel-order", state.getOrderId(), orderCommand);
    }

    // 모든 보상 트랜잭션 실행
    private void compensateAll(SagaState state, String reason) {
        log.warn("Compensating all: {}, reason: {}", state.getOrderId(), reason);

        state.setStatus("COMPENSATING");
        sagaStateRepository.save(state);

        // 재고 복원
        RestoreInventoryCommand inventoryCommand = new RestoreInventoryCommand(
            state.getOrderId(),
            state.getProductId(),
            state.getQuantity()
        );
        kafkaTemplate.send("restore-inventory", state.getOrderId(), inventoryCommand);

        // 결제 취소
        CancelPaymentCommand paymentCommand = new CancelPaymentCommand(
            state.getOrderId(),
            reason
        );
        kafkaTemplate.send("cancel-payment", state.getOrderId(), paymentCommand);

        // 주문 취소
        CancelOrderCommand orderCommand = new CancelOrderCommand(state.getOrderId(), reason);
        kafkaTemplate.send("cancel-order", state.getOrderId(), orderCommand);
    }

    private void failSaga(SagaState state, String reason) {
        state.setStatus("FAILED");
        state.setFailureReason(reason);
        state.setCompletedAt(LocalDateTime.now());
        sagaStateRepository.save(state);
        log.error("Saga failed: {}, reason: {}", state.getOrderId(), reason);
    }
}

3.6. Saga 패턴의 장단점

장점:

  • 분산 환경에서 데이터 일관성 유지 가능
  • 높은 가용성과 확장성
  • 각 서비스의 독립성 유지
  • 2PC(분산 트랜잭션)보다 성능 우수

단점:

  • 최종 일관성만 보장
  • 보상 트랜잭션 구현 복잡
  • 중복 메시지 처리 필요
  • 데드 레터 관리 필요

4. 세 가지 패턴의 통합 아키텍처

4.1. 완전한 이커머스 시스템 아키텍처

세 가지 패턴을 통합하면 다음과 같은 완전한 아키텍처를 구축할 수 있다.

[클라이언트]
    ↓
[API Gateway]
    ↓
┌─────────────────────────────────────────────────────────────┐
│                      Command Side (CQRS)                     │
│  ┌──────────────────────────────────────────────────────┐   │
│  │  Saga Orchestrator                                    │   │
│  │  - 주문 처리 프로세스 관리                              │   │
│  │  - 보상 트랜잭션 실행                                   │   │
│  └──────────────────────────────────────────────────────┘   │
│            ↓                   ↓                   ↓         │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐
│  │ Order Service   │  │ Payment Service │  │Inventory Service│
│  │ (Event Sourcing)│  │ (Event Sourcing)│  │ (Event Sourcing)│
│  └─────────────────┘  └─────────────────┘  └─────────────────┘
│            ↓                   ↓                   ↓         │
│  ┌──────────────────────────────────────────────────────┐   │
│  │                  Event Store (Kafka)                  │   │
│  │              - 모든 이벤트가 저장됨                     │   │
│  └──────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘
                            ↓
┌─────────────────────────────────────────────────────────────┐
│                      Query Side (CQRS)                        │
│  ┌──────────────────────────────────────────────────────┐   │
│  │           Event Processors / View Updaters            │   │
│  │   - Order View Updater                                │   │
│  │   - Payment View Updater                              │   │
│  │   - Inventory View Updater                            │   │
│  └──────────────────────────────────────────────────────┘   │
│            ↓                   ↓                   ↓         │
│  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐
│  │ Order View DB   │  │ Payment View DB │  │Inventory View DB│
│  │ (Read-Only)     │  │ (Read-Only)     │  │ (Read-Only)     │
│  └─────────────────┘  └─────────────────┘  └─────────────────┘
│            ↓                   ↓                   ↓         │
│  ┌──────────────────────────────────────────────────────┐   │
│  │              Query Handlers / APIs                    │   │
│  │   - 주문 조회, 결제 내역 조회, 재고 현황 조회 등        │   │
│  └──────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────┘
                            ↓
                       [클라이언트 응답]

4.2. 통합 아키텍처의 이점

데이터 일관성: Saga 패턴으로 분산 트랜잭션 관리

완전한 감사 추적: Event Sourcing으로 모든 변경 이력 저장

뛰어난 조회 성능: CQRS로 최적화된 읽기 모델 제공

확장성: 각 컴포넌트를 독립적으로 확장 가능

탄력성: 장애 발생 시 보상 트랜잭션으로 복구

4.3. 실제 구현 시 고려사항

1. 이벤트 버전 관리

// 이벤트에 버전 정보 포함
public interface DomainEvent {
    String getEventId();
    String getAggregateId();
    int getVersion();  // 이벤트 스키마 버전
    LocalDateTime getOccurredAt();
}

// 버전별 이벤트 변환 로직
@Component
public class EventUpcaster {

    public DomainEvent upcast(DomainEvent oldEvent) {
        if (oldEvent.getVersion() == 1) {
            // 버전 1 → 2 변환
            return convertV1ToV2(oldEvent);
        }
        return oldEvent;
    }
}

2. 멱등성 처리

@Component
public class IdempotencyHandler {

    private final Set<String> processedEvents = ConcurrentHashMap.newKeySet();

    public boolean isProcessed(String eventId) {
        return processedEvents.contains(eventId);
    }

    @Transactional
    public void markProcessed(String eventId) {
        processedEvents.add(eventId);
        // 데이터베이스에도 저장
        idempotencyRepository.save(new IdempotencyRecord(eventId));
    }

    public <T> T processIdempotently(String eventId, Supplier<T> action) {
        if (isProcessed(eventId)) {
            log.info("Event already processed: {}", eventId);
            return null;
        }

        T result = action.get();
        markProcessed(eventId);
        return result;
    }
}

3. 데드 레터 큐 관리

@Component
@Slf4j
public class DeadLetterQueueHandler {

    private final KafkaTemplate<String, Object> kafkaTemplate;
    private final DeadLetterRepository deadLetterRepository;

    @KafkaListener(topics = "dead-letter-queue")
    public void handleDeadLetter(DeadLetterMessage message) {
        log.error("Processing dead letter: {}", message);

        // 재처리 시도
        boolean reprocessed = tryReprocess(message);

        if (!reprocessed) {
            // 수동 개입 필요 알림
            notifyAdmin(message);

            // 영구 저장
            deadLetterRepository.save(new DeadLetterEntity(message));
        }
    }

    private boolean tryReprocess(DeadLetterMessage message) {
        int maxAttempts = 3;
        for (int i = 0; i < maxAttempts; i++) {
            try {
                // 원본 토픽으로 재전송
                kafkaTemplate.send(message.getOriginalTopic(),
                                   message.getKey(),
                                   message.getPayload());
                log.info("Dead letter reprocessed: {}", message.getMessageId());
                return true;
            } catch (Exception e) {
                log.warn("Reprocess attempt {} failed: {}", i + 1, message.getMessageId());
                sleep(1000 * (i + 1));  // 점진적 백오프
            }
        }
        return false;
    }
}

5. 결론

5.1. 패턴 선택 가이드

Event Sourcing을 선택해야 하는 경우:

  • 완전한 감사 추적이 필요한 경우 (금융, 의료, 규제 산업)
  • 비즈니스 인사이트를 위해 상태 변경 이력 분석이 필요한 경우
  • 복잡한 도메인 이벤트를 기반으로 다양한 파생 뷰가 필요한 경우

CQRS를 선택해야 하는 경우:

  • 읽기와 쓰기의 성능 요구사항이 크게 다른 경우
  • 복잡한 비즈니스 로직과 다양한 조회 요구사항이 있는 경우
  • 팀을 명령 측과 조회 측으로 분리하고 싶은 경우

Saga 패턴을 선택해야 하는 경우:

  • 여러 마이크로서비스에 걸친 트랜잭션이 필요한 경우
  • 2PC의 성능 저하를 피해야 하는 경우
  • 최종 일관성이 비즈니스적으로 허용되는 경우

5.2. 마이크로서비스 여정의 완성

지금까지 BASIC 시리즈와 ADVANCED 패턴을 통해 마이크로서비스 아키텍처의 거의 모든 측면을 다루었다.

  • BASIC #1: 마이크로서비스와 Spring Cloud 소개
  • BASIC #2: 서비스 디스커버리 (Eureka)
  • BASIC #3: API 게이트웨이 (Spring Cloud Gateway)
  • BASIC #4: 이커머스 MSA 프로젝트 구조 설계
  • BASIC #5: 인증 처리와 JWT 기반 보안
  • BASIC #6: Configuration Service와 중앙 설정 관리
  • BASIC #7: 마이크로서비스 간 통신 전략
  • BASIC #8: Kafka 기반 데이터 동기화
  • BASIC #9: 장애 처리와 모니터링
  • BASIC #10: Docker 기반 MSA 배포 전략
  • ADVANCED #1: Event Sourcing, CQRS, Saga Pattern

이 여정이 마이크로서비스 아키텍처를 이해하고 실제 시스템에 적용하는 데 도움이 되었기를 바란다. 마이크로서비스는 만능 해결책이 아니며, 복잡성이라는 대가를 치르는 대신 확장성과 민첩성을 얻는 트레이드오프다.

중요한 것은 특정 기술이나 패턴에 집착하기보다, 팀의 상황, 비즈니스 요구사항, 시스템의 특성에 맞는 적절한 아키텍처를 선택하는 것이다. 때로는 모놀리식이 더 나은 선택일 수 있고, 때로는 이 글에서 다룬 고급 패턴들이 필요할 것이다.

마이크로서비스의 세계는 계속 진화하고 있다. Kubernetes, Service Mesh, Serverless, WebAssembly 등 새로운 기술들이 등장하고 있으며, 이에 대한 지속적인 학습이 필요하다. 이 글이 그 여정의 든든한 이정표가 되기를 바란다.

'MSA > MSA 기본' 카테고리의 다른 글

[BASIC #10] Docker 기반 MSA 배포 전략  (0) 2025.09.21
[BASIC #9] 장애 처리와 모니터링  (0) 2025.09.20
[BASIC #8] Kafka 기반 데이터 동기화  (0) 2025.09.20
[BASIC #7] Microservice 간 통신 전략  (0) 2025.09.20
[BASIC #6] Configuration Service와 중앙 설정 관리  (0) 2025.09.20
'MSA/MSA 기본' 카테고리의 다른 글
  • [BASIC #10] Docker 기반 MSA 배포 전략
  • [BASIC #9] 장애 처리와 모니터링
  • [BASIC #8] Kafka 기반 데이터 동기화
  • [BASIC #7] Microservice 간 통신 전략
h6bro
h6bro
백엔드 개발자의 기술 블로그
  • h6bro
    Jun's Tech Blog
    h6bro
  • 전체
    오늘
    어제
    • 분류 전체보기 (250) N
      • Java (18)
        • Core (9)
        • Design Pattern (9)
      • Spring (80)
        • Core (24)
        • MVC (6)
        • DB (10)
        • JPA (26)
        • Monitoring (3)
        • Security (11)
        • WebSocket (0)
      • Database (33)
        • Redis (15)
        • MySQL (18)
      • MSA (25) N
        • MSA 기본 (11)
        • MSA 아키텍처 (14) N
      • Kafka (30)
        • Core (18)
        • Connect (12)
      • ElasticSearch (11)
        • Search (11)
        • Logging (0)
      • Test (4)
        • k6 (4)
      • Docker (9)
      • CI&CD (10)
        • GitHub Actions (6)
        • ArgoCD (4)
      • Kubernetes (18)
        • Core (12)
        • Ops (6)
      • Cloud Engineering (4)
        • AWS Infrastructure (3)
        • AWS EKS (1)
        • Terraform (0)
      • Project (8)
        • LinkFolio (1)
        • Secondhand Market (7)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

    • Cloud Engineering 포스팅 정리
  • 인기 글

  • 태그

    ㅈ
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.5
h6bro
[ADVANED #1] MSA 고급 패턴 정리
상단으로

티스토리툴바