refactor: migrate WebSocket handler to manage-notify module
This commit is contained in:
@@ -26,6 +26,10 @@
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-webflux</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
<artifactId>jackson-databind</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-test</artifactId>
|
||||
|
||||
+33
@@ -0,0 +1,33 @@
|
||||
package cn.novalon.manage.notify.config;
|
||||
|
||||
import cn.novalon.manage.notify.websocket.SysWebSocketHandler;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.core.Ordered;
|
||||
import org.springframework.web.reactive.HandlerMapping;
|
||||
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
|
||||
import org.springframework.web.reactive.socket.WebSocketHandler;
|
||||
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
@Configuration
|
||||
public class WebSocketConfig {
|
||||
|
||||
@Bean
|
||||
public HandlerMapping webSocketHandlerMapping(SysWebSocketHandler webSocketHandler) {
|
||||
Map<String, WebSocketHandler> map = new HashMap<>();
|
||||
map.put("/ws", webSocketHandler);
|
||||
|
||||
SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
|
||||
handlerMapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
|
||||
handlerMapping.setUrlMap(map);
|
||||
return handlerMapping;
|
||||
}
|
||||
|
||||
@Bean
|
||||
public WebSocketHandlerAdapter webSocketHandlerAdapter() {
|
||||
return new WebSocketHandlerAdapter();
|
||||
}
|
||||
}
|
||||
+161
@@ -0,0 +1,161 @@
|
||||
package cn.novalon.manage.notify.websocket;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.scheduling.annotation.Scheduled;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.web.reactive.socket.WebSocketHandler;
|
||||
import org.springframework.web.reactive.socket.WebSocketSession;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@Component
|
||||
public class SysWebSocketHandler implements WebSocketHandler {
|
||||
|
||||
private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>();
|
||||
private final Map<String, LocalDateTime> lastActivityTime = new ConcurrentHashMap<>();
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
@Value("${websocket.idle-timeout:300s}")
|
||||
private Duration idleTimeout;
|
||||
|
||||
@Value("${websocket.heartbeat-interval:30s}")
|
||||
private Duration heartbeatInterval;
|
||||
|
||||
@Override
|
||||
public Mono<Void> handle(WebSocketSession session) {
|
||||
String userId = extractUserId(session);
|
||||
lastActivityTime.put(userId, LocalDateTime.now());
|
||||
|
||||
return session.receive()
|
||||
.doOnNext(message -> {
|
||||
String payload = message.getPayloadAsText();
|
||||
handleIncomingMessage(session, userId, payload);
|
||||
lastActivityTime.put(userId, LocalDateTime.now());
|
||||
})
|
||||
.doOnComplete(() -> {
|
||||
sessions.remove(userId);
|
||||
lastActivityTime.remove(userId);
|
||||
System.out.println("WebSocket session closed for user: " + userId);
|
||||
})
|
||||
.doOnError(error -> {
|
||||
sessions.remove(userId);
|
||||
lastActivityTime.remove(userId);
|
||||
System.err.println("WebSocket error for user " + userId + ": " + error.getMessage());
|
||||
})
|
||||
.then();
|
||||
}
|
||||
|
||||
@Scheduled(fixedRate = 60000)
|
||||
public void cleanupIdleConnections() {
|
||||
LocalDateTime now = LocalDateTime.now();
|
||||
lastActivityTime.entrySet().removeIf(entry -> {
|
||||
LocalDateTime lastActivity = entry.getValue();
|
||||
if (Duration.between(lastActivity, now).compareTo(idleTimeout) > 0) {
|
||||
String userId = entry.getKey();
|
||||
WebSocketSession session = sessions.get(userId);
|
||||
if (session != null) {
|
||||
try {
|
||||
session.close();
|
||||
System.out.println("Closed idle WebSocket connection for user: " + userId);
|
||||
} catch (Exception e) {
|
||||
System.err.println("Error closing idle connection for user " + userId + ": " + e.getMessage());
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
@Scheduled(fixedRate = 30000)
|
||||
public void sendHeartbeat() {
|
||||
sessions.forEach((userId, session) -> {
|
||||
try {
|
||||
if (session.isOpen()) {
|
||||
String heartbeatMessage = objectMapper.writeValueAsString(Map.of(
|
||||
"type", "heartbeat",
|
||||
"timestamp", System.currentTimeMillis()
|
||||
));
|
||||
session.send(Mono.just(session.textMessage(heartbeatMessage))).subscribe();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
System.err.println("Error sending heartbeat to user " + userId + ": " + e.getMessage());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private String extractUserId(WebSocketSession session) {
|
||||
String query = session.getHandshakeInfo().getUri().getQuery();
|
||||
if (query != null && query.contains("userId=")) {
|
||||
return query.split("userId=")[1].split("&")[0];
|
||||
}
|
||||
return session.getId();
|
||||
}
|
||||
|
||||
private void handleIncomingMessage(WebSocketSession session, String userId, String payload) {
|
||||
try {
|
||||
Map<String, Object> message = objectMapper.readValue(payload, new TypeReference<Map<String, Object>>() {
|
||||
});
|
||||
String type = (String) message.get("type");
|
||||
|
||||
switch (type) {
|
||||
case "ping":
|
||||
sendMessageToUser(userId, Map.of("type", "pong", "timestamp", System.currentTimeMillis()));
|
||||
break;
|
||||
case "pong":
|
||||
lastActivityTime.put(userId, LocalDateTime.now());
|
||||
break;
|
||||
case "subscribe":
|
||||
sessions.put(userId, session);
|
||||
lastActivityTime.put(userId, LocalDateTime.now());
|
||||
System.out.println("User " + userId + " subscribed to WebSocket");
|
||||
break;
|
||||
case "heartbeat":
|
||||
lastActivityTime.put(userId, LocalDateTime.now());
|
||||
break;
|
||||
default:
|
||||
System.out.println("Unknown message type: " + type);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
System.err.println("Error handling WebSocket message: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public void sendMessageToUser(String userId, Object message) {
|
||||
WebSocketSession session = sessions.get(userId);
|
||||
if (session != null && session.isOpen()) {
|
||||
try {
|
||||
String json = objectMapper.writeValueAsString(message);
|
||||
session.send(Mono.just(session.textMessage(json))).subscribe();
|
||||
} catch (Exception e) {
|
||||
System.err.println("Error sending message to user " + userId + ": " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void broadcastMessage(Object message) {
|
||||
String json;
|
||||
try {
|
||||
json = objectMapper.writeValueAsString(message);
|
||||
} catch (Exception e) {
|
||||
System.err.println("Error serializing broadcast message: " + e.getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
sessions.forEach((userId, session) -> {
|
||||
try {
|
||||
if (session.isOpen()) {
|
||||
session.send(Mono.just(session.textMessage(json))).subscribe();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
System.err.println("Error broadcasting to user " + userId + ": " + e.getMessage());
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user