본문 바로가기
프로젝트/FitTrip

[커뮤니티 서비스] 커뮤니티 서비스와 다른 서비스의 통신

by 진꿈청 2024. 8. 7.

커뮤니티 서비스

커뮤니티 서비스는 FitTrip 프로젝트의 유저가 상시 접속한다고 해도 과언이 아니다.

 

유저가 채팅을 치기 위해선 서버, DM에 접속해야 하며,

유저가 음성/화상 통화를 하기 위해서도 서버, DM에 접속해야 한다.

 

처음 FitTrip 서비스를 방문해도 유저에게 서버, DM 목록을 전송해주며,

유저는 다양한 채널을 생성하고 포럼 채널포럼을 통한 챌린지를 진행한다.

 

 

즉, 커뮤니티 서비스에서 일어난 변경은 항상 모든 유저에게 실시간으로 업데이트 되어야 한다.

그렇기에 관련 작업을 위해 커뮤니티 서비스는 `Kafka`로 처리한다.

 

 

커뮤니티 서비스의 카프카 이벤트 처리

앞서, 설명한 것처럼 커뮤니티 서비스는 카프카로 다양한 이벤트를 전송한다.

 

  • 서버
    • UPDATE, DELETE
  • DM
    • UPDATE
  • 채널
    • CREATE, READ, UPDATE, DELETE
  • 카테고리
    • CREATE, UDPATE, DELETE
  • 포럼
    • CREATE, UPDATE, DELETE

위의 사진중에서 `채널 READ` 부분만 `상태관리 서비스`로 전달된다고 생각하면 된다.

(이유에 관해선 아래에서 설명한다.)

 

즉, `채널 READ` 부분을 제외하고는 전부 채팅 서비스로 보낸다는 의미가 되는데,

그 이유는 채팅 서비스가 `웹소켓`으로 프론트와 연결이 되어있기 때문에

`브로드캐스트`가 가능하여 실시간으로 사용자에게 변경사항을 업데이트할 수 있기 때문이다.

 

 

그럼 커뮤니티 서비스에서는 어떻게 `Kafka`로 이벤트를 전달할까? 

 

 

Kafka 관련 설정

 

application.yml

 

application.yml에 사용할 토픽을 지정한다.

  kafka:
    bootstrap-servers: localhost:9092
    topic:
      community-server-event: "communityServerEventTopic"
      community-dm-event: "communityDmEventTopic"
      community-channel-event: "communityChannelEventTopic"
      community-category-event: "communityCategoryEventTopic"
      community-forum-event: "communityForumEventTopic"
      user-location-event: "userLocationEvent"

 

 

KafkaTopicConfig

 

카프카 토픽을 생성하기 위한 설정 클래스라고 생각하면 된다.

@Configuration
public class KafkaTopicConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Value("${spring.kafka.topic.community-server-event}")
    private String communityServerEventTopic;

    @Value("${spring.kafka.topic.community-dm-event}")
    private String communityDmEventTopic;

    @Value("${spring.kafka.topic.community-channel-event}")
    private String communityChannelEventTopic;

    @Value("${spring.kafka.topic.community-category-event}")
    private String communityCategoryEventTopic;

    @Value("${spring.kafka.topic.community-forum-event}")
    private String communityForumEventTopic;

    @Value("${spring.kafka.topic.user-location-event}")
    private String userLocationEventTopic;
    @Bean
    public KafkaAdmin kafkaAdmin(){
        Map<String, Object> configurations = new HashMap<>();
        configurations.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configurations);
    }

    @Bean
    public NewTopic communityServerEventTopic() {
        return TopicBuilder.name(communityServerEventTopic)
                .partitions(1)
                .replicas(1)
                .build();
    }

    @Bean
    public NewTopic communityDmEventTopic() {
        return TopicBuilder.name(communityDmEventTopic)
                .partitions(1)
                .replicas(1)
                .build();
    }

    @Bean
    public NewTopic communityChannelEventTopic() {
        return TopicBuilder.name(communityChannelEventTopic)
                .partitions(1)
                .replicas(1)
                .build();
    }

    @Bean
    public NewTopic communityCategoryEventTopic() {
        return TopicBuilder.name(communityCategoryEventTopic)
                .partitions(1)
                .replicas(1)
                .build();
    }

    @Bean
    public NewTopic communityForumEventTopic() {
        return TopicBuilder.name(communityForumEventTopic)
                .partitions(1)
                .replicas(1)
                .build();
    }

    @Bean
    public NewTopic userLocationEventTopic() {
        return TopicBuilder.name(userLocationEventTopic)
                .partitions(1)
                .replicas(1)
                .build();
    }
}

 

