본문 바로가기
Spring/WebSocket

[Spring WebSocket] 채팅 서비스에 Transactional Outbox Pattern 도입

by 진꿈청 2024. 11. 11.

https://asfirstalways.tistory.com/359

 

2024.11.11 - [Spring/WebSocket] - [Spring WebSocket] STOMP에서 Kafka 활용 Flow

 

[Spring WebSocket] STOMP에서 Kafka 활용 Flow

2024.11.10 - [Spring/WebSocket] - [Spring WebSocket] 채팅 서비스 프로젝트에 Kafka 적용 [Spring WebSocket] 채팅 서비스 프로젝트에 Kafka 적용채팅 서비스는 일반적인 `HTTP` 통신을 사용하는 `stateless` 서비스와는

hdbstn3055.tistory.com

 

이전 포스팅에서 본것처럼 `STOMP`에 메시지 브로커인 `Kafka`를 활용해서 `Scale-Out` 상황에 대비했다.

 

그런데 이렇게 되면

 

발행되어야 할 메시지가 발행되지 않거나, 발행되지 않아야 하는 메시지가 발행될 수 있다.

 

 

이를 간단하게 설명하자면

 


발행되어야 할 메시지가 발행되지 않음

 

`DB`에는 채팅이 잘 저장이 돼서 그걸 `Message Broker`로 잘 전달했는데 그 작업이 실패

 

 

발행되지 않아야 하는 메시지가 발행 됨

 

`DB`에는 채팅이 잘 저장이 안됐는데 `Message Broker`로 전달해버림


 

 

지금처럼 간단한 채팅 서비스에서는 뭐 큰 문제가 되지 않을 수 있다.

 

하지만, 실시간 경매 채팅 서비스라면? 채팅 서비스가 아니더라도 만약 결제와 관련된 서비스라면?

 

아주 난리가 날 것이다.

 

그래서 그런 문제를 해결하려면 적어도 한번은 전달 즉, `At Least Once Delivery`를 보장해야 한다.

 

방법은 여러개가 존재한다. `Transcational OutBox Pattern`, `Kafka Callback`, `RabbitMQ CallBack` 등등

나는 여기서 `Transcational OutBox Pattern`을 활용해 해당 상황에 대비했다.

 

 

관련된 자세한 내용은 이전 포스팅에서 확인할 수 있다.

 

2024.09.19 - [Infra/DevOps] - 분산 시스템에서 데이터를 전달하는 효율적인 방법 - 1

 

분산 시스템에서 데이터를 전달하는 효율적인 방법 - 1

이번 포스팅에서는 NHN 유튜브의 분산 시스템에서 데이터를 전달하는 효율적인 방법 강의를 보고 정리한 내용이다.  포스팅에서 다룰 내용데이터 전달 보장 방법론RDB를 사용하는 애플리케이션

hdbstn3055.tistory.com

 

2024.09.20 - [Infra/DevOps] - 분산 시스템에서 데이터를 전달하는 효율적인 방법 - 2

 

분산 시스템에서 데이터를 전달하는 효율적인 방법 - 2

2024.09.19 - [Infra/DevOps] - 분산 시스템에서 데이터를 전달하는 효율적인 방법 - 1 분산 시스템에서 데이터를 전달하는 효율적인 방법 - 1이번 포스팅에서는 NHN 유튜브의 분산 시스템에서 데이터를

hdbstn3055.tistory.com

 


 

자, 이제는 어떻게 현재 채팅 토이 프로젝트에 적용을 했느냐?

 

 

우선, 위 포스팅을 보지 않았을 수도 있으니 아래 사항을 알고 넘어가자.

 

`Transactional OutBox Pattern`은 `Kafka`와 같은 메시지 브로커에 메시지를 전송할 때

 

`COMMIT` 이전에 해당 메시지와 관련 내용을 `DB`에 저장해 놓는다.

