728x90
반응형
🍒 채팅 전송 및 조회
- 채팅 메시지 전송: 사용자가 메시지를 작성하고 전송하면, 해당 메시지가 서버로 전송되고 Redis에 저장됩니다. 서버는 메시지의 처리와 동시에 해당 메시지를 Kafka를 통해 실시간으로 참여자에게 전달합니다.
- 채팅 메시지 조회: 사용자가 채팅방에 입장하면, Redis에 저장된 이전 메시지들을 조회할 수 있습니다. 이때 Redis는 빠른 데이터 조회를 가능하게 하여, 사용자에게 즉시 메시지를 보여줍니다.
- 참여자 관리 (Redis): Redis를 사용하여 현재 채팅방에 참여 중인 사용자들을 관리합니다. 사용자가 채팅방에 입장하거나 퇴장할 때마다 Redis에 참여자 목록을 갱신하여 실시간으로 참여자 현황을 유지합니다.
- 실시간 메시지 전달 (Kafka): 채팅 메시지가 발생하면 Kafka를 통해 실시간으로 모든 참여자에게 메시지를 전달합니다. Kafka는 메시지 큐를 활용하여 빠르고 안정적인 메시지 전송을 보장합니다.
1. 채팅 메시지 전송
채팅 메시지 전송 시, 메시지 내용과 함께 채팅방에 해당하는 모든 참여자에게 알림을 보내는 과정입니다.
👉🏻 sendMessage 메서드를 통해 메시지를 저장하고, Kafka를 통해 실시간으로 메시지를 전달
@Service
public class ChatService {
// Redis 템플릿을 사용하여 Redis와 통신
@Autowired
private StringRedisTemplate redisTemplate;
// Kafka 템플릿을 사용하여 실시간 메시지를 전송
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
// 채팅 메시지를 Redis에 저장하고 Kafka를 통해 실시간으로 메시지 전달
public void sendMessage(String roomId, String userId, String message) {
// 메시지를 Redis에 저장 (방 ID를 키로, 사용자 ID와 메시지를 값으로 저장)
String chatMessage = userId + ": " + message;
redisTemplate.opsForList().rightPush(roomId, chatMessage); // 메시지를 채팅방에 추가
// 메시지를 Kafka로 전송 (실시간으로 참여자들에게 전달)
kafkaTemplate.send("chat-topic", roomId, chatMessage); // 'chat-topic'은 Kafka의 토픽 이름
}
// 특정 채팅방의 모든 메시지를 Redis에서 조회
public List<String> getMessages(String roomId) {
// Redis에서 해당 채팅방에 저장된 모든 메시지 조회
return redisTemplate.opsForList().range(roomId, 0, -1); // 해당 방에 저장된 메시지 목록 반환
}
// 채팅방에 입장한 사용자를 Redis에서 관리
public void joinRoom(String roomId, String userId) {
// 사용자가 채팅방에 입장하면 Redis에 참여자 목록에 추가
redisTemplate.opsForSet().add(roomId + ":participants", userId); // 사용자 ID를 채팅방 참여자 목록에 추가
}
// 채팅방에서 나간 사용자를 Redis에서 관리
public void leaveRoom(String roomId, String userId) {
// 사용자가 채팅방에서 나가면 Redis에서 참여자 목록에서 삭제
redisTemplate.opsForSet().remove(roomId + ":participants", userId); // 사용자 ID를 참여자 목록에서 제거
}
// 채팅방의 현재 참여자 목록을 Redis에서 조회
public Set<String> getParticipants(String roomId) {
// Redis에서 해당 채팅방의 현재 참여자 목록을 조회
return redisTemplate.opsForSet().members(roomId + ":participants"); // 참여자 목록 반환
}
}
- 채팅 메시지 전송 (sendMessage):
- 사용자가 채팅 메시지를 입력하면, sendMessage 메서드가 호출되어 Redis에 메시지를 저장하고, Kafka를 통해 실시간으로 다른 사용자들에게 메시지를 전송함
- Redis에 메시지를 저장할 때는 rightPush를 사용하여 메시지를 리스트에 추가
- Kafka에서는 chat-topic이라는 토픽에 메시지를 전송
- 채팅 메시지 조회 (getMessages):
- 사용자가 채팅방에 들어오면, getMessages 메서드를 통해 해당 채팅방의 메시지를 Redis에서 조회하여 반환
- Redis 리스트에서 메시지를 조회할 때는 range를 사용하여 모든 메시지를 가져옴
- 채팅방 참여자 관리:
- 입장 (joinRoom): 사용자가 채팅방에 입장하면, 해당 채팅방의 참여자 목록에 사용자의 ID를 추가합니다. Redis의 opsForSet().add를 사용하여 참여자 목록에 추가
- 퇴장 (leaveRoom): 사용자가 채팅방을 나가면, 해당 사용자의 ID를 참여자 목록에서 제거합니다. Redis의 opsForSet().remove를 사용하여 참여자 목록에서 제거
- 참여자 조회 (getParticipants): 현재 채팅방에 참여 중인 사용자들을 조회할 수 있습니다. Redis의 opsForSet().members를 사용하여 참여자 목록 반
2. 채팅 메시지 조회
채팅 메시지를 조회할 때는 페이징 처리를 사용하여 효율적으로 메시지를 로딩합니다.
MongoDB에서 Slice를 이용해 데이터를 가져오고, 필요한 메시지를 ChatMessageResponse로 변환하여 응답합니다.
👉🏻 주어진 chatRoomId에 대해 마지막 메시지부터 pageable 기준으로 데이터를조회
👉🏻 메시지 응답을 생성하여 읽지 않은 메시지 수와 최신 메시지 시간을 처리
@Override
public ChatMessageListResponse getChatMessages(Long chatRoomId, String lastChatMessageId, Pageable pageable) {
// 현재 로그인한 사용자의 소셜 ID를 얻어옴
String socialId = getCurrentUserId();
// 해당 소셜 ID로 Member 객체를 조회, 없으면 예외 발생
Member member = memberRepository.findBySocialId(socialId).orElseThrow(MemberNotFoundException::new);
// 해당 회원의 ID를 얻어옴
Long senderId = member.getMemberId();
// 사용자가 해당 채팅방에 접근할 수 있는지 검증
validateChatRoom(senderId, chatRoomId);
// 마지막 메시지 ID와 페이지 정보를 이용하여 채팅 메시지 슬라이스를 조회
Slice<ChatMessage> chatMessageSlice = getChatMessageSlice(chatRoomId, lastChatMessageId, pageable);
// 채팅 메시지 목록을 채팅 메시지 응답 DTO 리스트로 변환
List<ChatMessageResponse> chatMessageRespDtos = convertToChatMessageRespDto(chatMessageSlice.getContent(), senderId);
// 변환된 메시지 응답 DTO 리스트를 새로운 Slice로 감싸서 반환 (다음 페이지 여부 포함)
Slice<ChatMessageResponse> chatMessageRespDtoSlice = new SliceImpl<>(chatMessageRespDtos, pageable, chatMessageSlice.hasNext());
// 채팅 메시지 응답 객체를 반환
return new ChatMessageListResponse(senderId, chatRoomId, chatMessageRespDtoSlice);
}
- 채팅방 접근 권한 검증 (validateChatRoom):
- 사용자가 해당 채팅방에 접근할 수 있는지 확인
- 채팅 메시지 슬라이스 조회 (getChatMessageSlice):
- chatRoomId, lastChatMessageId, 그리고 pageable을 사용하여 해당 채팅방의 메시지 조회
- Slice는 페이지네이션 기능을 제공하는 Spring Data의 클래스
- 채팅 메시지 DTO 변환 (convertToChatMessageRespDto):
- 조회된 ChatMessage 리스트를 클라이언트에 반환할 수 있는 ChatMessageResponse DTO로 변환
- 변환 과정에서 senderId를 포함시켜 응답 객체를 만듬
- 변환된 메시지 DTO를 Slice로 감싸기 (SliceImpl):
- 변환된 메시지 응답 DTO 리스트를 SliceImpl을 이용해 슬라이스로 감쌉니다. hasNext()는 다음 페이지가 있는지 여부를 확인하는 값
- 최종 응답 객체 반환 (ChatMessageListResponse):
- senderId, chatRoomId, 그리고 변환된 채팅 메시지 응답 DTO 슬라이스를 포함한 ChatMessageListResponse 객체를 반환
3. Redis와 Kafka를 통한 실시간 메시지 전송
실시간 메시지를 전송하기 위해 Kafka를 사용하여 메시지와 알림을 전송
👉🏻 채팅 메시지와 알림 메시지는 각각 Kafka topic을 통해 전달됨
✔️ KafkaReceiver
KafkaReceiver는 Kafka에서 받은 메시지를 WebSocket을 통해 실시간으로 전달
👉🏻 메시지는 채팅방마다 구독된 클라이언트에게 "/sub/chat/room/{chatRoomId}" 형식으로 전송됨
@Slf4j // Lombok 어노테이션: 로깅 기능을 위한 log 객체 자동 생성
@RequiredArgsConstructor // Lombok 어노테이션: final 필드를 생성자 인자로 받아 초기화하는 생성자 자동 생성
public class KafkaReceiver {
// SimpMessageSendingOperations: 메시지를 WebSocket 클라이언트로 전송하는 데 사용되는 Spring의 인터페이스
private final SimpMessageSendingOperations template;
// Kafka의 채팅 메시지를 수신하는 리스너 메서드
@KafkaListener(topics = KafkaVO.KAFKA_CHAT_TOPIC, containerFactory = "kafkaChatContainerFactory")
public void receiveChatMessage(Message message) {
// 채팅 메시지가 전송될 WebSocket 주소를 로그로 출력
log.debug("채팅 메시지 전송 위치 = /sub/chat/room/" + message.getChatRoomId());
// 수신된 채팅 메시지 자체를 로그로 출력
log.debug("채팅 방으로 메시지 전송 = {}", message);
// WebSocket을 통해 해당 채팅방으로 메시지 전송
template.convertAndSend("/sub/chat/room/" + message.getChatRoomId(), message);
}
// Kafka의 알림 메시지를 수신하는 리스너 메서드
@KafkaListener(topics = KafkaVO.KAFKA_NOTIFICATION_TOPIC, containerFactory = "kafkaNotificationContainerFactory")
public void receiveNotificationMessage(Notification notification) {
// 알림 메시지가 전송될 WebSocket 주소를 로그로 출력
log.debug("알림 메시지 전송 위치 = /sub/notification/" + notification.getRecipientId());
// 수신된 알림 메시지 자체를 로그로 출력
log.debug("알림 메시지 전송 = {}", notification);
// WebSocket을 통해 해당 사용자에게 알림 메시지 전송
template.convertAndSend("/sub/notification/" + notification.getRecipientId(), notification);
}
}
- SimpMessageSendingOperations:
- WebSocket 메시지를 전송하는 데 사용됨
- 이 클래스는 WebSocket을 통해 클라이언트에 메시지를 전송하는 기능을 제공함
- @KafkaListener:
- Kafka 메시지를 수신하는 메서드 정의
- 각 리스너 메서드는 특정 Kafka 토픽에서 메시지를 수신하고 이를 처리함
- topics 속성에는 수신할 Kafka 토픽 이름을 지정하며, containerFactory 속성에는 해당 토픽에 대한 Kafka 리스너 컨테이너 팩토리 이름을 지정함
- 이 팩토리는 Kafka 메시지를 어떻게 처리할지 정의.
- receiveChatMessage:
- KafkaVO.KAFKA_CHAT_TOPIC 토픽에서 메시지를 수신함
- 메시지를 수신한 후, 해당 채팅방으로 WebSocket 메시지를 전송
- 수신된 메시지와 메시지가 전송될 위치(/sub/chat/room/{chatRoomId})를 로그로 출력
- receiveNotificationMessage:
- KafkaVO.KAFKA_NOTIFICATION_TOPIC 토픽에서 알림 메시지를 수신함
- 수신된 알림을 해당 수신자에게 WebSocket으로 전송
- 수신된 알림 메시지와 알림이 전송될 위치(/sub/notification/{recipientId})를 로그로 출력합니다.
✔️ KafkaSender
KafkaSender는 메시지를 Kafka topic에 전송하는 역할을 담당하며, 메시지가 전송될 때마다 적절한 topic에 맞는 Message나 Notification을 전송함
@Slf4j // Lombok 어노테이션: 로깅 기능을 위한 log 객체 자동 생성
@Service // Spring의 서비스 계층을 나타내는 어노테이션
@RequiredArgsConstructor // Lombok 어노테이션: final 필드를 생성자 인자로 받아 초기화하는 생성자 자동 생성
public class KafkaSender {
// KafkaTemplate을 이용하여 채팅 메시지를 전송하는 템플릿 객체
private final KafkaTemplate<String, Message> kafkaChatTemplate;
// KafkaTemplate을 이용하여 알림 메시지를 전송하는 템플릿 객체
private final KafkaTemplate<String, Notification> kafkaNotificationTemplate;
// 채팅 메시지를 Kafka 토픽으로 전송하는 메서드
public void sendMessage(String topic, String chatRoomId, Message message) {
// Kafka에 메시지 전송
kafkaChatTemplate.send(topic, chatRoomId, message);
// 전송된 메시지 로깅
log.debug("채팅 메시지 전송: topic = {}, chatRoomId = {}, message = {}", topic, chatRoomId, message);
}
// 알림 메시지를 Kafka 토픽으로 전송하는 메서드
public void sendNotification(String topic, String chatRoomId, Notification notification) {
// Kafka에 알림 메시지 전송
kafkaNotificationTemplate.send(topic, chatRoomId, notification);
// 전송된 알림 메시지 로깅
log.debug("알림 메시지 전송: topic = {}, chatRoomId = {}, notification = {}", topic, chatRoomId, notification);
}
}
- kafkaChatTemplate & kafkaNotificationTemplate:
- KafkaTemplate은 Kafka 메시지를 전송하기 위한 템플릿 객체
- 각각 채팅 메시지와 알림 메시지를 Kafka에 전송하는 데 사용됨
- KafkaTemplate<String, Message>는 String 키와 Message 객체를 전송
- KafkaTemplate<String, Notification>는 String 키와 Notification 객체를 전송
- sendMessage:
- 채팅 메시지를 Kafka로 전송하는 기능 제공
- 메시지를 지정된 topic과 chatRoomId로 전송하며, 그 전송 내용을 로그로 기록
- sendNotification:
- 알림 메시지를 Kafka로 전송하는 기능 제공
- 알림 메시지를 지정된 topic과 chatRoomId로 전송하며, 그 전송 내용을 로그로 기록
다음 게시글에서는 읽음 처리 부분에 대해 다룰 예정입니다 !

728x90
반응형
'프로젝트 > Wedle' 카테고리의 다른 글
[Spring Boot/Kafka/Stomp] 실시간 채팅 구현 - 6. 채팅방 생성 (0) | 2025.03.19 |
---|---|
[Spring Boot/Kafka/Stomp] 실시간 채팅 구현 – 5. Redis와 Kafka를 활용한 실시간 채팅 시스템 구현: WebSocket 기반의 고성능 메시징 (1) | 2025.03.19 |
[Spring Boot/Kafka/Stomp] 실시간 채팅 구현 – 4. MongoDB 설정 및 연동 (0) | 2025.03.19 |
[Spring Boot/Kafka/Stomp] 실시간 채팅 구현 – 3. WebSocket 연결 및 핸들러 구현 (0) | 2025.03.19 |
[Spring Boot] 실시간 채팅 구현 – 2. Kafka 설치 (0) | 2025.03.10 |