[ADVANCED #2] 분산 트랜잭션과 SAGA 패턴 완전 정리

2026. 3. 2. 14:13·MSA/MSA 아키텍처

0. 들어가며

마이크로서비스 아키텍처에서 가장 까다로운 문제 중 하나는 여러 서비스에 걸친 데이터 일관성 유지다. 모놀리식 아키텍처에서는 하나의 데이터베이스 트랜잭션으로 모든 작업을 ACID하게 처리할 수 있었다. 하지만 MSA에서는 각 서비스가 독립적인 데이터베이스를 가지므로, 전통적인 방식의 분산 트랜잭션은 현실적으로 어렵다.

 

이런 상황에서 등장한 패턴이 바로 SAGA 패턴이다. SAGA는 분산 환경에서 데이터 일관성을 유지하기 위한 패턴으로, 각 서비스의 로컬 트랜잭션을 순차적으로 실행하고, 실패 시 보상 트랜잭션(Compensating Transaction)을 실행하여 이전 상태로 되돌린다.

이번 글에서는 분산 트랜잭션의 기본 개념부터 시작하여 SAGA 패턴의 두 가지 구현 방식(코레오그래피/오케스트레이션), 그리고 실무에서 자주 마주치는 Dual Write 문제와 이를 해결하기 위한 Outbox 패턴, CDC(Change Data Capture)까지 상세히 살펴본다.


1. 분산 트랜잭션 문제

1.1. 분산 트랜잭션이란?

분산 트랜잭션은 여러 개의 독립적인 데이터베이스나 서비스에 걸쳐 수행되는 트랜잭션을 의미한다. 예를 들어, 주문 처리 프로세스는 다음과 같은 여러 단계를 거칠 수 있다.

// 하나의 비즈니스 트랜잭션이 여러 서비스에 걸쳐 있음
public void processOrder(Order order) {
    // 1. 주문 생성 (Order Service)
    orderService.createOrder(order);

    // 2. 재고 차감 (Inventory Service)
    inventoryService.deductStock(order);

    // 3. 결제 처리 (Payment Service)
    paymentService.processPayment(order);

    // 4. 배송 생성 (Shipping Service)
    shippingService.createShipment(order);

    // 이 중 하나라도 실패하면 어떻게 될까?
}

1.2. 분산 트랜잭션의 어려움

1. 네트워크 신뢰성 문제

  • 서비스 간 네트워크 호출은 항상 실패할 가능성이 있음
  • 타임아웃, 일시적 장애 등 예측 불가능한 상황 발생

2. 독립적인 데이터베이스

  • 각 서비스가 자신의 DB를 가지므로, DB 레벨의 트랜잭션으로 묶을 수 없음
  • 2PC(2 Phase Commit)를 사용할 수 있지만, 성능과 가용성에 문제

3. 부분 실패 처리

  • 일부 단계는 성공하고 일부는 실패한 경우 처리 필요
  • 이미 성공한 단계를 원상복구 할 방법이 필요

1.3. 2PC(2 Phase Commit)의 한계

전통적인 분산 트랜잭션 해결책인 2PC는 다음과 같은 단계로 동작한다.

[트랜잭션 코디네이터]
    ↓
[1단계: 준비(Prepare)] → 모든 노드에 준비 요청
    ↓
[2단계: 커밋(Commit)] → 모든 노드가 준비 완료되면 커밋

2PC의 문제점:

  1. 성능 저하: 모든 노드가 준비될 때까지 락(Lock)을 유지해야 함
  2. 가용성 감소: 코디네이터가 단일 장애점(SPOF)이 됨
  3. 확장성 제한: 노드가 많아질수록 병목 현상 심화
  4. 마이크로서비스 부적합: REST API 등 다양한 프로토콜을 사용하는 MSA 환경에 적용 어려움

2. SAGA 패턴

2.1. SAGA 패턴 개념

SAGA 패턴은 분산 환경에서 데이터 일관성을 유지하기 위한 패턴으로, 각 서비스의 로컬 트랜잭션을 순차적으로 실행하고, 실패 시 보상 트랜잭션을 실행하여 이전 상태로 되돌린다.

핵심 아이디어:

  • 긴 트랜잭션(Long-lived Transaction)을 여러 개의 짧은 로컬 트랜잭션으로 분할
  • 각 로컬 트랜잭션은 완료되면 커밋
  • 실패 시 보상 트랜잭션 실행

2.2. SAGA 패턴 예시

주문 처리 SAGA 흐름:

[주문 생성] → 성공 → [재고 차감] → 성공 → [결제 처리] → 성공 → [배송 생성] → 성공
    ↓실패          ↓실패          ↓실패          ↓실패
[보상: 없음]   [보상: 재고 복원]  [보상: 결제 취소]  [보상: 배송 취소]

2.3. 보상 트랜잭션 (Compensating Transaction)

보상 트랜잭션은 이미 완료된 트랜잭션의 효과를 되돌리는 작업이다. 각 서비스는 자신의 로컬 트랜잭션에 대한 보상 로직을 제공해야 한다.

@Service
@RequiredArgsConstructor
public class InventoryService {

    private final InventoryRepository inventoryRepository;

    // 정방향 트랜잭션
    @Transactional
    public void deductStock(String productId, int quantity) {
        // 재고 차감
        inventoryRepository.deduct(productId, quantity);
    }

    // 보상 트랜잭션
    @Transactional
    public void compensateDeductStock(String productId, int quantity) {
        // 차감했던 재고 복원
        inventoryRepository.restore(productId, quantity);
    }
}

@Service
@RequiredArgsConstructor
public class PaymentService {

    private final PaymentRepository paymentRepository;

    // 정방향 트랜잭션
    @Transactional
    public void processPayment(String orderId, Money amount) {
        // 결제 처리
        paymentRepository.save(new Payment(orderId, amount));
    }

    // 보상 트랜잭션
    @Transactional
    public void compensatePayment(String orderId) {
        // 결제 취소 (환불)
        paymentRepository.cancel(orderId);
    }
}

3. SAGA 구현 방식

SAGA 패턴은 크게 두 가지 방식으로 구현할 수 있다: **코레오그래피(Choreography)**와 오케스트레이션(Orchestration).

3.1. 코레오그래피(Choreography) 방식

코레오그래피 방식은 중앙 컨트롤러 없이 각 서비스가 이벤트를 발행하고 구독하며 트랜잭션을 진행하는 방식이다.

구조:

[주문 서비스] → OrderCreated 이벤트 발행
    ↓
[재고 서비스] → 이벤트 구독, 재고 차감 후 StockDeducted 이벤트 발행
    ↓
[결제 서비스] → 이벤트 구독, 결제 처리 후 PaymentProcessed 이벤트 발행
    ↓
[배송 서비스] → 이벤트 구독, 배송 생성

장점:

  • 구현이 단순함 (중앙 오케스트레이터 불필요)
  • 느슨한 결합 (각 서비스가 독립적)
  • 장애 격리 용이

단점:

  • 트랜잭션 흐름 파악이 어려움 (분산 추적 필요)
  • 순환 의존성 발생 가능
  • 복잡한 비즈니스 로직 처리 어려움

3.1.1. 코레오그래피 구현 예시

이벤트 정의:

// 공통 이벤트 클래스
@Data
@NoArgsConstructor
@AllArgsConstructor
public class SagaEvent {
    private String eventId;
    private String sagaId;
    private String eventType;
    private String orderId;
    private Object payload;
    private LocalDateTime timestamp;
}

// 주문 생성 이벤트
public class OrderCreatedEvent extends SagaEvent {
    public OrderCreatedEvent(String sagaId, String orderId, OrderPayload payload) {
        super(UUID.randomUUID().toString(), sagaId, "ORDER_CREATED", orderId, payload, LocalDateTime.now());
    }
}

// 재고 차감 완료 이벤트
public class StockDeductedEvent extends SagaEvent {
    public StockDeductedEvent(String sagaId, String orderId) {
        super(UUID.randomUUID().toString(), sagaId, "STOCK_DEDUCTED", orderId, null, LocalDateTime.now());
    }
}

// 재고 부족 이벤트 (실패)
public class StockFailedEvent extends SagaEvent {
    public StockFailedEvent(String sagaId, String orderId, String reason) {
        super(UUID.randomUUID().toString(), sagaId, "STOCK_FAILED", orderId, reason, LocalDateTime.now());
    }
}

주문 서비스 (프로듀서):

@Service
@Slf4j
@RequiredArgsConstructor
public class OrderSagaService {

    private final OrderRepository orderRepository;
    private final KafkaTemplate<String, SagaEvent> kafkaTemplate;

    @Transactional
    public String createOrder(CreateOrderRequest request) {
        // 1. 주문 생성 (PENDING 상태)
        String orderId = UUID.randomUUID().toString();
        String sagaId = UUID.randomUUID().toString();

        Order order = Order.builder()
            .id(orderId)
            .userId(request.getUserId())
            .productId(request.getProductId())
            .quantity(request.getQuantity())
            .totalAmount(request.getQuantity() * request.getUnitPrice())
            .status("PENDING")
            .sagaId(sagaId)
            .build();

        orderRepository.save(order);

        // 2. SAGA 시작 이벤트 발행
        OrderPayload payload = new OrderPayload(orderId, request.getUserId(),
            request.getProductId(), request.getQuantity());
        OrderCreatedEvent event = new OrderCreatedEvent(sagaId, orderId, payload);

        kafkaTemplate.send("saga-order-created", orderId, event);
        log.info("Saga started: {} for order: {}", sagaId, orderId);

        return orderId;
    }

    // 보상 트랜잭션 처리 (실패 시)
    @KafkaListener(topics = "saga-compensation")
    public void handleCompensation(SagaEvent event) {
        log.info("Received compensation event: {}", event.getEventType());

        if ("COMPENSATE_ORDER".equals(event.getEventType())) {
            orderRepository.findBySagaId(event.getSagaId())
                .ifPresent(order -> {
                    order.setStatus("FAILED");
                    order.setFailureReason((String) event.getPayload());
                    orderRepository.save(order);
                    log.info("Order compensated: {}", order.getId());
                });
        }
    }
}

재고 서비스 (컨슈머 겸 프로듀서):

@Service
@Slf4j
@RequiredArgsConstructor
public class InventorySagaConsumer {

    private final InventoryRepository inventoryRepository;
    private final KafkaTemplate<String, SagaEvent> kafkaTemplate;

    @KafkaListener(topics = "saga-order-created")
    public void handleOrderCreated(SagaEvent event) {
        log.info("Received order created event: {}", event.getSagaId());

        try {
            OrderPayload payload = (OrderPayload) event.getPayload();

            // 재고 차감
            boolean success = inventoryRepository.deductStock(
                payload.getProductId(),
                payload.getQuantity()
            );

            if (success) {
                // 성공 이벤트 발행
                StockDeductedEvent successEvent = new StockDeductedEvent(
                    event.getSagaId(),
                    payload.getOrderId()
                );
                kafkaTemplate.send("saga-stock-deducted", payload.getOrderId(), successEvent);
                log.info("Stock deducted for saga: {}", event.getSagaId());
            } else {
                // 실패 이벤트 발행 (재고 부족)
                StockFailedEvent failedEvent = new StockFailedEvent(
                    event.getSagaId(),
                    payload.getOrderId(),
                    "Insufficient stock"
                );
                kafkaTemplate.send("saga-stock-failed", payload.getOrderId(), failedEvent);
                log.warn("Stock deduction failed for saga: {}", event.getSagaId());
            }

        } catch (Exception e) {
            log.error("Error processing inventory saga", e);
            // 실패 이벤트 발행
            StockFailedEvent failedEvent = new StockFailedEvent(
                event.getSagaId(),
                event.getOrderId(),
                e.getMessage()
            );
            kafkaTemplate.send("saga-stock-failed", event.getOrderId(), failedEvent);
        }
    }

    // 보상 트랜잭션 (재고 복원)
    @KafkaListener(topics = "saga-compensate-inventory")
    public void handleCompensation(SagaEvent event) {
        log.info("Compensating inventory for saga: {}", event.getSagaId());

        OrderPayload payload = (OrderPayload) event.getPayload();
        inventoryRepository.restoreStock(payload.getProductId(), payload.getQuantity());

        // 보상 완료 이벤트 발행
        kafkaTemplate.send("saga-inventory-compensated", event.getOrderId(), event);
    }
}

결제 서비스 (컨슈머 겸 프로듀서):

@Service
@Slf4j
@RequiredArgsConstructor
public class PaymentSagaConsumer {

    private final PaymentRepository paymentRepository;
    private final KafkaTemplate<String, SagaEvent> kafkaTemplate;

    @KafkaListener(topics = "saga-stock-deducted")
    public void handleStockDeducted(SagaEvent event) {
        log.info("Stock deducted, processing payment for saga: {}", event.getSagaId());

        try {
            OrderPayload payload = (OrderPayload) event.getPayload();

            // 결제 처리
            Payment payment = Payment.builder()
                .paymentId(UUID.randomUUID().toString())
                .orderId(payload.getOrderId())
                .amount(payload.getQuantity() * 1000) // 단가 가정
                .status("COMPLETED")
                .build();

            paymentRepository.save(payment);

            // 결제 완료 이벤트 발행
            PaymentProcessedEvent processedEvent = new PaymentProcessedEvent(
                event.getSagaId(),
                payload.getOrderId(),
                payment.getPaymentId()
            );
            kafkaTemplate.send("saga-payment-processed", payload.getOrderId(), processedEvent);
            log.info("Payment processed for saga: {}", event.getSagaId());

        } catch (Exception e) {
            log.error("Payment failed for saga: {}", event.getSagaId(), e);

            // 결제 실패 시 보상 트랜잭션 트리거 (재고 복원)
            PaymentFailedEvent failedEvent = new PaymentFailedEvent(
                event.getSagaId(),
                event.getOrderId(),
                e.getMessage()
            );
            kafkaTemplate.send("saga-payment-failed", event.getOrderId(), failedEvent);

            // 재고 복원 보상 요청
            kafkaTemplate.send("saga-compensate-inventory", event.getOrderId(), event);
        }
    }

    // 결제 취소 보상 트랜잭션
    @KafkaListener(topics = "saga-compensate-payment")
    public void handlePaymentCompensation(SagaEvent event) {
        log.info("Compensating payment for saga: {}", event.getSagaId());

        paymentRepository.cancelByOrderId(event.getOrderId());

        // 보상 완료 이벤트
        kafkaTemplate.send("saga-payment-compensated", event.getOrderId(), event);
    }
}

3.2. 오케스트레이션(Orchestration) 방식

오케스트레이션 방식은 중앙 오케스트레이터(Saga Orchestrator)가 전체 트랜잭션 흐름을 제어하는 방식이다.

구조:

                     [SAGA 오케스트레이터]
                          ↙ ↓ ↘
              [주문 서비스] [재고 서비스] [결제 서비스]

장점:

  • 트랜잭션 흐름 파악이 쉬움 (중앙 집중식)
  • 복잡한 비즈니스 로직 처리 용이
  • 순환 의존성 없음
  • 상태 관리가 용이

단점:

  • 오케스트레이터가 단일 장애점(SPOF)이 될 수 있음
  • 중앙 집중화로 인한 병목 가능성
  • 오케스트레이터 구현 복잡도

3.2.1. 오케스트레이션 구현 예시

SAGA 오케스트레이터:

@Component
@Slf4j
@RequiredArgsConstructor
public class OrderSagaOrchestrator {

    private final SagaStateRepository sagaStateRepository;
    private final OrderServiceClient orderClient;
    private final InventoryServiceClient inventoryClient;
    private final PaymentServiceClient paymentClient;
    private final ShippingServiceClient shippingClient;

    // SAGA 시작
    @Transactional
    public String startSaga(CreateOrderRequest request) {
        String sagaId = UUID.randomUUID().toString();
        String orderId = UUID.randomUUID().toString();

        // SAGA 상태 저장
        SagaState state = SagaState.builder()
            .sagaId(sagaId)
            .orderId(orderId)
            .currentStep("ORDER_CREATING")
            .status("IN_PROGRESS")
            .payload(request)
            .startedAt(LocalDateTime.now())
            .build();

        sagaStateRepository.save(state);

        // 1단계: 주문 생성
        executeStep(sagaId, "ORDER_CREATING", () -> {
            orderClient.createOrder(orderId, request);
            return "ORDER_CREATED";
        });

        return sagaId;
    }

    // 단계 실행 (컴펜세이션 처리 포함)
    private void executeStep(String sagaId, String step, Supplier<String> action) {
        try {
            String result = action.get();
            updateSagaState(sagaId, step, "COMPLETED", result);

            // 다음 단계 결정
            determineNextStep(sagaId, step, result);

        } catch (Exception e) {
            log.error("Step failed: {} for saga: {}", step, sagaId, e);

            // 실패 시 보상 트랜잭션 실행
            compensate(sagaId, step, e.getMessage());
        }
    }

    // 다음 단계 결정
    private void determineNextStep(String sagaId, String currentStep, String result) {
        SagaState state = sagaStateRepository.findById(sagaId).orElseThrow();

        switch (currentStep) {
            case "ORDER_CREATED":
                // 재고 차감 단계로
                executeStep(sagaId, "INVENTORY_DEDUCTING", () -> {
                    inventoryClient.deductStock(state.getOrderId(), state.getPayload());
                    return "INVENTORY_DEDUCTED";
                });
                break;

            case "INVENTORY_DEDUCTED":
                // 결제 처리 단계로
                executeStep(sagaId, "PAYMENT_PROCESSING", () -> {
                    paymentClient.processPayment(state.getOrderId(), state.getPayload());
                    return "PAYMENT_PROCESSED";
                });
                break;

            case "PAYMENT_PROCESSED":
                // 배송 생성 단계로
                executeStep(sagaId, "SHIPPING_CREATING", () -> {
                    shippingClient.createShipment(state.getOrderId(), state.getPayload());
                    return "SHIPPING_CREATED";
                });
                break;

            case "SHIPPING_CREATED":
                // SAGA 완료
                completeSaga(sagaId);
                break;
        }
    }

    // 보상 트랜잭션 실행
    private void compensate(String sagaId, String failedStep, String reason) {
        SagaState state = sagaStateRepository.findById(sagaId).orElseThrow();

        log.warn("Starting compensation for saga: {} failed at step: {}", sagaId, failedStep);

        // 실패한 단계에 따라 필요한 보상 트랜잭션 실행
        if (failedStep.equals("INVENTORY_DEDUCTING")) {
            // 주문 취소 (보상)
            compensateOrder(state.getOrderId(), reason);

        } else if (failedStep.equals("PAYMENT_PROCESSING")) {
            // 재고 복원 + 주문 취소
            compensateInventory(state.getOrderId(), state.getPayload());
            compensateOrder(state.getOrderId(), reason);

        } else if (failedStep.equals("SHIPPING_CREATING")) {
            // 결제 취소 + 재고 복원 + 주문 취소
            compensatePayment(state.getOrderId());
            compensateInventory(state.getOrderId(), state.getPayload());
            compensateOrder(state.getOrderId(), reason);
        }

        // SAGA 상태 업데이트
        state.setStatus("FAILED");
        state.setFailureReason(reason);
        state.setCompletedAt(LocalDateTime.now());
        sagaStateRepository.save(state);
    }

    private void compensateOrder(String orderId, String reason) {
        try {
            orderClient.cancelOrder(orderId, reason);
            log.info("Order compensated: {}", orderId);
        } catch (Exception e) {
            log.error("Failed to compensate order: {}", orderId, e);
        }
    }

    private void compensateInventory(String orderId, CreateOrderRequest payload) {
        try {
            inventoryClient.restoreStock(payload.getProductId(), payload.getQuantity());
            log.info("Inventory compensated for order: {}", orderId);
        } catch (Exception e) {
            log.error("Failed to compensate inventory: {}", orderId, e);
        }
    }

    private void compensatePayment(String orderId) {
        try {
            paymentClient.cancelPayment(orderId);
            log.info("Payment compensated for order: {}", orderId);
        } catch (Exception e) {
            log.error("Failed to compensate payment: {}", orderId, e);
        }
    }

    private void completeSaga(String sagaId) {
        SagaState state = sagaStateRepository.findById(sagaId).orElseThrow();
        state.setStatus("COMPLETED");
        state.setCompletedAt(LocalDateTime.now());
        sagaStateRepository.save(state);

        log.info("Saga completed successfully: {}", sagaId);
    }

    private void updateSagaState(String sagaId, String step, String status, String result) {
        SagaState state = sagaStateRepository.findById(sagaId).orElseThrow();
        state.setCurrentStep(step);
        state.setStepStatus(status);
        state.setStepResult(result);
        sagaStateRepository.save(state);
    }
}

SAGA 상태 저장소:

@Entity
@Table(name = "saga_state")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class SagaState {

    @Id
    private String sagaId;

    private String orderId;

    private String currentStep;

    private String stepStatus;

    @Column(length = 5000)
    private String stepResult;

    private String status;  // IN_PROGRESS, COMPLETED, FAILED

    @Column(length = 5000)
    private String failureReason;

    @Transient
    private CreateOrderRequest payload;  // 실제로는 JSON으로 직렬화하여 저장

    private LocalDateTime startedAt;

    private LocalDateTime completedAt;
}

@Repository
public interface SagaStateRepository extends JpaRepository<SagaState, String> {
    List<SagaState> findByStatus(String status);
    Optional<SagaState> findByOrderId(String orderId);
}

3.3. 두 방식의 비교

특성  코레오그래피  오케스트레이션
제어 흐름 분산, 이벤트 기반 중앙 집중식
결합도 매우 낮음 중간 (오케스트레이터와 결합)
복잡도 런타임 흐름 파악 어려움 로직은 단순, 오케스트레이터 복잡
장애 처리 분산된 보상 로직 중앙 집중식 보상 로직
추적성 분산 추적 필요 상대적 쉬움
적합한 규모 단순한 워크플로우 복잡한 비즈니스 로직
서비스 수 3-5개 이하 많음 (5개 이상)

4. Dual Write Problem

4.1. Dual Write 문제란?

Dual Write 문제는 하나의 트랜잭션에서 데이터베이스 쓰기와 메시지 발행을 동시에 수행할 때 발생하는 일관성 문제다.

// 문제 상황: DB 쓰기와 메시지 발행 중 하나만 성공할 수 있음
@Transactional
public void createOrder(Order order) {
    // 1. DB에 주문 저장
    orderRepository.save(order);  // 성공

    // 2. Kafka로 이벤트 발행
    kafkaTemplate.send("order-created", order);  // 실패하면?
    // ↑ 네트워크 문제로 실패 시, DB에는 저장되었지만 이벤트는 발행되지 않음
}

발생 가능한 불일치 상황:

  1. DB 저장 성공, 메시지 발행 실패:
    • 다른 서비스들이 이벤트를 받지 못해 데이터 불일치 발생
    • 예: 재고 서비스가 재고 차감을 하지 못함
  2. DB 저장 실패, 메시지 발행 성공:
    • 이벤트는 발행되었지만 실제 데이터는 없음
    • 컨슈머가 존재하지 않는 데이터에 대해 작업 시도

5. Outbox 패턴 (Dual Write 문제 해결방안)

5.1. Outbox 패턴 개념

Outbox 패턴은 Dual Write 문제를 해결하기 위한 패턴으로, 메시지를 별도의 Outbox 테이블에 저장하고 별도의 프로세스가 이를 발행하는 방식이다.

동작 방식:

1. 트랜잭션 시작
2. 주문 저장 (Order 테이블)
3. Outbox 테이블에 이벤트 저장 (같은 트랜잭션)
4. 트랜잭션 커밋
5. 별도 프로세스(Relay)가 Outbox 테이블 폴링
6. 미발행 이벤트를 Kafka로 발행
7. 발행 완료된 이벤트는 Outbox에서 삭제 또는 마킹

5.2. Outbox 패턴 구현

Outbox 엔티티:

@Entity
@Table(name = "outbox")
@Data
public class OutboxEntity {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false, unique = true)
    private String eventId;

    @Column(nullable = false)
    private String aggregateType;

    @Column(nullable = false)
    private String aggregateId;

    @Column(nullable = false)
    private String eventType;

    @Column(nullable = false, columnDefinition = "TEXT")
    private String payload;  // JSON 형식의 이벤트 데이터

    @Column(nullable = false)
    private LocalDateTime createdAt;

    @Column
    private LocalDateTime publishedAt;

    @Column(nullable = false)
    private int retryCount;

    @Enumerated(EnumType.STRING)
    private OutboxStatus status;  // PENDING, PUBLISHED, FAILED
}