`COMMIT` 이후에는 `DB`에 저장된 이벤트를 읽어 `Producer`를 활용해 토픽에 전송한다.

 

그 후, 성공적으로 전송되었다면 `SEND_SUCCESS`, 실패했다면 `SEND_FAIL`과 같이 상태값을 저장한다.

 

이때, 지속적으로 확인하는 `Polling Daemon` 등을 확인해 발송 실패 상황에 대비한다.

 

 

자 이점을 알았으니 코드와 함께 알아보자.

 

 

NormalMessageCommandServiceImpl

@Slf4j
@Service
@RequiredArgsConstructor
public class NormalMessageCommandServiceImpl implements NormalMessageCommandService {

    private final SequenceGenerator sequenceGenerator;
    private final NormalMessageRepository messageRepository;
    private final NormalRepository normalRepository;

    @Override
    @Transactional
    public void save(NormalMessageCreateRequest request) {
        validateUserInNormal(request.getNormalId(), request.getUserId());
        NormalMessage normalMessage = NormalMessage.from(request);

        normalMessage.generateSequence(sequenceGenerator.generateSequence(NormalMessage.SEQUENCE_NAME));

        NormalChatCreateEvent chatCreateEvent =
                NormalChatCreateEvent.from(messageRepository.save(normalMessage), UUIDUtil.generateUUID());

        Events.send(chatCreateEvent);
    }

    @Override
    @Transactional
    public void modify(NormalMessageModifyRequest request) {
        NormalMessage normalMessage = messageRepository.findById(request.getMessageId())
                .orElseThrow(() -> new NormalChatException(Code.NOT_FOUND, "Message Not Found"));

        normalMessage.modify(request.getContent());

//        System.out.println(request.getContent());

        NormalChatModifyEvent chatModifyEvent =
                NormalChatModifyEvent.from(messageRepository.save(normalMessage), UUIDUtil.generateUUID());

        Events.send(chatModifyEvent);
    }

    @Override
    @Transactional
    public void delete(NormalMessageDeleteRequest request) {
        NormalMessage normalMessage = messageRepository.findById(request.getMessageId())
                .orElseThrow(() -> new NormalChatException(Code.NOT_FOUND, "Message Not Found"));

        normalMessage.delete();

        NormalChatDeleteEvent chatDeleteEvent =
                NormalChatDeleteEvent.from(messageRepository.save(normalMessage), UUIDUtil.generateUUID());

        Events.send(chatDeleteEvent);
    }


    private void validateUserInNormal(Long normalId, Long userId){
        normalRepository.findByNormalIdAndInUserId(normalId, List.of(userId))
                .orElseThrow(() -> new NormalChatException(Code.NOT_FOUND, "User Not In Normal Chat"));
    }
}

 

위 코드를 보면 알 수 있듯이 우선

 

  1. 클라이언트가 전달한 채팅 내용을 `normalRepository`와 함께 `DB`에 저장한다.
  2. 각각의 작업에 대해 `NormalChatCreateEvent`, `NormalChatModifyEvent`, `NormalChatDeleteEvent`를 생성한다.
  3. `Events`를 활용해 이벤트를 전달한다.

여기서 `Events`는 뭘까?

 

Events

public class Events {

    private static ApplicationEventPublisher publisher;

    public static void register(ApplicationEventPublisher publisher){
        Events.publisher = publisher;
    }

    public static void send(Object event){
        if(publisher != null){
            publisher.publishEvent(event);
        }
    }
}

 

`Events` 클래스는 `Spring`이 제공하는 `ApplicationEventPublisher`를 사용해 이벤트를 발행해주는 역할을 한다.

 

왜 `ApplicationEventPublisher`를 사용했냐면

 

처리해야 할 도메인 로직과 그 이후에 처리되어야 할 로직을 분리하기 위함이다.

 

  1. 도메인 로직이 완료된 이후에 이와 관련된 이벤트를 발행한다.
  2. 도메인 로직 완료 이후에 처리해야 할 것들을 해당 이벤트를 바라보는 `Listener`에 구현한다.
  3. 즉, 도메인 로직은 도메인의 요구사항에만 집중할 수 있고, 그 이외의 것들은 해당 이벤트를 바라보는 `Listener`에 추가하기만 하면 된다.

