Kafka 를 사용한 채팅 서버를 구축해보고자 한다.
Kafka 를 통해서 Pub/Sub 메세지를 소화하고, 클라이언트와 서버와의 연결은 WebSocket 을 사용할 예정이다.
메세지의 기록이 필요하다면 MongoDB 를 사용해서 저장한다.
1. 환경설정
우선 Kafka 설치가 필요하다.
Kafka 는 기본적으로 브로커, 프로듀서, 컨슈머로 구성되어있다. 토픽과 파티션처럼 데이터를 토픽별 관리하고 분산 저장되는 서비스이다. 또한 Kafka 클러스터는 각 파티션에 대해 Leader 와 Follower 를 유지한다.
이러한 분산 서비스에서는 노드들을 관리하기 위해 Zookeeper 가 필수적이다.
Kafka 2.8.0 이후로는 Zookeeper 없이도 Kafka 클러스터를 관리할 수 있는 KRaft (Kafka Raft) 모드가 도입되었다. KRaft 모드에서는 Kafka 자체에서 클러스터와 메타데이터를 관리할 수 있어, 향후에는 Zookeeper 의존성을 완전히 제거하는 것이 목표라고 한다.
C:\devtools\kafka_2.13-2.8.0>.\bin\windows\zookeeper-server-start.bat config\zookeeper.properties
일단 Zookeeper 실행 후 Kafka 를 실행해야하기에 우선 해당 명령어를 통해 Zookeeper 를 실행시켜야한다.
C:\devtools\kafka_2.13-2.8.0>.\bin\windows\kafka-server-start.bat config\server.properties
해당 명령어는 Kafka 를 실행시키는 명령어이다.
여기까지 문제없이 진행되었다면 Kafka 설치 및 실행이 완료되었다!
이제 본격적으로 Spring Boot 를 통한 채팅 서버를 구현해보자.
2. Spring Boot 환경 설정
2.1 build.gradle
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-data-mongodb'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-websocket'
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
의존성은 말했다 시피 채팅을 위한 WebSocket 과 Kafka 로 구성되어있다.
또한 채팅 내역을 저장한다면 NoSQL 에 저장할 생각이기에 MongoDB 연결 의존성 또한 추가해줬다.
2.2 application.yml
spring:
kafka:
consumer:
bootstrap-servers: localhost:9092
group-id: chat
producer:
bootstrap-servers: localhost:9092
Kafka 의 컨슈머와 프로듀서의 연결을 지정해준다.
group-id 는 컨슈머 그룹을 식별하는 고유한 ID 값이다. 해당 ID 는 메세지를 소비하는 컨슈머들이 소속된 그룹을 나타낸다!
만약 컨슈머가 여러 개 존재하는 분산 서버 환경이라면 같은 group-id 를 가진 컨슈머끼리 협력하여 메세지를 소비하기에 같은 파티션의 메세지를 중복 없이 처리할 수 있다. 즉, 메세지의 병렬 처리가 가능하다.
2.3 Producer, Consumer Service
가장 간단한 구조를 만들어보자.
@Service
@RequiredArgsConstructor
public class ChatProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
kafkaTemplate.send("chat-topic", message);
}
}
Producer 는 메세지를 KafkaTemplate 을 가져와 `chat-topic` 해당 토픽에 메세지를 발행한다.
@Service
public class ChatConsumer {
@KafkaListener(topics = "chat-topic", groupId = "chat")
public void listen(String message) {
System.out.println("Received: " + message);
}
}
Consumer 는 어노테이션을 통해 특정 토픽을 구독하고 있고, 해당 메세지를 소비한다.
여기서도 groupId 가 사용된다. yml 에서 이미 설정해주었지만 모든 리스너에는 필수적으로 명시해줘야한다.
반대로 yml 에 있는 groupId 를 삭제하더라도 문제는 없다.
2.4 Controller
@RestController
@RequiredArgsConstructor
public class ChatController {
private final ChatProducer chatProducer;
@PostMapping("/send")
public String sendMessage(@RequestParam String message) {
chatProducer.sendMessage(message);
return "Message sent: " + message;
}
}
가장 간단하게 메세지를 발행하는 API 이다.
2.5 테스트
요청을 보냈을 때 발행 이후 해당 토픽을 제대로 읽어와서 로그가 발생하는 것을 확인할 수 있다.
추가로 해당 토픽을 확인해보면 실제로 메세지들이 담겨있는 것을 확인할 수 있다.
여기까지 Kafka 연동을 확인하고 간단한 메세지 발행을 해봤다.
이후 게시글에서는 실제 WebSocket 을 사용한 클라이언트 연결까지 해보겠다.
'Programming > Spring' 카테고리의 다른 글
[Spring Boot] WebSocket, Kafka 채팅 서버 구현 (2) (8) | 2024.11.04 |
---|---|
[Spring] @ModelAttribute 사용 방법과 원리 by 생성자 개수, Setter (0) | 2024.10.31 |
[Spring Boot] SSE 를 통한 대기열 서비스 개선, 불필요한 폴링 제거 (3) | 2024.10.26 |
[Spring Boot] 트래픽 우회, 대기열 서비스 구현 (0) | 2024.09.01 |
[Spring Boot] 랭킹서비스 Redis vs DB 성능 비교 (1) | 2024.09.01 |