public enum OutboxStatus {
    PENDING, PUBLISHED, FAILED
}

Outbox 저장소:

@Repository
public interface OutboxRepository extends JpaRepository<OutboxEntity, Long> {

    @Lock(LockModeType.PESSIMISTIC_WRITE)
    @Query("SELECT o FROM OutboxEntity o WHERE o.status = 'PENDING' ORDER BY o.createdAt ASC")
    List<OutboxEntity> findPendingEvents();

    @Lock(LockModeType.PESSIMISTIC_WRITE)
    @Query("SELECT o FROM OutboxEntity o WHERE o.status = 'PENDING' AND o.createdAt < :cutoff")
    List<OutboxEntity> findPendingEventsOlderThan(LocalDateTime cutoff);
}

서비스에서 Outbox 사용:

@Service
@RequiredArgsConstructor
public class OrderServiceWithOutbox {

    private final OrderRepository orderRepository;
    private final OutboxRepository outboxRepository;

    @Transactional
    public Order createOrder(CreateOrderRequest request) {
        // 1. 주문 저장
        String orderId = UUID.randomUUID().toString();
        Order order = Order.builder()
            .id(orderId)
            .userId(request.getUserId())
            .productId(request.getProductId())
            .quantity(request.getQuantity())
            .totalAmount(request.getQuantity() * request.getUnitPrice())
            .status("PENDING")
            .build();

        orderRepository.save(order);

        // 2. Outbox에 이벤트 저장 (같은 트랜잭션)
        OrderCreatedEvent event = new OrderCreatedEvent(
            UUID.randomUUID().toString(),
            orderId,
            request.getUserId(),
            request.getProductId(),
            request.getQuantity()
        );

        OutboxEntity outbox = new OutboxEntity();
        outbox.setEventId(event.getEventId());
        outbox.setAggregateType("Order");
        outbox.setAggregateId(orderId);
        outbox.setEventType("OrderCreated");
        outbox.setPayload(toJson(event));
        outbox.setCreatedAt(LocalDateTime.now());
        outbox.setStatus(OutboxStatus.PENDING);
        outbox.setRetryCount(0);

        outboxRepository.save(outbox);

        return order;
    }
}