또한, 여기서 핵심이 되는 어노테이션은 `@TransactionalEventListener`이다.

 

`@TransactionalEventListener`를 사용하면 `BEFORE_COMMIT`과 `AFTER_COMMIT` 과정에서 일어날 작업을

정의할 수 있다.

 

 

 

NormalChatEvent

 

발행된 `Event`의 정보가 담길 `Collection` 혹은 테이블과 같은 개념이다.

@Getter
@Builder
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@Document(collection = "normalChatEvent")
public class NormalChatEvent {

    @Transient
    public static final String SEQUENCE_NAME = "normal_chat_event_sequence";

    @Id
    private Long eventId;

    private EventSentType eventSentType;

    private String uuid;

    private Long messageId;
    private Long normalId;
    private Long userId;
    private String profileImg;
    private String writer;
    private String content;
    private Boolean isDeleted;
    private ChatType chatType;
    private ActionType actionType;
    private List<UploadFile> files;
    private LocalDateTime createdAt;
    private LocalDateTime modifiedAt;

    public void generateSequence(Long eventId){
        this.eventId = eventId;
    }

    public static NormalChatEvent from(NormalChatCreateEvent message, EventSentType eventSentType) {
        return NormalChatEvent.builder()
                .uuid(message.getUuid())
                .messageId(message.getMessageId())
                .normalId(message.getNormalId())
                .userId(message.getUserId())
                .profileImg(message.getProfileImg())
                .writer(message.getWriter())
                .content(message.getContent())
                .isDeleted(message.getIsDeleted())
                .actionType(message.getActionType())
                .chatType(message.getChatType())
                .files(message.getFiles())
                .createdAt(message.getCreatedAt())
                .modifiedAt(message.getModifiedAt())
                .eventSentType(eventSentType)
                .build();
    }

    public static NormalChatEvent from(NormalChatModifyEvent message, EventSentType eventSentType) {
        return NormalChatEvent.builder()
                .uuid(message.getUuid())
                .messageId(message.getMessageId())
                .normalId(message.getNormalId())
                .content(message.getContent())
                .chatType(message.getChatType())
                .actionType(message.getActionType())
                .createdAt(message.getCreatedAt())
                .modifiedAt(message.getModifiedAt())
                .eventSentType(eventSentType)
                .build();
    }

    public static NormalChatEvent from(NormalChatDeleteEvent message, EventSentType eventSentType) {
        return NormalChatEvent.builder()
                .uuid(message.getUuid())
                .messageId(message.getMessageId())
                .chatType(message.getChatType())
                .actionType(message.getActionType())
                .eventSentType(eventSentType)
                .build();
    }

    public void changeEventSentType(EventSentType eventSentType) {
        this.eventSentType = eventSentType;
    }
}

 

 

NormalChatEventHandler

@Slf4j
@Service
@RequiredArgsConstructor
public class NormalChatEventHandler {

    private final SequenceGenerator sequenceGenerator;
    private final ChatEventProducer chatEventProducer;
    private final NormalChatEventRepository normalChatEventRepository;

    @TransactionalEventListener(classes = NormalChatCreateEvent.class, phase = TransactionPhase.BEFORE_COMMIT)
    public void normalChatCreateEventBeforeHandler(NormalChatCreateEvent chatCreateEvent){
        NormalChatEvent normalChatEvent = createNormalChatEvent(chatCreateEvent);
        normalChatEvent.generateSequence(sequenceGenerator.generateSequence(NormalChatEvent.SEQUENCE_NAME));
        normalChatEventRepository.save(normalChatEvent);
    }

