
2024.11.11 - [Spring/WebSocket] - [Spring WebSocket] STOMP에서 Kafka 활용 Flow
[Spring WebSocket] STOMP에서 Kafka 활용 Flow
2024.11.10 - [Spring/WebSocket] - [Spring WebSocket] 채팅 서비스 프로젝트에 Kafka 적용 [Spring WebSocket] 채팅 서비스 프로젝트에 Kafka 적용채팅 서비스는 일반적인 `HTTP` 통신을 사용하는 `stateless` 서비스와는
hdbstn3055.tistory.com
이전 포스팅에서 본것처럼 STOMP
에 메시지 브로커인 Kafka
를 활용해서 Scale-Out
상황에 대비했다.
그런데 이렇게 되면
발행되어야 할 메시지가 발행되지 않거나, 발행되지 않아야 하는 메시지가 발행될 수 있다.
이를 간단하게 설명하자면
발행되어야 할 메시지가 발행되지 않음
DB
에는 채팅이 잘 저장이 돼서 그걸 Message Broker
로 잘 전달했는데 그 작업이 실패
발행되지 않아야 하는 메시지가 발행 됨
DB
에는 채팅이 잘 저장이 안됐는데 Message Broker
로 전달해버림
지금처럼 간단한 채팅 서비스에서는 뭐 큰 문제가 되지 않을 수 있다.
하지만, 실시간 경매 채팅 서비스라면? 채팅 서비스가 아니더라도 만약 결제와 관련된 서비스라면?
아주 난리가 날 것이다.
그래서 그런 문제를 해결하려면 적어도 한번은 전달 즉, At Least Once Delivery
를 보장해야 한다.
방법은 여러개가 존재한다. Transcational OutBox Pattern
, Kafka Callback
, RabbitMQ CallBack
등등
나는 여기서 Transcational OutBox Pattern
을 활용해 해당 상황에 대비했다.
관련된 자세한 내용은 이전 포스팅에서 확인할 수 있다.
2024.09.19 - [Infra/DevOps] - 분산 시스템에서 데이터를 전달하는 효율적인 방법 - 1
분산 시스템에서 데이터를 전달하는 효율적인 방법 - 1
이번 포스팅에서는 NHN 유튜브의 분산 시스템에서 데이터를 전달하는 효율적인 방법 강의를 보고 정리한 내용이다. 포스팅에서 다룰 내용데이터 전달 보장 방법론RDB를 사용하는 애플리케이션
hdbstn3055.tistory.com
2024.09.20 - [Infra/DevOps] - 분산 시스템에서 데이터를 전달하는 효율적인 방법 - 2
분산 시스템에서 데이터를 전달하는 효율적인 방법 - 2
2024.09.19 - [Infra/DevOps] - 분산 시스템에서 데이터를 전달하는 효율적인 방법 - 1 분산 시스템에서 데이터를 전달하는 효율적인 방법 - 1이번 포스팅에서는 NHN 유튜브의 분산 시스템에서 데이터를
hdbstn3055.tistory.com
자, 이제는 어떻게 현재 채팅 토이 프로젝트에 적용을 했느냐?
우선, 위 포스팅을 보지 않았을 수도 있으니 아래 사항을 알고 넘어가자.
Transactional OutBox Pattern
은 Kafka
와 같은 메시지 브로커에 메시지를 전송할 때
COMMIT
이전에 해당 메시지와 관련 내용을 DB
에 저장해 놓는다.
COMMIT
이후에는 DB
에 저장된 이벤트를 읽어 Producer
를 활용해 토픽에 전송한다.
그 후, 성공적으로 전송되었다면 SEND_SUCCESS
, 실패했다면 SEND_FAIL
과 같이 상태값을 저장한다.
이때, 지속적으로 확인하는 Polling Daemon
등을 확인해 발송 실패 상황에 대비한다.
자 이점을 알았으니 코드와 함께 알아보자.
NormalMessageCommandServiceImpl
@Slf4j
@Service
@RequiredArgsConstructor
public class NormalMessageCommandServiceImpl implements NormalMessageCommandService {
private final SequenceGenerator sequenceGenerator;
private final NormalMessageRepository messageRepository;
private final NormalRepository normalRepository;
@Override
@Transactional
public void save(NormalMessageCreateRequest request) {
validateUserInNormal(request.getNormalId(), request.getUserId());
NormalMessage normalMessage = NormalMessage.from(request);
normalMessage.generateSequence(sequenceGenerator.generateSequence(NormalMessage.SEQUENCE_NAME));
NormalChatCreateEvent chatCreateEvent =
NormalChatCreateEvent.from(messageRepository.save(normalMessage), UUIDUtil.generateUUID());
Events.send(chatCreateEvent);
}
@Override
@Transactional
public void modify(NormalMessageModifyRequest request) {
NormalMessage normalMessage = messageRepository.findById(request.getMessageId())
.orElseThrow(() -> new NormalChatException(Code.NOT_FOUND, "Message Not Found"));
normalMessage.modify(request.getContent());
// System.out.println(request.getContent());
NormalChatModifyEvent chatModifyEvent =
NormalChatModifyEvent.from(messageRepository.save(normalMessage), UUIDUtil.generateUUID());
Events.send(chatModifyEvent);
}
@Override
@Transactional
public void delete(NormalMessageDeleteRequest request) {
NormalMessage normalMessage = messageRepository.findById(request.getMessageId())
.orElseThrow(() -> new NormalChatException(Code.NOT_FOUND, "Message Not Found"));
normalMessage.delete();
NormalChatDeleteEvent chatDeleteEvent =
NormalChatDeleteEvent.from(messageRepository.save(normalMessage), UUIDUtil.generateUUID());
Events.send(chatDeleteEvent);
}
private void validateUserInNormal(Long normalId, Long userId){
normalRepository.findByNormalIdAndInUserId(normalId, List.of(userId))
.orElseThrow(() -> new NormalChatException(Code.NOT_FOUND, "User Not In Normal Chat"));
}
}
위 코드를 보면 알 수 있듯이 우선
- 클라이언트가 전달한 채팅 내용을
normalRepository
와 함께DB
에 저장한다. - 각각의 작업에 대해
NormalChatCreateEvent
,NormalChatModifyEvent
,NormalChatDeleteEvent
를 생성한다. Events
를 활용해 이벤트를 전달한다.
여기서 Events
는 뭘까?
Events
public class Events {
private static ApplicationEventPublisher publisher;
public static void register(ApplicationEventPublisher publisher){
Events.publisher = publisher;
}
public static void send(Object event){
if(publisher != null){
publisher.publishEvent(event);
}
}
}
Events
클래스는 Spring
이 제공하는 ApplicationEventPublisher
를 사용해 이벤트를 발행해주는 역할을 한다.
왜 ApplicationEventPublisher
를 사용했냐면
처리해야 할 도메인 로직과 그 이후에 처리되어야 할 로직을 분리하기 위함이다.
- 도메인 로직이 완료된 이후에 이와 관련된 이벤트를 발행한다.
- 도메인 로직 완료 이후에 처리해야 할 것들을 해당 이벤트를 바라보는
Listener
에 구현한다. - 즉, 도메인 로직은 도메인의 요구사항에만 집중할 수 있고, 그 이외의 것들은 해당 이벤트를 바라보는
Listener
에 추가하기만 하면 된다.
또한, 여기서 핵심이 되는 어노테이션은 @TransactionalEventListener
이다.
@TransactionalEventListener
를 사용하면 BEFORE_COMMIT
과 AFTER_COMMIT
과정에서 일어날 작업을
정의할 수 있다.
NormalChatEvent
발행된 Event
의 정보가 담길 Collection
혹은 테이블과 같은 개념이다.
@Getter
@Builder
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@Document(collection = "normalChatEvent")
public class NormalChatEvent {
@Transient
public static final String SEQUENCE_NAME = "normal_chat_event_sequence";
@Id
private Long eventId;
private EventSentType eventSentType;
private String uuid;
private Long messageId;
private Long normalId;
private Long userId;
private String profileImg;
private String writer;
private String content;
private Boolean isDeleted;
private ChatType chatType;
private ActionType actionType;
private List<UploadFile> files;
private LocalDateTime createdAt;
private LocalDateTime modifiedAt;
public void generateSequence(Long eventId){
this.eventId = eventId;
}
public static NormalChatEvent from(NormalChatCreateEvent message, EventSentType eventSentType) {
return NormalChatEvent.builder()
.uuid(message.getUuid())
.messageId(message.getMessageId())
.normalId(message.getNormalId())
.userId(message.getUserId())
.profileImg(message.getProfileImg())
.writer(message.getWriter())
.content(message.getContent())
.isDeleted(message.getIsDeleted())
.actionType(message.getActionType())
.chatType(message.getChatType())
.files(message.getFiles())
.createdAt(message.getCreatedAt())
.modifiedAt(message.getModifiedAt())
.eventSentType(eventSentType)
.build();
}
public static NormalChatEvent from(NormalChatModifyEvent message, EventSentType eventSentType) {
return NormalChatEvent.builder()
.uuid(message.getUuid())
.messageId(message.getMessageId())
.normalId(message.getNormalId())
.content(message.getContent())
.chatType(message.getChatType())
.actionType(message.getActionType())
.createdAt(message.getCreatedAt())
.modifiedAt(message.getModifiedAt())
.eventSentType(eventSentType)
.build();
}
public static NormalChatEvent from(NormalChatDeleteEvent message, EventSentType eventSentType) {
return NormalChatEvent.builder()
.uuid(message.getUuid())
.messageId(message.getMessageId())
.chatType(message.getChatType())
.actionType(message.getActionType())
.eventSentType(eventSentType)
.build();
}
public void changeEventSentType(EventSentType eventSentType) {
this.eventSentType = eventSentType;
}
}
NormalChatEventHandler
@Slf4j
@Service
@RequiredArgsConstructor
public class NormalChatEventHandler {
private final SequenceGenerator sequenceGenerator;
private final ChatEventProducer chatEventProducer;
private final NormalChatEventRepository normalChatEventRepository;
@TransactionalEventListener(classes = NormalChatCreateEvent.class, phase = TransactionPhase.BEFORE_COMMIT)
public void normalChatCreateEventBeforeHandler(NormalChatCreateEvent chatCreateEvent){
NormalChatEvent normalChatEvent = createNormalChatEvent(chatCreateEvent);
normalChatEvent.generateSequence(sequenceGenerator.generateSequence(NormalChatEvent.SEQUENCE_NAME));
normalChatEventRepository.save(normalChatEvent);
}
@Async
@Retryable(retryFor = {Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 2000L))
@TransactionalEventListener(classes = NormalChatCreateEvent.class, phase = TransactionPhase.AFTER_COMMIT)
public void normalChatCreateEventAfterHandler(NormalChatCreateEvent chatCreateEvent){
NormalChatEvent normalChatEvent = createNormalChatEvent(chatCreateEvent);
publishNormalChatEvent(normalChatEvent);
}
@TransactionalEventListener(classes = NormalChatModifyEvent.class, phase = TransactionPhase.BEFORE_COMMIT)
public void normalChatModifyEventBeforeHandler(NormalChatModifyEvent chatModifyEvent){
NormalChatEvent normalChatEvent = createNormalChatEvent(chatModifyEvent);
normalChatEvent.generateSequence(sequenceGenerator.generateSequence(NormalChatEvent.SEQUENCE_NAME));
normalChatEventRepository.save(normalChatEvent);
}
@Async
@Retryable(retryFor = {Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 2000L))
@TransactionalEventListener(classes = NormalChatModifyEvent.class, phase = TransactionPhase.AFTER_COMMIT)
public void normalChatModifyEventAfterHandler(NormalChatModifyEvent chatModifyEvent){
NormalChatEvent normalChatEvent = createNormalChatEvent(chatModifyEvent);
publishNormalChatEvent(normalChatEvent);
}
@TransactionalEventListener(classes = NormalChatDeleteEvent.class, phase = TransactionPhase.BEFORE_COMMIT)
public void normalChatDeleteEventBeforeHandler(NormalChatDeleteEvent chatDeleteEvent){
NormalChatEvent normalChatEvent = createNormalChatEvent(chatDeleteEvent);
normalChatEvent.generateSequence(sequenceGenerator.generateSequence(NormalChatEvent.SEQUENCE_NAME));
normalChatEventRepository.save(normalChatEvent);
}
@Async
@Retryable(retryFor = {Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 2000L))
@TransactionalEventListener(classes = NormalChatDeleteEvent.class, phase = TransactionPhase.AFTER_COMMIT)
public void normalChatDeleteEventAfterHandler(NormalChatDeleteEvent chatDeleteEvent){
NormalChatEvent normalChatEvent = createNormalChatEvent(chatDeleteEvent);
publishNormalChatEvent(normalChatEvent);
}
private void publishNormalChatEvent(NormalChatEvent normalChatEvent) {
NormalChatEvent chatEvent = normalChatEventRepository.findByUuid(normalChatEvent.getUuid())
.orElseThrow(() -> new RuntimeException("Normal Chat Event Not Found"));
try{
chatEventProducer.sendToNormalChatTopic(chatEvent);
chatEvent.changeEventSentType(EventSentType.SEND_SUCCESS);
normalChatEventRepository.save(chatEvent);
} catch (Exception e){
chatEvent.changeEventSentType(EventSentType.SEND_FAIL);
normalChatEventRepository.save(chatEvent);
}
}
private NormalChatEvent createNormalChatEvent(NormalChatCreateEvent chatCreateEvent) {
return NormalChatEvent.from(chatCreateEvent, EventSentType.INIT);
}
private NormalChatEvent createNormalChatEvent(NormalChatModifyEvent chatModifyEvent) {
return NormalChatEvent.from(chatModifyEvent, EventSentType.INIT);
}
private NormalChatEvent createNormalChatEvent(NormalChatDeleteEvent chatDeleteEvent) {
return NormalChatEvent.from(chatDeleteEvent, EventSentType.INIT);
}
}
normalChatCreateEventBeforeHandler
메소드를 예시로 BEFORE_COMMIT
이벤트 발생 시
Transactional OutBox Pattern
에 맞게 이벤트를 DB
에 저장해놓는다.
그 후 AFTER_COMMIT
이벤트가 발생하면 normalChatCreateEventAfterHandler
메소드가 해당 이벤트를 받아
publishNormalChatEvent
메소드를 실행한다.
(이때 코드를 보면 알 수 있듯이 @Async
(비동기)로 최대 3회 @Retryable
(재전송) 한다.)
publishNormalChatEvent
메소드는 Producer
를 활용해 Kafka
의 Topic으로 이벤트를 전달하고
이벤트의 상태를 SEND_SUCCESS
로 변경한다.
만약, 실패했다면 SEND_FAIL
로 변경 후 저장한다.
그 후, 3번 전부 다 실패했다면 결국 SEND_FAIL
로 저장될 것이므로,
Polling Daemon
을 활용해 SEND_FAIL
채팅에 관한 재전송을 진행하면 된다.
여기서 UUID
를 추가한 이유는 BEFORE_COMMIT
단계에서 NormalChatEvent
를 생성하고 저장하지만,
이 시점에서는 트랜잭션이 아직 커밋되지 않았기 때문에, 바로 AFTER_COMMIT
이벤트가 발생하게 되면
AFTER_COMMIT
단계에서 publishNormalChatEvent
를 호출할 때
normalChatEvent.getEventId()
가 NULL
인 경우가 생길 수 있다.
따라서, 저장 시UUID
를 생성하고 발행할 이벤트 객체에 저장하도록 하였다.
NormalChatModifyEvent chatModifyEvent =
NormalChatModifyEvent.from(messageRepository.save(normalMessage), UUIDUtil.generateUUID());
이제 이렇게 복잡한 방식으로 Producer
로 발행된 메시지는 이전에도 계속 살펴보았던 Consumer
로 전달이 된다.
NormalChatConsumer
@Slf4j
@Component
@RequiredArgsConstructor
public class NormalChatConsumer {
private final SimpMessageSendingOperations messagingTemplate;
@KafkaListener(topics = "${spring.kafka.topic.normal-chat}", groupId = "${spring.kafka.consumer.group-id.normal-chat}", containerFactory = "normalChatListenerContainerFactory")
public void normalChatListener(NormalChatEvent chatEvent) {
Long normalId = chatEvent.getNormalId();
switch (chatEvent.getActionType()){
case SEND -> {
NormalMessageCreateResponse response = NormalMessageCreateResponse.from(chatEvent);
messagingTemplate.convertAndSend("/topic/normal/" + normalId, DataResponseDto.from(response));
}
case MODIFY -> {
NormalMessageModifyResponse response = NormalMessageModifyResponse.from(chatEvent);
messagingTemplate.convertAndSend("/topic/normal/" + normalId, DataResponseDto.from(response));
}
case DELETE -> {
NormalMessageDeleteResponse response = NormalMessageDeleteResponse.from(chatEvent);
messagingTemplate.convertAndSend("/topic/normal/" + normalId, DataResponseDto.from(response));
}
}
}
}
실제 DB 저장