Outbox 릴레이 (메시지 발행기):

@Component
@Slf4j
@RequiredArgsConstructor
public class OutboxRelay {

    private final OutboxRepository outboxRepository;
    private final KafkaTemplate<String, String> kafkaTemplate;

    @Scheduled(fixedDelay = 5000)  // 5초마다 실행
    @Transactional
    public void publishPendingEvents() {
        List<OutboxEntity> pendingEvents = outboxRepository.findPendingEvents();

        for (OutboxEntity event : pendingEvents) {
            try {
                // Kafka로 발행
                String topic = determineTopic(event.getEventType());
                kafkaTemplate.send(topic, event.getAggregateId(), event.getPayload())
                    .whenComplete((result, ex) -> {
                        if (ex == null) {
                            // 발행 성공 처리
                            event.setPublishedAt(LocalDateTime.now());
                            event.setStatus(OutboxStatus.PUBLISHED);
                            outboxRepository.save(event);
                            log.info("Event published: {}", event.getEventId());
                        } else {
                            // 발행 실패 처리
                            handlePublishFailure(event, ex);
                        }
                    });
            } catch (Exception e) {
                log.error("Failed to publish event: {}", event.getEventId(), e);
                handlePublishFailure(event, e);
            }
        }
    }

    private void handlePublishFailure(OutboxEntity event, Exception ex) {
        event.setRetryCount(event.getRetryCount() + 1);

        if (event.getRetryCount() >= 3) {
            event.setStatus(OutboxStatus.FAILED);
            log.error("Event permanently failed after 3 retries: {}", event.getEventId());
            // 알림 발송 등 추가 처리
        } else {
            event.setStatus(OutboxStatus.PENDING);  // 재시도
        }

        outboxRepository.save(event);
    }

