카테고리 없음

[Spring] Buffer로 채팅 기록 저장 조회, Kafka 분기하기

경상도상남자 2025. 4. 4. 12:39

우리 팀은 현재 친환경 제품이나 캠페인을 펀딩할 수 있는 앱을 만들려고 하는데 기본적으로 msa 아키텍처로 구현을 하는 중이었다.

처음에는 chat 컴포넌트에서 클라이언트와 웹소켓을 연결하고 채팅 기능을 구현할려고 했는데 우리가 분기한 컴포넌트 중에 notification이라는 소켓을 이용하여 알림 기능을 담당하는 컴포넌트가 있었다. 

구현을 하고 보니 notification에서도 소켓을 연결하고 chat에서도 소켓을 연결하는게 비효율적이라는 생각이 들었다...

 

 

그래서 역할을 다시 분리하기로 했다.

역할 구분

notification-service : WebSocket 처리, Kafka Producer, Kafka Consumer (실시간 전송, 메시지 누적 감지)

chat-service : 메시지 영속 저장 (MongoDB) + 채팅 내역 조회 API 제공

 

이렇게 역할을 분리하면서 추가적으로 해결해야 하는 과제가 생겼다. 

 

1. 얼마에 한번씩 DB에 메시지를 저장할 것인가?

2. 저장하고 난 후 Kafka에 저장된 메시지는 어떻게 할 것인가?

3. 단체 채팅방에서 중간에 추가로 들어온 사람은 Kafka에 저장된 데이터를 먼저 확인하고 db에 저장된 채팅 메시지를 봐야한다. 

 

1번 문제에 대한 나의 생각

일단 현재 Kafka에서는 TTL 1시간으로 정해놨고 50건에 한번씩 저장을 하는 것으로 정했는데 Kafka에 1시간동안 몇 건의 메시지가 쌓일지 모르기 때문에 어디까지 저장을 했는지 기록을 해야한다. 그리고 50건이 다 안채워졌더라도 삭제되기 전에 쌓인만큼 저장을 해야한다.  

  • 저장 로직에 저장 추적 및 예외가 발생해서 저장이 안 됐다면, 다시 컨슈밍할 수 있도록 offset 처리를 활용해 구현해보자

 

2번 문제에 대한 생각

topic을 유지하면서 메시지를 지울수 있는 방법이 있다고는 하는데 나 같은 경우, TTL을 길게 하면 문제가 될거 같지만 1시간이라 굳이 억지로 지울 필요가 있을까 라는 생각이 든다. 지우지 말자

 

3번 문제에 대한 생각

3번 문제에 대한 해답을 정하기 위해서는 먼저 데이터를 어떻게 저장을 해야할지 부터 정해야 한다. 나는 버퍼(buffer)를 활용해서 50개씩 chat-service로 메시지를 보내주려고 한다. 이때 만약 1시간이 지나더라도 50개가 안채워졌을 경우도 메시지가 저장이 되어야 한다.

또한 구독시 버퍼에서 아직 MongoDB에 저장되지 않은 메시지도 같이 보내줘서 메시지 정합성을 지키도록 코드를 구현하고자 한다.

 

📦 Kafka 기반 채팅  코드 분석하기


 

🛒 KafkaConsumer

- Kafka와 WebSocket 기반 채팅 시스템에서
메시지를 버퍼링하고, 일정 조건에 따라 DB에 저장하는 핵심 Kafka Consumer

import com.notification.buffer.ChatMessageBuffer;
import com.notification.client.ChatClient;
import com.notification.dto.ChatMessageDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;

import java.util.List;

@Slf4j
@Component
@RequiredArgsConstructor
public class ChatKafkaConsumer {

    private final ObjectMapper objectMapper;
    private final SimpMessagingTemplate template;
    private final ChatMessageBuffer buffer;
    private final ChatClient chatClient;