`KafkaAdmin`을 생성하고 해당 `KafkaAdmin`을 사용해 `NewTopic`(토픽)을 생성한다.

이때, 파티션, 레플리카의 개수등 여러 설정이 가능하다.

 

KafkaProducerConfig

package capstone.communityservice.global.config;

import capstone.communityservice.global.common.dto.kafka.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;

@EnableKafka
@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfig() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                JsonSerializer.class);

        return config;
    }

    @Bean
    public ProducerFactory<String, CommunityServerEventDto> communityServerEventProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, CommunityServerEventDto> communityServerEventKafkaTemplate() {
        return new KafkaTemplate<>(communityServerEventProducerFactory());
    }

    @Bean
    public ProducerFactory<String, CommunityDmEventDto> communityDmEventProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, CommunityDmEventDto> communityDmEventKafkaTemplate() {
        return new KafkaTemplate<>(communityDmEventProducerFactory());
    }

    @Bean
    public ProducerFactory<String, CommunityChannelEventDto> communityChannelEventProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, CommunityChannelEventDto> communityChannelEventKafkaTemplate() {
        return new KafkaTemplate<>(communityChannelEventProducerFactory());
    }

    @Bean
    public ProducerFactory<String, CommunityCategoryEventDto> communityCategoryEventProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, CommunityCategoryEventDto> communityCategoryEventKafkaTemplate() {
        return new KafkaTemplate<>(communityCategoryEventProducerFactory());
    }

    @Bean
    public ProducerFactory<String, CommunityForumEventDto> communityForumEventProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, CommunityForumEventDto> communityForumEventKafkaTemplate() {
        return new KafkaTemplate<>(communityForumEventProducerFactory());
    }

    @Bean
    public ProducerFactory<String, UserLocationEventDto> userLocationEventProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, UserLocationEventDto> userLocationEventKafkaTemplate() {
        return new KafkaTemplate<>(userLocationEventProducerFactory());
    }
}

 

`ProducerFactory`를 사용하여 카프카 프로듀서를 생성하고 해당 프로듀서를 인자로 받는

`KafkaTemplate`을 생성한다. 우리는 이 `KafkaTemplate`을 활용하여 이벤트를 전달할 수 있다.

 

 

KafkaEventPublisher

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaEventPublisher {

    @Value("${spring.kafka.topic.community-server-event}")
    private String communityServerEventTopic;

    @Value("${spring.kafka.topic.community-dm-event}")
    private String communityDmEventTopic;

    @Value("${spring.kafka.topic.community-channel-event}")
    private String communityChannelEventTopic;

    @Value("${spring.kafka.topic.community-category-event}")
    private String communityCategoryEventTopic;

    @Value("${spring.kafka.topic.community-forum-event}")
    private String communityForumEventTopic;

    @Value("${spring.kafka.topic.user-location-event}")
    private String userLocationEventTopic;

    private final KafkaTemplate<String, CommunityServerEventDto> communityServerEventKafkaTemplate;
    private final KafkaTemplate<String, CommunityDmEventDto> communityDmEventKafkaTemplate;
    private final KafkaTemplate<String, CommunityChannelEventDto> communityChannelEventKafkaTemplate;
    private final KafkaTemplate<String, CommunityCategoryEventDto> communityCategoryEventKafkaTemplate;
    private final KafkaTemplate<String, CommunityForumEventDto> communityForumEventKafkaTemplate;
    private final KafkaTemplate<String, UserLocationEventDto> userLocationEventKafkaTemplate;

    public void sendToServerEventTopic(CommunityServerEventDto serverEventDto) {
        communityServerEventKafkaTemplate.send(communityServerEventTopic, serverEventDto);
    }

    public void sendToDmEventTopic(CommunityDmEventDto dmEventDto) {
        communityDmEventKafkaTemplate.send(communityDmEventTopic, dmEventDto);
    }

    public void sendToChannelEventTopic(CommunityChannelEventDto channelEventDto) {
        communityChannelEventKafkaTemplate.send(communityChannelEventTopic, channelEventDto);
    }

    public void sendToCategoryEventTopic(CommunityCategoryEventDto categoryEventDto) {
        communityCategoryEventKafkaTemplate.send(communityCategoryEventTopic, categoryEventDto);
    }

    public void sendToForumEventTopic(CommunityForumEventDto forumEventDto) {
        communityForumEventKafkaTemplate.send(communityForumEventTopic, forumEventDto);
    }

    public void sendToUserLocEventTopic(UserLocationEventDto userLocationEventDto){
        userLocationEventKafkaTemplate.send(userLocationEventTopic, userLocationEventDto);
    }
}

 