자.. 근데 MongoDB
를 사용하시는 분은 이것만 보고 따라하시면 안된다..
Transactional OutBox Pattern
의 핵심은 DB
의 트랜잭션 기능에 의거한다.
BEFORE_COMMIT
, AFTER_COMMIT
을 활용하여 적절하게 로직을 처리한다.
만약, MySQL
를 사용한다면 SpringBoot
에서 그냥 @Transactional
를 붙이면 트랜잭션을 쉽게 사용할 수 있다.
하지만, MongoDB
는 @Transactional
을 사용하기 위해서는 replicaSet
환경을 구축해야 한다고 한다..
구축하지 않으면 아래 예외가 발생한다.
com.mongodb.MongoCommandException: Command failed with error 263
(ShardingOperationFailed): 'Transaction numbers are only allowed on a replica set
memebe or mongos' on server localhost:27017.
The full response is { "ok" : 0.0, "errmsg" : "Transaction numbers are only
allowed on a replica set member or mongos", "code" : 263,
"codeName" : "ShardingOperationFailed" }
MongoDB
의 트랜잭션은 데이터의 일관성과 무결성을 유지하면서 원자적으로 실행해야 하는 여러 작업을
트랜잭션에 포함하는 경우가 많아서 레플리카 세트 내에서 작동하도록 설계되었다고 한다.
자세한 내용은 아래 블로그에 잘 포스팅 되어 있으니 참고하면 좋을 것 같다.
MongoDB에서 @Transcational 적용하는 방법
회사에서 Spring Boot + mongoDB 트랜잭션 도입하기
1. 왜 mongoDB에서 트랜잭션을 도입하는거야?우선 업무에서 왜 mongoDB를 사용하고 있는지 부터 이야기 해야 할 것 같아요.일반적인 회사에서는 관계형 데이터베이스를 사용하고 있는데, 제가 머물
jh2021.tistory.com
위의 블로그를 활용하면서 겪은 정보인데 나는 당연하게도 Priority
가 낮으면 우선순위가 높을 것이라 생각했는데
Priority
가 높아야 우선순위가 높게 설정된다.
만약에 우선순위 설정을 잘못하게 되면 계속 원하는 mongoDB
가 Primary
가 아닌 Secondary
로 설정될 것이다.