    @Async
    @Retryable(retryFor = {Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 2000L))
    @TransactionalEventListener(classes = NormalChatCreateEvent.class, phase = TransactionPhase.AFTER_COMMIT)
    public void normalChatCreateEventAfterHandler(NormalChatCreateEvent chatCreateEvent){
        NormalChatEvent normalChatEvent = createNormalChatEvent(chatCreateEvent);
        publishNormalChatEvent(normalChatEvent);
    }

    @TransactionalEventListener(classes = NormalChatModifyEvent.class, phase = TransactionPhase.BEFORE_COMMIT)
    public void normalChatModifyEventBeforeHandler(NormalChatModifyEvent chatModifyEvent){
        NormalChatEvent normalChatEvent = createNormalChatEvent(chatModifyEvent);
        normalChatEvent.generateSequence(sequenceGenerator.generateSequence(NormalChatEvent.SEQUENCE_NAME));
        normalChatEventRepository.save(normalChatEvent);
    }

    @Async
    @Retryable(retryFor = {Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 2000L))
    @TransactionalEventListener(classes = NormalChatModifyEvent.class, phase = TransactionPhase.AFTER_COMMIT)
    public void normalChatModifyEventAfterHandler(NormalChatModifyEvent chatModifyEvent){
        NormalChatEvent normalChatEvent = createNormalChatEvent(chatModifyEvent);
        publishNormalChatEvent(normalChatEvent);
    }

    @TransactionalEventListener(classes = NormalChatDeleteEvent.class, phase = TransactionPhase.BEFORE_COMMIT)
    public void normalChatDeleteEventBeforeHandler(NormalChatDeleteEvent chatDeleteEvent){
        NormalChatEvent normalChatEvent = createNormalChatEvent(chatDeleteEvent);
        normalChatEvent.generateSequence(sequenceGenerator.generateSequence(NormalChatEvent.SEQUENCE_NAME));
        normalChatEventRepository.save(normalChatEvent);
    }

    @Async
    @Retryable(retryFor = {Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 2000L))
    @TransactionalEventListener(classes = NormalChatDeleteEvent.class, phase = TransactionPhase.AFTER_COMMIT)
    public void normalChatDeleteEventAfterHandler(NormalChatDeleteEvent chatDeleteEvent){
        NormalChatEvent normalChatEvent = createNormalChatEvent(chatDeleteEvent);
        publishNormalChatEvent(normalChatEvent);
    }

    private void publishNormalChatEvent(NormalChatEvent normalChatEvent) {
        NormalChatEvent chatEvent = normalChatEventRepository.findByUuid(normalChatEvent.getUuid())
                .orElseThrow(() -> new RuntimeException("Normal Chat Event Not Found"));

        try{
            chatEventProducer.sendToNormalChatTopic(chatEvent);
            chatEvent.changeEventSentType(EventSentType.SEND_SUCCESS);
            normalChatEventRepository.save(chatEvent);
        } catch (Exception e){
            chatEvent.changeEventSentType(EventSentType.SEND_FAIL);
            normalChatEventRepository.save(chatEvent);
        }
    }

    private NormalChatEvent createNormalChatEvent(NormalChatCreateEvent chatCreateEvent) {
        return NormalChatEvent.from(chatCreateEvent, EventSentType.INIT);
    }

    private NormalChatEvent createNormalChatEvent(NormalChatModifyEvent chatModifyEvent) {
        return NormalChatEvent.from(chatModifyEvent, EventSentType.INIT);
    }

    private NormalChatEvent createNormalChatEvent(NormalChatDeleteEvent chatDeleteEvent) {
        return NormalChatEvent.from(chatDeleteEvent, EventSentType.INIT);
    }

}

 

`normalChatCreateEventBeforeHandler` 메소드를 예시로 `BEFORE_COMMIT` 이벤트 발생 시  

`Transactional OutBox Pattern`에 맞게 이벤트를 `DB`에 저장해놓는다.

 

