본문 바로가기
Spring/WebSocket

[Spring WebSocket] STOMP에서 Kafka 활용 Flow

by 진꿈청 2024. 11. 11.

https:/ /asfirstalways.tistory.com/359

 

 

2024.11.10 - [Spring/WebSocket] - [Spring WebSocket] 채팅 서비스 프로젝트에 Kafka 적용

 

[Spring WebSocket] 채팅 서비스 프로젝트에 Kafka 적용

채팅 서비스는 일반적인 `HTTP` 통신을 사용하는 `stateless` 서비스와는 달리,`WebSocket`을 사용해 클라이언트와 상시 연결되어 있는 `stateful`한 특성을 가진다. 만약, 단일 인스턴스라면 해당 부분은

hdbstn3055.tistory.com

 

우리는 위 포스트를 통해 `WebSocket`을 사용하는 서비스에서 `Kafka` 즉,

`Message Broker`를 사용하는 이유에 관해 알아보았다.

 

그렇다면 실제로 어떤식으로 코드를 작성해야 할까?

 

코드를 설명하기에 앞서 간단하게 요약하자면

(`Transactional OutBox Pattern`이 적용되어 있기에 참고해서 보면 좋을 것 같다.) 

 

  1. 클라이언트가 채팅 서비스에 채팅 전송, 수정, 삭제
  2. "채팅 서비스에서 바로 클라이언트로 브로드캐스트 하는 것이 아닌" `Producer`로 `Kafka` 토픽으로 전송
  3. 채팅 서비스의 `Consumer`가 해당 메시지를 전달 받음
  4. `SEND`, `UPDATE`, `DELETE` 작업에 맞춰 데이터를 받은 뒤 이제 클라이언트에 브로드캐스트

여기서 포인트는 채팅 서비스에서 바로 클라이언트로 브로드캐스트하는 것이 아니라

중간에 메시지 브로커를 통해서 클라이언트에 브로드캐스트 한다는 것이다.

 

 

자 이제 Kafka의 설정부터 코드와 함께 알아보자.

 

관련 코드

 

build.gradle

 

우선 Spring은 Kafka를 활용하기 쉽게 도와주는 `Dependency`를 제공한다.

/* Kafka */
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'

 

 

우선, 나는 `Kafka`를 활용하기 위해 `Docker`를 사용했다.

 

docker-compose.yml

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    ports:
      - "9092:9092"
      - "9093:9093"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: INSIDE://localhost:9092,OUTSIDE://localhost:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
    depends_on:
      - zookeeper
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

 

 

application.yml

spring:
  ...

  kafka:
    bootstrap-servers: localhost:9093
    topic:
      normal-chat: "normalChat"
      emoji-chat: "emojiChat"
    consumer:
      group-id:
        normal-chat: "normalChatGroup"
        emoji-chat: "emojiChatGroup"

 

 

KafkaTopicConfig

@Configuration
public class KafkaTopicConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Value("${spring.kafka.topic.normal-chat}")
    private String normalChatTopic;

    @Value("${spring.kafka.topic.emoji-chat}")
    private String emojiChatTopic;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configurations = new HashMap<>();
        configurations.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configurations);
    }

    @Bean
    public NewTopic normalChatTopic(){
        return TopicBuilder.name(normalChatTopic)
                .partitions(1)
                .replicas(1)
                .build();
    }

    @Bean
    public NewTopic emojiChatTopic(){
        return TopicBuilder.name(normalChatTopic)
                .partitions(1)
                .replicas(1)
                .build();
    }
}

 

 

ChatProducerConfig

 

`Producer` 역할을 수행할 `Config`이다.

`@EnableKafka`를 꼭 붙여주도록 하자. `Kafka` 환경설정과 관련해서 필요한 역할을 한다.

@EnableKafka
@Configuration
public class ChatProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;;

    @Bean
    public Map<String, Object> chatEventProducerConfig() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return config;
    }

    @Bean
    public ProducerFactory<String, NormalChatEvent> normalChatEventProducerFactory(){
        return new DefaultKafkaProducerFactory<>(chatEventProducerConfig());
    }

    @Bean
    public KafkaTemplate<String, NormalChatEvent> normalChatKafkaTemplate(){
        return new KafkaTemplate<>(normalChatEventProducerFactory());
    }

}

 

 

NormalChatConsumerConfig

 

`Consumer` 역할을 수행할 `Config`이다.

`@EnableKafka`를 꼭 붙여주도록 하자. `Kafka` 환경설정과 관련해서 필요한 역할을 한다.