    private String determineTopic(String eventType) {
        switch (eventType) {
            case "OrderCreated":
                return "order-events";
            case "PaymentProcessed":
                return "payment-events";
            default:
                return "default-events";
        }
    }
}

6. CDC (Change Data Capture)

6.1. CDC 개념

CDC는 데이터베이스의 변경 사항을 캡처하여 다른 시스템에 전파하는 기술이다. Outbox 패턴과 유사하지만, 애플리케이션 레벨이 아닌 데이터베이스 레벨에서 변경을 감지한다.

동작 방식:

[데이터베이스] → (변경 감지) → [CDC 커넥터] → [Kafka] → [컨슈머]

6.2. Debezium을 활용한 CDC 구현

Debezium 설정 (MySQL 예시):

{
  "name": "order-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "order-server",
    "database.include.list": "orderdb",
    "table.include.list": "orderdb.orders,orderdb.outbox",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.order",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
  }
}

Spring Boot에서 CDC 이벤트 소비:

@Component
@Slf4j
@RequiredArgsConstructor
public class CDCConsumer {

    private final OrderRepository orderRepository;
    private final KafkaTemplate<String, Object> kafkaTemplate;

    @KafkaListener(topics = "order-server.orderdb.orders")
    public void handleOrderChange(ChangeEvent event) {
        log.info("Received order change: {}", event);

        if ("c".equals(event.getOperation())) {  // Create
            handleOrderCreated(event);
        } else if ("u".equals(event.getOperation())) {  // Update
            handleOrderUpdated(event);
        }
    }

