[Spring Boot/Kafka/Stomp] 실시간 채팅 구현 – 5. Redis와 Kafka를 활용한 실시간 채팅 시스템 구현: WebSocket 기반의 고성능 메시징

2025. 3. 19. 17:38·프로젝트/Wedle
728x90
반응형

1️⃣ Kafka와 Redis를 활용한 실시간 채팅 시스템 개요

Kafka는 메시징 큐 시스템으로, 높은 처리량과 분산 시스템을 통해 메시지를 효율적으로 처리할 수 있습니다. Redis는 빠른 데이터 저장과 조회를 지원하는 인메모리 데이터베이스로, 채팅방 참가자 목록이나 세션 정보를 저장하는 데 사용됩니다.

이번 게시글에서는 Kafka와 Redis를 설정하고, 이를 사용한 WebSocket 기반의 실시간 채팅 시스템을 어떻게 구현할 수 있는지에 대해 설명하겠습니다!


2️⃣ Kafka Consumer와 Producer 설정

Kafka를 사용하여 실시간 메시지를 주고받기 위한 Consumer와 Producer를 설정해야합니다. Kafka를 사용하면 각 사용자가 송신한 메시지를 여러 채팅방에 배포하거나, 실시간으로 알림을 전달할 수 있습니다.

Kafka Consumer 설정

// Kafka 리스너 설정을 활성화하는 어노테이션
@EnableKafka
@Configuration
public class ListenerConfiguration {

    // Kafka 서버 주소를 설정값으로부터 읽어오는 어노테이션
    @Value("${kafka.server}")
    private String kafkaServer;

    // Kafka Consumer의 그룹 ID를 설정값으로부터 읽어오는 어노테이션
    @Value("${kafka.consumer.id}")
    private String kafkaConsumerId;

    @Bean
    // Kafka 리스너 컨테이너 팩토리 생성 (메시지 처리용)
    ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() {
        // 새로운 Kafka 리스너 컨테이너 팩토리 객체를 생성
        ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
        
        // Kafka 리스너가 사용할 Consumer Factory를 설정
        factory.setConsumerFactory(kafkaConsumerFactory());
        
        // 리스너의 병렬 처리 수를 2로 설정 (동시 처리할 컨슈머 수)
        factory.setConcurrency(2);
        
        return factory;
    }