모든 카프카 이벤트 발생시 서비스마다 사용되는 `KafkaTemplate`을 찾아서 사용하는 것은 번거롭다.

따라서, 카프카 이벤트를 공통으로 처리해주는 클래스를 하나 생성하였다.

 

 

실제 사용 예시

 

@Slf4j
@Service
@Transactional
@RequiredArgsConstructor
public class ChannelCommandService {
    private final ServerQueryService serverQueryService;
    private final FileUploadService fileUploadService;

    private final ChannelRepository channelRepository;
    private final ServerRepository serverRepository;
    private final CategoryRepository categoryRepository;
    private final ForumRepository forumRepository;
    private final FileRepository fileRepository;

    private final KafkaEventPublisher kafkaEventPublisher;
	
    ...
    
    public ChannelResponse create(ChannelCreateRequest request) {
        Server findServer = validateManagerInChannel(request.getServerId(), request.getUserId());

        validateCategory(request.getCategoryId());

        Channel newChannel = channelRepository.save(createChannel(findServer, request));

        kafkaEventPublisher.sendToChannelEventTopic(
                CommunityChannelEventDto.of(
                        "channel-create",
                        newChannel,
                        findServer.getId()
                )
        );

        printKafkaLog("create");
        return ChannelResponse.of(newChannel);
    }
    
    ...
}

 

kafkaEventPublisher을 활용하여 이벤트를 전달한다. 이때, `EventDto`들의 부모 클래스를 두고 작성하는 방법도 있으나,

백엔드간의 회의에서 따로 사용하는 것으로 하였지만, 추후 리팩토링 과정에서 적용해봐도 좋을 것 같다.

 

채널 READ의 경우

 

 유저의 최근 채널 위치는 상태관리 서버가 담당을 한다. 굳이 채널 위치를 상태관리에서 저장하는 이유는

 

"커뮤니티 서비스는 채팅을 관리하지 않기 때문이다."

 

하지만, 커뮤니티 서비스는 유저 처음 접속시 접속한 채널의 초기 채팅 목록을 전달해준다.

즉, 특정 서비스에서 유저의 최근 채널 위치를 지속적으로 업데이트 해주어야한다.

 

 

관련 모식도

 

상태관리로 유저의 최근 채널 위치 업데이트 과정 

 

커뮤니티 서비스가 유저에게 채널 위치에 따른 데이터를 전달하는 과정

 

 

그러나, 위의 모식도로만 동작한다면 아래의 경우에 문제가 생긴다.

  • 서버를 처음 생성한 경우 유저의 최근 채널 위치를 알 수가 없다.
  • 서버에 처음 참가한 경우 유저의 최근 채널 위치를 알 수가 없다.

 

따라서, 나는 해당 경우 서버의 첫번째 채팅 채널상태관리 서버로 전달해 유저의 최근 채널 위치를 등록시켰다.

 

 

자, 이제 관련 코드를 쭉 살펴보자.

 

 

채널 READ 관련 작업

 

ServerQueryService

 

서버와 연관된 조회 작업을 처리하는 `Service`이다. `read` 메소드를 중점적으로 확인하면 된다.

@Slf4j
@Service
@Transactional(readOnly = true)
@RequiredArgsConstructor
public class ServerQueryService {

     private final StateServiceClient stateServiceClient;
    private final ChatServiceClient chatServiceClient;