    @KafkaListener(topicPattern = "chat-room.*", groupId = "chat-group", containerFactory = "kafkaListenerContainerFactory")
    public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) throws JsonProcessingException {
        try {
            String topic = record.topic();

            String fundingId = topic.split("\\.")[1];

            int intFundingId = Integer.parseInt(fundingId);
            ChatMessageDto dto = objectMapper.readValue(record.value(), ChatMessageDto.class);

            // 메시지 버퍼에 추가
            buffer.addMessage(intFundingId, dto);

            //WebSocket 브로드 캐스트
            template.convertAndSend("/sub/chat/" + fundingId, dto);

            // 50개 도달하면 chat-service로 저장
            if (buffer.isReadyToFlush(intFundingId)) {
                List<ChatMessageDto> messagesToStore = buffer.getAndClearBuffer(intFundingId);

                try {
                    chatClient.storeMessages(intFundingId, messagesToStore); // 저장 요청

                } catch (Exception e) {
                    log.error(e.getMessage(), e);

                    for (ChatMessageDto msg : messagesToStore) {
                        buffer.addMessage(intFundingId, msg); //다시 버퍼에 넣어서 복원
                    }
                }

            }
            ack.acknowledge(); // 저장까지 성공한 후에 커밋

        } catch (Exception e) {
            log.error("❌ Kafka 메시지 처리 중 오류: {}", e.getMessage(), e);

        }
    }
}

 

🧩  주요 코드 분석

 

🧵 Kafka 메시지 처리

@KafkaListener(topicPattern = "chat-room.*", groupId = "chat-group", containerFactory = "kafkaListenerContainerFactory")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack)

 

 

  • @KafkaListener : chat-room.{fundingId} 형식의 Kafka 토픽을 실시간 수신
  • Acknowledgment : 수동 커밋 모드 (처리 성공 시에만 offset 커밋)

⚙️ 처리 순서 상세

1️⃣ 토픽 파싱 및 메시지 역직렬화

java
복사편집
String topic = record.topic(); // e.g., "chat-room.123" 
String fundingId = topic.split("\\.")[1]; // "123" 
int intFundingId = Integer.parseInt(fundingId); // 123 → int 
ChatMessageDto dto = objectMapper.readValue(record.value(), ChatMessageDto.class);
  • Kafka 메시지(JSON)를 ChatMessageDto 객체로 역직렬화

 

2️⃣ 메시지 버퍼 저장 + 실시간 전송

buffer.addMessage(intFundingId, dto);                                     // 버퍼에 저장
template.convertAndSend("/sub/chat/" + fundingId, dto);                   // WebSocket으로 모든 유저에게 전송

 

 

3️⃣ 버퍼 크기 체크 (50개 이상이면 저장 시도)

if (buffer.isReadyToFlush(intFundingId)) {
    List<ChatMessageDto> messagesToStore = buffer.getAndClearBuffer(intFundingId);

 

  • ChatMessageBuffer의 내부 Map에 메시지가 50개 이상 쌓였는지 확인
  • 조건 만족 시 buffer에서 메시지 꺼내고 제거

 

4️⃣ chat-service로 저장 요청 + 실패 시 rollback

try {
    chatClient.storeMessages(intFundingId, messagesToStore); // 저장 요청
} catch (Exception e) {
    log.error(e.getMessage(), e);

    for (ChatMessageDto msg : messagesToStore) {
        buffer.addMessage(intFundingId, msg); // 실패 시 rollback → 다시 버퍼에 복구
    }
}

 

  • 저장 실패 시 → 데이터 유실 방지를 위해 메시지 다시 버퍼에 복원

5️⃣ Kafka offset 커밋

ack.acknowledge(); // 메시지 정상 처리 후 커밋

 

📌 포인트

  •  실시간성과 데이터 영속성을 모두 보장하는 구조
  •  수동 커밋 + rollback 처리로 메시지 유실 없이 안정적 운영 가능
  • ✅ ChatMessageBuffer와 타이머 기반 flush와 함께 사용하면 완벽한 보강 가능

 

🔧 KafkaConsumerConfig 분석

package com.notification.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            ConsumerFactory<String, String> consumerFactory
    ) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); // 수동 ack
        return factory;
    }
}

 

  • ConcurrentKafkaListenerContainerFactory: Kafka 리스너 컨테이너를 생성하는 팩토리
  • <String, String>: key-value 타입 지정 (기본은 문자열)
  • ConsumerFactory는 Kafka @KafkaListener가 사용할 실제 Consumer 객체를 생성하는 컴포넌트

옵션설명

AckMode.MANUAL_IMMEDIATE 수동 커밋: ack.acknowledge() 호출 시 즉시 커밋
MANUAL 수동 커밋이지만, batch로 처리됨 (ack 후 커밋은 batch 종료 시점)
RECORD 각 레코드마다 자동 커밋
BATCH 배치 단위 자동 커밋
TIME 일정 주기마다 자동 커밋
COUNT 일정 개수마다 자동 커밋

 

 


 