만약 mongo1
을 primary
로 설정하고 싶다면 priority
를 mongo2
보단 높게 설정해주도록 하자.
정리
기존에 Transactional OutBox Pattern
의 개념만 알고 있었는데 이번에 직접 사용해보는 경험을 얻게 되었다.
위 패턴이 데이터의 원자성을 보장해 메시지 발행의 신뢰성을 높일 수 있다는 점을 확인했다.
이로써 데이터의 일관성을 확보하고 시스템의 안정성을 강화할 수 있었다.
하지만, 위 방법이 만능이라고 할 순 없다.
미미하긴 하지만 Producer
의 이벤트 발행 시 DB 조회가 계속해서 일어나기도 하고
만약 의도치 않게 서비스가 종료되었을 경우 INIT
, SEND_SUCCESS
, SEND_FAIL
상태에 문제가 생길 수 있다.
그래도 아무런 조치가 없는 것보다 훨씬 나은 방법을 경험해봤기에 아주 좋은 경험을 했다고 생각한다.
코드를 확인하고 싶다면?
https://github.com/HanYoonSoo/STOMP-Study
GitHub - HanYoonSoo/STOMP-Study
Contribute to HanYoonSoo/STOMP-Study development by creating an account on GitHub.
github.com
참고
회사에서 Spring Boot + mongoDB 트랜잭션 도입하기
1. 왜 mongoDB에서 트랜잭션을 도입하는거야?우선 업무에서 왜 mongoDB를 사용하고 있는지 부터 이야기 해야 할 것 같아요.일반적인 회사에서는 관계형 데이터베이스를 사용하고 있는데, 제가 머물
jh2021.tistory.com
https://an-jjin.tistory.com/48
개발 기록 - FitTrip에 트랜잭셔널 아웃박스 패턴 적용하기
FitTrip 프로젝트는 MSA 구조에서 EDA를 적용한 프로젝트입니다.즉 서비스가 여러 개로 나눠진 분산 시스템 구조입니다.이러한 분산 시스템 구조에서는 데이터의 일관성을 확보하기가 상대적으로
an-jjin.tistory.com
https://www.youtube.com/watch?v=uk5fRLUsBfk
'Spring > WebSocket' 카테고리의 다른 글
[Spring WebSocket] STOMP에서 Kafka 활용 Flow (0) | 2024.11.11 |
---|---|
[Spring WebSocket] STOMP에서의 예외처리 (0) | 2024.11.11 |
[Spring WebSocket] Spring Security + STOMP (0) | 2024.11.11 |
[Spring WebSocket] MongoDB Collection 설계 With Auto-Incremented Sequence (0) | 2024.11.10 |
[Spring WebSocket] 채팅 서비스 프로젝트에 Kafka 적용 (1) | 2024.11.10 |