    private final ServerRepository serverRepository;
    private final ServerUserRepository serverUserRepository;
    private final ChannelRepository channelRepository;
    private final CategoryRepository categoryRepository;

    public ServerReadResponse read(Long serverId, Long userId) {
        Server findServer = findServerWithServerUser(serverId);

        validateServerUser(serverId, userId);

        ServerUsersStateResponse usersState = getUsersState(serverId);

        // 유저 최근 채널 위치 불러오는 로직
        Page<ServerMessageDto> messages = getMessages(findServer.getId(), userId);

        return createServerReadResponseDto
                (
                    serverId,
                    findServer,
                    usersState,
                    messages
                );
    }

    private ServerUsersStateResponse getUsersState(Long serverId) {
        List<Long> userIds = serverUserRepository.findUserIdsByServerId(serverId);

        return stateServiceClient.getServerUsersState(serverId, userIds);
    }

    private Page<ServerMessageDto> getMessages(Long serverId, Long userId) {
        UserLocationDto userLocation = stateServiceClient.getUserLocation(serverId, userId);

        System.out.println("userLocation: " + userLocation.getChannelId());

        return validateChatChannel(userLocation.getChannelId()) ? chatServiceClient.getServerMessages(
                userLocation.getChannelId(),
                0,
                30
        ) : null;
    }

    public List<OpenServerQueryResponse> search() {
        return serverRepository.findTopOpenServer()
                .stream()
                .map(OpenServerQueryResponse::of)
                .collect(Collectors.toList());
    }

    public PageResponseDto searchCondition(String name, int pageNo) {
        int page = pageNo == 0 ? 0 : pageNo - 1;
        int pageLimit = 10;

        Pageable pageable = PageRequest.of(page, pageLimit);

        Page<Server> servers = serverRepository.findServerWithPaging(name, pageable);

        return PageResponseDto.of(servers, ServerWithCountResponse::of);
    }

    private Server findServerWithServerUser(Long serverId) {
        return serverRepository.findServerWithUserById(serverId)
                .orElseThrow(() ->
                        new ServerException(Code.NOT_FOUND, "Server Not Found")
                );
    }

    private ServerReadResponse createServerReadResponseDto(
            Long serverId,
            Server findServer,
            ServerUsersStateResponse usersState,
            Page<ServerMessageDto> messages
    ) {
        List<ChannelResponse> channels = channelRepository.findByServerId(serverId)
                .stream()
                .map(ChannelResponse::of)
                .toList();

        List<CategoryResponse> categories = categoryRepository.findByServerId(serverId)
                .stream()
                .map(CategoryResponse::of)
                .toList();

        List<ServerUserInfo> serverUserInfos = findServer
                .getServerUsers()
                .stream()
                .map(ServerUserInfo::of)
                .toList();

        return ServerReadResponse.of(
                ServerResponse.of(findServer),
                serverUserInfos,
                categories,
                channels,
                usersState,
                messages
        );
    }

    private boolean validateChatChannel(Long channelId) {
        Channel findChannel = channelRepository.findById(channelId)
                .orElseThrow(
                        () -> new ChannelException(
                                Code.NOT_FOUND, "Not Found Channel")
                );

        if(!findChannel.getChannelType()
                .equals(ChannelType.CHAT))
        {
            return false;
        }

        return true;
    }

    public void validateManager(Long managerId, Long userId){
        if(!managerId.equals(userId)){
            throw new ServerException(Code.UNAUTHORIZED, "Not Manager");
        }
    }

    public Server validateExistServer(Long serverId){
        return serverRepository.findById(serverId)
                .orElseThrow(() ->
                        new ServerException(Code.NOT_FOUND, "Server Not Found")
                );
    }

    private void validateServerUser(Long serverId, Long userId) {
        serverUserRepository.findByServerIdAndUserId(serverId, userId)
                .orElseThrow(() -> new ServerException(
                        Code.NOT_FOUND, "Not Found ServerUser")
                );
    }
}

 

getUsersState()

  • 유저의 온/오프라인 상태정보를 상태관리 서비스로부터 받아온다.
  • 이때, `OpenFeign`을 활용한다.