💾 ChatMessageBuffer

- 채팅 메시지를 임시 버퍼에 저장하고,
일정 수량 또는 시간 경과 후에 DB로 flush하는 버퍼 클래스의 핵심 역할을 분석합니다.

package com.notification.buffer;

import com.notification.dto.ChatMessageDto;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class ChatMessageBuffer {

    // 채팅방별 메시지 버퍼
    private final Map<Integer, List<ChatMessageDto>> buffer = new ConcurrentHashMap<>();

    // 채팅방별 마지막 메시지 추가 시간 (즉시 flush 여부 판단용)
    private final Map<Integer, LocalDateTime> lastUpdateTime = new ConcurrentHashMap<>();

    // 채팅방별 최초 메시지 추가 시간 (지속적으로 오더라도 flush 조건 판단용)
    private final Map<Integer, LocalDateTime> bufferStartTime = new ConcurrentHashMap<>();

    /**
     * 메시지를 버퍼에 추가하고, 최초/최종 시간을 갱신한다.
     */
    public void addMessage(int fundingId, ChatMessageDto message) {
        buffer.computeIfAbsent(fundingId, k -> new ArrayList<>()).add(message);

        // 최초 메시지 시간은 한 번만 기록
        bufferStartTime.putIfAbsent(fundingId, LocalDateTime.now());

        // 마지막 메시지 시간은 항상 갱신
        lastUpdateTime.put(fundingId, LocalDateTime.now());
    }

    /**
     * 50개 이상이면 즉시 flush 조건 만족
     */
    public boolean isReadyToFlush(int fundingId) {
        return buffer.getOrDefault(fundingId, List.of()).size() >= 50;
    }

    /**
     * flush 시 버퍼에서 메시지를 꺼내고 정리
     */
    public List<ChatMessageDto> getAndClearBuffer(int fundingId) {
        List<ChatMessageDto> messages = new ArrayList<>(buffer.getOrDefault(fundingId, List.of()));
        buffer.remove(fundingId);
        lastUpdateTime.remove(fundingId);
        bufferStartTime.remove(fundingId);
        return messages;
    }

    /**
     * 아직 저장되지 않은 메시지 목록 반환 (WebSocket 입장 시 전송용)
     */
    public List<ChatMessageDto> getBufferedMessages(int fundingId) {
        return new ArrayList<>(buffer.getOrDefault(fundingId, List.of()));
    }

    /**
     * 채팅방별 마지막 메시지 시간 반환 (flush delay 기준용)
     */
    public Map<Integer, LocalDateTime> getLastUpdateTimes() {
        return new HashMap<>(lastUpdateTime);
    }

    /**
     * 채팅방별 버퍼 시작 시간 반환 (flush timeout 기준용)
     */
    public Map<Integer, LocalDateTime> getBufferStartTimes() {
        return new HashMap<>(bufferStartTime);
    }

    /**
     * 특정 채팅방의 버퍼 시작 시간 반환
     */
    public LocalDateTime getBufferStartTime(int fundingId) {
        return bufferStartTime.get(fundingId);
    }
}

 

🔐 내부 필드 설명

private final Map<Integer, List<ChatMessageDto>> buffer = new ConcurrentHashMap<>();
private final Map<Integer, LocalDateTime> lastUpdateTime = new ConcurrentHashMap<>();
private final Map<Integer, LocalDateTime> bufferStartTime = new ConcurrentHashMap<>();

 

- buffer : 채팅방(fundingId) 별로 수신한 메시지를 리스트에 저장

- lastUpdateTime : 해당 채팅방에 마지막으로 메시지가 추가된 시간 (타임아웃 기준)

- bufferStartTime : 해당 채팅방 현재 버퍼에 최초로 추가된 시간( 타임아웃 기준)

 

 

🧩 주요 메서드 분석

✅ addMessage(int fundingId, ChatMessageDto message)

public void addMessage(int fundingId, ChatMessageDto message) {
    buffer.computeIfAbsent(fundingId, k -> new ArrayList<>()).add(message);
    
    // 최초 메시지 시간은 한 번만 등록
    bufferStartTime.putIfAbsent(fundingId, LocalDateTime.now());
    lastUpdateTime.put(fundingId, LocalDateTime.now());
}

 

- 채팅방별 리스트에 메시지 추가