    private void handleOrderCreated(ChangeEvent event) {
        OrderPayload payload = event.getPayload();

        // 필요한 처리 수행
        OrderSyncEvent syncEvent = new OrderSyncEvent(
            payload.getId(),
            payload.getUserId(),
            payload.getStatus()
        );

        // 다른 서비스에 동기화
        kafkaTemplate.send("order-sync", payload.getId(), syncEvent);
    }
}

7. [실습 요약] 보상 트랜잭션 (SAGA) 처리

7.1. 실습 15: SAGA 코레오그래피 구현

  • Kafka 토픽 설정
  • 이벤트 기반 SAGA 흐름 구현
  • 보상 트랜잭션 처리

7.2. 실습 16: SAGA 오케스트레이션 구현

  • 오케스트레이터 서비스 구현
  • 상태 관리
  • 보상 로직 중앙 집중화

7.3. 실습 17: Outbox 패턴 적용

  • Outbox 테이블 설계
  • 트랜잭셔널 아웃박스 구현
  • 릴레이 서비스 구현

8. 정리

8.1. 분산 트랜잭션 패턴 선택 가이드

상황  추천 패턴
단순한 워크플로우, 소규모 코레오그래피 SAGA
복잡한 비즈니스 로직, 대규모 오케스트레이션 SAGA
이벤트 발행 일관성 필요 Outbox 패턴
기존 데이터베이스 변경 감지 필요 CDC
강한 일관성이 필수 2PC 고려 (가능한 피할 것)