getMessages()

  • getUserLocation()
    • 상태관리 서비스로부터 유저의 최근 채널 위치를 가져온다.
    • 이때, 채팅 채널인지 확인하는 작업도 진행된다.
  • 유저의 최근 채널 위치를 토대로 채팅 서버로부터 최근 채팅 목록을 갖고와서 반환한다.
    • 마찬가지로, `OpenFeign`을 활용한다.

 

 

 

StateServiceClient

 

`OpenFeign`을 사용하여 상태관리 서비스와 통신하는 Client 인터페이스이다.

@FeignClient("state-service")
public interface StateServiceClient {

    @GetMapping("/feign/server/user/state")
    ServerUsersStateResponse getServerUsersState(
            @RequestParam Long serverId,
            @RequestParam List<Long> userIds
    );

    @GetMapping("/feign/user/connection/state")
    UserConnectionStateResponse getUsersConnectionState(@RequestParam List<Long> userIds);

    @GetMapping("/feign/{serverId}/{userId}")
    UserLocationDto getUserLocation(@PathVariable("serverId") Long serverId, @PathVariable("userId") Long userId);

}

 

 

ChatServiceClient

 

`OpenFeign`을 사용하여 채팅 서비스와 통신하는 Client 인터페이스이다.

@FeignClient("chat-service")
public interface ChatServiceClient {

    @GetMapping("/feign/server/messages/channel")
    Page<ServerMessageDto> getServerMessages(
            @RequestParam(value = "channelId") Long channelId,
            @RequestParam(defaultValue = "0") int page,
            @RequestParam(defaultValue = "30") int size
    );

    @GetMapping("/feign/direct/messages/room")
    Page<DmMessageDto> getDmMessages(
            @RequestParam(value = "roomId") Long roomId,
            @RequestParam(defaultValue = "0") int page,
            @RequestParam(defaultValue = "30") int size
    );

    @GetMapping("/feign/forum/messages/forum")
    Page<ForumMessageDto> getForumMessages(
            @RequestParam(value = "forumId") Long forumId,
            @RequestParam(defaultValue = "0") int page,
            @RequestParam(defaultValue = "30") int size
    );

    @GetMapping("/feign/forum/messages/count")
    ForumChannelResponseDto getForumsMessageCount(
            @RequestParam(value = "forumIds") List<Long> forumIds
    );

}

 

 

유저 서버 처음 생성/접속시

 

ServerCommandService

 

서버 생성/변경/삭제 작업을 하는 Service이다.

@Slf4j
@Service
@Transactional
@RequiredArgsConstructor
public class ServerCommandService {
    private static final String INVITE_LINK_PREFIX = "serverId=%d";

    private final FileUploadService fileUploadService;
    private final RedisService redisService;

    private final UserQueryService userQueryService;
    private final ServerUserCommandService serverUserCommandService;
    private final ServerQueryService serverQueryService;
    private final CategoryCommandService categoryCommandService;
    private final ChannelCommandService channelCommandService;

    private final ServerRepository serverRepository;
    private final ServerUserRepository serverUserRepository;
    private final ChannelRepository channelRepository;
    private final CategoryRepository categoryRepository;

    private final KafkaEventPublisher kafkaEventPublisher;

    public ServerResponse create(ServerCreateRequest request, MultipartFile file) {
         String profileUrl = file != null ? uploadProfile(file) : null; // <- S3 등록 후
        // String profileUrl = null;
        System.out.println(profileUrl);

        User user = userQueryService.findUserByOriginalId(request.getUserId());

        Server server = serverRepository.save(
                Server.of(
                        request.getName(),
                        profileUrl,
                        user.getId()
                )
        );

        serverUserCommandService.save(
                ServerUser.of(server, user)
        );

        categoryAndChannelInit(server, user.getId());

        /**
         * 서버 Read
         * 유저 상태 정보(온라인/오프라인) 상태관리 서버로부터 받아오는 로직 필요.
         * 첫 접속시 보여줄 채팅 메시지를 가져오기 위한 채팅 서비스 OpenFeign 작업 필요.
         */
        return ServerResponse.of(server);
    }

    public ServerResponse join(ServerJoinRequest request) {
        Server findServer = validateServerUser(request);
        validateServerJoin(findServer, request);

        User findUser = userQueryService.findUserByOriginalId(request.getUserId());

        verifyInvitationCode(findServer.getId(), request);

        serverUserCommandService.save(ServerUser.of(findServer, findUser));

        Channel defaultChannel = findServer.getChannels().get(0);

        channelCommandService.sendUserLocEvent(
                findUser.getId(),
                findServer.getId(),
                defaultChannel.getId()
        );

        return ServerResponse.of(findServer);
    }