- 메시지 추가 시각을 lastUpdateTime에 기록 ( 타임아웃 flush)

 

 

✅ isReadyToFlush(int fundingId)

lastUpdateTime.put(fundingId, LocalDateTime.now());

 

- 저장 조건 판단 : 버퍼에 쌓인 메시지가 50개 이상인지 여부 판단

- 사용 위치 : KafkaConsumer에서 즉시 저장 조건 체크에 활용

 

✅ getAndClearBuffer(int fundingId)

public List<ChatMessageDto> getAndClearBuffer(int fundingId) {
    List<ChatMessageDto> messages = buffer.getOrDefault(fundingId, new ArrayList<>());
    List<ChatMessageDto> snapshot = new ArrayList<>(messages);

    buffer.remove(fundingId);          // 메모리 제거
    lastUpdateTime.remove(fundingId);  // 타이머 제거
    bufferStartTime.remove(fundingId); // ⬅ 추가

    return snapshot;
}

 

- 메시지 가져오기 : 버퍼에서 메시지를 복사해서 반환

- 메시지, 타임스탬프 모두 제거 (flush 후 메모리 관리)

- 불변성 확보 : snapshot으로 복사본 반환 → 외부 수정 방지

- 저장 실패 시에는 반환된 snapshot을 다시 addMessage()로 복구 가능함 (rollback 지원)

 

✅ getBufferedMessages(int fundingId)

public List<ChatMessageDto> getBufferedMessages(int fundingId) {
    return new ArrayList<>(buffer.getOrDefault(fundingId, List.of()));
}

 

- 현재 buffer 메시지 조회 : MongoDB에 저장되기 전, Kafka 버퍼에만 있는 메시지를 클라이언트에게 보여주기 위함

- 사용 예시 : WebSocket 구독 시 입장 유저에게 아직 저장되지 않은 메시지를 보내는 용도

 

✅ getLastUpdateTimes()

public Map<Integer, LocalDateTime> getLastUpdateTimes() {
    return new HashMap<>(lastUpdateTime);
}

- 모든 채팅방의 마지막 메시지 시간 반환 : 타임아웃 감지용 ( 30분 이상 무활동 시 강제 저장)

 

✅ getBufferStartTimes() , getBufferStartTime 

 /**
     * 채팅방별 버퍼 시작 시간 반환 (flush timeout 기준용)
     */
    public Map<Integer, LocalDateTime> getBufferStartTimes() {
        return new HashMap<>(bufferStartTime);
    }

    /**
     * 특정 채팅방의 버퍼 시작 시간 반환
     */
    public LocalDateTime getBufferStartTime(int fundingId) {
        return bufferStartTime.get(fundingId);
    }

 

  • 처음에는 채팅방의 마지막 메시지의 시간을 보고 flush를 하려고 했는데 9분에 한 번씩 메시지가 오는 극단적인 경우 메시지가 소실되는 경우가 생김
  • 따라서 모든 채팅방의 마지막 메시지 시간 반환하는 메서드를 활용해서 강제 푸쉬하도록 함 : 타임아웃 감지용 ( 30분 이상 무활동 시 강제 저장)
  • 사용 위치 : @Scheduled 스케줄러에서 반복 조회에 활용

 

 


 

 

⏱  ChatBufferTimeoutFlusher 코드 분석

- 메시지가 적게 오거나 간헐적으로만 오는 방에서도 Kafka retention 전에 반드시 flush되도록
버퍼 최초 메시지 시간(bufferStartTime) 기반으로 강제 저장하는 구조입니다.

 

package com.notification.kafka.flusher;

import com.notification.buffer.ChatMessageBuffer;
import com.notification.client.ChatClient;
import com.notification.dto.ChatMessageDto;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;

@Slf4j
@Component
@RequiredArgsConstructor
public class ChatBufferTimeoutFlusher {

    private final ChatMessageBuffer buffer;
    private final ChatClient chatClient;

