2024.10.26 - [Programming/Spring] - [Spring Boot] WebSocket, Kafka 채팅 서버 구현 (1)
이제 Kafka 를 사용해서 채팅 서버를 구현해보고자 한다.
추가 환경 설정
하지만 이전에 해당 글을 통해 Kafka 서버 실행을 Docker-compose 를 통해 한 번에 실행하며, 추가적으로 Kafka-UI 까지 사용할 수 있었다.
2024.10.31 - [Programming/Coding] - Docker 를 통한 일관된 개발 환경 배포 방법
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 2181:2181
networks:
- kafka-net
kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 # 포트 변경
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
- kafka-net
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
depends_on:
- kafka
ports:
- 9091:8080
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
networks:
- kafka-net
networks:
kafka-net:
driver: bridge
STOMP 테스터
추가적으로 다른 글로 작성했듯이 테스트를 위한 툴이 부족해서 직접 만들어 사용하게 되었다.
2024.10.28 - [Programming/Coding] - [WebSocket] STOMP 테스트를 위한 블로그 사이트
https://min9805.github.io/stomp/
[STOMP Tester]
min9805.github.io
Kafka-UI
Docker-compose 를 통해서 Kafka-UI 까지 구동할 수 있다. Topic 탭에서 메세지들을 들여다볼 수 있고 제대로 메세지가 잘 가는 것을 확인할 수 있다.
Json 데이터 보내기
우선 추가적인 작업을 위해 Object 메세지를 발행하고자 했다.
public ChatMessage sendMessage(ChatMessage message) {
String urlTopic = urlUtils.sanitizeUrl(message.getUrl());
kafkaTemplate.send(urlTopic, message.toString());
return message;
}
위와 같은 방법으로 우선 보내봤다.
발행된 메세지를 살펴보면 객체 형태같지만 Json 형태가 아니기에 Json 형태로 맞춰주는 것이 좋아보인다.
1. Object Mapper 사용
@Service
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final UrlUtils urlUtils;
public ChatMessage sendMessage(ChatMessage message) {
ObjectMapper mapper = new ObjectMapper();
String jsonString = "";
try{
jsonString = mapper.writeValueAsString(message);
} catch (Exception e) {
e.printStackTrace();
}
String urlTopic = urlUtils.sanitizeUrl(message.getUrl());
kafkaTemplate.send(urlTopic, jsonString);
return message;
}
}
첫 번째 방법은 kafkaTemplate 을 <String, String> 으로 받아서 직접 변환해서 사용하는 방법이다. 한 블로그에서는 직렬화 설정을 해두고 이 방법을 사용해서 의문이 있었다. 위 방법에서는 따로 config 설정 등이 없어도 제대로 동작하기 때문이다.
2. Kafka Config
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(properties, new StringSerializer(), new JsonSerializer());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
@Service
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final UrlUtils urlUtils;
public ChatMessage sendMessage(ChatMessage message) {
String urlTopic = urlUtils.sanitizeUrl(message.getUrl());
kafkaTemplate.send(urlTopic, message);
return message;
}
}
두 번째로 Config 를 사용하는 방법이다. yml 을 사용해서 설정값을 주입할 수도 있을 것 같다.
Config 를 생성하고 Key, Value 에 대해 직렬화 방법을 설정한다. 기본적인 설정을 그대로 사용하더라도 문제는 없다.
메세지 자체의 범용성을 위해 Value 값으로 Object 타입을 받는다. Object 타입을 받는 것이 여러 문제가 있을 수 있다는 것을 유념하자.
위처럼 실제 메세지에서 json 및 객체의 타입정보까지 한 번에 확인할 수 있다.
1번 방법은 Headers 에 타입 정보가 없기에 2 번 방법을 애용하도록 하자. 훨씬 간편하기도 하다.
Json 데이터 받기
Producer 에서 Json 데이터를 생성했으면 이제 Consumer 에서 이를 받아야한다.
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "chat-group");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class);
properties.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return new DefaultKafkaConsumerFactory<>(properties, new StringDeserializer(), new JsonDeserializer<>());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, ChatMessage> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, ChatMessage> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 패턴 매칭 활성화
factory.getContainerProperties().setMissingTopicsFatal(false);
return factory;
}
}
이를 위한 설정은 위와 같다. Cosumer 도 설정을 집어넣어 생성해준다. 여기서 주의할점은 역직렬화이므로 Deserializer() 를 사용해야한다.
@KafkaListener(topicPattern = ".*", groupId = "chat-group")
public void listenAll(ChatMessage message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
kafkaConsumer.sendMessage(topic, message);
}
현재 모든 토픽의 메세지를 받아와야하기에 패턴을 사용하였다. 이렇게 작성하면 모든 토픽의 메시지를 받아올 수 있고 물론 사용에 주의해야한다.
@KafkaListener(topicPattern = ".*", groupId = "chat-group")
public void listenAll(ChatMessage message,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
kafkaConsumer.sendMessage(topic, message);
}
<<< MESSAGE
destination:/sub/1HHCdeNBAS7RHJLT4qS6RR539Flez21ZOUuK8
content-type:application/json
subscription:sub-0
message-id:d61d225e-54ae-0a9e-3745-76c8a9558f99-0
content-length:94
{"message":"hello2","sender":"행복한띠용이","url":"https://happyzodiac.tistory.com/100"}
실제 메세지는 위와 같이 전달된다. 헤더에서 Topic 을 뽑아낼 수 있는 것을 확인할 수 있다.
@Service
@RequiredArgsConstructor
public class KafkaConsumer {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final SimpMessagingTemplate messagingTemplate;
public ChatMessage sendMessage(String topic, ChatMessage message) {
messagingTemplate.convertAndSend("/sub/" + topic, message);
return message;
}
}
이를 현재 웹소켓으로 Pub/Sub 되어있는 노드에 메세지를 보내주기만 하면 된다.
실제로 메세지가 정상적으로 도착하는 것까지 확인할 수 있다!
이제 해당 서버를 활용해보자
'Programming > Spring' 카테고리의 다른 글
[Spring Boot] 동시성 제어 with 비관적 락, Redis 그리고 @Transactional 사용 시 동시성 문제점 (5) | 2024.11.19 |
---|---|
[Spring Boot] WebSocket, Kafka 채팅 서버 및 크롬 확장자 구현(3) (1) | 2024.11.04 |
[Spring] @ModelAttribute 사용 방법과 원리 by 생성자 개수, Setter (0) | 2024.10.31 |
[Spring Boot] WebSocket, Kafka 채팅 서버 구현 (1) (1) | 2024.10.26 |
[Spring Boot] SSE 를 통한 대기열 서비스 개선, 불필요한 폴링 제거 (4) | 2024.10.26 |