    public ServerResponse update(ServerUpdateRequest request, MultipartFile file) {
        Server findServer = serverQueryService.validateExistServer(request.getServerId());

        validateManager(findServer.getManagerId(), request.getUserId());

        String profileUrl = determineProfileUrl(file, findServer, request.getProfile());

        findServer.setServer(
                request.getName(),
                profileUrl,
                request.isOpen(),
                request.getDescription()
        );

        kafkaEventPublisher.sendToServerEventTopic(
                CommunityServerEventDto.of("server-update", findServer)
        );

        printKafkaLog("update");

        return ServerResponse.of(findServer);
    }

    public void delete(ServerDeleteRequest request) {
        Server findServer = serverQueryService.validateExistServer(request.getServerId());

        validateManager(findServer.getManagerId(), request.getUserId());

        validateServerProfileDelete(findServer);

        kafkaEventPublisher.sendToServerEventTopic(
                CommunityServerEventDto.of("server-delete", findServer)
        );

        serverDeleteBatch(findServer);

        printKafkaLog("delete");
        serverRepository.delete(findServer);
    }

    public ServerResponse deleteProfile(ServerProfileDeleteRequest request) {
        Server findServer = serverQueryService.validateExistServer(
                request.getServerId()
        );

        validateManager(findServer.getManagerId(), request.getUserId());

        if(findServer.getProfile() != null){
            fileUploadService.delete(findServer.getProfile());
            findServer.setProfile(null);
        }

        kafkaEventPublisher.sendToServerEventTopic(
                CommunityServerEventDto.of("server-delete", findServer)
        );

        printKafkaLog("update");

        return ServerResponse.of(findServer);
    }

    public ServerInviteCodeResponse generatedServerInviteCode(Long serverId) {
        validateExistServer(serverId);

        String key = INVITE_LINK_PREFIX.formatted(serverId);

        String value = redisService.getValues(key);

        if(value.equals("false")){
            final String randomCode = RandomUtil.generateRandomCode();
            redisService.setValues(key, randomCode, RedisService.toTomorrow());
            return ServerInviteCodeResponse.of(randomCode);
        }

        return ServerInviteCodeResponse.of(value);
    }

    /**
     * Server내 자체 Category Repository 사용할지 고민
     */
    private void categoryAndChannelInit(Server server, Long userId){
        CategoryResponse initChatCategory
                = categoryCommandService.save(Category.of(server, "채팅 채널"));
        CategoryResponse initVoiceCategory
                = categoryCommandService.save(Category.of(server, "음성 채널"));

        Channel newChannel = channelRepository.save(
                Channel.of(
                        server,
                        initChatCategory.getCategoryId(),
                        ChannelType.CHAT,
                        "일반")
        );


        channelRepository.save(
                Channel.of(
                        server,
                        initVoiceCategory.getCategoryId(),
                        ChannelType.VOICE,
                        "일반"
                )
        );

        channelCommandService.sendUserLocEvent(
                userId,
                server.getId(),
                newChannel.getId()
        );
    }

    private void validateServerJoin(Server server, ServerJoinRequest request) {
        boolean isClosedWithoutCode = !server.isOpen() && request.getInvitationCode() == null;
        if (isClosedWithoutCode) {
            throw new ServerException(Code.VALIDATION_ERROR, "Not open server. Require invitationCode");
        }
    }

    private void verifyInvitationCode(Long serverId, ServerJoinRequest request) {
        if (request.getInvitationCode() != null) {
            String storedInvitationCode = redisService.getValues(INVITE_LINK_PREFIX.formatted(serverId));
            validateMatchInvitationCode(storedInvitationCode, request.getInvitationCode());
        }
    }

    private Server validateServerUser(ServerJoinRequest request) {
        Optional<Server> findServer = serverUserRepository.validateServerUser(
                request.getServerId(),
                request.getUserId()
        );

        if(findServer.isPresent()){
            throw new ServerException(Code.VALIDATION_ERROR, "Already Exist User");
        } else{
            return serverQueryService.validateExistServer(request.getServerId());
        }
    }