    /**
     * 1분마다 실행되며, 버퍼가 일정 시간 이상 유지된 경우 강제 저장 수행
     * - Kafka retention.ms (1시간) 도달 전에 flush 보장 목적
     */
    @Scheduled(fixedDelay = 60000) // 1분 간격
    public void flushIfStale() {
        Duration flushTimeout = Duration.ofMinutes(50); // Kafka TTL보다 여유 있게 설정

        LocalDateTime now = LocalDateTime.now();

        for (Map.Entry<Integer, LocalDateTime> entry : buffer.getBufferStartTimes().entrySet()) {
            int fundingId = entry.getKey();
            LocalDateTime startedAt = entry.getValue();

            // 버퍼 생성된 지 50분 이상 경과 시 강제 저장
            if (Duration.between(startedAt, now).compareTo(flushTimeout) >= 0) {
                List<ChatMessageDto> toFlush = buffer.getAndClearBuffer(fundingId);

                if (!toFlush.isEmpty()) {
                    try {
                        chatClient.storeMessages(fundingId, toFlush);
                        log.info("⏱ [버퍼 TTL 초과 저장] fundingId={}, count={}", fundingId, toFlush.size());
                    } catch (Exception e) {
                        log.error("❌ [버퍼 저장 실패] fundingId={}, error={}", fundingId, e.getMessage());

                        // rollback: 메시지 다시 버퍼에 복원
                        for (ChatMessageDto msg : toFlush) {
                            buffer.addMessage(fundingId, msg);
                        }
                    }
                }
            }
        }
    }
}

 

🔐 주요 필드

private final ChatMessageBuffer buffer;
private final ChatClient chatClient;

 

- buffer 메시지를 fundingId 별로 보관하고 있는 메모리 버퍼

- chat-service로 메시지를 저장 요청하는 client (ex. FeginClient)

 

 

🔍 메서드 분석 – flushIfStale()

 

