2024.11.10 - [Spring/WebSocket] - [Spring WebSocket] 채팅 서비스 프로젝트에 Kafka 적용
우리는 위 포스트를 통해 `WebSocket`을 사용하는 서비스에서 `Kafka` 즉,
`Message Broker`를 사용하는 이유에 관해 알아보았다.
그렇다면 실제로 어떤식으로 코드를 작성해야 할까?
코드를 설명하기에 앞서 간단하게 요약하자면
(`Transactional OutBox Pattern`이 적용되어 있기에 참고해서 보면 좋을 것 같다.)
- 클라이언트가 채팅 서비스에 채팅 전송, 수정, 삭제
- "채팅 서비스에서 바로 클라이언트로 브로드캐스트 하는 것이 아닌" `Producer`로 `Kafka` 토픽으로 전송
- 채팅 서비스의 `Consumer`가 해당 메시지를 전달 받음
- `SEND`, `UPDATE`, `DELETE` 작업에 맞춰 데이터를 받은 뒤 이제 클라이언트에 브로드캐스트
여기서 포인트는 채팅 서비스에서 바로 클라이언트로 브로드캐스트하는 것이 아니라
중간에 메시지 브로커를 통해서 클라이언트에 브로드캐스트 한다는 것이다.
자 이제 Kafka의 설정부터 코드와 함께 알아보자.
관련 코드
build.gradle
우선 Spring은 Kafka를 활용하기 쉽게 도와주는 `Dependency`를 제공한다.
/* Kafka */
implementation 'org.springframework.kafka:spring-kafka'
testImplementation 'org.springframework.kafka:spring-kafka-test'
우선, 나는 `Kafka`를 활용하기 위해 `Docker`를 사용했다.
docker-compose.yml
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
ports:
- "9092:9092"
- "9093:9093"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: INSIDE://localhost:9092,OUTSIDE://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:9093
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
depends_on:
- zookeeper
volumes:
- /var/run/docker.sock:/var/run/docker.sock
application.yml
spring:
...
kafka:
bootstrap-servers: localhost:9093
topic:
normal-chat: "normalChat"
emoji-chat: "emojiChat"
consumer:
group-id:
normal-chat: "normalChatGroup"
emoji-chat: "emojiChatGroup"
KafkaTopicConfig
@Configuration
public class KafkaTopicConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value("${spring.kafka.topic.normal-chat}")
private String normalChatTopic;
@Value("${spring.kafka.topic.emoji-chat}")
private String emojiChatTopic;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configurations = new HashMap<>();
configurations.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configurations);
}
@Bean
public NewTopic normalChatTopic(){
return TopicBuilder.name(normalChatTopic)
.partitions(1)
.replicas(1)
.build();
}
@Bean
public NewTopic emojiChatTopic(){
return TopicBuilder.name(normalChatTopic)
.partitions(1)
.replicas(1)
.build();
}
}
ChatProducerConfig
`Producer` 역할을 수행할 `Config`이다.
`@EnableKafka`를 꼭 붙여주도록 하자. `Kafka` 환경설정과 관련해서 필요한 역할을 한다.
@EnableKafka
@Configuration
public class ChatProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;;
@Bean
public Map<String, Object> chatEventProducerConfig() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return config;
}
@Bean
public ProducerFactory<String, NormalChatEvent> normalChatEventProducerFactory(){
return new DefaultKafkaProducerFactory<>(chatEventProducerConfig());
}
@Bean
public KafkaTemplate<String, NormalChatEvent> normalChatKafkaTemplate(){
return new KafkaTemplate<>(normalChatEventProducerFactory());
}
}
NormalChatConsumerConfig
`Consumer` 역할을 수행할 `Config`이다.
`@EnableKafka`를 꼭 붙여주도록 하자. `Kafka` 환경설정과 관련해서 필요한 역할을 한다.
@EnableKafka
@Configuration
public class NormalChatConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value("${spring.kafka.consumer.group-id.normal-chat}")
private String groupId;
@Bean
public Map<String, Object> normalChatConsumerConfiguration() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
return config;
}
@Bean
public ConsumerFactory<String, NormalChatEvent> normalChatEventConsumerFactory(){
return new DefaultKafkaConsumerFactory<>(
normalChatConsumerConfiguration(),
new StringDeserializer(),
new JsonDeserializer<>(NormalChatEvent.class)
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, NormalChatEvent> normalChatListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, NormalChatEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(normalChatEventConsumerFactory());
return factory;
}
}
ChatEventProducer
`Producer`로 메시지 브로커로 메시지를 전달하는 역할을 한다.
@Slf4j
@Component
@RequiredArgsConstructor
public class ChatEventProducer {
@Value("${spring.kafka.topic.normal-chat}")
private String normalChatTopic;
private final KafkaTemplate<String, NormalChatEvent> normalChatKafkaTemplate;
public void sendToNormalChatTopic(NormalChatEvent message){
normalChatKafkaTemplate.send(normalChatTopic, message);
}
}
NormalChatConsumer
`Consumer`로 메시지 브로커에서 메시지를 받아온다.
@Slf4j
@Component
@RequiredArgsConstructor
public class NormalChatConsumer {
private final SimpMessageSendingOperations messagingTemplate;
@KafkaListener(topics = "${spring.kafka.topic.normal-chat}", groupId = "${spring.kafka.consumer.group-id.normal-chat}", containerFactory = "normalChatListenerContainerFactory")
public void normalChatListener(NormalChatEvent chatEvent) {
Long normalId = chatEvent.getNormalId();
switch (chatEvent.getActionType()){
case SEND -> {
NormalMessageCreateResponse response = NormalMessageCreateResponse.from(chatEvent);
messagingTemplate.convertAndSend("/topic/normal/" + normalId, DataResponseDto.from(response));
}
case MODIFY -> {
NormalMessageModifyResponse response = NormalMessageModifyResponse.from(chatEvent);
messagingTemplate.convertAndSend("/topic/normal/" + normalId, DataResponseDto.from(response));
}
case DELETE -> {
NormalMessageDeleteResponse response = NormalMessageDeleteResponse.from(chatEvent);
messagingTemplate.convertAndSend("/topic/normal/" + normalId, DataResponseDto.from(response));
}
}
}
}
자 여기까지 기본적인 `Kafka` 설정과 `Producer`, `Consumer` 작업이 끝났다.
그럼 실제 작동 코드는 어떻게 될까?
NormalMessageCommandController
@Slf4j
@RestController
@RequiredArgsConstructor
public class NormalMessageCommandController {
private final NormalMessageCommandService commandService;
@MessageMapping("/normal/message/send")
public void save(NormalMessageCreateRequest request){
commandService.save(request);
}
@MessageMapping("/normal/message/modify")
public void modify(NormalMessageModifyRequest request){
commandService.modify(request);
}
@MessageMapping("/normal/message/delete")
public void delete(NormalMessageDeleteRequest request){
commandService.delete(request);
}
}
`@MessageMapping`을 보면 알 수 있겠지만 해당 경로로 클라이언트는 채팅을 전송/수정/삭제를 한다.
그 후, `CommandService`로 해당 DTO가 전달이 된다.
NormalMessageCommandServiceImpl
@Slf4j
@Service
@RequiredArgsConstructor
public class NormalMessageCommandServiceImpl implements NormalMessageCommandService {
private final SequenceGenerator sequenceGenerator;
private final NormalMessageRepository messageRepository;
private final NormalRepository normalRepository;
@Override
@Transactional
public void save(NormalMessageCreateRequest request) {
validateUserInNormal(request.getNormalId(), request.getUserId());
NormalMessage normalMessage = NormalMessage.from(request);
normalMessage.generateSequence(sequenceGenerator.generateSequence(NormalMessage.SEQUENCE_NAME));
NormalChatCreateEvent chatCreateEvent =
NormalChatCreateEvent.from(messageRepository.save(normalMessage), UUIDUtil.generateUUID());
Events.send(chatCreateEvent);
}
@Override
@Transactional
public void modify(NormalMessageModifyRequest request) {
NormalMessage normalMessage = messageRepository.findById(request.getMessageId())
.orElseThrow(() -> new NormalChatException(Code.NOT_FOUND, "Message Not Found"));
normalMessage.modify(request.getContent());
// System.out.println(request.getContent());
NormalChatModifyEvent chatModifyEvent =
NormalChatModifyEvent.from(messageRepository.save(normalMessage), UUIDUtil.generateUUID());
Events.send(chatModifyEvent);
}
@Override
@Transactional
public void delete(NormalMessageDeleteRequest request) {
NormalMessage normalMessage = messageRepository.findById(request.getMessageId())
.orElseThrow(() -> new NormalChatException(Code.NOT_FOUND, "Message Not Found"));
normalMessage.delete();
NormalChatDeleteEvent chatDeleteEvent =
NormalChatDeleteEvent.from(messageRepository.save(normalMessage), UUIDUtil.generateUUID());
Events.send(chatDeleteEvent);
}
private void validateUserInNormal(Long normalId, Long userId){
normalRepository.findByNormalIdAndInUserId(normalId, List.of(userId))
.orElseThrow(() -> new NormalChatException(Code.NOT_FOUND, "User Not In Normal Chat"));
}
}
지금 현재는 `Transactional OutBox Pattern`이 적용되어 있어서 확인하기 힘들지만,
그래도 코드를 살펴보면 알 수 있듯이 생성/수정/삭제 이후 `Event`를 생성하여 `send()`한다.
즉, `Producer` -> `Broker` -> `Consumer`로 전달이 되고
그렇게 전송된 메시지는 아래 `Consumer` 코드에 의해
@Slf4j
@Component
@RequiredArgsConstructor
public class NormalChatConsumer {
private final SimpMessageSendingOperations messagingTemplate;
@KafkaListener(topics = "${spring.kafka.topic.normal-chat}", groupId = "${spring.kafka.consumer.group-id.normal-chat}", containerFactory = "normalChatListenerContainerFactory")
public void normalChatListener(NormalChatEvent chatEvent) {
Long normalId = chatEvent.getNormalId();
switch (chatEvent.getActionType()){
case SEND -> {
NormalMessageCreateResponse response = NormalMessageCreateResponse.from(chatEvent);
messagingTemplate.convertAndSend("/topic/normal/" + normalId, DataResponseDto.from(response));
}
case MODIFY -> {
NormalMessageModifyResponse response = NormalMessageModifyResponse.from(chatEvent);
messagingTemplate.convertAndSend("/topic/normal/" + normalId, DataResponseDto.from(response));
}
case DELETE -> {
NormalMessageDeleteResponse response = NormalMessageDeleteResponse.from(chatEvent);
messagingTemplate.convertAndSend("/topic/normal/" + normalId, DataResponseDto.from(response));
}
}
}
}
특정 `normalId`를 구독중인 클라이언트들에게 `BroadCast`가 된다.
여기서 `convertAndSend()` 메소드는 해당 경로를 구독중인 사람들에게
두 번째 인자값에 있는(위에서는 `DataResponseDto.from(...)`)) 데이터를 전달하는 것이다.
작동 영상
실제로 `Producer`와 `Consumer`를 통해 잘 동작하는 것을 확인할 수 있다.
정리
우리는 `Kafka`라는 `Message Broker`를 사용하므로, 채팅 서비스가 갖는 `stateful` 특성에 의해 겪을 수 있는
`Scale-Out`의 어려움에 성공적으로 대처를 했다.
하지만, 위와 같이 클라이언트에게 바로 반환하는 것이 아닌 `Message Broker`를 구성하므로 발생하는 추가적인 문제가 있다.
만약 모종의 문제가 생겨 `Kafka`에 발행되어야 하는 메시지가 발생되지 않는다면?
메시지가 클라이언트에게 전달이 되지 않는 상황이 발생할 수 있다.
즉, 더 큰 문제가 생길 수도 있다.
그렇기에 우리는 `Kafka`와 같이 외부 인프라를 활용해 데이터를 전달하는 과정에서 `At-Least-Once-Delivery`를 보장해야 한다.
따라서, 다음 포스팅에서는 해당 부분의 해결을 위한 `Transactional OutBox Pattern`을 적용한 과정에 관해 설명하려 한다.
코드를 확인하고 싶다면?
https://github.com/HanYoonSoo/STOMP-Study
'Spring > WebSocket' 카테고리의 다른 글
[Spring WebSocket] 채팅 서비스에 Transactional Outbox Pattern 도입 (2) | 2024.11.11 |
---|---|
[Spring WebSocket] STOMP에서의 예외처리 (0) | 2024.11.11 |
[Spring WebSocket] Spring Security + STOMP (1) | 2024.11.11 |
[Spring WebSocket] MongoDB Collection 설계 With Auto-Incremented Sequence (9) | 2024.11.10 |
[Spring WebSocket] 채팅 서비스 프로젝트에 Kafka 적용 (2) | 2024.11.10 |