From 95438f33348993f57dd4d12c687f3e6d910f7bc1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=BF=94?= Date: Thu, 12 Mar 2026 07:51:59 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=9B=86=E6=88=90WebSocket=E5=88=B0Use?= =?UTF-8?q?rMessageService?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/impl/SysUserMessageService.java | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 novalon-manage-api/manage-sys/src/main/java/cn/novalon/manage/sys/core/service/impl/SysUserMessageService.java diff --git a/novalon-manage-api/manage-sys/src/main/java/cn/novalon/manage/sys/core/service/impl/SysUserMessageService.java b/novalon-manage-api/manage-sys/src/main/java/cn/novalon/manage/sys/core/service/impl/SysUserMessageService.java new file mode 100644 index 0000000..9906975 --- /dev/null +++ b/novalon-manage-api/manage-sys/src/main/java/cn/novalon/manage/sys/core/service/impl/SysUserMessageService.java @@ -0,0 +1,64 @@ +package cn.novalon.manage.sys.core.service.impl; + +import cn.novalon.manage.sys.core.domain.SysUserMessage; +import cn.novalon.manage.sys.core.service.ISysUserMessageService; +import cn.novalon.manage.sys.core.service.IWebSocketService; +import cn.novalon.manage.sys.infrastructure.db.converter.SysUserMessageConverter; +import cn.novalon.manage.sys.infrastructure.db.dao.SysUserMessageDao; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@Service +public class SysUserMessageService implements ISysUserMessageService { + + private final SysUserMessageDao dao; + private final SysUserMessageConverter converter; + private final IWebSocketService webSocketService; + + public SysUserMessageService(SysUserMessageDao dao, SysUserMessageConverter converter, + IWebSocketService webSocketService) { + this.dao = dao; + this.converter = converter; + this.webSocketService = webSocketService; + } + + @Override + public Flux findByUserId(Long userId) { + return dao.findByUserIdOrderByCreateTimeDesc(userId) + .map(converter::toDomain); + } + + @Override + public Flux findByUserIdAndIsRead(Long userId, String isRead) { + return dao.findByUserIdAndIsReadOrderByCreateTimeDesc(userId, isRead) + .map(converter::toDomain); + } + + @Override + public Mono countUnread(Long userId) { + return dao.countByUserIdAndIsRead(userId, "0"); + } + + @Override + public Mono save(SysUserMessage message) { + return dao.save(converter.toEntity(message)) + .map(converter::toDomain) + .flatMap(savedMessage -> { + return webSocketService.notifyNewMessage( + savedMessage.getUserId(), + savedMessage.getTitle(), + savedMessage.getContent()).thenReturn(savedMessage); + }); + } + + @Override + public Mono markAsRead(Long id) { + return dao.findById(id) + .flatMap(entity -> { + entity.setIsRead("1"); + return dao.save(entity); + }) + .then(); + } +}