2024.11.11 - [Spring/WebSocket] - [Spring WebSocket] STOMP에서 Kafka 활용 Flow
이전 포스팅에서 본것처럼 `STOMP`에 메시지 브로커인 `Kafka`를 활용해서 `Scale-Out` 상황에 대비했다.
그런데 이렇게 되면
발행되어야 할 메시지가 발행되지 않거나, 발행되지 않아야 하는 메시지가 발행될 수 있다.
이를 간단하게 설명하자면
발행되어야 할 메시지가 발행되지 않음
`DB`에는 채팅이 잘 저장이 돼서 그걸 `Message Broker`로 잘 전달했는데 그 작업이 실패
발행되지 않아야 하는 메시지가 발행 됨
`DB`에는 채팅이 잘 저장이 안됐는데 `Message Broker`로 전달해버림
지금처럼 간단한 채팅 서비스에서는 뭐 큰 문제가 되지 않을 수 있다.
하지만, 실시간 경매 채팅 서비스라면? 채팅 서비스가 아니더라도 만약 결제와 관련된 서비스라면?
아주 난리가 날 것이다.
그래서 그런 문제를 해결하려면 적어도 한번은 전달 즉, `At Least Once Delivery`를 보장해야 한다.
방법은 여러개가 존재한다. `Transcational OutBox Pattern`, `Kafka Callback`, `RabbitMQ CallBack` 등등
나는 여기서 `Transcational OutBox Pattern`을 활용해 해당 상황에 대비했다.
관련된 자세한 내용은 이전 포스팅에서 확인할 수 있다.
2024.09.19 - [Infra/DevOps] - 분산 시스템에서 데이터를 전달하는 효율적인 방법 - 1
2024.09.20 - [Infra/DevOps] - 분산 시스템에서 데이터를 전달하는 효율적인 방법 - 2
자, 이제는 어떻게 현재 채팅 토이 프로젝트에 적용을 했느냐?
우선, 위 포스팅을 보지 않았을 수도 있으니 아래 사항을 알고 넘어가자.
`Transactional OutBox Pattern`은 `Kafka`와 같은 메시지 브로커에 메시지를 전송할 때
`COMMIT` 이전에 해당 메시지와 관련 내용을 `DB`에 저장해 놓는다.
`COMMIT` 이후에는 `DB`에 저장된 이벤트를 읽어 `Producer`를 활용해 토픽에 전송한다.
그 후, 성공적으로 전송되었다면 `SEND_SUCCESS`, 실패했다면 `SEND_FAIL`과 같이 상태값을 저장한다.
이때, 지속적으로 확인하는 `Polling Daemon` 등을 확인해 발송 실패 상황에 대비한다.
자 이점을 알았으니 코드와 함께 알아보자.
NormalMessageCommandServiceImpl
@Slf4j
@Service
@RequiredArgsConstructor
public class NormalMessageCommandServiceImpl implements NormalMessageCommandService {
private final SequenceGenerator sequenceGenerator;
private final NormalMessageRepository messageRepository;
private final NormalRepository normalRepository;
@Override
@Transactional
public void save(NormalMessageCreateRequest request) {
validateUserInNormal(request.getNormalId(), request.getUserId());
NormalMessage normalMessage = NormalMessage.from(request);
normalMessage.generateSequence(sequenceGenerator.generateSequence(NormalMessage.SEQUENCE_NAME));
NormalChatCreateEvent chatCreateEvent =
NormalChatCreateEvent.from(messageRepository.save(normalMessage), UUIDUtil.generateUUID());
Events.send(chatCreateEvent);
}
@Override
@Transactional
public void modify(NormalMessageModifyRequest request) {
NormalMessage normalMessage = messageRepository.findById(request.getMessageId())
.orElseThrow(() -> new NormalChatException(Code.NOT_FOUND, "Message Not Found"));
normalMessage.modify(request.getContent());
// System.out.println(request.getContent());
NormalChatModifyEvent chatModifyEvent =
NormalChatModifyEvent.from(messageRepository.save(normalMessage), UUIDUtil.generateUUID());
Events.send(chatModifyEvent);
}
@Override
@Transactional
public void delete(NormalMessageDeleteRequest request) {
NormalMessage normalMessage = messageRepository.findById(request.getMessageId())
.orElseThrow(() -> new NormalChatException(Code.NOT_FOUND, "Message Not Found"));
normalMessage.delete();
NormalChatDeleteEvent chatDeleteEvent =
NormalChatDeleteEvent.from(messageRepository.save(normalMessage), UUIDUtil.generateUUID());
Events.send(chatDeleteEvent);
}
private void validateUserInNormal(Long normalId, Long userId){
normalRepository.findByNormalIdAndInUserId(normalId, List.of(userId))
.orElseThrow(() -> new NormalChatException(Code.NOT_FOUND, "User Not In Normal Chat"));
}
}
위 코드를 보면 알 수 있듯이 우선
- 클라이언트가 전달한 채팅 내용을 `normalRepository`와 함께 `DB`에 저장한다.
- 각각의 작업에 대해 `NormalChatCreateEvent`, `NormalChatModifyEvent`, `NormalChatDeleteEvent`를 생성한다.
- `Events`를 활용해 이벤트를 전달한다.
여기서 `Events`는 뭘까?
Events
public class Events {
private static ApplicationEventPublisher publisher;
public static void register(ApplicationEventPublisher publisher){
Events.publisher = publisher;
}
public static void send(Object event){
if(publisher != null){
publisher.publishEvent(event);
}
}
}
`Events` 클래스는 `Spring`이 제공하는 `ApplicationEventPublisher`를 사용해 이벤트를 발행해주는 역할을 한다.
왜 `ApplicationEventPublisher`를 사용했냐면
처리해야 할 도메인 로직과 그 이후에 처리되어야 할 로직을 분리하기 위함이다.
- 도메인 로직이 완료된 이후에 이와 관련된 이벤트를 발행한다.
- 도메인 로직 완료 이후에 처리해야 할 것들을 해당 이벤트를 바라보는 `Listener`에 구현한다.
- 즉, 도메인 로직은 도메인의 요구사항에만 집중할 수 있고, 그 이외의 것들은 해당 이벤트를 바라보는 `Listener`에 추가하기만 하면 된다.
또한, 여기서 핵심이 되는 어노테이션은 `@TransactionalEventListener`이다.
`@TransactionalEventListener`를 사용하면 `BEFORE_COMMIT`과 `AFTER_COMMIT` 과정에서 일어날 작업을
정의할 수 있다.
NormalChatEvent
발행된 `Event`의 정보가 담길 `Collection` 혹은 테이블과 같은 개념이다.
@Getter
@Builder
@NoArgsConstructor(access = AccessLevel.PRIVATE)
@AllArgsConstructor(access = AccessLevel.PRIVATE)
@Document(collection = "normalChatEvent")
public class NormalChatEvent {
@Transient
public static final String SEQUENCE_NAME = "normal_chat_event_sequence";
@Id
private Long eventId;
private EventSentType eventSentType;
private String uuid;
private Long messageId;
private Long normalId;
private Long userId;
private String profileImg;
private String writer;
private String content;
private Boolean isDeleted;
private ChatType chatType;
private ActionType actionType;
private List<UploadFile> files;
private LocalDateTime createdAt;
private LocalDateTime modifiedAt;
public void generateSequence(Long eventId){
this.eventId = eventId;
}
public static NormalChatEvent from(NormalChatCreateEvent message, EventSentType eventSentType) {
return NormalChatEvent.builder()
.uuid(message.getUuid())
.messageId(message.getMessageId())
.normalId(message.getNormalId())
.userId(message.getUserId())
.profileImg(message.getProfileImg())
.writer(message.getWriter())
.content(message.getContent())
.isDeleted(message.getIsDeleted())
.actionType(message.getActionType())
.chatType(message.getChatType())
.files(message.getFiles())
.createdAt(message.getCreatedAt())
.modifiedAt(message.getModifiedAt())
.eventSentType(eventSentType)
.build();
}
public static NormalChatEvent from(NormalChatModifyEvent message, EventSentType eventSentType) {
return NormalChatEvent.builder()
.uuid(message.getUuid())
.messageId(message.getMessageId())
.normalId(message.getNormalId())
.content(message.getContent())
.chatType(message.getChatType())
.actionType(message.getActionType())
.createdAt(message.getCreatedAt())
.modifiedAt(message.getModifiedAt())
.eventSentType(eventSentType)
.build();
}
public static NormalChatEvent from(NormalChatDeleteEvent message, EventSentType eventSentType) {
return NormalChatEvent.builder()
.uuid(message.getUuid())
.messageId(message.getMessageId())
.chatType(message.getChatType())
.actionType(message.getActionType())
.eventSentType(eventSentType)
.build();
}
public void changeEventSentType(EventSentType eventSentType) {
this.eventSentType = eventSentType;
}
}
NormalChatEventHandler
@Slf4j
@Service
@RequiredArgsConstructor
public class NormalChatEventHandler {
private final SequenceGenerator sequenceGenerator;
private final ChatEventProducer chatEventProducer;
private final NormalChatEventRepository normalChatEventRepository;
@TransactionalEventListener(classes = NormalChatCreateEvent.class, phase = TransactionPhase.BEFORE_COMMIT)
public void normalChatCreateEventBeforeHandler(NormalChatCreateEvent chatCreateEvent){
NormalChatEvent normalChatEvent = createNormalChatEvent(chatCreateEvent);
normalChatEvent.generateSequence(sequenceGenerator.generateSequence(NormalChatEvent.SEQUENCE_NAME));
normalChatEventRepository.save(normalChatEvent);
}
@Async
@Retryable(retryFor = {Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 2000L))
@TransactionalEventListener(classes = NormalChatCreateEvent.class, phase = TransactionPhase.AFTER_COMMIT)
public void normalChatCreateEventAfterHandler(NormalChatCreateEvent chatCreateEvent){
NormalChatEvent normalChatEvent = createNormalChatEvent(chatCreateEvent);
publishNormalChatEvent(normalChatEvent);
}
@TransactionalEventListener(classes = NormalChatModifyEvent.class, phase = TransactionPhase.BEFORE_COMMIT)
public void normalChatModifyEventBeforeHandler(NormalChatModifyEvent chatModifyEvent){
NormalChatEvent normalChatEvent = createNormalChatEvent(chatModifyEvent);
normalChatEvent.generateSequence(sequenceGenerator.generateSequence(NormalChatEvent.SEQUENCE_NAME));
normalChatEventRepository.save(normalChatEvent);
}
@Async
@Retryable(retryFor = {Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 2000L))
@TransactionalEventListener(classes = NormalChatModifyEvent.class, phase = TransactionPhase.AFTER_COMMIT)
public void normalChatModifyEventAfterHandler(NormalChatModifyEvent chatModifyEvent){
NormalChatEvent normalChatEvent = createNormalChatEvent(chatModifyEvent);
publishNormalChatEvent(normalChatEvent);
}
@TransactionalEventListener(classes = NormalChatDeleteEvent.class, phase = TransactionPhase.BEFORE_COMMIT)
public void normalChatDeleteEventBeforeHandler(NormalChatDeleteEvent chatDeleteEvent){
NormalChatEvent normalChatEvent = createNormalChatEvent(chatDeleteEvent);
normalChatEvent.generateSequence(sequenceGenerator.generateSequence(NormalChatEvent.SEQUENCE_NAME));
normalChatEventRepository.save(normalChatEvent);
}
@Async
@Retryable(retryFor = {Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 2000L))
@TransactionalEventListener(classes = NormalChatDeleteEvent.class, phase = TransactionPhase.AFTER_COMMIT)
public void normalChatDeleteEventAfterHandler(NormalChatDeleteEvent chatDeleteEvent){
NormalChatEvent normalChatEvent = createNormalChatEvent(chatDeleteEvent);
publishNormalChatEvent(normalChatEvent);
}
private void publishNormalChatEvent(NormalChatEvent normalChatEvent) {
NormalChatEvent chatEvent = normalChatEventRepository.findByUuid(normalChatEvent.getUuid())
.orElseThrow(() -> new RuntimeException("Normal Chat Event Not Found"));
try{
chatEventProducer.sendToNormalChatTopic(chatEvent);
chatEvent.changeEventSentType(EventSentType.SEND_SUCCESS);
normalChatEventRepository.save(chatEvent);
} catch (Exception e){
chatEvent.changeEventSentType(EventSentType.SEND_FAIL);
normalChatEventRepository.save(chatEvent);
}
}
private NormalChatEvent createNormalChatEvent(NormalChatCreateEvent chatCreateEvent) {
return NormalChatEvent.from(chatCreateEvent, EventSentType.INIT);
}
private NormalChatEvent createNormalChatEvent(NormalChatModifyEvent chatModifyEvent) {
return NormalChatEvent.from(chatModifyEvent, EventSentType.INIT);
}
private NormalChatEvent createNormalChatEvent(NormalChatDeleteEvent chatDeleteEvent) {
return NormalChatEvent.from(chatDeleteEvent, EventSentType.INIT);
}
}
`normalChatCreateEventBeforeHandler` 메소드를 예시로 `BEFORE_COMMIT` 이벤트 발생 시
`Transactional OutBox Pattern`에 맞게 이벤트를 `DB`에 저장해놓는다.
그 후 `AFTER_COMMIT` 이벤트가 발생하면 `normalChatCreateEventAfterHandler` 메소드가 해당 이벤트를 받아
`publishNormalChatEvent` 메소드를 실행한다.
(이때 코드를 보면 알 수 있듯이 `@Async`(비동기)로 최대 3회 `@Retryable`(재전송) 한다.)
`publishNormalChatEvent` 메소드는 `Producer`를 활용해 `Kafka`의 Topic으로 이벤트를 전달하고
이벤트의 상태를 `SEND_SUCCESS`로 변경한다.
만약, 실패했다면 `SEND_FAIL`로 변경 후 저장한다.
그 후, 3번 전부 다 실패했다면 결국 `SEND_FAIL`로 저장될 것이므로,
`Polling Daemon`을 활용해 `SEND_FAIL` 채팅에 관한 재전송을 진행하면 된다.
여기서 `UUID`를 추가한 이유는 `BEFORE_COMMIT` 단계에서 `NormalChatEvent`를 생성하고 저장하지만,
이 시점에서는 트랜잭션이 아직 커밋되지 않았기 때문에, 바로 `AFTER_COMMIT` 이벤트가 발생하게 되면
`AFTER_COMMIT` 단계에서 `publishNormalChatEvent`를 호출할 때
`normalChatEvent.getEventId()`가 `NULL`인 경우가 생길 수 있다.
따라서, 저장 시`UUID`를 생성하고 발행할 이벤트 객체에 저장하도록 하였다.
NormalChatModifyEvent chatModifyEvent =
NormalChatModifyEvent.from(messageRepository.save(normalMessage), UUIDUtil.generateUUID());
이제 이렇게 복잡한 방식으로 `Producer`로 발행된 메시지는 이전에도 계속 살펴보았던 `Consumer`로 전달이 된다.
NormalChatConsumer
@Slf4j
@Component
@RequiredArgsConstructor
public class NormalChatConsumer {
private final SimpMessageSendingOperations messagingTemplate;
@KafkaListener(topics = "${spring.kafka.topic.normal-chat}", groupId = "${spring.kafka.consumer.group-id.normal-chat}", containerFactory = "normalChatListenerContainerFactory")
public void normalChatListener(NormalChatEvent chatEvent) {
Long normalId = chatEvent.getNormalId();
switch (chatEvent.getActionType()){
case SEND -> {
NormalMessageCreateResponse response = NormalMessageCreateResponse.from(chatEvent);
messagingTemplate.convertAndSend("/topic/normal/" + normalId, DataResponseDto.from(response));
}
case MODIFY -> {
NormalMessageModifyResponse response = NormalMessageModifyResponse.from(chatEvent);
messagingTemplate.convertAndSend("/topic/normal/" + normalId, DataResponseDto.from(response));
}
case DELETE -> {
NormalMessageDeleteResponse response = NormalMessageDeleteResponse.from(chatEvent);
messagingTemplate.convertAndSend("/topic/normal/" + normalId, DataResponseDto.from(response));
}
}
}
}
실제 DB 저장
자.. 근데 `MongoDB`를 사용하시는 분은 이것만 보고 따라하시면 안된다..
`Transactional OutBox Pattern`의 핵심은 `DB`의 트랜잭션 기능에 의거한다.
`BEFORE_COMMIT`, `AFTER_COMMIT`을 활용하여 적절하게 로직을 처리한다.
만약, `MySQL`를 사용한다면 `SpringBoot`에서 그냥 `@Transactional`를 붙이면 트랜잭션을 쉽게 사용할 수 있다.
하지만, `MongoDB`는 `@Transactional`을 사용하기 위해서는 `replicaSet` 환경을 구축해야 한다고 한다..
구축하지 않으면 아래 예외가 발생한다.
com.mongodb.MongoCommandException: Command failed with error 263
(ShardingOperationFailed): 'Transaction numbers are only allowed on a replica set
memebe or mongos' on server localhost:27017.
The full response is { "ok" : 0.0, "errmsg" : "Transaction numbers are only
allowed on a replica set member or mongos", "code" : 263,
"codeName" : "ShardingOperationFailed" }
`MongoDB`의 트랜잭션은 데이터의 일관성과 무결성을 유지하면서 원자적으로 실행해야 하는 여러 작업을
트랜잭션에 포함하는 경우가 많아서 레플리카 세트 내에서 작동하도록 설계되었다고 한다.
자세한 내용은 아래 블로그에 잘 포스팅 되어 있으니 참고하면 좋을 것 같다.
MongoDB에서 @Transcational 적용하는 방법
위의 블로그를 활용하면서 겪은 정보인데 나는 당연하게도 `Priority`가 낮으면 우선순위가 높을 것이라 생각했는데
`Priority`가 높아야 우선순위가 높게 설정된다.
만약에 우선순위 설정을 잘못하게 되면 계속 원하는 `mongoDB`가 `Primary`가 아닌 `Secondary`로 설정될 것이다.
만약 `mongo1`을 `primary`로 설정하고 싶다면 `priority`를 `mongo2`보단 높게 설정해주도록 하자.
정리
기존에 `Transactional OutBox Pattern`의 개념만 알고 있었는데 이번에 직접 사용해보는 경험을 얻게 되었다.
위 패턴이 데이터의 원자성을 보장해 메시지 발행의 신뢰성을 높일 수 있다는 점을 확인했다.
이로써 데이터의 일관성을 확보하고 시스템의 안정성을 강화할 수 있었다.
하지만, 위 방법이 만능이라고 할 순 없다.
미미하긴 하지만 `Producer`의 이벤트 발행 시 DB 조회가 계속해서 일어나기도 하고
만약 의도치 않게 서비스가 종료되었을 경우 `INIT`, `SEND_SUCCESS`, `SEND_FAIL` 상태에 문제가 생길 수 있다.
그래도 아무런 조치가 없는 것보다 훨씬 나은 방법을 경험해봤기에 아주 좋은 경험을 했다고 생각한다.
코드를 확인하고 싶다면?
https://github.com/HanYoonSoo/STOMP-Study
참고
https://an-jjin.tistory.com/48
https://www.youtube.com/watch?v=uk5fRLUsBfk
'Spring > WebSocket' 카테고리의 다른 글
[Spring WebSocket] STOMP에서 Kafka 활용 Flow (0) | 2024.11.11 |
---|---|
[Spring WebSocket] STOMP에서의 예외처리 (0) | 2024.11.11 |
[Spring WebSocket] Spring Security + STOMP (1) | 2024.11.11 |
[Spring WebSocket] MongoDB Collection 설계 With Auto-Incremented Sequence (9) | 2024.11.10 |
[Spring WebSocket] 채팅 서비스 프로젝트에 Kafka 적용 (2) | 2024.11.10 |