그 후 `AFTER_COMMIT` 이벤트가 발생하면 `normalChatCreateEventAfterHandler` 메소드가 해당 이벤트를 받아

`publishNormalChatEvent` 메소드를 실행한다.

(이때 코드를 보면 알 수 있듯이 `@Async`(비동기)로 최대 3회 `@Retryable`(재전송) 한다.)

 

`publishNormalChatEvent` 메소드는 `Producer`를 활용해 `Kafka`의 Topic으로 이벤트를 전달하고

이벤트의 상태를 `SEND_SUCCESS`로 변경한다.

만약, 실패했다면 `SEND_FAIL`로 변경 후 저장한다.

 

그 후, 3번 전부 다 실패했다면 결국 `SEND_FAIL`로 저장될 것이므로,

`Polling Daemon`을 활용해 `SEND_FAIL` 채팅에 관한 재전송을 진행하면 된다.

 

 

여기서 `UUID`를 추가한 이유는 `BEFORE_COMMIT` 단계에서 `NormalChatEvent`를 생성하고 저장하지만,

 

이 시점에서는 트랜잭션이 아직 커밋되지 않았기 때문에, 바로 `AFTER_COMMIT` 이벤트가 발생하게 되면

`AFTER_COMMIT` 단계에서 `publishNormalChatEvent`를 호출할 때

`normalChatEvent.getEventId()`가 `NULL`인 경우가 생길 수 있다.

 

따라서, 저장 시`UUID`를 생성하고 발행할 이벤트 객체에 저장하도록 하였다.

 

NormalChatModifyEvent chatModifyEvent =
        NormalChatModifyEvent.from(messageRepository.save(normalMessage), UUIDUtil.generateUUID());

 

 

이제 이렇게 복잡한 방식으로 `Producer`로 발행된 메시지는 이전에도 계속 살펴보았던 `Consumer`로 전달이 된다.

 

 

NormalChatConsumer

@Slf4j
@Component
@RequiredArgsConstructor
public class NormalChatConsumer {

    private final SimpMessageSendingOperations messagingTemplate;

    @KafkaListener(topics = "${spring.kafka.topic.normal-chat}", groupId = "${spring.kafka.consumer.group-id.normal-chat}", containerFactory = "normalChatListenerContainerFactory")
    public void normalChatListener(NormalChatEvent chatEvent) {
        Long normalId = chatEvent.getNormalId();

        switch (chatEvent.getActionType()){
            case SEND -> {
                NormalMessageCreateResponse response = NormalMessageCreateResponse.from(chatEvent);
                messagingTemplate.convertAndSend("/topic/normal/" + normalId, DataResponseDto.from(response));
            }

            case MODIFY -> {
                NormalMessageModifyResponse response = NormalMessageModifyResponse.from(chatEvent);
                messagingTemplate.convertAndSend("/topic/normal/" + normalId, DataResponseDto.from(response));
            }

            case DELETE -> {
                NormalMessageDeleteResponse response = NormalMessageDeleteResponse.from(chatEvent);
                messagingTemplate.convertAndSend("/topic/normal/" + normalId, DataResponseDto.from(response));
            }
        }
    }
}

 

 

실제 DB 저장

 

 


 

자.. 근데 `MongoDB`를 사용하시는 분은 이것만 보고 따라하시면 안된다..

 

`Transactional OutBox Pattern`의 핵심은 `DB`의 트랜잭션 기능에 의거한다.

`BEFORE_COMMIT`, `AFTER_COMMIT`을 활용하여 적절하게 로직을 처리한다.

 

만약, `MySQL`를 사용한다면 `SpringBoot`에서 그냥 `@Transactional`를 붙이면 트랜잭션을 쉽게 사용할 수 있다.

 

 

하지만, `MongoDB`는  `@Transactional`을 사용하기 위해서는 `replicaSet` 환경을 구축해야 한다고 한다..

 

구축하지 않으면 아래 예외가 발생한다.