    private void validateMatchInvitationCode(String value, String invitationCode) {
        if(!value.equals(invitationCode)){
            throw new ServerException(Code.VALIDATION_ERROR, "Not Match InvitationCode");
        }
    }

    private void validateExistServer(Long serverId){
        serverRepository.findById(serverId)
                .orElseThrow(() ->
                        new ServerException(Code.NOT_FOUND, "Server Not Found")
                );
    }

    private String determineProfileUrl(MultipartFile file, Server server, String serverProfile) {
        if (file != null) {
            return serverProfile.equals("null") ? uploadProfile(file) : updateProfile(file, serverProfile, server);
        }
        return serverProfile;
    }

    private String uploadProfile(MultipartFile file) {
         return fileUploadService.save(file); // <- S3 등록 후
//        return "http://image.png"; // 예시 URL
    }

    private String updateProfile(MultipartFile file, String serverProfile, Server server) {
        if (validateProfileWithFile(server, serverProfile)) {
             return fileUploadService.update(file, serverProfile); // <- S3 등록 후
//            return "http://image2.png"; // 예시 URL
        }
        return serverProfile;
    }

    private boolean validateProfileWithFile(Server server, String profileUrl) {
        String profile = server.getProfile();

        return profile == null || profile.equals(profileUrl);
    }

    private void validateManager(Long managerId, Long userId){
        if(!managerId.equals(userId)){
            throw new ServerException(Code.UNAUTHORIZED, "Not Manager");
        }
    }
    private void validateServerProfileDelete(Server server) {
        if(server.getProfile() != null && !server.getProfile().equals("null"))
            fileUploadService.delete(server.getProfile());
    }

    private void serverDeleteBatch(Server server) {
        channelRepository.deleteAllByServerId(server.getId());
        categoryRepository.deleteAllByServerid(server.getId());
        serverUserRepository.deleteAllByServerId(server.getId());
    }

    private void printKafkaLog(String type) {
        log.info("Kafka event send about Server {}", type);
    }
}
  • create()
    • categoryAndChannelInit()
      • 서버 생성시 기본적으로 생성되는 카테고리와 채널을 생성하는 메소드이다.
      • 해당 작업에서 서버를 생성한 유저의 최근 채널 위치를 지정하기 위해 `ChannelCommandService`를 활용한다.
  • join()
    • 서버에 유저가 참여할 때 사용되는 메소드로 성공적으로 참여됐다면,
    • 서버에 처음 접속한 유저의 최근 채널 위치를 저장하기 위해 `ChannelCommandService`를 활용한다.

 

 

ChannelCommandService

 

채널 생성/변경/삭제를 담당하는 서비스이다.

@Slf4j
@Service
@Transactional
@RequiredArgsConstructor
public class ChannelCommandService {
    private final ServerQueryService serverQueryService;
    private final FileUploadService fileUploadService;

    private final ChannelRepository channelRepository;
    private final ServerRepository serverRepository;
    private final CategoryRepository categoryRepository;
    private final ForumRepository forumRepository;
    private final FileRepository fileRepository;

    private final KafkaEventPublisher kafkaEventPublisher;

    ...

    public void sendUserLocEvent(Long userId, Long serverId, Long channelId) {
        kafkaEventPublisher.sendToUserLocEventTopic(
                UserLocationEventDto.of(
                        userId,
                        serverId,
                        channelId
                )
        );

        printKafkaLog("User Location Send");
    }

    ...
}

 

sendUserLocEvent()

  • 해당 메소드를 활용하여 유저의 최근 채널 위치상태관리 서비스로 전달한다.

 


 

정리

여기까지 커뮤니티 서비스에서 다른 서비스와의 통신에 관해 알아보았다.

 

전부 다 표현하진 않았지만,

  • 커뮤니티가 다른 서비스로 카프카 이벤트를 보내는 경우
  • 커뮤니티가 다른 서비스에게 `OpenFeign`으로 값을 가져오는 경우

 

등 많은 경우가 더 존재한다.

 

해당 작업의 주요 키워드는 `Kafka`와 `OpenFeign`이다.

 

그렇기에 아래 작업이 가능해졌다.