    // Kafka Consumer 설정을 위한 Consumer Factory 생성
    @Bean
    public ConsumerFactory<String, Message> kafkaConsumerFactory() {
        // Message 객체를 처리할 JsonDeserializer 생성
        JsonDeserializer<Message> deserializer = new JsonDeserializer<>();
        
        // 모든 패키지를 신뢰하는 설정 (주로 Jackson을 사용해 JSON을 Java 객체로 변환)
        deserializer.addTrustedPackages("*");

        // Kafka Consumer 설정을 Map으로 정의
        Map<String, Object> consumerConfigurations = ImmutableMap.<String, Object>builder()
                .put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer)  // Kafka 서버 주소
                .put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerId)      // Kafka Consumer의 그룹 ID
                .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)  // 메시지의 키 디시리얼라이저 설정
                .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)  // 메시지의 값 디시리얼라이저 설정
                .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")  // 컨슈머가 읽을 오프셋 설정 (최신부터 읽음)
                .put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "10000")  // 한 번에 가져올 최소 바이트 수 설정
                .put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "200")  // 데이터를 가져오기 전 최대 대기 시간 설정
                .build();
        
        // 설정된 Consumer 설정을 사용하여 Consumer Factory 생성
        return new DefaultKafkaConsumerFactory<>(consumerConfigurations, new StringDeserializer(), deserializer);
    }

    // 알림(Notification) 메시지를 처리할 Kafka 리스너 설정
    @Bean
    // Kafka 리스너 컨테이너 팩토리 생성 (알림 처리용)
    ConcurrentKafkaListenerContainerFactory<String, Notification> kafkaListenerContainerFactory2() {
        // 새로운 Kafka 리스너 컨테이너 팩토리 객체를 생성
        ConcurrentKafkaListenerContainerFactory<String, Notification> factory = new ConcurrentKafkaListenerContainerFactory<>();
        
        // Kafka 리스너가 사용할 Consumer Factory를 설정 (알림용)
        factory.setConsumerFactory(kafkaNotificationConsumer());
        
        // 리스너의 병렬 처리 수를 2로 설정 (동시 처리할 컨슈머 수)
        factory.setConcurrency(2);
        
        return factory;
    }

    // Kafka Consumer 설정을 위한 Consumer Factory 생성 (알림용)
    @Bean
    public ConsumerFactory<String, Notification> kafkaNotificationConsumer() {
        // Notification 객체를 처리할 JsonDeserializer 생성
        JsonDeserializer<Notification> deserializer = new JsonDeserializer<>();
        
        // 모든 패키지를 신뢰하는 설정 (주로 Jackson을 사용해 JSON을 Java 객체로 변환)
        deserializer.addTrustedPackages("*");

        // Kafka Consumer 설정을 Map으로 정의
        Map<String, Object> consumerConfigurations =
                ImmutableMap.<String, Object>builder()
                        .put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer)  // Kafka 서버 주소
                        .put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerId)      // Kafka Consumer의 그룹 ID
                        .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)  // 메시지의 키 디시리얼라이저 설정
                        .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)  // 메시지의 값 디시리얼라이저 설정
                        .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")  // 컨슈머가 읽을 오프셋 설정 (최신부터 읽음)
                        .put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "10000")  // 한 번에 가져올 최소 바이트 수 설정
                        .put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "200")  // 데이터를 가져오기 전 최대 대기 시간 설정
                        .build();
        
        // 설정된 Consumer 설정을 사용하여 Consumer Factory 생성 (알림용)
        return new DefaultKafkaConsumerFactory<>(consumerConfigurations, new StringDeserializer(), deserializer);
    }
}

 

  • ListenerConfiguration
    • Kafka를 사용하는 리스너 설정
    • 두 가지 타입의 메시지(Message, Notification)를 처리하는 리스너를 구성
    • 각 리스너는 Kafka의 소비자(Consumer)로부터 메시지를 받아오는 역할을 하며, 병렬 처리(concurrency) 수를 설정
  • kafkaListenerContainerFactory
    • Message 객체를 처리할 Kafka 리스너 설정
    • 리스너는 kafkaConsumerFactory() 메서드에서 생성된 소비자(Consumer) 객체를 사용하여 Kafka 서버로부터 메시지를 받아옴
  • kafkaConsumerFactory
    • Kafka 소비자(Consumer) 설정을 위한 ConsumerFactory 객체 생성
    • 이 설정은 Kafka 서버와 연결하고, 메시지의 직렬화/역직렬화 방식(JsonDeserializer)을 설정
  • kafkaListenerContainerFactory2
    • Notification 객체를 처리할 Kafka 리스너를 설정하는 메서드
    • Message 처리와 동일한 방식으로 구성됨
  • kafkaNotificationConsumer
    • 알림(Notification)을 처리할 Kafka 소비자(Consumer)를 설정하는 메서드
    • Message 처리와 동일한 방식으로 처리

Kafka Producer 설정

@EnableKafka  // Kafka의 기능을 사용하기 위한 어노테이션
@Configuration
public class ProducerConfiguration {

    @Value("${kafka.server}")  // Kafka 서버의 주소를 애플리케이션 설정 파일에서 가져옴
    private String kafkaServer;

    // 메시지 Producer 설정
    @Bean
    public ProducerFactory<String, Message> producerFactory() {
        // ProducerFactory를 생성하여 메시지를 생산할 때 사용할 Kafka 설정을 반환
        return new DefaultKafkaProducerFactory<>(chatProducerConfigurations());
    }