com.mongodb.MongoCommandException: Command failed with error 263
(ShardingOperationFailed): 'Transaction numbers are only allowed on a replica set
memebe or mongos' on server localhost:27017.
The full response is { "ok" : 0.0, "errmsg" : "Transaction numbers are only 
allowed on a replica set member or mongos", "code" : 263, 
"codeName" : "ShardingOperationFailed" }

 

`MongoDB`의 트랜잭션은 데이터의 일관성과 무결성을 유지하면서 원자적으로 실행해야 하는 여러 작업을

트랜잭션에 포함하는 경우가 많아서 레플리카 세트 내에서 작동하도록 설계되었다고 한다.

 

자세한 내용은 아래 블로그에 잘 포스팅 되어 있으니 참고하면 좋을 것 같다.

MongoDB에서 @Transcational 적용하는 방법

 

회사에서 Spring Boot + mongoDB 트랜잭션 도입하기

1. 왜 mongoDB에서 트랜잭션을 도입하는거야?우선 업무에서 왜 mongoDB를 사용하고 있는지 부터 이야기 해야 할 것 같아요.일반적인 회사에서는 관계형 데이터베이스를 사용하고 있는데, 제가 머물

jh2021.tistory.com

 

위의 블로그를 활용하면서 겪은 정보인데 나는 당연하게도 `Priority`가 낮으면 우선순위가 높을 것이라 생각했는데

`Priority`가 높아야 우선순위가 높게 설정된다.

 

만약에 우선순위 설정을 잘못하게 되면 계속 원하는 `mongoDB`가 `Primary`가 아닌 `Secondary`로 설정될 것이다.

 

 

만약 `mongo1`을 `primary`로 설정하고 싶다면 `priority`를 `mongo2`보단 높게 설정해주도록 하자.

 

 

정리

기존에 `Transactional OutBox Pattern`의 개념만 알고 있었는데 이번에 직접 사용해보는 경험을 얻게 되었다.

 

위 패턴이 데이터의 원자성을 보장해 메시지 발행의 신뢰성을 높일 수 있다는 점을 확인했다.

 

이로써 데이터의 일관성을 확보하고 시스템의 안정성을 강화할 수 있었다.

 

하지만, 위 방법이 만능이라고 할 순 없다.

 

미미하긴 하지만 `Producer`의 이벤트 발행 시 DB 조회가 계속해서 일어나기도 하고

만약 의도치 않게 서비스가 종료되었을 경우 `INIT`, `SEND_SUCCESS`, `SEND_FAIL` 상태에 문제가 생길 수 있다.

 

그래도 아무런 조치가 없는 것보다 훨씬 나은 방법을 경험해봤기에 아주 좋은 경험을 했다고 생각한다.

 

 

코드를 확인하고 싶다면?

https://github.com/HanYoonSoo/STOMP-Study

 

GitHub - HanYoonSoo/STOMP-Study

Contribute to HanYoonSoo/STOMP-Study development by creating an account on GitHub.

github.com

 

 

 

 

 

참고

https://jh2021.tistory.com/24

 

회사에서 Spring Boot + mongoDB 트랜잭션 도입하기

1. 왜 mongoDB에서 트랜잭션을 도입하는거야?우선 업무에서 왜 mongoDB를 사용하고 있는지 부터 이야기 해야 할 것 같아요.일반적인 회사에서는 관계형 데이터베이스를 사용하고 있는데, 제가 머물

jh2021.tistory.com

https://an-jjin.tistory.com/48

 

개발 기록 - FitTrip에 트랜잭셔널 아웃박스 패턴 적용하기

FitTrip 프로젝트는 MSA 구조에서 EDA를 적용한 프로젝트입니다.즉 서비스가 여러 개로 나눠진 분산 시스템 구조입니다.이러한 분산 시스템 구조에서는 데이터의 일관성을 확보하기가 상대적으로

an-jjin.tistory.com

https://www.youtube.com/watch?v=uk5fRLUsBfk