Spring Boot에 Java 21 LTS 가상 스레드에 WebSocket 사용하기
spring boot virtual thread websocket
스프링 부트 텍스트 기반 Stomp 웹소켓 예제는 많지만 바이너리 기반 웹소켓 예제는 많이 없어서 자료를 남긴다.
데이터베이스 연동 또는 레디스 연동 등으로 인한 블로킹 구간이 있을 경우 처리량을 올리기 위해 가상 스레드를 사용한다.
간단한 외부 연동 블로킹 확인을 위해 타임 서버도 구성하였다.
스프링 부트 3.2 버전부터 가상 스레드를 지원하며 3.3 버전부터 웹소켓 가상 스레드를 지원하므로
스프링 부트 3.3 이상 버전 사용을 권장한다.
Time 서버
application.yml
server:
shutdown: graceful
port: 9090
tomcat:
threads:
max: 4
min-spare: 4
ServerTimeController.java
package com.example.controller;
import lombok.Builder;
import lombok.Data;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ServerTimeController {
@GetMapping("/")
public ServerTime index() {
return ServerTime.builder().time(System.currentTimeMillis()).build();
}
}
@Builder
@Data
class ServerTime { // Outer Static Class
private long time;
}
TimeServerApplication.java
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class TimeServerApplication {
public static void main(String[] args) {
SpringApplication.run(TimeServerApplication.class, args);
}
}
import 키워드 제외 하고 적은 코드로 타임 서버가 구성되었다.
WebSocket 클라이언트
client.html
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
<title>websocket client</title>
</head>
<body>
<p><button type="button" id="connect" onclick="connect()" value="">Connect</button></p>
<p><button type="button" id="disconnect" onclick="disconnect()" value="">Disconnect</button></p>
<p><button type="button" id="echo" onclick="echo()" value="">Echo</button></p>
</body>
<script type="text/javascript">
const WS_STATE_CONNECTING = 0; // 소켓이 생성됐으나 연결은 아직 개방되지 않았습니다.
const WS_STATE_OPEN = 1; // 연결이 개방되어 통신할 수 있습니다.
const WS_STATE_CLOSING = 2; // 연결을 닫는 중입니다.
const WS_STATE_CLOSED = 3; // 연결이 닫힘
const HEADER_SIZE = 4;
const WS_URL = "ws://127.0.0.1:2121/connect";
var ws = null;
function connect() {
if (ws !== null) {
disconnect();
}
ws = new WebSocket(WS_URL);
ws.onopen = function() {
if (ws.readyState === WS_STATE_OPEN) {
console.log("Connected to server success", ws.readyState);
} else {
console.log("Connected to server fail", ws.readyState);
}
};
ws.onmessage = function(packet) {
var data = packet.data;
var messageTypePacket = data.slice(0, HEADER_SIZE);
var messageTypeReader = new FileReader();
messageTypeReader.onload = function() {
var messageTypeDataView = new DataView(messageTypeReader.result);
var messageType = messageTypeDataView.getUint32();
console.log("messageType", messageType);
if (messageType === 1001) {
var bodyPacket = data.slice(HEADER_SIZE, data.size);
var bodyReader = new FileReader();
bodyReader.onload = function() {
console.log("body", bodyReader.result);
}
bodyReader.readAsText(bodyPacket);
}
}
messageTypeReader.readAsArrayBuffer(messageTypePacket);
};
ws.onclose = function(event) {
console.log("close");
};
ws.onerror = function(event) {
console.log(event.data);
};
}
function disconnect() {
if (ws === null) {
return;
}
ws.close();
ws = null;
console.log("Disconnected from server");
}
function echo() {
if (ws.readyState !== WS_STATE_OPEN) {
return;
}
var inputText = {timestamp: Date.now(), message: "안녕.hello"};
console.log(JSON.stringify(inputText));
var textEncoder = new TextEncoder();
var jsonBytes = textEncoder.encode(JSON.stringify(inputText)); // Uint8Array
var jsonBytesSize = jsonBytes.length;
var packet = new Uint8Array(HEADER_SIZE + jsonBytesSize);
var messageTypeDataView = new DataView(packet.buffer);
messageTypeDataView.setUint32(0, 1001); // messageType
for (var i = 0; i < jsonBytesSize; i++) {
packet[HEADER_SIZE+i] = jsonBytes[i];
}
ws.send(packet);
}
</script>
</html>
웹소켓 연결과 간단한 바이너리 패킷 전송 기능을 포함하였다.
WebSocket 서버
웹소켓 및 Http API
build.gradle
...
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-websocket'
implementation 'org.springframework.boot:spring-boot-starter-log4j2'
compileOnly 'org.projectlombok:lombok:1.18.32'
annotationProcessor 'org.projectlombok:lombok:1.18.32'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
...
application.yml
server:
shutdown: graceful
port: 2121
spring:
main:
web-application-type: servlet
threads:
virtual:
enabled: true
WebSocketConfig.java
package com.example.config;
import com.example.websocket.BinaryWebSocketHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestClient;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistration;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@RequiredArgsConstructor
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
private final RestClient restClient;
@Bean
public BinaryWebSocketHandler webSocketHandler() {
return new BinaryWebSocketHandler(restClient);
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry webSocketHandlerRegistry) {
WebSocketHandlerRegistration registration = webSocketHandlerRegistry.addHandler(webSocketHandler(), "/connect");
registration.setAllowedOrigins("*");
}
}
웹소켓 연결 과정을 제어하고 싶다면 HandshakeInterceptor 인터페이스를 구현후 설정에 추가한다.
import org.springframework.web.socket.server.HandshakeInterceptor;
...
public class WebsocketHandshakeInterceptor extends HandshakeInterceptor {
...
}
...
registration.addInterceptors(new WebsocketHandshakeInterceptor());
RestClientConfig.java
package com.example.config;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.client.JdkClientHttpRequestFactory;
import org.springframework.web.client.RestClient;
import java.net.http.HttpClient;
import java.time.Duration;
import java.util.concurrent.Executors;
@Configuration
public class RestClientConfig {
@Value("${spring.threads.virtual.enabled}")
private boolean isVirtualThreadEnabled;
@Bean
public RestClient restClient() {
return buildRestClient("http://timeserver:9090");
}
private RestClient buildRestClient(String baseUrl) {
var requestFactory = new JdkClientHttpRequestFactory(HttpClient.newBuilder()
.connectTimeout(Duration.ofMillis(3000L))
.executor(isVirtualThreadEnabled ? Executors.newVirtualThreadPerTaskExecutor() : Executors.newCachedThreadPool())
.build());
requestFactory.setReadTimeout(Duration.ofMillis(10000L));
return RestClient.builder().baseUrl(baseUrl).requestFactory(requestFactory).build();
}
}
model class, const class
// MessageType.java
public interface MessageType {
int ECHO = 1001;
}
// PacketConst.java
public interface PacketConst {
int MESSAGE_TYPE_SIZE = 4;
int HEADER_SIZE = MESSAGE_TYPE_SIZE;
}
// ServerTime.java
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class ServerTime {
private long time;
}
ApiController.java
package com.example.controller;
import com.example.model.ServerTime;
import lombok.extern.log4j.Log4j2;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@Log4j2
@RestController
public class ApiController {
@GetMapping("/")
public ServerTime index() {
return ServerTime.builder().time(0L).build();
}
}
스프링 부트 웹소켓에 추가로 Http API도 포함
BinaryWebSocketHandler.java
package com.example.websocket;
import com.example.model.MessageType;
import com.example.model.PacketConst;
import com.example.model.ServerTime;
import lombok.AllArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.springframework.web.client.RestClient;
import org.springframework.web.socket.BinaryMessage;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.PongMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.nio.ByteBuffer;
import static org.springframework.http.MediaType.APPLICATION_JSON;
@AllArgsConstructor
@Log4j2
public class BinaryWebSocketHandler extends org.springframework.web.socket.handler.BinaryWebSocketHandler {
private final RestClient restClient;
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
log.info("session connection id: {}", session.getId());
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
log.info("session closed id: {} statusCode: {}", session.getId(), status.getCode());
}
@Override
public void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
log.info("PongMessage");
}
@Override
public void handleTransportError(WebSocketSession session, Throwable ex) throws Exception {
if (ex instanceof IOException) {
log.info(ex.getMessage());
} else {
log.error(ex.getMessage(), ex);
}
}
@Override
public void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
int bodyLength = message.getPayloadLength() - PacketConst.HEADER_SIZE;
if (bodyLength < PacketConst.HEADER_SIZE) {
return;
}
ByteBuffer readPayload = message.getPayload();
byte[] readPayloadBytes = readPayload.array();
int messageType = readPayload.getInt();
log.info("messageType: {} bodyLength: {}", messageType, bodyLength);
log.info("body: {}", new String(readPayloadBytes, PacketConst.HEADER_SIZE, bodyLength));
try {
ServerTime serverTime = restClient.get() // Http GET
.uri("/").accept(APPLICATION_JSON).retrieve().body(ServerTime.class);
if (serverTime != null) {
log.info("serverTime {}", serverTime.getTime());
}
} catch (Exception ex) {
log.info(ex.getMessage());
}
switch (messageType) {
case MessageType.ECHO: {
ByteBuffer bb = ByteBuffer.allocate(PacketConst.HEADER_SIZE + bodyLength);
bb.putInt(MessageType.ECHO).put(readPayloadBytes, PacketConst.HEADER_SIZE, bodyLength).flip();
try {
session.sendMessage(new BinaryMessage(bb));
} catch (Exception ex) {
log.error(ex.getMessage());
}
break;
}
}
}
}
WebSocketServerApplication.java
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class WebSocketServerApplication {
public static void main(String[] args) {
SpringApplication.run(WebSocketServerApplication.class, args);
}
}
import 키워드 제외 하고 적은 코드로 웹소켓 서버가 구성되었다.
바이너리 기반 웹소켓을 사용하면 TCP 소켓 구현에서 수신 및 송신 패킷 파편화, 암호화, 패킷 압축 등을 추가로 구현하지 않아도 된다.
WebSocketSession 클래스 sendMessage 메서드 동시성 동기화가 필요한 경우 SendingQueue 구현 또는 ReentrantLock 클래스를 사용하면 된다.
ReentrantLock reentrantLock = new ReentrantLock(true);
...
reentrantLock.lock();
... // WebSocketSession sendMessage
reentrantLock.unlock();
'java' 카테고리의 다른 글
java bucket index (0) | 2025.03.17 |
---|---|
java jsckson2 라이브러리 사용 null and default value ignore (2) | 2024.10.08 |
java time ticks (0) | 2024.07.05 |
java drawing random (1) | 2024.07.05 |
java windows 백그라운드 실행 및 종료 (0) | 2024.07.04 |