@EnableKafka
@Configuration
public class NormalChatConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Value("${spring.kafka.consumer.group-id.normal-chat}")
    private String groupId;

    @Bean
    public Map<String, Object> normalChatConsumerConfiguration() {
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return config;
    }

    @Bean
    public ConsumerFactory<String, NormalChatEvent> normalChatEventConsumerFactory(){
        return new DefaultKafkaConsumerFactory<>(
                normalChatConsumerConfiguration(),
                new StringDeserializer(),
                new JsonDeserializer<>(NormalChatEvent.class)
        );
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, NormalChatEvent> normalChatListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, NormalChatEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(normalChatEventConsumerFactory());
        return factory;
    }
}

 

 

ChatEventProducer

 

`Producer`로 메시지 브로커로 메시지를 전달하는 역할을 한다.

@Slf4j
@Component
@RequiredArgsConstructor
public class ChatEventProducer {

    @Value("${spring.kafka.topic.normal-chat}")
    private String normalChatTopic;

    private final KafkaTemplate<String, NormalChatEvent> normalChatKafkaTemplate;

    public void sendToNormalChatTopic(NormalChatEvent message){
        normalChatKafkaTemplate.send(normalChatTopic, message);
    }
}

 

 

NormalChatConsumer

 

`Consumer`로 메시지 브로커에서 메시지를 받아온다.

@Slf4j
@Component
@RequiredArgsConstructor
public class NormalChatConsumer {

    private final SimpMessageSendingOperations messagingTemplate;

    @KafkaListener(topics = "${spring.kafka.topic.normal-chat}", groupId = "${spring.kafka.consumer.group-id.normal-chat}", containerFactory = "normalChatListenerContainerFactory")
    public void normalChatListener(NormalChatEvent chatEvent) {
        Long normalId = chatEvent.getNormalId();

        switch (chatEvent.getActionType()){
            case SEND -> {
                NormalMessageCreateResponse response = NormalMessageCreateResponse.from(chatEvent);
                messagingTemplate.convertAndSend("/topic/normal/" + normalId, DataResponseDto.from(response));
            }

            case MODIFY -> {
                NormalMessageModifyResponse response = NormalMessageModifyResponse.from(chatEvent);
                messagingTemplate.convertAndSend("/topic/normal/" + normalId, DataResponseDto.from(response));
            }

            case DELETE -> {
                NormalMessageDeleteResponse response = NormalMessageDeleteResponse.from(chatEvent);
                messagingTemplate.convertAndSend("/topic/normal/" + normalId, DataResponseDto.from(response));
            }
        }
    }
}

 

 

자 여기까지 기본적인 `Kafka` 설정과 `Producer`, `Consumer` 작업이 끝났다.

 

그럼 실제 작동 코드는 어떻게 될까?

 

 

NormalMessageCommandController

@Slf4j
@RestController
@RequiredArgsConstructor
public class NormalMessageCommandController {

    private final NormalMessageCommandService commandService;

    @MessageMapping("/normal/message/send")
    public void save(NormalMessageCreateRequest request){
        commandService.save(request);
    }

    @MessageMapping("/normal/message/modify")
    public void modify(NormalMessageModifyRequest request){
        commandService.modify(request);
    }

    @MessageMapping("/normal/message/delete")
    public void delete(NormalMessageDeleteRequest request){
        commandService.delete(request);
    }


}

 

`@MessageMapping`을 보면 알 수 있겠지만 해당 경로로 클라이언트는 채팅을 전송/수정/삭제를 한다.

 

그 후, `CommandService`로 해당 DTO가 전달이 된다.

 

 

NormalMessageCommandServiceImpl

@Slf4j
@Service
@RequiredArgsConstructor
public class NormalMessageCommandServiceImpl implements NormalMessageCommandService {

    private final SequenceGenerator sequenceGenerator;
    private final NormalMessageRepository messageRepository;
    private final NormalRepository normalRepository;

    @Override
    @Transactional
    public void save(NormalMessageCreateRequest request) {
        validateUserInNormal(request.getNormalId(), request.getUserId());
        NormalMessage normalMessage = NormalMessage.from(request);

        normalMessage.generateSequence(sequenceGenerator.generateSequence(NormalMessage.SEQUENCE_NAME));

        NormalChatCreateEvent chatCreateEvent =
                NormalChatCreateEvent.from(messageRepository.save(normalMessage), UUIDUtil.generateUUID());

        Events.send(chatCreateEvent);
    }

