2024.09.19 - [Infra/DevOps] - 분산 시스템에서 데이터를 전달하는 효율적인 방법 - 1
이전 글과 이어지는 내용이다.
우리는 앞서 RDB를 사용하는 애플리케이션에서 전달 방법을 알아보았다.
이번 포스팅에서는 RabbitMQ를 사용한 전달 방법과 Kafka를 사용한 전달 방법에 관해 알아본다.
RabbitMQ를 사용한 전달 방법
RabbitMQ
- AMQP(Advanced Message Queuing Protocol)을 구현한 메시지 브로커
- Publish/Subscribe 방식 지원
- ACK(Acknowledgement) 메시지 응답처리 매커니즘
- Producer Confirm(메시지를 잘 받음)
- Consumer ACK(메시지를 잘 받고 잘 처리했음)
RabbitMQ 메시지 전달 과정
Queue는 여러개 일 수 있다. 라우팅 키 설정에 의해 잘 분배가 될 것이다.
그런데 Routing이 실패하는 경우가 있다.
RabbitMQ를 확장할 때 Cluster로 구성을 할 수가 있는데 라우팅이 실패할 수 있다.(네트워크는 신뢰할 수 없기 때문에)
(이 점도 유의해야 한다)
Producer Confirm 과정은 아래와 같다.
Producer Confirm
- `org.springframework.amqp.rabbit.connection.CorrelationData.java`
- Spring에서 제공하는 클래스이다.
- Producer Confirm을 확인할 수 있는 기본 클래스
- RabbitTemplate과 사용(Spring에서 제공하는 클래스이므로)
- RabbitTemplate을 활용하여 발행할 때 이 CorrelationData를 넘길 수가 있다.
- 그리고 잘 처리됐는지 안됐는지를 발행시점에 만든 CorrelationData로 확인할 수 있다.
CorrelationData를 사용한 코드
`rabbitTemplate`에 `send()`라는 메소드가 존재한다.
잘 보면 인자로 `CorrelationData`를 받는 메소드들이 쭉 오버로딩 되어있다.
그때, 메시지를 발행하는 시점에 CorrelationData 객체를 생성해서 UUID 및 UUID와 대응하는 값들을 넣어주면
나중에 이것을 확인할 수 있다.
그러면 어떻게 콜백을 받을까?
RabbitTemplate의 ConfirmCallback을 사용한 코드
public void setConfirmCallback(ConfirmCallback confirmCallback) {
this.confirmCallback = confirmCallback;
}
@FunctionalInterface
public interface ConfirmCallback {
void confirm(CorrelationData correlationData, boolean ack, String cause);
}
RabbitTemplate의 `setConfirmCallback`이라는 메서드에 `ConfirmCallback` 구현체를 넣어주면 된다.
이때, `correlationData`, `ack`, `cause` 인자를 활용해 Publisher Confirm을 확인할 수 있다.
- correlationData: 메시지를 보낼 때 생성한 객체
- ack: boolean으로 성공/실패 확인이 가능
- cause: 만약, 실패했을 때 실패원인 확인
Callback이다보니 즉각적으로 대응하는 것은 어렵다. 하지만, 적어도 어떤 메시지가 실패했는지는 알 수 있다.
그러면 이렇게만 코딩하면 될까?
SpringBoot의 경우 `#1`처럼 `application.properties`에 등록을 해줘야 한다.
아래는 바닐라 스프링(?)의 경우
Consumer Ack
- com.rabbitmq.client.Channel.java
- 채널이라는 객체는 MessageQueue와 애플리케이션의 커넥션을 한번 추상화한 클래스이다.
- RabbitMQ와 애플리케이션 사이의 가상의 커넥션
- Queue에서 데이터를 발행(Publishing), 소비(Consuming)하는 기능을 제공
- ACK, NACK
아래는 코드이다.
Listener를 구현할 때 일단 위와 같은 방법으로 구현할 것이다.
하지만, 보통 인자 중 Message만을 사용하고 Channel은 사용하지 않을 것이다.
그런데 Channel이라는 인자를 선언을 해놓으면 SpringFramework가 타입을 보고 적절한 객체를 메시지와 Channel에 주입을 해준다.
Channel을 받지 않더라도 내부 비즈니스 로직에서 예외가 발생하면 NACK를 보내준다.
하지만, `basicAck`와 `basicNack`라는 메소드를 사용하면 수동으로 보내줄 수 있다.
그러나, 이렇게 완벽하게 ACK, NACK 통신이 이뤄진다고 해도 우리의 코드는 완벽하지 않고 계속 버그가 생긴다.
Consumer에서는 계속 `NACK`를 보내고 Producer에는 계속 데이터를 보내 `Queue`에 쌓인다면?
문제가 될 것이다.
그래서 아래와 같은 방식도 존재한다고 한다.
`Dead Letter`라는 것을 활용하는 방법이 있다고 한다.
Queue에서 정상적으로 처리하지 못한 Message를 `Dead Letter Exchange`라는 곳으로 옮기고
`Dead Letter Exchange`는 다시 라우팅 키를 활용해서 `Dead Letter Queue`라는 곳으로 보내는 것이다.
RabbitMQ에서 Dead Letter 처리하는 경우는 크게 3가지가 존재한다.
- 리퀘스트가 False이며, basicAck/basicNack가 메시지를 처리하는 경우
- 큐에 메시지가 오래된 경우
- 큐에 메시지가 가득찬 경우
이 경우 우리는 관련된 설정을 할 수 있다.
Retry 후 DeadLetter로 이동
최대 시도 횟수는 3이며,
시도할 때마다 간격은 1초이며, 간격의 배수는 2이고, 최대 2초로 설정한다.
이때, `RejectAndDontRequeueRecoverer()`를 사용해야 한다.
즉, 원래 있던 큐에 리큐를 하면 안되고 Dead Letter로 빠져야 하기 때문에 위 설정을 넣어줘야 한다.
DeadLetter Queue Listener 코드
DeadLetter 빠진 메시지를 처리하는 코드는 위와 같다.
일반적으로 사용하는 `@RabbitListener`를 하나 달아주면 된다.
그리고 실패한 것에 관해서는 `onDeadLetterMessage`로 받으면 된다.
이때, Alert 처리나 fallback처리를 해주면 된다.
로그로 남겨두면 수동으로라도 보낼 수 있다.
Kafka를 사용한 전달 방법
Producer Confirm
여기서 `kafkaTemplate`이라는 객체의 `send()`라는 메소드를 사용하면 Kafka에 메시지를 발행할 수 있다.
이때, 응답값은 `ListenableFuture`이다. 그럼 이 `ListenableFuture`는 뭘 받을까?
위의 코드처럼 future에 Callback을 넣어줄 수가 있다.
2개의 콜백
첫번째는 Success Callback
두번째는 Failure Callback
Consumer ACK - AcknowledgingMessageListener
이때, 우리는 두번째 인자인 Acknowledgement를 활용하여 Consumer ACK를 구현할 수 있다.
나름대로 작성한 구현 클래스
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.support.Acknowledgment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyKafkaMessageListener implements AcknowledgingMessageListener<String, String> {
private static final Logger logger = LoggerFactory.getLogger(MyKafkaMessageListener.class);
@Override
@KafkaListener(
topics = "your-topic-name",
containerFactory = "kafkaListenerContainerFactory"
)
public void onMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment acknowledgment) {
try {
// 메시지 처리 로직
String key = consumerRecord.key();
String value = consumerRecord.value();
log.info("Processing message with key: {}, value: {}", key, value);
// 메시지 정상 처리 시 ACK
acknowledgment.acknowledge();
log.info("Message acknowledged successfully.");
} catch (Exception e) {
log.error("Error while processing message: {}", consumerRecord, e);
// 실패 시 NACK 처리
// 메시지를 다시 처리하게 하고, 5초 후에 재시도
acknowledgment.nack(5000L);
log.info("Message NACKed, will be retried after 5 seconds.");
}
}
}
위처럼 `acknowledgment.acknowledge()`를 활용하면 ACK/NACK를 날리는 것이 가능하다.
하지만, 아래처럼 ConsumerFactory 설정을 해줘야 한다.
`ENABLE_AUTO_COMMIT_CONFIG`라는 설정값에 `false`를 줘야한다.
AUTO COMMIT을 사용하지 않고 메뉴얼로 ACK나 NACK를 날리기 위해서이다.
마무리
우리는 메시지 전달 방법론에 관해 알아보았고,
일반적으로 SpringBoot에서 세팅된 값을 가지고는 Producer Confirm과 Consumer Ack을 사용할 수 없다.
그렇기 때문에 이제껏 설명한 방법을 통해 At Least Once를 구현하여 코드를 좀 더 안정성 있게 만들 수 있다.
하지만, 성능상 문제가 있거나 혹은 처리해야 하는 방법이 정해져 있어야 하기 때문에
어떻게 처리해야 할지 고민하여 Prodcuer Confirm이라던가 Consumer ACK를 고려해서 처리해야 한다.
분산 시스템에서 데이터를 전달하는 효율적인 방법
- Event driven Architecture의 기본은 데이터 전달
- 최소 At Least Once(최소 한번) 설정
- Producer Confirm, Consumer Ack 고려
'Infra > DevOps' 카테고리의 다른 글
분산 시스템에서 데이터를 전달하는 효율적인 방법 - 1 (0) | 2024.09.19 |
---|---|
[Ansible] Ansible 설치 가이드 (0) | 2024.07.29 |
[Docker] Docker 설치 가이드 (0) | 2024.07.29 |
[Jenkins] Jenkins 설치 가이드 (0) | 2024.07.29 |
[DevOps] Configuration Drift (0) | 2024.07.21 |