    // 메시지 Producer Configurations
    @Bean
    public Map<String, Object> chatProducerConfigurations() {
        // 메시지 전송을 위한 Kafka Producer의 설정 정보를 반환
        return ImmutableMap.<String, Object>builder()
                .put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer)  // Kafka 서버 주소 설정
                .put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)  // 메시지 키를 직렬화하는 클래스 설정
                .put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class)  // 메시지 값을 JSON 형식으로 직렬화하는 클래스 설정
                .build();
    }

    // Kafka Template (메시지 전송용)
    @Bean
    public KafkaTemplate<String, Message> kafkaTemplate() {
        // KafkaTemplate을 사용하여 메시지를 전송할 수 있게 설정
        return new KafkaTemplate<>(producerFactory());
    }

    // 알림 Producer 설정
    @Bean
    public ProducerFactory<String, Notification> notificationProducerFactory() {
        // Notification 메시지를 전송하기 위한 Kafka Producer의 팩토리 설정
        return new DefaultKafkaProducerFactory<>(notificationProducerConfigurations());
    }

    // 알림 Producer Configurations
    @Bean
    public Map<String, Object> notificationProducerConfigurations() {
        // 알림 전송을 위한 Kafka Producer의 설정 정보를 반환
        return ImmutableMap.<String, Object>builder()
                .put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer)  // Kafka 서버 주소 설정
                .put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)  // 알림의 키를 직렬화하는 클래스 설정
                .put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class)  // 알림의 값을 JSON 형식으로 직렬화하는 클래스 설정
                .build();
    }

    // 알림 Kafka Template
    @Bean
    public KafkaTemplate<String, Notification> notificationKafkaTemplate() {
        // KafkaTemplate을 사용하여 알림 메시지를 전송할 수 있게 설정
        return new KafkaTemplate<>(notificationProducerFactory());
    }
}
  • ProducerFactory
    • Kafka 메시지 생산을 위한 팩토리 객체 생성
    • ProducerFactory는 Kafka에 메시지를 보낼 수 있는 객체를 생성하는 데 사용됨
  • chatProducerConfigurations()
    • 메시지를 Kafka로 보낼 때 필요한 설정 제공
    • 설정에는 Kafka 서버 주소, 키와 값의 직렬화 클래스 등이 포함됨
  • KafkaTemplate
    • Kafka에 메시지를 보내는 역할
    • KafkaTemplate을 생성할 때 ProducerFactory를 전달하여 메시지를 생산할 수 있도록 함
  • notificationProducerFactory()
    • 알림 메시지를 Kafka로 보내기 위한 ProducerFactory를 생성
    • notificationProducerConfigurations()에서 정의된 설정을 사용하여 알림 메시지를 전송하는 데 필요한 Kafka 설정을 제공
  • notificationProducerConfigurations()
    • 알림 메시지 전송에 필요한 설정 제공
    • chatProducerConfigurations()와 비슷하지만, 알림 메시지에 맞는 직렬화 클래스를 설정
  • notificationKafkaTemplate()
    • 알림 메시지를 전송하는 KafkaTemplate을 생성

Kafka 설정 파일

spring:
  kafka:
    producer:
      bootstrap-servers: ${kafka.server}
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
    consumer:
      bootstrap-servers: ${kafka.server}
      group-id: ${kafka.consumer.id}
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      auto-offset-reset: latest
      fetch-min-bytes: 10000
      fetch-max-wait-ms: 200
    zipkin:
    sender:
      type: kafka

kafka:
  server: localhost:9092
  consumer:
    id: teenspot-consumer

 

✔️ Producer

  • producer.bootstrap-servers: Kafka 서버 주소
  • producer.key-serializer
    • Kafka에 보내는 메시지의 key를 직렬화할 방법
    • 여기서는 StringSerializer를 사용하여 문자열로 직렬화했음
  • producer.value-serializer
    • Kafka에 보내는 메시지의 value를 직렬화할 방법
    • JsonSerializer를 사용하여 객체를 JSON 형식으로 직렬화
      👉🏻 객체를 Kafka로 전송할 때 JSON 포맷으로 변환하여 전송하도록 합니다.

 

✔️ Consumer

 

  • consumer.bootstrap-servers : Kafka 서버 주소
  • consumer.group-id
    • Kafka Consumer Group의 ID 설정
    • 여러 개의 소비자가 있을 때, 이 ID는 Kafka가 메시지를 분배하는 방식을 결정하는 데 사용됨 
  • consumer.key-deserializer: Kafka에서 받은 메시지의 key를 역직렬화하는 방법
  • consumer.value-deserializer:
    • Kafka에서 받은 메시지의 value를 역직렬화하는 방법
    • JsonDeserializer를 사용하여 JSON 형식의 메시지를 Java 객체로 역직렬화
      👉🏻 Kafka에서 수신한 메시지를 Java 객체로 변환할 수 있습니다.
  • consumer.auto-offset-reset
    • Kafka Consumer가 메시지의 오프셋을 찾을 수 없을 때, Kafka가 어떻게 처리할지를 결정
    • latest로 설정되어 있으면, Kafka는 가장 최신 메시지부터 소비하기 시작
  • consumer.fetch-min-bytes
    • Kafka Consumer가 데이터를 소비할 때, 최소한의 바이트 수 설정
    • 최소한 이 크기만큼의 메시지를 받아올 때까지 기다림
  • consumer.fetch-max-wait-ms
    • Consumer가 데이터를 받아올 때 최대 대기 시간 설정
    • 이 값은 fetch-min-bytes와 결합되어 설정된 크기의 데이터를 수신할 때까지 기다리는 최대 시간

 