    @Override
    @Transactional
    public void modify(NormalMessageModifyRequest request) {
        NormalMessage normalMessage = messageRepository.findById(request.getMessageId())
                .orElseThrow(() -> new NormalChatException(Code.NOT_FOUND, "Message Not Found"));

        normalMessage.modify(request.getContent());

//        System.out.println(request.getContent());

        NormalChatModifyEvent chatModifyEvent =
                NormalChatModifyEvent.from(messageRepository.save(normalMessage), UUIDUtil.generateUUID());

        Events.send(chatModifyEvent);
    }

    @Override
    @Transactional
    public void delete(NormalMessageDeleteRequest request) {
        NormalMessage normalMessage = messageRepository.findById(request.getMessageId())
                .orElseThrow(() -> new NormalChatException(Code.NOT_FOUND, "Message Not Found"));

        normalMessage.delete();

        NormalChatDeleteEvent chatDeleteEvent =
                NormalChatDeleteEvent.from(messageRepository.save(normalMessage), UUIDUtil.generateUUID());

        Events.send(chatDeleteEvent);
    }


    private void validateUserInNormal(Long normalId, Long userId){
        normalRepository.findByNormalIdAndInUserId(normalId, List.of(userId))
                .orElseThrow(() -> new NormalChatException(Code.NOT_FOUND, "User Not In Normal Chat"));
    }
}

 

지금 현재는 `Transactional OutBox Pattern`이 적용되어 있어서 확인하기 힘들지만,

그래도 코드를 살펴보면 알 수 있듯이 생성/수정/삭제 이후 `Event`를 생성하여 `send()`한다.

 

즉, `Producer` -> `Broker` -> `Consumer`로 전달이 되고

 

 

그렇게 전송된 메시지는 아래 `Consumer` 코드에 의해

@Slf4j
@Component
@RequiredArgsConstructor
public class NormalChatConsumer {

    private final SimpMessageSendingOperations messagingTemplate;

    @KafkaListener(topics = "${spring.kafka.topic.normal-chat}", groupId = "${spring.kafka.consumer.group-id.normal-chat}", containerFactory = "normalChatListenerContainerFactory")
    public void normalChatListener(NormalChatEvent chatEvent) {
        Long normalId = chatEvent.getNormalId();

        switch (chatEvent.getActionType()){
            case SEND -> {
                NormalMessageCreateResponse response = NormalMessageCreateResponse.from(chatEvent);
                messagingTemplate.convertAndSend("/topic/normal/" + normalId, DataResponseDto.from(response));
            }

            case MODIFY -> {
                NormalMessageModifyResponse response = NormalMessageModifyResponse.from(chatEvent);
                messagingTemplate.convertAndSend("/topic/normal/" + normalId, DataResponseDto.from(response));
            }

            case DELETE -> {
                NormalMessageDeleteResponse response = NormalMessageDeleteResponse.from(chatEvent);
                messagingTemplate.convertAndSend("/topic/normal/" + normalId, DataResponseDto.from(response));
            }
        }
    }
}

 

특정 `normalId`를 구독중인 클라이언트들에게 `BroadCast`가 된다.

 

여기서 `convertAndSend()` 메소드는 해당 경로를 구독중인 사람들에게

두 번째 인자값에 있는(위에서는 `DataResponseDto.from(...)`)) 데이터를 전달하는 것이다.

 

 

작동 영상

 

실제로 `Producer`와 `Consumer`를 통해 잘 동작하는 것을 확인할 수 있다.

 

 

 

정리

우리는 `Kafka`라는 `Message Broker`를 사용하므로, 채팅 서비스가 갖는 `stateful` 특성에 의해 겪을 수 있는

`Scale-Out`의 어려움에 성공적으로 대처를 했다.

 

하지만, 위와 같이 클라이언트에게 바로 반환하는 것이 아닌 `Message Broker`를 구성하므로 발생하는 추가적인 문제가 있다.

 

만약 모종의 문제가 생겨 `Kafka`에 발행되어야 하는 메시지가 발생되지 않는다면?

 

메시지가 클라이언트에게 전달이 되지 않는 상황이 발생할 수 있다.

 

즉, 더 큰 문제가 생길 수도 있다.

 

그렇기에 우리는 `Kafka`와 같이 외부 인프라를 활용해 데이터를 전달하는 과정에서 `At-Least-Once-Delivery`를 보장해야 한다.

 

 

따라서, 다음 포스팅에서는 해당 부분의 해결을 위한 `Transactional OutBox Pattern`을 적용한 과정에 관해 설명하려 한다.

 

 

코드를 확인하고 싶다면?

https://github.com/HanYoonSoo/STOMP-Study

 

GitHub - HanYoonSoo/STOMP-Study

Contribute to HanYoonSoo/STOMP-Study development by creating an account on GitHub.

github.com