diff --git a/novalon-manage-api/manage-sys/src/main/java/cn/novalon/manage/sys/config/WebSocketConfig.java b/novalon-manage-api/manage-sys/src/main/java/cn/novalon/manage/sys/config/WebSocketConfig.java index e6bb529..9c8caca 100644 --- a/novalon-manage-api/manage-sys/src/main/java/cn/novalon/manage/sys/config/WebSocketConfig.java +++ b/novalon-manage-api/manage-sys/src/main/java/cn/novalon/manage/sys/config/WebSocketConfig.java @@ -1,23 +1,32 @@ package cn.novalon.manage.sys.config; +import cn.novalon.manage.sys.websocket.SysWebSocketHandler; +import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.web.socket.config.annotation.EnableWebSocket; -import org.springframework.web.socket.config.annotation.WebSocketConfigurer; -import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry; +import org.springframework.web.reactive.HandlerMapping; +import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter; + +import java.util.HashMap; +import java.util.Map; @Configuration -@EnableWebSocket -public class WebSocketConfig implements WebSocketConfigurer { +public class WebSocketConfig { - private final SystemWebSocketHandler webSocketHandler; + @Bean + public HandlerMapping webSocketHandlerMapping(SysWebSocketHandler webSocketHandler) { + Map map = new HashMap<>(); + map.put("/ws", webSocketHandler); - public WebSocketConfig(SystemWebSocketHandler webSocketHandler) { - this.webSocketHandler = webSocketHandler; + SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping(); + handlerMapping.setOrder(1); + handlerMapping.setUrlMap(map); + return handlerMapping; } - @Override - public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { - registry.addHandler(webSocketHandler, "/ws") - .setAllowedOrigins("*"); + @Bean + public WebSocketHandlerAdapter webSocketHandlerAdapter() { + return new WebSocketHandlerAdapter(); } } diff --git a/novalon-manage-api/manage-sys/src/main/java/cn/novalon/manage/sys/websocket/SysWebSocketHandler.java b/novalon-manage-api/manage-sys/src/main/java/cn/novalon/manage/sys/websocket/SysWebSocketHandler.java new file mode 100644 index 0000000..290a7a4 --- /dev/null +++ b/novalon-manage-api/manage-sys/src/main/java/cn/novalon/manage/sys/websocket/SysWebSocketHandler.java @@ -0,0 +1,97 @@ +package cn.novalon.manage.sys.websocket; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.WebSocketMessage; +import org.springframework.web.reactive.socket.WebSocketSession; +import reactor.core.publisher.Mono; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Component +public class SysWebSocketHandler implements WebSocketHandler { + + private final Map sessions = new ConcurrentHashMap<>(); + private final ObjectMapper objectMapper = new ObjectMapper(); + + @Override + public Mono handle(WebSocketSession session) { + String userId = extractUserId(session); + + return session.receive() + .doOnNext(message -> { + String payload = message.getPayloadAsText(); + handleIncomingMessage(session, userId, payload); + }) + .doOnComplete(() -> { + sessions.remove(userId); + System.out.println("WebSocket session closed for user: " + userId); + }) + .doOnError(error -> { + sessions.remove(userId); + System.err.println("WebSocket error for user " + userId + ": " + error.getMessage()); + }) + .then(); + } + + private String extractUserId(WebSocketSession session) { + String query = session.getHandshakeInfo().getUri().getQuery(); + if (query != null && query.contains("userId=")) { + return query.split("userId=")[1].split("&")[0]; + } + return session.getId(); + } + + private void handleIncomingMessage(WebSocketSession session, String userId, String payload) { + try { + Map message = objectMapper.readValue(payload, Map.class); + String type = (String) message.get("type"); + + switch (type) { + case "ping": + sendMessageToUser(userId, Map.of("type", "pong", "timestamp", System.currentTimeMillis())); + break; + case "subscribe": + sessions.put(userId, session); + System.out.println("User " + userId + " subscribed to WebSocket"); + break; + default: + System.out.println("Unknown message type: " + type); + } + } catch (Exception e) { + System.err.println("Error handling WebSocket message: " + e.getMessage()); + } + } + + public void sendMessageToUser(String userId, Object message) { + WebSocketSession session = sessions.get(userId); + if (session != null) { + try { + String json = objectMapper.writeValueAsString(message); + session.send(Mono.just(session.textMessage(json))).subscribe(); + } catch (Exception e) { + System.err.println("Error sending message to user " + userId + ": " + e.getMessage()); + } + } + } + + public void broadcastMessage(Object message) { + String json; + try { + json = objectMapper.writeValueAsString(message); + } catch (Exception e) { + System.err.println("Error serializing broadcast message: " + e.getMessage()); + return; + } + + sessions.forEach((userId, session) -> { + try { + session.send(Mono.just(session.textMessage(json))).subscribe(); + } catch (Exception e) { + System.err.println("Error broadcasting to user " + userId + ": " + e.getMessage()); + } + }); + } +}