feat: 添加WebSocket配置和处理器

This commit is contained in:
张翔
2026-03-12 07:49:57 +08:00
parent 8048e8bded
commit a24406653b
2 changed files with 118 additions and 12 deletions
@@ -1,23 +1,32 @@
package cn.novalon.manage.sys.config;
import cn.novalon.manage.sys.websocket.SysWebSocketHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
public class WebSocketConfig {
private final SystemWebSocketHandler webSocketHandler;
@Bean
public HandlerMapping webSocketHandlerMapping(SysWebSocketHandler webSocketHandler) {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/ws", webSocketHandler);
public WebSocketConfig(SystemWebSocketHandler webSocketHandler) {
this.webSocketHandler = webSocketHandler;
SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
handlerMapping.setOrder(1);
handlerMapping.setUrlMap(map);
return handlerMapping;
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(webSocketHandler, "/ws")
.setAllowedOrigins("*");
@Bean
public WebSocketHandlerAdapter webSocketHandlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
@@ -0,0 +1,97 @@
package cn.novalon.manage.sys.websocket;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Mono;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class SysWebSocketHandler implements WebSocketHandler {
private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public Mono<Void> handle(WebSocketSession session) {
String userId = extractUserId(session);
return session.receive()
.doOnNext(message -> {
String payload = message.getPayloadAsText();
handleIncomingMessage(session, userId, payload);
})
.doOnComplete(() -> {
sessions.remove(userId);
System.out.println("WebSocket session closed for user: " + userId);
})
.doOnError(error -> {
sessions.remove(userId);
System.err.println("WebSocket error for user " + userId + ": " + error.getMessage());
})
.then();
}
private String extractUserId(WebSocketSession session) {
String query = session.getHandshakeInfo().getUri().getQuery();
if (query != null && query.contains("userId=")) {
return query.split("userId=")[1].split("&")[0];
}
return session.getId();
}
private void handleIncomingMessage(WebSocketSession session, String userId, String payload) {
try {
Map<String, Object> message = objectMapper.readValue(payload, Map.class);
String type = (String) message.get("type");
switch (type) {
case "ping":
sendMessageToUser(userId, Map.of("type", "pong", "timestamp", System.currentTimeMillis()));
break;
case "subscribe":
sessions.put(userId, session);
System.out.println("User " + userId + " subscribed to WebSocket");
break;
default:
System.out.println("Unknown message type: " + type);
}
} catch (Exception e) {
System.err.println("Error handling WebSocket message: " + e.getMessage());
}
}
public void sendMessageToUser(String userId, Object message) {
WebSocketSession session = sessions.get(userId);
if (session != null) {
try {
String json = objectMapper.writeValueAsString(message);
session.send(Mono.just(session.textMessage(json))).subscribe();
} catch (Exception e) {
System.err.println("Error sending message to user " + userId + ": " + e.getMessage());
}
}
}
public void broadcastMessage(Object message) {
String json;
try {
json = objectMapper.writeValueAsString(message);
} catch (Exception e) {
System.err.println("Error serializing broadcast message: " + e.getMessage());
return;
}
sessions.forEach((userId, session) -> {
try {
session.send(Mono.just(session.textMessage(json))).subscribe();
} catch (Exception e) {
System.err.println("Error broadcasting to user " + userId + ": " + e.getMessage());
}
});
}
}