8.2. SAGA 패턴 도입 시 주의사항

[1] 멱등성(Idempotency) 보장

  • 같은 메시지가 여러 번 처리되어도 결과가 동일해야 함
  • 멱등성 키를 사용하여 중복 처리 방지
@Entity
@Table(uniqueConstraints = @UniqueConstraint(columnNames = {"eventId"}))
public class ProcessedEvent {
    @Id
    private String eventId;
    private LocalDateTime processedAt;
}

[2] 데드 레터 큐(Dead Letter Queue) 관리

  • 처리 실패한 메시지를 별도 큐에 저장
  • 재처리 메커니즘 구현

[3] 모니터링과 알림

  • SAGA 상태 추적
  • 장기간 미완료 SAGA 감지

[4] 트랜잭션 경계 최소화

  • 가능한 한 SAGA의 범위를 작게 유지
  • 꼭 필요한 경우에만 분산 트랜잭션 사용

8.3. 다음 글 예고

다음 글에서는 MSA의 회복성(Resilience)과 관측성(Observability)에 대해 다룰 예정이다. Retry, Circuit Breaker, Bulkhead, Timeout 등 장애 처리 패턴과 OpenTelemetry, Zipkin, Fluentd를 활용한 분산 추적 및 모니터링 구축 방법을 살펴보겠다.

