728x90
반응형
1️⃣ Kafka와 Redis를 활용한 실시간 채팅 시스템 개요
Kafka는 메시징 큐 시스템으로, 높은 처리량과 분산 시스템을 통해 메시지를 효율적으로 처리할 수 있습니다. Redis는 빠른 데이터 저장과 조회를 지원하는 인메모리 데이터베이스로, 채팅방 참가자 목록이나 세션 정보를 저장하는 데 사용됩니다.
이번 게시글에서는 Kafka와 Redis를 설정하고, 이를 사용한 WebSocket 기반의 실시간 채팅 시스템을 어떻게 구현할 수 있는지에 대해 설명하겠습니다!
2️⃣ Kafka Consumer와 Producer 설정
Kafka를 사용하여 실시간 메시지를 주고받기 위한 Consumer와 Producer를 설정해야합니다. Kafka를 사용하면 각 사용자가 송신한 메시지를 여러 채팅방에 배포하거나, 실시간으로 알림을 전달할 수 있습니다.
Kafka Consumer 설정
// Kafka 리스너 설정을 활성화하는 어노테이션
@EnableKafka
@Configuration
public class ListenerConfiguration {
// Kafka 서버 주소를 설정값으로부터 읽어오는 어노테이션
@Value("${kafka.server}")
private String kafkaServer;
// Kafka Consumer의 그룹 ID를 설정값으로부터 읽어오는 어노테이션
@Value("${kafka.consumer.id}")
private String kafkaConsumerId;
@Bean
// Kafka 리스너 컨테이너 팩토리 생성 (메시지 처리용)
ConcurrentKafkaListenerContainerFactory<String, Message> kafkaListenerContainerFactory() {
// 새로운 Kafka 리스너 컨테이너 팩토리 객체를 생성
ConcurrentKafkaListenerContainerFactory<String, Message> factory = new ConcurrentKafkaListenerContainerFactory<>();
// Kafka 리스너가 사용할 Consumer Factory를 설정
factory.setConsumerFactory(kafkaConsumerFactory());
// 리스너의 병렬 처리 수를 2로 설정 (동시 처리할 컨슈머 수)
factory.setConcurrency(2);
return factory;
}
// Kafka Consumer 설정을 위한 Consumer Factory 생성
@Bean
public ConsumerFactory<String, Message> kafkaConsumerFactory() {
// Message 객체를 처리할 JsonDeserializer 생성
JsonDeserializer<Message> deserializer = new JsonDeserializer<>();
// 모든 패키지를 신뢰하는 설정 (주로 Jackson을 사용해 JSON을 Java 객체로 변환)
deserializer.addTrustedPackages("*");
// Kafka Consumer 설정을 Map으로 정의
Map<String, Object> consumerConfigurations = ImmutableMap.<String, Object>builder()
.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer) // Kafka 서버 주소
.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerId) // Kafka Consumer의 그룹 ID
.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) // 메시지의 키 디시리얼라이저 설정
.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer) // 메시지의 값 디시리얼라이저 설정
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") // 컨슈머가 읽을 오프셋 설정 (최신부터 읽음)
.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "10000") // 한 번에 가져올 최소 바이트 수 설정
.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "200") // 데이터를 가져오기 전 최대 대기 시간 설정
.build();
// 설정된 Consumer 설정을 사용하여 Consumer Factory 생성
return new DefaultKafkaConsumerFactory<>(consumerConfigurations, new StringDeserializer(), deserializer);
}
// 알림(Notification) 메시지를 처리할 Kafka 리스너 설정
@Bean
// Kafka 리스너 컨테이너 팩토리 생성 (알림 처리용)
ConcurrentKafkaListenerContainerFactory<String, Notification> kafkaListenerContainerFactory2() {
// 새로운 Kafka 리스너 컨테이너 팩토리 객체를 생성
ConcurrentKafkaListenerContainerFactory<String, Notification> factory = new ConcurrentKafkaListenerContainerFactory<>();
// Kafka 리스너가 사용할 Consumer Factory를 설정 (알림용)
factory.setConsumerFactory(kafkaNotificationConsumer());
// 리스너의 병렬 처리 수를 2로 설정 (동시 처리할 컨슈머 수)
factory.setConcurrency(2);
return factory;
}
// Kafka Consumer 설정을 위한 Consumer Factory 생성 (알림용)
@Bean
public ConsumerFactory<String, Notification> kafkaNotificationConsumer() {
// Notification 객체를 처리할 JsonDeserializer 생성
JsonDeserializer<Notification> deserializer = new JsonDeserializer<>();
// 모든 패키지를 신뢰하는 설정 (주로 Jackson을 사용해 JSON을 Java 객체로 변환)
deserializer.addTrustedPackages("*");
// Kafka Consumer 설정을 Map으로 정의
Map<String, Object> consumerConfigurations =
ImmutableMap.<String, Object>builder()
.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer) // Kafka 서버 주소
.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerId) // Kafka Consumer의 그룹 ID
.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) // 메시지의 키 디시리얼라이저 설정
.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer) // 메시지의 값 디시리얼라이저 설정
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") // 컨슈머가 읽을 오프셋 설정 (최신부터 읽음)
.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "10000") // 한 번에 가져올 최소 바이트 수 설정
.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "200") // 데이터를 가져오기 전 최대 대기 시간 설정
.build();
// 설정된 Consumer 설정을 사용하여 Consumer Factory 생성 (알림용)
return new DefaultKafkaConsumerFactory<>(consumerConfigurations, new StringDeserializer(), deserializer);
}
}
- ListenerConfiguration
- Kafka를 사용하는 리스너 설정
- 두 가지 타입의 메시지(Message, Notification)를 처리하는 리스너를 구성
- 각 리스너는 Kafka의 소비자(Consumer)로부터 메시지를 받아오는 역할을 하며, 병렬 처리(concurrency) 수를 설정
- kafkaListenerContainerFactory
- Message 객체를 처리할 Kafka 리스너 설정
- 리스너는 kafkaConsumerFactory() 메서드에서 생성된 소비자(Consumer) 객체를 사용하여 Kafka 서버로부터 메시지를 받아옴
- kafkaConsumerFactory
- Kafka 소비자(Consumer) 설정을 위한 ConsumerFactory 객체 생성
- 이 설정은 Kafka 서버와 연결하고, 메시지의 직렬화/역직렬화 방식(JsonDeserializer)을 설정
- kafkaListenerContainerFactory2
- Notification 객체를 처리할 Kafka 리스너를 설정하는 메서드
- Message 처리와 동일한 방식으로 구성됨
- kafkaNotificationConsumer
- 알림(Notification)을 처리할 Kafka 소비자(Consumer)를 설정하는 메서드
- Message 처리와 동일한 방식으로 처리
Kafka Producer 설정
@EnableKafka // Kafka의 기능을 사용하기 위한 어노테이션
@Configuration
public class ProducerConfiguration {
@Value("${kafka.server}") // Kafka 서버의 주소를 애플리케이션 설정 파일에서 가져옴
private String kafkaServer;
// 메시지 Producer 설정
@Bean
public ProducerFactory<String, Message> producerFactory() {
// ProducerFactory를 생성하여 메시지를 생산할 때 사용할 Kafka 설정을 반환
return new DefaultKafkaProducerFactory<>(chatProducerConfigurations());
}
// 메시지 Producer Configurations
@Bean
public Map<String, Object> chatProducerConfigurations() {
// 메시지 전송을 위한 Kafka Producer의 설정 정보를 반환
return ImmutableMap.<String, Object>builder()
.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer) // Kafka 서버 주소 설정
.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) // 메시지 키를 직렬화하는 클래스 설정
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class) // 메시지 값을 JSON 형식으로 직렬화하는 클래스 설정
.build();
}
// Kafka Template (메시지 전송용)
@Bean
public KafkaTemplate<String, Message> kafkaTemplate() {
// KafkaTemplate을 사용하여 메시지를 전송할 수 있게 설정
return new KafkaTemplate<>(producerFactory());
}
// 알림 Producer 설정
@Bean
public ProducerFactory<String, Notification> notificationProducerFactory() {
// Notification 메시지를 전송하기 위한 Kafka Producer의 팩토리 설정
return new DefaultKafkaProducerFactory<>(notificationProducerConfigurations());
}
// 알림 Producer Configurations
@Bean
public Map<String, Object> notificationProducerConfigurations() {
// 알림 전송을 위한 Kafka Producer의 설정 정보를 반환
return ImmutableMap.<String, Object>builder()
.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServer) // Kafka 서버 주소 설정
.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) // 알림의 키를 직렬화하는 클래스 설정
.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class) // 알림의 값을 JSON 형식으로 직렬화하는 클래스 설정
.build();
}
// 알림 Kafka Template
@Bean
public KafkaTemplate<String, Notification> notificationKafkaTemplate() {
// KafkaTemplate을 사용하여 알림 메시지를 전송할 수 있게 설정
return new KafkaTemplate<>(notificationProducerFactory());
}
}
- ProducerFactory
- Kafka 메시지 생산을 위한 팩토리 객체 생성
- ProducerFactory는 Kafka에 메시지를 보낼 수 있는 객체를 생성하는 데 사용됨
- chatProducerConfigurations()
- 메시지를 Kafka로 보낼 때 필요한 설정 제공
- 설정에는 Kafka 서버 주소, 키와 값의 직렬화 클래스 등이 포함됨
- KafkaTemplate
- Kafka에 메시지를 보내는 역할
- KafkaTemplate을 생성할 때 ProducerFactory를 전달하여 메시지를 생산할 수 있도록 함
- notificationProducerFactory()
- 알림 메시지를 Kafka로 보내기 위한 ProducerFactory를 생성
- notificationProducerConfigurations()에서 정의된 설정을 사용하여 알림 메시지를 전송하는 데 필요한 Kafka 설정을 제공
- notificationProducerConfigurations()
- 알림 메시지 전송에 필요한 설정 제공
- chatProducerConfigurations()와 비슷하지만, 알림 메시지에 맞는 직렬화 클래스를 설정
- notificationKafkaTemplate()
- 알림 메시지를 전송하는 KafkaTemplate을 생성
Kafka 설정 파일
spring:
kafka:
producer:
bootstrap-servers: ${kafka.server}
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
bootstrap-servers: ${kafka.server}
group-id: ${kafka.consumer.id}
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: latest
fetch-min-bytes: 10000
fetch-max-wait-ms: 200
zipkin:
sender:
type: kafka
kafka:
server: localhost:9092
consumer:
id: teenspot-consumer
✔️ Producer
- producer.bootstrap-servers: Kafka 서버 주소
- producer.key-serializer
- Kafka에 보내는 메시지의 key를 직렬화할 방법
- 여기서는 StringSerializer를 사용하여 문자열로 직렬화했음
- producer.value-serializer
- Kafka에 보내는 메시지의 value를 직렬화할 방법
- JsonSerializer를 사용하여 객체를 JSON 형식으로 직렬화
👉🏻 객체를 Kafka로 전송할 때 JSON 포맷으로 변환하여 전송하도록 합니다.
✔️ Consumer
- consumer.bootstrap-servers : Kafka 서버 주소
- consumer.group-id
- Kafka Consumer Group의 ID 설정
- 여러 개의 소비자가 있을 때, 이 ID는 Kafka가 메시지를 분배하는 방식을 결정하는 데 사용됨
- consumer.key-deserializer: Kafka에서 받은 메시지의 key를 역직렬화하는 방법
- consumer.value-deserializer:
- Kafka에서 받은 메시지의 value를 역직렬화하는 방법
- JsonDeserializer를 사용하여 JSON 형식의 메시지를 Java 객체로 역직렬화
👉🏻 Kafka에서 수신한 메시지를 Java 객체로 변환할 수 있습니다.
- consumer.auto-offset-reset
- Kafka Consumer가 메시지의 오프셋을 찾을 수 없을 때, Kafka가 어떻게 처리할지를 결정
- latest로 설정되어 있으면, Kafka는 가장 최신 메시지부터 소비하기 시작
- consumer.fetch-min-bytes
- Kafka Consumer가 데이터를 소비할 때, 최소한의 바이트 수 설정
- 최소한 이 크기만큼의 메시지를 받아올 때까지 기다림
- consumer.fetch-max-wait-ms
- Consumer가 데이터를 받아올 때 최대 대기 시간 설정
- 이 값은 fetch-min-bytes와 결합되어 설정된 크기의 데이터를 수신할 때까지 기다리는 최대 시간
✔️ Zipkin 설정 (빈 설정)
- zipkin.sender.type
- Zipkin은 분산 추적 시스템
- 요청이 여러 서비스로 전달될 때 각 서비스 간의 호출을 추적하는 데 사용됨
- type: kafka로 설정되어 있으면, Zipkin 추적 정보가 Kafka를 통해 전송되도록 설정됨
✔️ Kafka 서버 주소 및 Consumer ID
- kafka.server : Kafka 서버의 주소 설정
- kafka.consumer.id
3️⃣ Redis 설정
Redis는 실시간 채팅 시스템에서 빠르게 데이터를 조회하고 저장하는 데 사용됩니다. 여기서는 채팅방 참가자 정보를 Redis에 저장하고 관리하는 역할입니다.
Redis 설정 파일
spring:
redis:
host: localhost
port: 6379
- spring.redis.host : Redis 서버의 호스트 이름 설정
- spring.redis.port :
- Redis 서버의 포트 설정
- 기본적으로 Redis는 6379 포트 사용
✨ ChatRoomParticipant 클래스
@Builder // 빌더 패턴을 활성화하여 객체 생성 시 코드 가독성을 높임
@Getter // 모든 필드에 대한 getter 메서드를 자동으로 생성
@AllArgsConstructor // 모든 필드를 인자로 받는 생성자 자동 생성
@NoArgsConstructor // 기본 생성자 자동 생성
@RedisHash(value = "chatRoomParticipant") // Redis에서 이 객체를 "chatRoomParticipant"라는 이름으로 저장
public class ChatRoomParticipant {
@Id // Redis에서 고유 키로 사용할 필드에 대해 지정
private String id; // 채팅방 참가자 ID (Redis의 Key 역할을 함)
@Indexed // Redis에서 memberId에 대해 인덱스를 생성하여 빠르게 검색할 수 있도록 함
private Long memberId; // 사용자의 ID (채팅방에 참여한 사용자를 식별)
@Indexed // chatRoomId에 대해서도 인덱스를 생성하여 빠르게 검색할 수 있도록 함
private Long chatRoomId; // 채팅방 ID (어떤 채팅방에 참여했는지를 식별)
private LocalDateTime joinedAt; // 사용자가 채팅방에 참가한 시간
public ChatRoomParticipant(Long memberId, Long chatRoomId) {
this.memberId = memberId; // 생성자에서 memberId 필드 초기화
this.chatRoomId = chatRoomId; // 생성자에서 chatRoomId 필드 초기화
this.joinedAt = LocalDateTime.now(); // joinedAt 필드는 현재 시간을 자동으로 설정
}
}
- @RedisHash(value = "chatRoomParticipant")
- Redis에서 chatRoomParticipant라는 이름의 해시(Hash)로 저장될 객체임을 지정하는 어노테이션
- 이 클래스를 Redis의 해시 형태로 저장하겠다는 의미
- @Id
- 이 필드는 Redis에서 객체의 고유 ID로 사용될 식별자 필드
- Redis에서 데이터를 저장할 때 각 객체는 고유한 ID로 구별됨
- 이 id는 Redis의 key 역할
- @Indexed
- memberId와 chatRoomId 필드에 인덱스를 생성하여 검색 성능을 높임
- Redis에서 이 필드를 기준으로 데이터를 빠르게 검색할 수 있도록 도움
- joinedAt
- ChatRoomParticipant가 채팅방에 참가한 시간을 기록하는 필드
- LocalDateTime을 사용하여 현재 시간을 기록
✨ ChatRoomParticipantRedisRepository 인터페이스
public interface ChatRoomParticipantRedisRepository extends CrudRepository<ChatRoomParticipant, String> {
List<ChatRoomParticipant> findByMemberId(Long memberId); // memberId로 참여자 목록을 조회하는 메서드
List<ChatRoomParticipant> findByChatRoomId(Long chatRoomId); // chatRoomId로 참여자 목록을 조회하는 메서드
}
- ChatRoomParticipantRedisRepository
- Redis에 저장된 ChatRoomParticipant 객체에 대해 CRUD 작업을 수행하는 Repository
- CrudRepository를 상속받음으로써 기본적인 CRUD 작업을 자동으로 제공
- findByMemberId : memberId로 ChatRoomParticipant 리스트를 검색하는 메서드
- findByChatRoomId : chatRoomId로 ChatRoomParticipant 리스트를 검색하는 메서드
다음 게시글에서는 실시간 채팅 구현 로직에 대해 다룰 예정입니다 !

728x90
반응형
'프로젝트 > Wedle' 카테고리의 다른 글
[Spring Boot/Kafka/Stomp] 실시간 채팅 구현 – 7. 채팅 전송 및 조회 (0) | 2025.03.19 |
---|---|
[Spring Boot/Kafka/Stomp] 실시간 채팅 구현 - 6. 채팅방 생성 (0) | 2025.03.19 |
[Spring Boot/Kafka/Stomp] 실시간 채팅 구현 – 4. MongoDB 설정 및 연동 (0) | 2025.03.19 |
[Spring Boot/Kafka/Stomp] 실시간 채팅 구현 – 3. WebSocket 연결 및 핸들러 구현 (0) | 2025.03.19 |
[Spring Boot] 실시간 채팅 구현 – 2. Kafka 설치 (0) | 2025.03.10 |