✔️ Zipkin 설정 (빈 설정)

  • zipkin.sender.type
    • Zipkin은 분산 추적 시스템
    • 요청이 여러 서비스로 전달될 때 각 서비스 간의 호출을 추적하는 데 사용됨
    • type: kafka로 설정되어 있으면, Zipkin 추적 정보가 Kafka를 통해 전송되도록 설정됨

✔️ Kafka 서버 주소 및 Consumer ID

  • kafka.server : Kafka 서버의 주소 설정
  • kafka.consumer.id

3️⃣ Redis 설정

Redis는 실시간 채팅 시스템에서 빠르게 데이터를 조회하고 저장하는 데 사용됩니다. 여기서는 채팅방 참가자 정보를 Redis에 저장하고 관리하는 역할입니다.

Redis 설정 파일

spring:
    redis:
      host: localhost
      port: 6379

 

 

  • spring.redis.host : Redis 서버의 호스트 이름 설정
  • spring.redis.port :
    • Redis 서버의 포트 설정
    • 기본적으로 Redis는 6379 포트 사용

✨ ChatRoomParticipant 클래스

@Builder  // 빌더 패턴을 활성화하여 객체 생성 시 코드 가독성을 높임
@Getter  // 모든 필드에 대한 getter 메서드를 자동으로 생성
@AllArgsConstructor  // 모든 필드를 인자로 받는 생성자 자동 생성
@NoArgsConstructor  // 기본 생성자 자동 생성
@RedisHash(value = "chatRoomParticipant")  // Redis에서 이 객체를 "chatRoomParticipant"라는 이름으로 저장
public class ChatRoomParticipant {

    @Id  // Redis에서 고유 키로 사용할 필드에 대해 지정
    private String id;  // 채팅방 참가자 ID (Redis의 Key 역할을 함)
    
    @Indexed  // Redis에서 memberId에 대해 인덱스를 생성하여 빠르게 검색할 수 있도록 함
    private Long memberId;  // 사용자의 ID (채팅방에 참여한 사용자를 식별)
    
    @Indexed  // chatRoomId에 대해서도 인덱스를 생성하여 빠르게 검색할 수 있도록 함
    private Long chatRoomId;  // 채팅방 ID (어떤 채팅방에 참여했는지를 식별)
    
    private LocalDateTime joinedAt;  // 사용자가 채팅방에 참가한 시간
    
    public ChatRoomParticipant(Long memberId, Long chatRoomId) {
        this.memberId = memberId;  // 생성자에서 memberId 필드 초기화
        this.chatRoomId = chatRoomId;  // 생성자에서 chatRoomId 필드 초기화
        this.joinedAt = LocalDateTime.now();  // joinedAt 필드는 현재 시간을 자동으로 설정
    }
}
  • @RedisHash(value = "chatRoomParticipant")
    • Redis에서 chatRoomParticipant라는 이름의 해시(Hash)로 저장될 객체임을 지정하는 어노테이션
    • 이 클래스를 Redis의 해시 형태로 저장하겠다는 의미
  • @Id
    • 이 필드는 Redis에서 객체의 고유 ID로 사용될 식별자 필드
    • Redis에서 데이터를 저장할 때 각 객체는 고유한 ID로 구별됨
    • 이 id는 Redis의 key 역할
  • @Indexed
    • memberId와 chatRoomId 필드에 인덱스를 생성하여 검색 성능을 높임
    • Redis에서 이 필드를 기준으로 데이터를 빠르게 검색할 수 있도록 도움
  • joinedAt
    • ChatRoomParticipant가 채팅방에 참가한 시간을 기록하는 필드
    • LocalDateTime을 사용하여 현재 시간을 기록

✨ ChatRoomParticipantRedisRepository 인터페이스

