728x90
반응형
1️⃣ WebSocket 연결
WebSocket 연결은 기본적으로 클라이언트와 서버 간의 실시간 연결을 설정하는 작업입니다.
WebSocket을 연결하기 이해선
- STOMP 엔드포인트 설정: 클라이언트가 WebSocket을 통해 연결할 수 있도록 엔드포인트를 설정합니다.
- 메시지 브로커 설정: 클라이언트가 구독(subscribe)할 수 있는 주제(topic)를 설정하고, 서버가 클라이언트에 메시지를 보낼 수 있도록 메시지 브로커를 구성합니다.
- 핸드셰이크 인터셉터 추가: 연결 설정 시 필요한 추가 정보나 인증 절차를 처리할 수 있도록 인터셉터를 추가합니다.
- 메시지 크기 및 전송 타임아웃 설정: WebSocket 메시지의 크기나 전송에 대한 제한을 설정합니다.
@Configuration
@RequiredArgsConstructor
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
private final StompHandler stompHandler; // STOMP 처리 핸들러
private final HttpHandshakeInterceptor httpHandshakeInterceptor; // HTTP 핸드셰이크 인터셉터
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 메시지 브로커 설정
registry.enableSimpleBroker("/sub"); // /sub 경로로 메시지를 브로커가 전달
registry.setApplicationDestinationPrefixes("/pub"); // 클라이언트가 /pub 경로로 보낸 메시지를 서버에서 처리
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// STOMP 엔드포인트 설정
registry.addEndpoint("/stomp/chat") // /stomp/chat 경로로 WebSocket 연결을 허용
.setAllowedOrigins("*") // 모든 출처에서의 연결을 허용
.addInterceptors(httpHandshakeInterceptor) // 핸드셰이크 인터셉터 추가 (추가 처리 필요시)
.withSockJS(); // SockJS fallback을 지원하여 WebSocket을 사용할 수 없는 환경에서 대체 프로토콜을 사용하도록 설정
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
// 클라이언트로부터 들어오는 메시지 채널에 인터셉터 추가
registration.interceptors(stompHandler); // STOMP 핸들러를 통한 메시지 처리
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registry) {
// WebSocket 전송 설정 (메시지 크기, 전송 시간, 버퍼 크기 등)
registry.setMessageSizeLimit(512 * 1024); // 메시지 크기 제한 설정 (512KB)
registry.setSendTimeLimit(10 * 1000); // 메시지 전송 타임아웃 설정 (10초)
registry.setSendBufferSizeLimit(512 * 1024); // 전송 버퍼 크기 제한 설정 (512KB)
}
}
1) @Configuration 및 @EnableWebSocketMessageBroker
- @Configuration
- WebSocket 설정을 담당하는 클래스 지정
- @EnableWebSocketMessageBroker
- WebSocket을 기반으로 메시징 기능을 활성화
- WebSocket과 STOMP 프로토콜을 쉽게 설정 가능
2) configureMessageBroker
- 메시지 브로커 설정
- enableSimpleBroker("/sub")
- 클라이언트가 구독할 수 있는 주제 설정
- 여기선 /sub로 시작하는 주제를 구독
- ex) 클라이언트는 /sub/chat/room/{roomId}와 같은 주제를 구독할 수 있음
- setApplicationDestinationPrefixes("/pub")
- 클라이언트가 서버로 메시지를 보낼 때 사용하는 경로
- 클라이언트는 /pub/chat.sendMessage와 같은 경로로 메시지를 서버에 전송
3) registerStompEndpoints
- addEndpoint("/stomp/chat")
- 클라이언트가 WebSocket을 통해 연결할 수 있는 엔드포인트 설정
- 클라이언트는 이 경로를 통해 서버에 WebSocket 연결 시도
- setAllowedOrigins("*")
- 모든 출처에서 WebSocket 연결 허용
- 실제 환경에서는 보안을 고려하여 특정 출처만 허용하는 것이 좋음
- addInterceptors(httpHandshakeInterceptor)
- WebSocket 연결 과정에서 핸드셰이크 인터셉터를 추가
- 클라이언트가 연결을 요청할 때 추가적인 처리가 필요할 경우 사용됨 ex) 인증이나 세션 체크
- withSockJS()
- SockJS는 WebSocket을 사용할 수 없는 환경에서 대체 프로토콜 제공
- WebSocket을 지원하지 않는 브라우저에서도 WebSocket 기능을 사용할 수 있게 됩니다.
4) configureClientInboundChannel
- 클라이언트로부터 들어오는 메시지에 대해 STOMP 메시지 핸들러 설정
- registration.interceptors(stompHandler)
- STOMP 메시지를 처리하는 핸들러 설정
- 클라이언트가 보낸 메시지를 서버에서 처리할 수 있도록 함 ex) 메시지 포맷 확인/추가 처리
5) configureWebSocketTransport
- WebSocket의 전송 관련 설정 조정
- setMessageSizeLimit(512 * 1024) : 전송할 수 있는 메시지 크기 제한 설정
- setSendTimeLimit(10 * 1000) : 메시지 전송 타임아웃 설정합니다.
- setSendBufferSizeLimit(512 * 1024): 전송할 수 있는 버퍼의 크기 제한 설정
✅ WebSocket 연결 흐름
- 클라이언트 연결: 클라이언트는 /stomp/chat 엔드포인트로 WebSocket 연결 시도
- 핸드셰이크: 서버는 httpHandshakeInterceptor를 통해 클라이언트의 연결 요청을 처리
👉🏻 필요에 따라 인증이나 세션 정보 확인 가능 - 메시지 전송/수신: 클라이언트는 /pub 경로로 메시지를 보내고, 서버는 이 메시지를 처리하여 /sub 경로로 구독 중인 다른 클라이언트들에게 메시지를 전송
2️⃣ STOMP 메시징 처리 (메시지 전송 및 구독 처리)
Spring WebSocket에서 STOMP 메시지 처리 및 구독/구독 해지, 연결 및 연결 해제 등의 기능을 처리하는 클래스입니다. WebSocket 채팅 시스템에서 발생하는 다양한 이벤트를 처리하며, 특히 클라이언트가 WebSocket을 통해 채팅방에 구독하거나 구독을 취소하는 동작을 처리합니다.
@Order(Ordered.HIGHEST_PRECEDENCE + 99) // 우선순위를 높게 설정하여 다른 인터셉터보다 먼저 실행되도록 설정
@Component
@RequiredArgsConstructor // final 필드를 자동으로 주입받는 생성자 생성
@Slf4j // 로깅을 위한 어노테이션
public class StompHandler implements ChannelInterceptor {
private final ChatService chatService; // 채팅 서비스 의존성 주입
private final NotificationService notificationService; // 알림 서비스 의존성 주입
private final ChatRoomRepository chatRoomRepository; // 채팅방 레포지토리 의존성 주입
private final KafkaSender kafkaSender; // Kafka를 통한 알림 전송 의존성 주입
private static final String TOPIC_NOTIFICATION = "/sub/notification"; // 알림 구독 경로
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message); // 메시지에서 STOMP 헤더를 추출
StompCommand stompCommand = accessor.getCommand(); // STOMP 명령을 추출
// STOMP 명령에 따라 적절한 처리 함수 호출
if (stompCommand != null) {
handleStompCommand(stompCommand, accessor);
}
return ChannelInterceptor.super.preSend(message, channel); // 기본 동작을 그대로 반환
}
private void handleStompCommand(StompCommand stompCommand, StompHeaderAccessor accessor) {
switch (stompCommand) {
case CONNECT: // 연결 시 처리
log.debug("CONNECT");
break;
case SUBSCRIBE: // 구독 시 처리
handleSubscribe(accessor);
break;
case UNSUBSCRIBE: // 구독 해지 시 처리
handleUnsubscribe(accessor);
break;
case SEND: // 메시지 전송 시 처리
log.debug("SEND");
break;
case DISCONNECT: // 연결 종료 시 처리
handleDisconnect(accessor);
break;
case ERROR: // 오류 발생 시 처리
log.debug("WebSocket Error 처리 코드!!");
break;
}
}
private void handleSubscribe(StompHeaderAccessor accessor) {
// 알림 구독인 경우 처리
if (accessor.getDestination().startsWith(TOPIC_NOTIFICATION)) {
log.debug("알림 SUBSCRIBE");
return;
}
// 채팅방 구독 처리
handleChatRoomSubscription(accessor);
}
private void handleChatRoomSubscription(StompHeaderAccessor accessor) {
log.debug("채팅방 SUBSCRIBE");
// 채팅방 ID와 사용자 ID 추출
Long chatRoomId = getChatRoomId(accessor);
Long memberId = getMemberId(accessor);
// 사용자가 해당 채팅방에 참여하고 있는지 확인
validateChatRoomParticipant(chatRoomId, memberId);
// 구독 정보 갱신
updateSubscription(accessor, chatRoomId, memberId);
// 다른 참가자가 있을 경우 읽음 카운트 갱신 알림 전송
chatService.getOtherMemberIdByChatRoomId(chatRoomId)
.ifPresent(otherMemberId -> notifyReadCountUpdate(chatRoomId, otherMemberId));
// 채팅방에 해당하는 모든 알림을 삭제
notificationService.deleteAllNotificationsInChatRoomByMember(memberId, chatRoomId);
}
private void updateSubscription(StompHeaderAccessor accessor, Long chatRoomId, Long memberId) {
// 기존 구독 정보 삭제
deleteExistingSubscription(accessor);
// Redis에서 채팅방 참가자 삭제
chatService.deleteChatRoomParticipantFromRedis();
// 새 구독 정보 세션에 추가
accessor.getSessionAttributes().put(ChatUtil.SUBSCRIPTIONS, chatRoomId);
// Redis에 채팅방 참가자 정보 저장
chatService.saveChatRoomParticipantToRedis(chatRoomId);
// 읽지 않은 메시지 수 업데이트
chatService.updateUnreadMessages(chatRoomId);
}
private void deleteExistingSubscription(StompHeaderAccessor accessor) {
// 기존 구독 정보가 있을 경우 삭제
Long subscriptions = (Long) accessor.getSessionAttributes().get(ChatUtil.SUBSCRIPTIONS);
if (subscriptions != null) {
accessor.getSessionAttributes().remove(ChatUtil.SUBSCRIPTIONS);
}
}
private void notifyReadCountUpdate(Long chatRoomId, Long otherMemberId) {
log.debug("상대방에게 readCount값 갱신 알림 전송");
// 읽음 카운트 갱신 알림 생성
Notification readCountUpdateNotification = Notification.createReadCountUpdateNotification(chatRoomId, otherMemberId);
// Kafka를 통해 알림 전송
kafkaSender.sendNotification(KafkaVO.KAFKA_NOTIFICATION_TOPIC, String.valueOf(chatRoomId), readCountUpdateNotification);
}
private void handleUnsubscribe(StompHeaderAccessor accessor) {
String destination = accessor.getDestination();
// 알림 구독 해지 처리
if (destination != null && accessor.getDestination().startsWith(TOPIC_NOTIFICATION)) {
log.debug("알림 UNSUBSCRIBE");
return;
}
// 채팅방 구독 해지 처리
handleChatRoomUnsubscription(accessor);
}
private void handleChatRoomUnsubscription(StompHeaderAccessor accessor) {
log.debug("채팅방 UNSUBSCRIBE");
// 기존 구독 정보 삭제
deleteExistingSubscription(accessor);
// Redis에서 채팅방 참가자 정보 삭제
chatService.deleteChatRoomParticipantFromRedis();
}
private void handleDisconnect(StompHeaderAccessor accessor) {
log.debug("웹소켓 DISCONNECT");
// 연결 해제 시 채팅방 구독 해지 처리
handleChatRoomUnsubscription(accessor);
}
private void validateChatRoomParticipant(Long chatRoomId, Long memberId) {
// 채팅방에 해당 사용자가 참여 중인지 확인
chatRoomRepository.findByIdAndMemberId(chatRoomId, memberId)
.orElseThrow(() -> new IllegalArgumentException("채팅방에 참여하지 않은 사용자는 구독할 수 없습니다"));
}
private Long getChatRoomId(StompHeaderAccessor accessor) {
// 메시지에서 채팅방 ID 추출
return Long.valueOf(accessor.getDestination().split("/")[4]);
}
private Long getMemberId(StompHeaderAccessor accessor) {
// 세션에서 사용자 ID 추출
return (Long) accessor.getSessionAttributes().get(ChatUtil.MEMBER_ID);
}
}
1) preSend 메소드
- 메시지가 채널로 전달되기 전에 호출되는 메소드
- STOMP 명령을 처리하고 메시지를 처리할지 여부를 결정
- StompHeaderAccessor.wrap(message) : 메시지의 STOMP 헤더 추출
- stompCommand: 메시지의 STOMP 명령 확인 (CONNECT, SUBSCRIBE, SEND, DISCONNECT, 등).
2) handleStompCommand 메소드
- STOMP 명령에 따른 구체적인 처리 로직을 구현하는 메소드
- CONNECT: 클라이언트가 WebSocket 서버에 연결될 때 발생
- SUBSCRIBE: 클라이언트가 구독할 때 발생. 주로 채팅방 구독 처리
- UNSUBSCRIBE: 구독을 취소할 때 발생
- SEND: 메시지를 보낼 때 발생
- DISCONNECT: 클라이언트가 연결을 종료할 때 발생
4) 구독 및 구독 해지 처리
- 구독 처리
- 구독 대상이 TOPIC_NOTIFICATION인 경우, 알림 관련 구독을 처리합니다.
- 그 외의 경우에는 채팅방 구독을 처리합니다.
- 채팅방 구독 처리
- getChatRoomId 및 getMemberId: 구독하려는 채팅방 ID와 사용자 ID를 추출합니다.
- validateChatRoomParticipant: 해당 채팅방에 참여 중인 사용자인지 확인합니다.
- updateSubscription: 구독 정보를 갱신하고 Redis에 채팅방 참가자를 저장합니다.
- notifyReadCountUpdate: 상대방에게 읽음 카운트를 갱신하는 알림을 Kafka로 전송합니다.
- notificationService.deleteAllNotificationsInChatRoomByMember: 기존 알림을 삭제합니다.
- 구독 해지 처리
- 구독 대상이 TOPIC_NOTIFICATION인 경우 알림 구독을 해지합니다.
- 그 외의 경우에는 채팅방 구독을 해지합니다.
- 채팅방 구독 해지 처리
- deleteExistingSubscription: 기존 구독 정보를 삭제합니다.
- chatService.deleteChatRoomParticipantFromRedis: Redis에서 채팅방 참가자를 삭제합니다.
5) 연결 해제 처리
- 연결 해제 시 채팅방 구독 해지 및 관련 작업을 처리
6) 기타 유틸리티 메소드
- validateChatRoomParticipant: 사용자가 해당 채팅방에 참여하고 있는지 확인합니다.
- getChatRoomId 및 getMemberId: STOMP 메시지에서 채팅방 ID와 사용자 ID를 추출합니다.
✅ STOMPHandler 연결 흐름
- WebSocket 연결:
- 클라이언트가 WebSocket을 통해 서버에 연결하고 CONNECT 명령 발생
- 채팅방 구독 또는 알림 구독:
- 클라이언트가 채팅방을 구독하거나 알림을 구독하면 SUBSCRIBE 명령 발생
- 구독하려는 채팅방에 사용자가 참여하고 있는지 확인하고, 참여하지 않았다면 에러 발생
- 구독 정보는 세션에 저장되고, 채팅방 관련 처리 완료
- 채팅방 구독 해지:
- 클라이언트가 채팅방의 구독을 해지하면 UNSUBSCRIBE 명령 발생
- Redis에서 채팅방 참가자 정보를 삭제하고, 세션에 저장된 구독 정보도 삭제
- 연결 종료:
- 클라이언트가 연결을 종료하면 DISCONNECT 명령 발생
- 연결 종료 시, Redis에서 참가자 정보를 삭제하고, 구독 해지 처리
- 메시지 전송:
- 클라이언트가 메시지를 보내면 SEND 명령 발생
- 메시지 전송 처리 로직은 별도로 정의되지 않은 경우, 서버에서 적절하게 처리
3️⃣ HttpHandshakeInterceptor 설정(인증 및 사용자 정보 처리 흐름 분석)
@Component // 이 클래스가 Spring Bean으로 관리되도록 지정
@RequiredArgsConstructor // 필수적인 생성자 자동 생성 (Lombok)
public class HttpHandshakeInterceptor implements HandshakeInterceptor { // WebSocket 핸드셰이크 인터셉터 구현
private final JwtProvider jwtProvider; // JWT 토큰을 처리하는 서비스
private static final String ERROR_MESSAGE = "Web Socket Connection Error!"; // 연결 오류 메시지
private final MemberRepository memberRepository; // 회원 정보를 조회하는 레포지토리
// WebSocket 연결 전 처리하는 메서드
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Map<String, Object> attributes) throws Exception {
// HttpServletRequest 및 HttpServletResponse 객체 추출
HttpServletRequest req = ((ServletServerHttpRequest) request).getServletRequest();
HttpServletResponse resp = ((ServletServerHttpResponse) response).getServletResponse();
// Authorization 헤더에서 JWT 토큰을 추출
String accessToken = extractAccessToken(req);
// 토큰 검증 및 사용자 정보를 WebSocket 세션에 저장
if (!verifyTokenAndStoreMemberId(accessToken, attributes, resp)) {
// 토큰 검증 실패 시, 401 Unauthorized 상태 코드로 오류 응답
resp.sendError(HttpServletResponse.SC_UNAUTHORIZED, ERROR_MESSAGE);
return false; // 연결 거부
}
return true; // 토큰 검증 성공 시, 연결을 허용
}
// HTTP 요청에서 JWT 토큰을 추출하는 메서드
private String extractAccessToken(HttpServletRequest req) {
// Authorization 헤더에서 Bearer 토큰 추출
String authorizationHeader = req.getHeader("Authorization");
if (authorizationHeader != null && authorizationHeader.startsWith("Bearer ")) {
return authorizationHeader.substring(7); // "Bearer " 이후의 토큰만 반환
}
return null; // 토큰이 없으면 null 반환
}
// 토큰 검증 및 사용자 정보 저장 메서드
private boolean verifyTokenAndStoreMemberId(String accessToken, Map<String, Object> attributes,
HttpServletResponse resp) throws IOException {
try {
// 토큰이 없으면 false 반환
if (accessToken == null) {
return false;
}
// JWT 토큰을 통해 socialId 추출 및 검증
String socialId = jwtProvider.verify(accessToken);
// socialId로 회원 정보 조회
Member member = memberRepository.findBySocialId(socialId)
.orElseThrow(MemberNotFoundException::new); // 회원이 없으면 예외 발생
// 조회된 회원의 memberId를 WebSocket 세션에 저장
Long memberId = member.getMemberId();
attributes.put(ChatUtil.MEMBER_ID, memberId);
return true; // 검증 성공
} catch (MemberNotFoundException e) {
// 회원 조회 실패 시 404 Not Found 오류 반환
resp.sendError(HttpServletResponse.SC_NOT_FOUND, e.getMessage());
} catch (Exception e) {
// 토큰 검증 실패 시 401 Unauthorized 오류 반환
resp.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Invalid or expired token.");
}
return false; // 예외 발생 시 연결 거부
}
// WebSocket 핸드셰이크 후 처리하는 메서드 (현재는 구현되지 않음)
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler,
Exception exception) {
// 이 메서드는 핸드셰이크 후 추가 작업이 필요한 경우에 구현할 수 있음
}
}
1) beforeHandshake 메서드 - 핸드셰이크 전 처리
- beforeHandshake: WebSocket 연결을 시도하기 전, 연결의 유효성 검증
- request, response: HTTP 요청과 응답을 WebSocket에서 사용할 수 있도록 변환
- attributes: WebSocket 세션에 관련된 속성을 저장할 수 있는 맵 👉🏻 사용자의 정보를 WebSocket 세션에 저장
- extractAccessToken: HTTP 요청 헤더에서 Authorization 값을 추출하여 Bearer 토큰 분리
- verifyTokenAndStoreMemberId: 추출한 accessToken을 검증하고, 사용자 정보를 attributes에 저장
👉🏻 토큰이 유효하지 않거나 사용자가 존재하지 않으면 false를 반환하고 연결 종료 - 토큰 검증 실패 시, 응답에 SC_UNAUTHORIZED 상태 코드와 함께 에러 메시지를 반환하고, WebSocket 연결 거부
2) extractAccessToken 메서드 - 토큰 추출
- 요청 헤더에서 Authorization 값을 가져오고, Bearer 로 시작하는 토큰을 분리
- Bearer 접두어를 제외한 실제 JWT 토큰 값을 반환하며, 없을 경우 null 반환
3) verifyTokenAndStoreMemberId 메서드 - 토큰 검증 및 사용자 정보 저장
- jwtProvider.verify(accessToken): JWT 토큰을 검증하고, 유효한 경우 해당 토큰에 포함된 사용자 ID(socialId)를 반환memberRepository.findBySocialId(socialId): 토큰에서 추출한 socialId를 사용해 회원 정보를 데이터베이스에서 조회합니다.
👉🏻 사용자가 없으면 MemberNotFoundException을 던짐 - 회원 정보가 정상적으로 조회되면, 해당 사용자의 memberId를 WebSocket의 세션 속성에 저장하여 이후 메시징 기능에서 사용할 수 있도록 함
4) afterHandshake 메서드 - 핸드셰이크 후 처리
- afterHandshake: WebSocket 연결이 성공적으로 이루어진 후에 추가적인 처리가 필요할 경우 사용 가능
- 지금은 일단 구현하지 않았습니다!
✅ HttpHandshakeInterceptor 연결 흐름
- WebSocket 연결 요청:
- 클라이언트가 WebSocket을 통해 서버에 연결을 요청
- 이 때 beforeHandshake 메서드가 호출됨
- JWT 토큰 검증:
- beforeHandshake에서 클라이언트 요청 헤더에서 JWT 토큰을 추출하고 검증
- 토큰이 유효하면 사용자 정보를 가져와 세션에 저장
- 유효하지 않은 토큰 처리:
- 토큰이 없거나 유효하지 않으면, 서버는 SC_UNAUTHORIZED 상태 코드와 함께 오류 메시지를 반환하고, WebSocket 연결을 거부
- 사용자 정보 저장:
- 유효한 토큰으로 인증된 사용자의 정보를 WebSocket 세션에 저장하여, 이후 메시징 처리에 사용할 수 있도록 함.
- 연결 후 추가 처리:
- afterHandshake 메서드에서는 연결 후 추가 처리 작업을 할 수 있으며, 현재 코드에서는 구현되지 않음
다음 게시글에서는 MongoDB 설정 부분에 대해 다룰 예정입니다 !

728x90
반응형
'프로젝트 > Wedle' 카테고리의 다른 글
[Spring Boot/Kafka/Stomp] 실시간 채팅 구현 – 5. Redis와 Kafka를 활용한 실시간 채팅 시스템 구현: WebSocket 기반의 고성능 메시징 (1) | 2025.03.19 |
---|---|
[Spring Boot/Kafka/Stomp] 실시간 채팅 구현 – 4. MongoDB 설정 및 연동 (0) | 2025.03.19 |
[Spring Boot] 실시간 채팅 구현 – 2. Kafka 설치 (0) | 2025.03.10 |
[AWS] AWS S3 이용한 파일 업로드 - 프로필사진 등록 📸 (1) | 2025.03.07 |
[Spring Boot] 실시간 채팅 구현 – 1. 기술 스택 선정(WebSocket, Stomp Kafka, MongoDB) (2) | 2025.03.07 |