'MSA > MSA 아키텍처' 카테고리의 다른 글

[ADVANCED #4] MSA 보안과 테스트 전략  (0) 2026.03.02
[ADVANCED #3] MSA 회복성, 관측성, 모니터링 전략  (0) 2026.03.02
[ADVANCED #1] MSA 데이터 관리 전략 (DB per Service, CQRS, Sharding)  (0) 2026.03.02
[BASIC #4] 비동기 통신과 Event-Driven Architecture  (0) 2026.03.02
[BASIC #3] MSA 동기 통신 전략과 API Gateway 패턴  (0) 2026.03.02
'MSA/MSA 아키텍처' 카테고리의 다른 글
  • [ADVANCED #4] MSA 보안과 테스트 전략
  • [ADVANCED #3] MSA 회복성, 관측성, 모니터링 전략
  • [ADVANCED #1] MSA 데이터 관리 전략 (DB per Service, CQRS, Sharding)
  • [BASIC #4] 비동기 통신과 Event-Driven Architecture
h6bro
h6bro
백엔드 개발자의 기술 블로그
  • h6bro
    Jun's Tech Blog
    h6bro
  • 전체
    오늘
    어제
    • 분류 전체보기 (250) N
      • Java (18)
        • Core (9)
        • Design Pattern (9)
      • Spring (80)
        • Core (24)
        • MVC (6)
        • DB (10)
        • JPA (26)
        • Monitoring (3)
        • Security (11)
        • WebSocket (0)
      • Database (33)
        • Redis (15)
        • MySQL (18)
      • MSA (25) N
        • MSA 기본 (11)
        • MSA 아키텍처 (14) N
      • Kafka (30)
        • Core (18)
        • Connect (12)
      • ElasticSearch (11)
        • Search (11)
        • Logging (0)
      • Test (4)
        • k6 (4)
      • Docker (9)
      • CI&CD (10)
        • GitHub Actions (6)
        • ArgoCD (4)
      • Kubernetes (18)
        • Core (12)
        • Ops (6)
      • Cloud Engineering (4)
        • AWS Infrastructure (3)
        • AWS EKS (1)
        • Terraform (0)
      • Project (8)
        • LinkFolio (1)
        • Secondhand Market (7)
  • 블로그 메뉴

    • 홈
    • 태그
    • 방명록
  • 링크

  • 공지사항

    • Cloud Engineering 포스팅 정리
  • 인기 글

  • 태그

    ㅈ
  • 최근 댓글

  • 최근 글

  • hELLO· Designed By정상우.v4.10.5
h6bro
[ADVANCED #2] 분산 트랜잭션과 SAGA 패턴 완전 정리
상단으로

티스토리툴바