@Scheduled(fixedDelay = 60000) // 1분 간격
public void flushIfStale() {
  Duration flushTimeout = Duration.ofMinutes(50); // Kafka TTL보다 여유 있게 설정
  LocalDateTime now = LocalDateTime.now();

 

  • Spring의 스케줄링 기능
  • fixedDelay = 60000 → 이전 실행이 끝난 후 60초 뒤에 다시 실행
  • 즉, 1분마다 반복 실행되는 타임아웃 체크 로직
  • 내가 설정한 Kafka 메시지 TTL은 1시  (retention.ms = 3600000)
  • 50분 경과한 버퍼는 Kafka에서 삭제되기 전에 저장 시도하도록 설정
  • 현재 시간 기준으로 버퍼 유지 시간을 계산

 

 

for (Map.Entry<Integer, LocalDateTime> entry : buffer.getBufferStartTimes().entrySet()) {
    int fundingId = entry.getKey();
    LocalDateTime startedAt = entry.getValue();

 

  • ChatMessageBuffer에서 버퍼가 처음 생성된 시간(bufferStartTime)을 반복하며 확인
  • fundingId는 채팅방 ID 역할

 

if (Duration.between(startedAt, now).compareTo(flushTimeout) >= 0) {

 

  • 현재 시간이 버퍼 생성 시간보다 50분 이상 지났는가 확인
  • ✅ 이 조건을 만족하면 → 강제 저장 조건 충족

 

 

List<ChatMessageDto> toFlush = buffer.getAndClearBuffer(fundingId);
  • 해당 채팅방 버퍼에서 메시지를 꺼내고,
    buffer, lastUpdateTime, bufferStartTime 전부 제거

 

if (!toFlush.isEmpty()) {
                    try {
                        chatClient.storeMessages(fundingId, toFlush);
                        log.info("⏱ [버퍼 TTL 초과 저장] fundingId={}, count={}", fundingId, toFlush.size());
                    } catch (Exception e) {
                        log.error("❌ [버퍼 저장 실패] fundingId={}, error={}", fundingId, e.getMessage());

                        // rollback: 메시지 다시 버퍼에 복원
                        for (ChatMessageDto msg : toFlush) {
                            buffer.addMessage(fundingId, msg);
                        }
                    }
                }

 

  • 메시지가 존재할 때만 저장 요청을 시도 (불필요한 호출 방지)
  • chat-service로 저장 요청 (ex. FeignClient 사용)
  • 로그로 저장된 메시지 개수 확인

실패한 경

  • 저장 도중 실패할 경우 예외 기록
  • 장애나 네트워크 오류, chat-service 다운 등의 상황 대비
  • rollback 처리
  • 메시지를 다시 ChatMessageBuffer에 복원 → 다음 flush 타이밍에 재시도 가능

 

@Slf4j
@Component
@RequiredArgsConstructor
public class StompSubscribeListener {

    private final ChatRoomService chatRoomService;
    private final SimpMessagingTemplate simpMessagingTemplate;
    private final ChatMessageBuffer chatMessageBuffer;

    @EventListener
    public void handleSubscribeEvent(SessionSubscribeEvent event) {

        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());
        String destination = accessor.getDestination(); // 예시) sub/chat/1

        if (destination != null && destination.startsWith("/sub/chat/")) {
            try {
                int fundingId = Integer.parseInt(destination.substring("/sub/chat/".length()));

                // 1. Kafka 토픽 생성
                chatRoomService.createChatRoomIfNotExists(fundingId);

                // 2. 버퍼에서 아직 저장되지 않은 메시지 조회 + 전송
                List<ChatMessageDto> bufferedMessages = chatMessageBuffer.getBufferedMessages(fundingId);
                if (!bufferedMessages.isEmpty()) {
                    for (ChatMessageDto message : bufferedMessages) {
                        simpMessagingTemplate.convertAndSend(destination, message);
                    }
                    log.info("📨 구독자에게 버퍼 메시지 전송: fundingId={}, count={}", fundingId, bufferedMessages.size());
                }

            } catch (NumberFormatException e) {
                log.warn("❌ 잘못된 채팅 destination 형식: {}", destination);
            }
        }
    }
}

 

 

 

🧩 StompSubscribeListener.handleSubscribeEvent()

- WebSocket 채팅방에 사용자가 입장(구독)할 때, Kafka 토픽이 없으면 생성하고, 아직 저장되지 않은 버퍼 메시지를 즉시 전달하는 기능

 

📦 주요 의존성

private final ChatRoomService chatRoomService;
private final SimpMessagingTemplate simpMessagingTemplate;
private final ChatMessageBuffer chatMessageBuffer;

 

- chatRoomService : Kafka 토픽 존재 여부 확인 및 생성

- simpMessagingTemplate : WebSocket 메시지 수동 전송 도구

- chatMessageBuffer : 아직 DB에 저장되지  않는 메시지 관리하는 버퍼

 

 

🔁 메서드 핵심 처리 흐름

@EventListener
public void handleSubscribeEvent(SessionSubscribeEvent event) {

 

 

  • 클라이언트가 채팅방을 구독하면 STOMP 구독 이벤트 발생
  • 이 메서드는 @EventListener로 해당 이벤트를 수신

 

 

1️⃣ 구독한 채널 파싱

String destination = accessor.getDestination(); // 예: "/sub/chat/1"

 

2️⃣ destination 형식 확인 및 fundingId 추출

if (destination != null && destination.equals("/sub/chat/")) {
    int fundingId = Integer.parseInt(destination.substring("/sub/chat/".length()));

3️⃣ Kafka 토픽 생성

chatRoomService.createChatRoomIfNotExists(fundingId);
  • 채팅방용 Kafka 토픽이 없으면 새로 생성
  • 이벤트 발생 시점에서만 생성 시도 → 효율적

4️⃣ 버퍼에 남은 메시지 조회 + WebSocket 전송

List<ChatMessageDto> bufferedMessages = chatMessageBuffer.getBufferedMessages(fundingId);

 

  • 아직 DB로 flush되지 않은 메시지가 버퍼에 있으면 → 구독자에게 전송
for (ChatMessageDto message : bufferedMessages) {
    simpMessagingTemplate.convertAndSend(destination, message);
}
  • simpMessagingTemplate 사용하여 수동 브로드캐스트

 

 

그외 코드들

ChatProducer, createChatRoomIfNotExists, KafkaAdminConfig 등은 아래 게시글에 있으니 생략하겠슴다~

 

https://gyeongsangman.tistory.com/manage/newpost/144?type=post&returnURL=https%3A%2F%2Fgyeongsangman.tistory.com%2Fmanage%2Fposts

 

티스토리

좀 아는 블로거들의 유용한 이야기, 티스토리. 블로그, 포트폴리오, 웹사이트까지 티스토리에서 나를 표현해 보세요.

www.tistory.com

 

 

 

✅ 전체 흐름 요약

 
Kafka 메시지 수신
   ↓
ChatMessageBuffer.addMessage()
   ↓
[조건1] 50개 도달 → getAndClearBuffer() → 저장 요청
[조건2] 30분 이상 무활동 → 스케줄러 → getAndClearBuffer() → 저장
   ↓
저장 성공 → 버퍼 비움
저장 실패 → rollback (다시 addMessage)