1. 개요
모의 투자 서비스를 개발하면서 시세 데이터만 1초에 4~5개가 들어오곤 한다. 결국 프론트엔드의 모든 API 엔드포인트를 로컬 서버로 옮겨오기 위해서는 실시간 데이터를 가공해서 분봉 데이터 형태로 나타낼 수 있어야한다. 목표는 두 가지이다. 웹소켓을 사용해서 실시간 캔들 데이터를 제공하는 것과 HTTP API 로 과거 캔들 데이터를 조회할 수 있는 것이다.
실시간 데이터를 위해서는 당연히 웹소켓 을 사용하고 있으며 토스 증권을 참고하고 있기에 STOMP 프로토콜을 추가적으로 사용하겠다. 클라이언트에게 시세 데이터를 뿌려주기 위해서는 pub/sub 에서도 broadcast 가 필요하며 성능적으로 우수한 Redis 의 pub/sub 구조를 사용하고 있다. 이 역시 실제 토스 증권의 시세 플랫폼에서 카프카나 UDP 가 아닌 Redis 를 사용하고 있음을 근거에 두었다.
이후 실시간 데이터를 가공 해서 분봉, 시간봉 데이터를 만들어 NoSQL 에 저장하며, 너무 많은 데이터이기에 압축 의 필요성을 느끼고 있다. 이후에는 성능을 위해서 Redis 에 분봉 데이터를 쌓아나가며 대략적으로 1시간에 한 번씩 데이터베이스에 저장할 생각이다.
데이터 만료 기한?
혹시 분봉 데이터 자체에 대한 만료 기한을 따로 두었나 싶어서 bybit 의 API 를 통해 어디까지 제공하는 지 확인해보았다.
https://api.bybit.com/v5/market/kline?category=linear&symbol=BTCUSDT&interval=1&end=1590426800000&limit=9
위 URL 은 1분봉 데이터를 가져오는 API 이다. bybit 에서 직접 확인할 수 있듯이 1분봉, 15분봉, 1시간봉 가릴 것 없이 모두 21년 7월까지의 모든 데이터를 제공한다.
토스 증권의 삼성전자 데이터도 한 번 확인해보자면
https://wts-info-api.tossinvest.com/api/v2/stock-prices/A005930/period-candles/month:1?count=100&to=1979-10-30T15:00:00Z
위처럼 1970년대 데이터들도 모두 토스의 API 를 통해서 제공하고 있다. 그 말인즉슨 데이터 만료 기한 없이 모든 데이터를 저장하고 있어야함을 뜻한다.
데이터 크기
{"dt":"1979-10-30T15:00:00Z","startDate":"1979-10-30T15:00:00Z","endDate":"1979-10-01T15:00:00Z","open":156.0,"high":157.0,"low":105.0,"close":105.0,"volume":44552076,"base":151.0}
토스 증권에서 월봉 데이터 중 하나를 가져왔다. 날짜가 3개, 숫자가 6개로 구성되어있으며 GPT 의 도움을 받았을 때 필드명, 오버헤드까지 모두 합쳐 대략 128 byte 의 크기를 예상한다. 저장 방식, 인덱스 등의 오차를 포함해 최종적으로 150 ~ 200 byte 크기를 예상하고 있다.
만약 1틱을 모두 저장한다면 하루만 저장하더라도 (86,400 * 4) * 200 byte = 69 MB 이다. 이외에 1분봉, 15분봉, 60분봉 데이터를 모두 저장하면 말도 안되는 데이터 양이 쌓이게 되는 것이다. 실제로 사용하는 나무 증권에서도 1틱 데이터를 제공하기는 하지만 최대 5분까지의 데이터만을 보여준다. 아마 이러한 이유로 토스 증권에서도 1분봉을 최소 단위로 제공한다고 생각한다.
1분봉부터 제공한다고 가정하고 다시 하루치 데이터를 계산해보자.
- 1분봉: 24 * 60 * 200 byte = 288,000 byte = 288 KB
- 3분봉: 24 * 20 * 200 byte = 96,000 byte = 96 KB
- 5분봉: 24 * 12 * 200 byte = 57,600 byte = 57.6 KB
- 15분봉: 24 * 4 * 200 byte = 19,200 byte = 19.2 KB
- 30분봉: 24 * 2 * 200 byte = 9,600 byte = 9.6 KB
- 1시간봉: 24 * 1 * 200 byte = 4,800 byte = 4.8 KB
- 2시간봉: 12 * 1 * 200 byte = 2,400 byte = 2.4 KB
- 4시간봉: 6 * 1 * 200 byte = 1,200 byte = 1.2 KB
- 6시간봉: 4 * 1 * 200 byte = 800 byte = 0.8 KB
- 12시간봉: 2 * 1 * 200 byte = 400 byte = 0.4 KB
- 1일봉: 1 * 1 * 200 byte = 200 byte = 0.2 KB
하루에 480 KB 가 생성된다.
24시간 코인 종목 수가 100개라면 하루에 48MB 인 것이다! 주식같은 경우 24시간 개장하지는 않지만 우리나라만해도 상장 기업수가 2000개가 넘는다. 그 만큼 대용량 데이터인 것이다. 즉 데이터 압축 도 필수적으로 보인다.
2. Redis PUB/SUB
STOMP 프로토콜을 Redis pub/sub 을 사용해서 구현해보자.
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Autowired
private StompHandler stompHandler;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry
.addEndpoint("/ws")
.setAllowedOriginPatterns("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(stompHandler);
}
}
@Component
@RequiredArgsConstructor
public class RedisMessageListener {
private final SimpMessagingTemplate simpMessagingTemplate;
private final RedissonClient redissonClient;
@PostConstruct
public void init() {
RTopic topic = redissonClient.getTopic("stock/BTCUSDT");
topic.addListener(StockTickDto.class, (channel, tickDto) -> {
// Redis에서 받은 메시지를 STOMP 클라이언트들에게 전달
simpMessagingTemplate.convertAndSend("/topic/stock/BTCUSDT", tickDto);
});
}
}
@Component
@Getter
@RequiredArgsConstructor
@Slf4j
public class StockInfoHolder {
private final MongoTemplate mongoTemplate;
private final ConcurrentHashMap<String, BigDecimal> currentPrices = new ConcurrentHashMap<>();
private final RedissonClient redissonClient;
public void updatePrice(JsonNode data) {
...
StockTick stockTick = StockTick.fromJson(data);
RTopic topic = redissonClient.getTopic("stock/" + symbol);
log.info(topic.toString());
topic.publish(StockTickDto.fromEntity(stockTick));
log.info("Published to Redis - Symbol: {}, Price: {}", symbol, price);
...
}
}
우선 시세 데이터는 외부에서 가져오고 있다. 현재는 bybit 의 ws 을 사용해서 데이터를 실시간으로 받아오고 있으며 이 실시간 데이터를 받아올 때 Redis 에 메시지를 발행한다. RedisMessageListener 는 BTCUSDT 토픽을 구독하고 있어서 데이터가 들어온다면 SimpMessageTemplate 을 통해서 구독 중인 클라이언트들에게 메시지를 전달해준다.
2024.10.28 - [Programming/Coding] - [WebSocket] STOMP 테스트를 위한 블로그 사이트
이전에 만든 STOMP 테스터를 사용하였고 실시간 데이터가 잘 넘어오는 것을 확인할 수 있었다.
메시지 처리 순서를 정리하자면 아래와 같다.
- 메시지를 실시간으로 받아와 Redis 에 Publish 한다.
- RedisMessageListener 는 해당 메시지를 Subscribe 하고 있다.
- RedisMessageListener 로 들어온 메시지를 SimpleBroker 에 Publish 한다.
- SimpleBrocker 는 Subscribe 하는 클라이언트들에게 메시지를 전달한다.
뭔가 이상하지 않은가? SimpleBrocker 의 단점을 해소하기 위해서 Redis Pub/Sub 을 사용하고자 하였지만 실제로는 SimpleBrocker 를 그대로 사용 중이다.
외부 Message Brocker?
외부 Message Brocker 를 사용하는 이유를 다시 생각해보자
우선 양방향 통신이며, 다중 서버일 때 필요하다. 서버가 3개이며 각 서버에 1번 토픽을 구독하고 있는 클라이언트가 있다고 가정해보자. 발행자는 1번 토픽에 메세지를 보낼 것이고 LB 를 통해서 1개의 서버에 이 메시지가 전달될 것이다. 만약 외부 MessageQueue 가 없다면 1/3 구독자만 메시지를 받게 될 것이기에 클라이언트에게 메시지 동기화를 위해 필요하다. MQ 와 클라이언트를 직접 연결할 수 없기에 서버의 마지막 단에서 이루어지는 특별한 아키텍처라 생각한다. 결국 MQ 와 서버, 각 서버와 클라이언트로 2번의 PUB/SUB 이 구성되어있는 것이다.
보편적으로 MQ 가 사용되는 아키텍처는 위와 같다고 생각한다. 이벤트 기반 아키텍처에서 볼 수 있는 구성이며 가장 큰 특징은 메시지 발행 주체와 소비 주체가 전혀 다르다는 것이다. 시세 데이터 같은 경우에는 결국 외부에서 주입받아야하기에 위 그림이 더 적합하다고 생각한다. 클라이언트가 서버에 접속해서 메시지를 발행하는 것이 아니라 메시지를 발행하는 주체가 별도로 존재하기 때문이다. 마지막 클라이언트 연결까지 고려해본다면 다음과 같은 그림일 것이다.
그럼 위에서 Redis 를 사용한 것과 가장 큰 차이점은 무엇일까? 가장 첫 단계인 Redis 에 Publish 하는 과정이 있으면 안된다는 것이다. 그림에서도 볼 수 있듯이 MQ 와 시세 서버는 단방향으로 이루어져있다. 아마 이 부분이 시스템을 구성하면서 가장 이질감을 느꼈던 부분이라 생각한다. 물론 웹소켓을 사용해 주문도 처리해야하지만 그건 여기서 주목하는 시세와 관련 없다는 점을 확실히 하자.
그래서 현재 서버를 구성할 때에는 시세 제공 API 를 직접 연결해 바로 사용할 예정이며, 분산 서버가 되더라도 시세 제공 API 를 MQ 에 담아주는 처리부를 따로 두어 현재 서버는 MQ 에서 메시지를 구독해서 사용할 것이다. 결론은 Redis 에 Publish 하는 과정이 필요없다는 것을 말하고 싶었다.
3. 데이터 가공 및 저장
분산 서버 환경이라고 한다면 누가 시세 데이터를 가공하고 저장해야할까? 나라면 두 가지 방법을 제시할 것이다.
첫 번째는 시세 플랫폼에서 바로 데이터를 저장하는 것이고,
두 번째는 MQ 에 담은 데이터를 통해 데이터를 저장하고 가공하는 서버를 따로 두는 것이다.
시세는 성능을 매우 중요시 하기 때문에 아마 두 번째 방법이 옳다고 느껴진다. 물론 지금은 단일 서버 환경이므로 한 서버 내에서 시세를 publish 하기 전에 데이터를 가공하고 저장하도록 하자. 이 부분도 비동기적으로 구현할까 했었는데, 결국에 캔들 데이터도 업데이트를 한 후에 실시간으로 함께 전달해야하기에 동기적으로 구현했다.
@Service
@RequiredArgsConstructor
@Slf4j
public class StockMarketDataHandler {
...
private void updateTickDataAtRedis(StockTick stockTick, int interval, char unit) {
String key = formatKey(stockTick.getSymbol(), interval, unit);
RDeque<String> deque = redissonClient.getDeque(key);
try {
String candleJson = deque.peekFirst();
// 캔들 데이터가 없는 경우 처리
if (candleJson == null) {
CandleStick candle = CandleStick.createCandle(stockTick);
deque.add(objectMapper.writeValueAsString(candle));
simpMessagingTemplate.convertAndSend("/topic/candle/" + key, candle);
} else {
CandleStick candle = objectMapper.readValue(candleJson, CandleStick.class);
// 기존 캔들 데이터를 사용할 지, 새로 만들어야할 지 판단
if(isSameInterval(candle, stockTick, interval, unit)) {
candle.updateCandle(stockTick);
deque.removeFirst();
deque.addFirst(objectMapper.writeValueAsString(candle));
simpMessagingTemplate.convertAndSend("/topic/candle/" + key, candle);
} else {
CandleStick newCandle = CandleStick.createCandle(stockTick);
deque.addFirst(objectMapper.writeValueAsString(newCandle));
simpMessagingTemplate.convertAndSend("/topic/candle/" + key, newCandle);
}
}
} catch (JsonProcessingException e) {
log.error("Error parsing candle data for symbol: {}", e);
}
}
...
}
Tick 데이터를 처리하고 가공하는 StockMarketDataHandler 클래스를 생성하였다. 실시간 캔들 데이터는 일정 시간 동안 시가, 종가, 최고가, 최저가, 거래량을 필요로한다. 여기서 실시간 거래량은 현재 사용하고 있는 bybit WS API 에서 제공하지 않기 때문에 일단 제외하였다. 그럼 나머지 요소들을 살펴보자. 일단 모든 요소들은 캔들을 생성할 때의 시세를 기준으로 초기화한다. 이후 시가는 변동되지 않는다. 종가는 매번 데이터가 들어올 때마다 시세로 업데이트해줘야한다. 캔들 API 에서 실시간 시세와 종가는 동일해야하기 때문이다. 그리고 최고가와 최저가는 단위 시간동안 MAX, MIN 값을 지속적으로 저장해줘야한다.
1분봉을 만들어냈으면 1분봉 3개를 합쳐내면 3분봉 하나를 만들어낼 수 있다. 시가는 첫번째 1분봉 시가, 종가는 세번째 1분봉 종가, 최고가는 MAX(1분봉의 최고가들), 최저가는 MIN(1분봉의 최저가들) 로 계산할 수 있다. 하지만 클라이언트가 3분봉 데이터를 받을 때에도 실시간 데이터를 받아야하기 때문에 1분봉과 동일하게 지속적인 업데이트가 필요하다. 결국 모든 캔들 유형에 대해 실시간으로 종가를 업데이트해줘야한다는 뜻이다.
이를 위해 Redis 에 "종목:캔들간격" 을 Key 로 가지는 List 형태에 데이터를 쌓아가고자 했다. updateTickDataAtRedis() 메서드에서는 다음과 같은 로직으로 동작한다.
- 해당 Key 값의 List 에 값이 없으면 현재 시세로 요소들을 초기화한 캔들 데이터를 생성한다.
- 만약 List 에 값이 있으면 기존 캔들에 업데이트할 것인지, 새로운 캔들을 생성할 것인지 결정한다.
- 업데이트 시 종가와 최대, 최소를 비교해 업데이트하고 저장한다.
- 새로운 캔들을 만든다면 시세로 초기화한 캔들 데이터를 생성하고 저장한다.
@Service
@RequiredArgsConstructor
@Slf4j
public class StockMarketDataHandler {
...
private final List<Integer> intervalMinutes = Arrays.asList(1, 3, 5, 15, 30);
private final List<Integer> intervalHours = Arrays.asList(1, 4, 12);
private final List<Integer> intervalDays = Arrays.asList(1, 7);
private final List<Integer> intervalMonths = List.of(1);
private final List<Integer> intervalYears = List.of(1);
private final List<List<Integer>> intervals = List.of(intervalMinutes, intervalHours, intervalDays, intervalMonths, intervalYears);
private final List<Character> units = List.of('m', 'h', 'd', 'M', 'Y');
...
public void saveStockTickData(TickerMessage tickerMessage) {
StockTick stockTick = StockTick.fromTickerMessage(tickerMessage);
for (int i = 0; i < intervals.size(); i++) {
for (int interval : intervals.get(i)) {
updateTickDataAtRedis(stockTick, interval, units.get(i));
}
}
}
...
private String formatKey(String symbol, int interval, char unit) {
return String.format("%s:%d%c", symbol, interval, unit);
}
private boolean isSameInterval(CandleStick candle, StockTick stockTick, int interval, char unit) {
if (unit == 'm'){
return (stockTick.getMinute() - candle.getMinute()) < interval;
} else if (unit == 'h') {
return (stockTick.getHour() - candle.getHour()) < interval;
} else if (unit == 'd') {
return ChronoUnit.DAYS.between(candle.getDate(), stockTick.getDate()) < interval;
} else if (unit == 'M') {
return ChronoUnit.MONTHS.between(candle.getDate(), stockTick.getDate()) < interval;
} else if (unit == 'Y') {
return ChronoUnit.YEARS.between(candle.getDate(), stockTick.getDate()) < interval;
}
return false;
}
}
만들고자 하는 데이터는 분봉, 시간봉, 일봉, 월봉, 년봉 데이터이며, 분봉에서는 1, 3, 5, 15, 30분, 시간봉에서는 1, 4, 12시간, 일봉에서는 1, 7일봉 데이터를 생성하고자 하며 이를 List 에 담아놓았다. 이를 위해 간격(1, 3, 5 ...)과 단위(분, 시, 일 ...) 을 나타내는 파라미터를 두 개 두었다.
formatKey() 메서드에서는 파라미터를 통해 종목, 간격, 단위에 유일한 Key 값을 생성해낸다.
isSameInterval() 메서드에서는 기존 캔들을 사용할 지, 새로운 캔들을 생성할 지 판단한다. 우선 단위 별 사용하는 인자가 다르다. 분봉 데이터라면 현재 데이터의 분과, 마지막으로 업데이트된 캔들의 생성 시점의 분을 비교해서 간격보다 적으면 기존 캔들을 사용해야한다고 판단한다. 즉 3분봉을 만들어낼 때 최신 3분봉 캔들이 45분 0초에 만들어졌다면, 45분 0초부터 47분 59초까지의 데이터는 해당 캔들에 업데이트 되는 것이다. 48분이 되어 간격이 3 이상이 된다면 해당 시세로 초기화되며 48분이 생성 시점으로 초기화된 캔들 데이터를 새로 생성한다. 이를 위해 캔들 데이터에 TimeStamp 가 존재하기는 하지만 개발의 가시성을 위해 minute, hour, date 정보를 추가해 사용 중이다.
위 같은 작업을 모두 실시간 최신화가 필요한 이유로써 생성하고자하는 모든 데이터에 대해 작업을 해주어야한다.
/topic/candle/BTCUSDT:1m 을 통해서 BTCUSDT 의 1분봉 실시간 데이터를 구독하면 위와 같이 실시간 데이터가 전달되는 것을 확인할 수 있다.
(1) 스로틀링
bybit 에서 들어오는 실시간 데이터는 과도하게 많다. 대체적으로 적당해보일 수 있지만 너무 빠르게 변경되기도하고 실제 증권쪽과 비교했을 때에도 과도하게 빠르게 변했다.
파이썬 스크립트를 작성해서 웹소켓 데이터가 얼마나 들어오는지 파악해봤다. 대략 초당 3.5회의 데이터가 들어오고 매번 Redis 를 조회하고 업데이트하기에는 과하다 생각했다. 그래서 수신되는 웹소켓 데이터에 대해서 스로틀링을 도입하고자 했다.
@Override
public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) {
try {
// 스로틀링
long currentTime = System.currentTimeMillis();
if (currentTime - lastUpdateTime < THROTTLE_INTERVAL) {
return;
}
...
// 가격 데이터 처리
if (jsonNode.has("data")) {
TickerMessage tickerMessage = objectMapper.readValue(payload, TickerMessage.class);
JsonNode data = jsonNode.get("data");
stockInfoHolder.updatePrice(data);
stockMarketDataHandler.saveStockTickData(tickerMessage);
lastUpdateTime = currentTime;
}
} catch (Exception e) {
log.error("Error handling message", e);
}
}
위처럼 THROTTLE_INTERVAL 을 놓고 0.5초마다 가격을 업데이트 하도록 변경하였다. 초당 최대 4.5 번 이상 들어와서 매번 실행하는 로직을 초당 2번으로 확정시킬 수 있었다.
문제 1. 데이터 손실
하지만 문제는 놓치는 데이터가 존재한다는 것이다.
- 0초에 100원
- 0.3초에 150원
- 0.5초에 90원
이처럼 시세가 들어오고 이후 계속 100원의 시세를 유지한다면 캔들 데이터의 최대값은 150원이 되어야하지만 스로틀링되어 기록되지 않는다. 금융권이기에 이런 정합성 문제는 철저하게 다루어야한다고 생각한다.
해결1. TimeBucket
이를 해결하고자 TimeBucket 을 생성하였다.
// 가격 데이터 처리
if (jsonNode.has("data")) {
TickerMessage tickerMessage = objectMapper.readValue(payload, TickerMessage.class);
JsonNode data = jsonNode.get("data");
// 스로틀링
long currentTime = System.currentTimeMillis();
if (currentTime - lastUpdateTime < THROTTLE_INTERVAL) {
if (tickTimeBucket == null) {
tickTimeBucket = new TickTimeBucket(tickerMessage);
} else {
tickTimeBucket.addTick(tickerMessage);
}
return;
}
stockInfoHolder.updatePrice(data);
stockMarketDataHandler.saveStockTickData(tickerMessage, tickTimeBucket);
lastUpdateTime = currentTime;
tickTimeBucket = null;
}
위와 같이 스로틀링을 진행하면서 만약 마지막 업데이트 시간으로부터 스로틀링 시간만큼 지나지 않았다면 데이터를 축적하는 것이다. 0.5 초간 시가, 종가, 최고가, 최저가를 계산해서 저장해놓음으로써 놓치는 데이터를 없앴다.
문제 2. 캔들 데이터 오염
하지만 분봉 데이터에서 1분 59.9초 의 데이터와 2분 1.1초의 데이터는 차이가 0.2초 뿐이므로 하나의 TimeBucket 에 축적될 수 있다는 문제가 있다. 결국 다른 분봉에 저장되어야하지만 데이터가 섞여버리는 문제이다.
해결 2. BucketKey
public boolean isThrottled(long currentTime) {
long key = (currentTime / THROTTLE_INTERVAL);
return this.bucketKey != key;
}
public void setBucketKeyForThrottled(long ts) {
this.bucketKey = (ts / THROTTLE_INTERVAL);
}
이는 BucketKey 를 통해 해결하였다. TimeStamp 로 관리되는 시간을 500ms 로 나누는 것이다. 그러면 500ms 라는 간격에 존재하는 모든 시간값들은 동일한 BucketKey 값을 가지게 된다.
이를통해 0.5초 단위로 시간 간격을 지정함으로써 1분 59.9초 의 데이터와 2분 1.1초의 데이터는 구분되어 사용될 수 있다.
문제 3. 실시간성 훼손
하지만 이런 방법은 결국 0초부터 0.499초 데이터를 모은 이후 0.5초 이후에 데이터가 들어와야만 사용자에게 이전 데이터를 보여준다. 극단적으로 말하자면 0.1초에 데이터가 들어오고, 3초에 두 번째 데이터가 들어온다면 사용자는 0.1초에 들어온 데이터를 3초에 받게 된다는 뜻이다. 극단적이 아니더라도 최대 0.5초의 지연시간을 발생시킨다.
이러한 문제는 데이터를 가공하고 사용자에게 보내는 입장에서 해결 불가능한 문제라고 생각한다. 결국 이러한 부분은 서비스의 정책에 따라 결정된다고 생각한다.
- 초당 3.5개의 데이터는 기업에서 문제를 일으킬만한 수준은 아니라고 생각한다.
- 실제 Tick 데이터는 퀀텀 투자 등에서는 매우 중요하겠지만 일반 사용자들에게는 심각한 사안은 아니며 실제 증권에서도 초당 3.5번의 업데이트는 하지 않는다.
결론은 실시간성과 성능에 대한 오버헤드를 잘 고려해야한다는 것이다.
4. 배치 프로세스
이제 Redis 에 담긴 캔들 데이터를 MongoDB 에 저장하는 과정이 필요하다.
@Component
@RequiredArgsConstructor
public class CandleBatchScheduler {
private final RedissonClient redissonClient;
private final MongoTemplate mongoTemplate;
private final String CANDLE_KEY = "candle";
private final List<String> STOCKS = List.of("BTCUSDT");
@Scheduled(cron = "0 * * * * *") // 매분 실행
public void batchCandleData() {
for (String stock : STOCKS) {
RDeque<Object> deque = redissonClient.getDeque(CANDLE_KEY + ":" + stock + ":1m");
// 마지막 데이터 하나 남기고 모두 처리
while (deque.size() > 1) { // 큐 크기가 1보다 클 때만 처리
Object candle = deque.pollFirst(); // 가장 오래된 데이터부터 처리
if (candle != null) {
Document candleDoc = Document.parse(candle.toString());
mongoTemplate.save(candleDoc, CANDLE_KEY + ":" + stock + ":1m");
}
}
}
}
}
이는 Spring Boot 의 스케쥴러를 사용하였다. 일단 매분마다 Redis 에 List 형태로 저장되어있는 분봉 데이터를 MongoDB 로 옮겨주는 작업을 진행하였다.
MongoDB 와 Redis 에 작업이 잘 되는 것을 확인할 수 있다.
Bulk Insert
분봉 데이터 같은 경우 1시간 단위로 집어넣게 되면 60개의 데이터를 한 번에 삽입해야한다. 만약 배치 프로세스의 간격이 길다면 더 많은 양의 데이터를 넣어줘야한다. 이를 위해서 Bulk Insert 가 가능하도록 변경했다.
@Scheduled(cron = "0 */15 * * * *") // 매시 0, 15, 30, 45분에 실행
public void batchCandleData() {
for (String stock : STOCKS) {
RDeque<Object> deque = redissonClient.getDeque(CANDLE_KEY + ":" + stock + ":1m");
int dequeSize = deque.size();
if (dequeSize > 1) { // 데이터가 2개 이상일 때만 처리
List<Object> candles = new ArrayList<>();
// size-1 만큼만 데이터 가져오기
for (int i = 0; i < dequeSize - 1; i++) {
Object candle = deque.pollLast();
if (candle != null) {
candles.add(candle);
}
}
// 벌크 저장
List<Document> documents = candles.stream()
.map(candle -> Document.parse(candle.toString()))
.collect(Collectors.toList());
mongoTemplate.insert(documents, CANDLE_KEY + ":" + stock + ":1m");
}
}
}
데이터가 잘 삽입되고 Redis 에도 데이터가 순서대로 잘 정리되고 있는 모습을 확인할 수 있다.
문제1. isSameInterval()
Redis 에 기존 분봉에 업데이트를 진행할 지, 새로운 캔들을 생성할 지 판단하는 메서드에 문제가 있는 것을 확인했다.
6시 59분 분봉 데이터 이후에 생성되는 7시 0분 분봉 데이터는 59 < 0 으로 판단되어 새로운 캔들이 생성되지 않는 문제이다.
private boolean isSameInterval(CandleStick candle, StockTick stockTick, int interval, char unit) {
if (unit == 'm'){
long minute1 = stockTick.getTimestamp() / 60000;
long minute2 = candle.getStartTime() / 60000;
return minute1 - minute2 < interval;
}
...
} else if (unit == 'M') {
LocalDateTime time1 = LocalDateTime.ofInstant(
Instant.ofEpochMilli(stockTick.getTimestamp()),
ZoneId.systemDefault());
LocalDateTime time2 = LocalDateTime.ofInstant(
Instant.ofEpochMilli(candle.getStartTime()),
ZoneId.systemDefault());
long monthsDiff = ChronoUnit.MONTHS.between(time2, time1);
return monthsDiff < interval;
}
...
return false;
}
기존에 분만 확인했었던 메서드를 TimeStamp 를 통해서 실제 차이를 계산해내어 진행하였다. 월봉 같은 경우도 LocalDateTime 으로 변환시켜 월차를 계산했다.
이렇게 하고나니 Redis, MongoDB 에 minute, hour, date 필드가 없어져 저장 공간을 줄일 수 있었다.
저장 공간 및 압축
캔들 데이터가 담긴 컬렉션의 정보를 열어보면 예상과 유사하게 한 doc 당 194 byte 크기를 차지하고 있으며 47개의 doc 로 8.9 KB 를 차지한다.
- zstd : 압축률과 속도 사이에서 균형을 이루는 새로운 알고리즘입니다. 일반적으로 50-70%의 공간 절약을 기대할 수 있다.
- 장점: Snappy와 Zlib의 중간 성능, 균형 잡힌 압축률과 속도.
- 단점: 중간 수준의 CPU 사용률.
- 사용 사례: 균형 잡힌 성능과 효율성이 필요한 경우 적합.
평균 doc 크기도 151 byte 로 25% 가량 줄어든 것을 확인할 수 있다. 사실 압축같은 경우에는 실제 효과는 있지만 지금 저장하고 있는 데이터가 너무 적어 압축을 위한 헤더만 추가될 수 있다고 한다. 하지만 아래 관련 글을 살펴보면 압축 효과는 확실히 있는 것을 확인할 수 있으니 미리 설정해두도록 하겠다. 이런 데이터에서는 필드명 또한 데이터 양에 큰 영향을 미치는데, 실제 두나무에서는 u, s, t 등으로, 토스에서는 low, high .. 이런식으로 필드를 표시하는 것을 확인할 수 있었다.
https://www.percona.com/blog/compression-methods-in-mongodb-snappy-vs-zstd/
5. 캔들 조회
마지막 목표인 과거 캔들 데이터를 조회해보자. 1분봉을 기준으로 설명할 예정이며 MongoDB, Redis 에 실시간 데이터들이 적재되어있다는 가정하에 진행한다.
@GetMapping("/kline")
public KlineDto getKlineData(
@RequestParam(defaultValue = "BTCUSDT") String symbol,
@RequestParam(defaultValue = "1") Integer interval,
@RequestParam(defaultValue = "m") Character unit,
@RequestParam(defaultValue = "1735689600000") long endTime,
@RequestParam(defaultValue = "30") int limit) {
return stockService.getKlineData(symbol, interval, unit, endTime, limit);
}
API 스펙은 bybit 를 참고하였다.
'http://localhost:8080/api/v1/stock/kline?symbol=BTCUSDT&interval=1&unit=m&endTime=1735689600000&limit=30' \
public KlineDto getKlineData(String symbol, int interval, Character unit, long endTime, int limit) {
RDeque<String> deque = redissonClient.getDeque("candle:" + symbol + ":" + interval + unit);
List<CandleStick> redisCandles = deque.stream()
.map(this::parseCandleData)
.filter(candle -> candle.getStartTime() <= endTime)
.collect(Collectors.toList());
if (redisCandles.size() >= limit) {
return KlineDto.builder()
.symbol(symbol)
.list(redisCandles.subList(0, limit))
.build();
}
List<CandleStick> mongoCandles = candleRepository.findCandlesBeforeEndTime(symbol, interval, unit, endTime, limit - redisCandles.size());
List<CandleStick> candles = List.of(redisCandles, mongoCandles).stream()
.flatMap(List::stream)
.sorted((c1, c2) -> Long.compare(c2.getStartTime(), c1.getStartTime()))
.limit(limit)
.collect(Collectors.toList());
return KlineDto.builder()
.symbol(symbol)
.list(candles)
.build();
}
서비스단에서는 Redis 에 존재하는 데이터와, mongoDB 에 존재하는 데이터를 합쳐서 보내줘야한다.
Redis 에 실시간 분봉 데이터들을 조회하고 limit 이상 데이터가 존재한다면 limit 만큼의 데이터만 잘라서 return 한다.
만약부족하다면 mongoDB 에 접근하여 limit 의 나머지만큼을 더 조회하여 합쳐 반환하는 것이다.
public List<CandleStick> findCandlesBeforeEndTime(String symbol, int interval, Character unit, long endTime, int limit) {
String collectionName = buildCollectionName(
symbol,
interval,
unit
);
Query query = new Query(
Criteria.where("startTime").lte(endTime)
);
query.with(Sort.by(Sort.Direction.DESC, "startTime"))
.limit(limit);
return mongoTemplate.find(query, Document.class, collectionName).stream()
.map(this::parseCandleData)
.toList();
}
실제 조회에서는 startTime 을 가지고 조회를 한다. 이를 보면 startTime 에 인덱스를 걸어놓는 것이 필요해보인다. 그리고 최신순 데이터가 압도적으로 많이 조회되기에 내림차순으로 정렬해놓았다.
Swagger 를 통해서 조회를 해보았을 때, Redis 에 있는 데이터와, MongoDB 에 있는 데이터가 적절하게 합쳐져서 반환되는 모습을 확인할 수 있었다.
'Programming > Spring' 카테고리의 다른 글
[Spring Boot][WebSocket + STOMP] 웹소켓 JWT 인증 및 파싱 (1) | 2024.11.26 |
---|---|
[Spring Boot] 동시성 제어 with 비관적 락, Redis 그리고 @Transactional 사용 시 동시성 문제점 (5) | 2024.11.19 |
[Spring Boot] WebSocket, Kafka 채팅 서버 및 크롬 확장자 구현(3) (0) | 2024.11.04 |
[Spring Boot] WebSocket, Kafka 채팅 서버 구현 (2) (8) | 2024.11.04 |
[Spring] @ModelAttribute 사용 방법과 원리 by 생성자 개수, Setter (0) | 2024.10.31 |