public interface ChatRoomParticipantRedisRepository extends CrudRepository<ChatRoomParticipant, String> {
    List<ChatRoomParticipant> findByMemberId(Long memberId);  // memberId로 참여자 목록을 조회하는 메서드
    List<ChatRoomParticipant> findByChatRoomId(Long chatRoomId);  // chatRoomId로 참여자 목록을 조회하는 메서드
}
  • ChatRoomParticipantRedisRepository
    • Redis에 저장된 ChatRoomParticipant 객체에 대해 CRUD 작업을 수행하는 Repository
    • CrudRepository를 상속받음으로써 기본적인 CRUD 작업을 자동으로 제공
  • findByMemberId : memberId로 ChatRoomParticipant 리스트를 검색하는 메서드
  • findByChatRoomId : chatRoomId로 ChatRoomParticipant 리스트를 검색하는 메서드

다음 게시글에서는 실시간 채팅 구현 로직에 대해 다룰 예정입니다 !

 

728x90
반응형
저작자표시 비영리 변경금지 (새창열림)

'프로젝트 > Wedle' 카테고리의 다른 글

[Spring Boot/Kafka/Stomp] 실시간 채팅 구현 – 7. 채팅 전송 및 조회  (0) 2025.03.19
[Spring Boot/Kafka/Stomp] 실시간 채팅 구현 - 6. 채팅방 생성  (0) 2025.03.19
[Spring Boot/Kafka/Stomp] 실시간 채팅 구현 – 4. MongoDB 설정 및 연동  (0) 2025.03.19
[Spring Boot/Kafka/Stomp] 실시간 채팅 구현 – 3. WebSocket 연결 및 핸들러 구현  (0) 2025.03.19
[Spring Boot] 실시간 채팅 구현 – 2. Kafka 설치  (0) 2025.03.10
'프로젝트/Wedle' 카테고리의 다른 글
  • [Spring Boot/Kafka/Stomp] 실시간 채팅 구현 – 7. 채팅 전송 및 조회
  • [Spring Boot/Kafka/Stomp] 실시간 채팅 구현 - 6. 채팅방 생성
  • [Spring Boot/Kafka/Stomp] 실시간 채팅 구현 – 4. MongoDB 설정 및 연동
  • [Spring Boot/Kafka/Stomp] 실시간 채팅 구현 – 3. WebSocket 연결 및 핸들러 구현
예롱메롱
예롱메롱
  • 예롱메롱
    예롱이의 개발 블로그
    예롱메롱
  • 전체
    오늘
    어제
    • 전체보기 (274)
      • 프로젝트 (35)
        • Wedle (12)
        • 인스타그램 클론 코딩 (13)
        • 스프링 부트와 AWS로 혼자 구현하는 웹 서비스 (10)
      • 인프런 Spring 강의 정리 (79)
        • 스프링 입문 - 코드로 배우는 스프링 부트, 웹 .. (7)
        • Spring 핵심 원리 - 기본편 (9)
        • 모든 개발자를 위한 HTTP 웹 기본 지식 (8)
        • 자바 ORM 표준 JPA 프로그래밍 - 기본편 (11)
        • 실전! 스프링 부트와 JPA 활용1 - 웹 애플리.. (6)
        • 실전! 스프링 부트와 JPA 활용2 - API 개.. (5)
        • 실전! 스프링 데이터 JPA (7)
        • 스프링 MVC 1편 - 백엔드 웹 개발 핵심 기술 (7)
        • 스프링 MVC 2편 - 백엔드 웹 개발 활용 기술 (11)
        • 실전! Querydsl (8)
      • Cloud (3)
      • Spring (6)
        • spring boot (5)
        • 소셜로그인 (1)
      • Docker (2)
      • DevOps (0)
      • Coding Test (114)
        • Programmers (37)
        • Baekjoon (76)
      • KB It's Your Life 6기 (1)
      • CS (18)
        • 알고리즘 (13)
        • 컴퓨터 구조 (1)
        • Operating System (0)
        • Network (0)
        • Database (4)
      • git (1)
      • Language (15)
        • Java (5)
        • C++ (6)
        • Python (4)
    • GITHUB GITHUB
    • INSTAGRAM INSTAGRAM
  • hELLO· Designed By정상우.v4.10.3
예롱메롱
[Spring Boot/Kafka/Stomp] 실시간 채팅 구현 – 5. Redis와 Kafka를 활용한 실시간 채팅 시스템 구현: WebSocket 기반의 고성능 메시징
상단으로

티스토리툴바