diff --git a/kicc-platform/kicc-platform-api/kicc-common-api/src/main/java/com/cloud/kicc/commonbiz/api/entity/SseSignalContainer.java b/kicc-platform/kicc-platform-api/kicc-common-api/src/main/java/com/cloud/kicc/commonbiz/api/entity/SseSignalContainer.java new file mode 100644 index 00000000..ea3213f1 --- /dev/null +++ b/kicc-platform/kicc-platform-api/kicc-common-api/src/main/java/com/cloud/kicc/commonbiz/api/entity/SseSignalContainer.java @@ -0,0 +1,29 @@ +package com.cloud.kicc.commonbiz.api.entity; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.experimental.Accessors; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +/** + *

+ * SSE 信号量容器 + *

+ * + * @Author: entfrm开发团队-王翔 + * @since: 2022/9/15 + */ +@Data +@EqualsAndHashCode(callSuper = false) +@Accessors(chain = true) +@AllArgsConstructor +public class SseSignalContainer { + + private String userId; + + private SseEmitter sseEmitter; + + protected String tenantId; + +} diff --git a/kicc-platform/kicc-platform-biz/kicc-common-biz/pom.xml b/kicc-platform/kicc-platform-biz/kicc-common-biz/pom.xml index ccb3c476..b709f8c7 100644 --- a/kicc-platform/kicc-platform-biz/kicc-common-biz/pom.xml +++ b/kicc-platform/kicc-platform-biz/kicc-common-biz/pom.xml @@ -54,6 +54,11 @@ com.cloud kicc-common-mock + + + com.cloud + kicc-common-job + org.eclipse.paho diff --git a/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/KiccCommonApplication.java b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/KiccCommonApplication.java index eb1fd76d..e7c58031 100644 --- a/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/KiccCommonApplication.java +++ b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/KiccCommonApplication.java @@ -2,6 +2,7 @@ package com.cloud.kicc.commonbiz; import com.cloud.kicc.common.core.annotation.EnableKiccJacksonAutoConvert; import com.cloud.kicc.common.feign.annotation.EnableKiccFeignClients; +import com.cloud.kicc.common.job.annotation.EnableKiccXxlJob; import com.cloud.kicc.common.security.annotation.EnableKiccResourceServer; import com.cloud.kicc.common.swagger.annotation.EnableKiccSwagger2; import org.springframework.boot.SpringApplication; @@ -20,6 +21,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; @EnableKiccFeignClients @SpringBootApplication @EnableKiccJacksonAutoConvert +@EnableKiccXxlJob public class KiccCommonApplication { public static void main(String[] args) { diff --git a/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/controller/MapLogisticSseController.java b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/controller/MapLogisticSseController.java new file mode 100644 index 00000000..8e473ec3 --- /dev/null +++ b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/controller/MapLogisticSseController.java @@ -0,0 +1,61 @@ +package com.cloud.kicc.commonbiz.controller; + +import com.cloud.kicc.common.core.api.R; +import com.cloud.kicc.common.core.constant.AppConstants; +import com.cloud.kicc.commonbiz.service.IMapLogisticSseService; +import io.swagger.annotations.Api; +import lombok.RequiredArgsConstructor; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +/** + *

+ * 物流地图SSE长连接 控制层 + * 长轮询: https://developer.aliyun.com/article/933937#slide-5 + *

+ * + * @Author: entfrm开发团队-王翔 + * @since: 2022/9/14 + */ +@RestController +@RequestMapping(AppConstants.APP_COMMON + "/mapLogisticSse") +@RequiredArgsConstructor +@Api(tags = "地图物流SSE控制接口") +public class MapLogisticSseController { + + private final IMapLogisticSseService iMapLogisticSseService; + + @GetMapping(value = "/subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE}) + public SseEmitter subscribe() { + return iMapLogisticSseService.SseSubscribe(); + } + + @GetMapping("/sendMessage") + public R sendMessage(String userId, String json) { + iMapLogisticSseService.sendMessage(userId, json); + return R.ok(); + } + + @GetMapping("/sendTenantMessage") + public R sendTenantMessage(String json) { + iMapLogisticSseService.sendTenantMessage(json); + return R.ok(); + } + + @GetMapping("/remove/{userId:\\w+}") + public R remove(@PathVariable String userId) { + iMapLogisticSseService.remove(userId); + return R.ok(); + } + + @GetMapping("/disconnectTenant") + public R disconnectTenant() { + iMapLogisticSseService.disconnectTenant(); + return R.ok(); + } + +} diff --git a/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/IMapLogisticSseService.java b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/IMapLogisticSseService.java new file mode 100644 index 00000000..fa08a802 --- /dev/null +++ b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/IMapLogisticSseService.java @@ -0,0 +1,56 @@ +package com.cloud.kicc.commonbiz.service; + + +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +/** + *

+ * 物流地图SSE长连接 服务层 + *

+ * + * @Author: entfrm开发团队-王翔 + * @since: 2022/9/14 + */ +public interface IMapLogisticSseService { + + /** + * 采用长轮询订阅 + * @return SseEmitter + */ + SseEmitter SseSubscribe(); + + /** + * sse发送消息 + * @param userId 用户Id + * @param json 发送数据包 + * @return void + */ + void sendMessage(String userId, String json); + + /** + * sse发送当前租户下所有在线客户端消息 + * @param json 发送数据包 + * @return void + */ + void sendTenantMessage(String json); + + /** + * 断开当前用户租户下所有客户端连接 + * @return void + */ + void disconnectTenant(); + + /** + * 断开当前用户连接 + * @return void + */ + void disconnect(); + + /** + * 移除指定用户客户端 + * @param userId 指定用户Id + * @return void + */ + void remove(String userId); + +} diff --git a/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/impl/MapLogisticSseServiceImpl.java b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/impl/MapLogisticSseServiceImpl.java new file mode 100644 index 00000000..62988082 --- /dev/null +++ b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/impl/MapLogisticSseServiceImpl.java @@ -0,0 +1,150 @@ +package com.cloud.kicc.commonbiz.service.impl; + +import cn.hutool.core.collection.CollectionUtil; +import cn.hutool.core.collection.ConcurrentHashSet; +import cn.hutool.core.util.StrUtil; +import com.cloud.kicc.common.core.exception.CommonException; +import com.cloud.kicc.common.data.entity.KiccUser; +import com.cloud.kicc.common.security.util.SecurityUtils; +import com.cloud.kicc.commonbiz.api.entity.SseSignalContainer; +import com.cloud.kicc.commonbiz.service.IMapLogisticSseService; +import com.xxl.job.core.handler.annotation.XxlJob; +import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + *

+ * 地图核心主任务表 服务实现类 + *

+ * + * @author entfrm开发团队-王翔 + * @since 2022-07-22 + */ +@Slf4j +@Service +@RequiredArgsConstructor +public class MapLogisticSseServiceImpl implements IMapLogisticSseService { + + /** 存储 sse 上下文容器 */ + private static Set sseSignalContainers = new ConcurrentHashSet(); + + /** + * 间隔10秒发送心跳包,检查客户端是否关闭 + */ + @XxlJob("doHeartbeat") + public void doHeartbeat() { + sseSignalContainers.forEach(item -> { + try { + item.getSseEmitter().send(SseEmitter.event().comment("保持心跳 " + LocalDateTime.now()).reconnectTime(1000)); + } catch (IOException e) { + log.debug("当前用户Id为:{}发送心跳包失败,正在删除当前的建立通道对象", item.getUserId()); + sseSignalContainers.removeIf(sseSignalContainer -> StrUtil.equals(item.getUserId(), sseSignalContainer.getUserId()) && StrUtil.equals(item.getTenantId(), sseSignalContainer.getTenantId())); + } + }); + } + + @Override + @SneakyThrows + public SseEmitter SseSubscribe() { + KiccUser kiccUser = getUser(); + // 超时时间设置为20秒 + SseEmitter sseEmitter = new SseEmitter(20000L); + SseSignalContainer sseSignalContainer =new SseSignalContainer( + kiccUser.getId(), + sseEmitter, + kiccUser.getTenantId() + ); + // 设置如果网络出错前端请求的重试时间为1s + sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("创建通道连接成功")); + sseSignalContainers.add(sseSignalContainer); + log.info("当前建立的用户Id为:{}", kiccUser.getId()); + sseEmitter.onTimeout(() -> { + log.info("当前用户Id为:{}的SSE长轮询已经超时,正在删除当前的建立通道对象", kiccUser.getId()); + sseSignalContainers.removeIf(item -> StrUtil.equals(item.getUserId(), kiccUser.getId()) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); + }); + sseEmitter.onCompletion(() -> { + log.info("当前用户Id为:{}的SSE长轮询已经返回响应关闭,正在删除当前的建立通道对象", kiccUser.getId()); + sseSignalContainers.removeIf(item -> StrUtil.equals(item.getUserId(), kiccUser.getId()) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); + }); + sseEmitter.onError(e -> { + log.info("当前用户Id为:{}的SSE长轮询出现异常,正在删除当前的建立通道对象,错误信息{}", kiccUser.getId(), e.getLocalizedMessage()); + sseSignalContainers.removeIf(item -> StrUtil.equals(item.getUserId(), kiccUser.getId()) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); + }); + return sseEmitter; + } + + @Override + @SneakyThrows + public void sendMessage(String userId, String json) { + KiccUser kiccUser = getUser(); + Set sendSseSignalContainers = sseSignalContainers.stream() + .filter(item -> StrUtil.equals(item.getUserId(), userId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())).collect(Collectors.toSet()); + if (CollectionUtil.isNotEmpty(sendSseSignalContainers)) { + SseSignalContainer sendSseSignalContainer = sendSseSignalContainers.stream().findFirst().get(); + sendSseSignalContainer.getSseEmitter().send(json); + } + } + + @Override + public void sendTenantMessage(String json) { + KiccUser kiccUser = getUser(); + Set sendSseSignalContainers = sseSignalContainers.stream() + .filter(item -> StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())).collect(Collectors.toSet()); + sendSseSignalContainers.forEach(item -> { + try { + item.getSseEmitter().send(json); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + @Override + public void disconnect() { + KiccUser kiccUser = getUser(); + Set sendSseSignalContainers = sseSignalContainers.stream() + .filter(item -> StrUtil.equals(item.getUserId(), kiccUser.getId()) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())).collect(Collectors.toSet()); + if (CollectionUtil.isNotEmpty(sendSseSignalContainers)) { + SseSignalContainer sendSseSignalContainer = sendSseSignalContainers.stream().findFirst().get(); + sendSseSignalContainer.getSseEmitter().complete(); + sseSignalContainers.removeIf(item -> StrUtil.equals(item.getUserId(), kiccUser.getId()) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); + } + } + + @Override + public void disconnectTenant() { + KiccUser kiccUser = SecurityUtils.getUser(); + Set sendSseSignalContainers = sseSignalContainers.stream() + .filter(item -> StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())).collect(Collectors.toSet()); + sendSseSignalContainers.forEach(item -> item.getSseEmitter().complete()); + sseSignalContainers.removeIf(item -> StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); + } + + @Override + public void remove(String userId) { + KiccUser kiccUser = getUser(); + Set sendSseSignalContainers = sseSignalContainers.stream() + .filter(item -> StrUtil.equals(item.getUserId(), userId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())).collect(Collectors.toSet()); + if (CollectionUtil.isNotEmpty(sendSseSignalContainers)) { + SseSignalContainer sendSseSignalContainer = sendSseSignalContainers.stream().findFirst().get(); + sendSseSignalContainer.getSseEmitter().complete(); + sseSignalContainers.removeIf(item -> StrUtil.equals(item.getUserId(), userId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); + } + } + + private KiccUser getUser() { + KiccUser kiccUser = SecurityUtils.getUser(); + Optional.ofNullable(kiccUser).orElseThrow(() -> new CommonException("当前用户登录,请先登录后重试!")); + return kiccUser; + } + +}