From 032229c31d28bb23604dc8fd2268058adb9156fd Mon Sep 17 00:00:00 2001
From: wangxiang <1827945911@qq.com>
Date: Sat, 17 Sep 2022 11:23:58 +0800
Subject: [PATCH] =?UTF-8?q?:rocket:=20=E9=9B=86=E6=88=90SSE,=E8=A7=A3?=
=?UTF-8?q?=E5=86=B3=E4=B9=B1=E7=A0=81=E5=BC=82=E5=B8=B8?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../controller/MapLogisticSseController.java | 5 +-
.../service/IMapLogisticSseService.java | 19 ++++--
.../impl/MapLogisticSseServiceImpl.java | 60 ++++++++++++-------
3 files changed, 56 insertions(+), 28 deletions(-)
diff --git a/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/controller/MapLogisticSseController.java b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/controller/MapLogisticSseController.java
index fda13d64..af6ac85e 100644
--- a/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/controller/MapLogisticSseController.java
+++ b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/controller/MapLogisticSseController.java
@@ -55,14 +55,15 @@ public class MapLogisticSseController {
}
@GetMapping("/sendMessage")
+
public R sendMessage(String userId, String json) {
- iMapLogisticSseService.sendMessage(userId, json);
+ iMapLogisticSseService.sendMessage(userId, sseSignalContainer -> sseSignalContainer.getSseEmitter().send(json));
return R.ok();
}
@GetMapping("/sendTenantMessage")
public R sendTenantMessage(String json) {
- iMapLogisticSseService.sendMessage(json);
+ iMapLogisticSseService.sendMessage(sseSignalContainer -> sseSignalContainer.getSseEmitter().send(json));
return R.ok();
}
diff --git a/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/IMapLogisticSseService.java b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/IMapLogisticSseService.java
index 72a50455..8d9455e2 100644
--- a/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/IMapLogisticSseService.java
+++ b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/IMapLogisticSseService.java
@@ -1,8 +1,11 @@
package com.cloud.kicc.commonbiz.service;
+import com.cloud.kicc.commonbiz.api.entity.SseSignalContainer;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+import java.io.IOException;
+
/**
*
* 物流地图SSE长连接 服务层
@@ -23,17 +26,17 @@ public interface IMapLogisticSseService {
/**
* sse发送消息
* @param userId 用户Id
- * @param json 发送数据包
+ * @param consumer 消费者发送函数
* @return void
*/
- void sendMessage(String userId, String json);
+ void sendMessage(String userId, SseEmitterConsumer consumer);
/**
* sse发送当前租户下所有在线客户端消息
- * @param json 发送数据包
+ * @param consumer 消费者发送函数
* @return void
*/
- void sendMessage(String json);
+ void sendMessage(SseEmitterConsumer consumer);
/**
* 断开当前用户租户下所有客户端连接
@@ -48,4 +51,12 @@ public interface IMapLogisticSseService {
*/
void disconnect(String clientId);
+ /**
+ * SseEmitter消费函数
+ */
+ @FunctionalInterface
+ interface SseEmitterConsumer {
+ void accept(T t) throws IOException;
+ }
+
}
diff --git a/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/impl/MapLogisticSseServiceImpl.java b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/impl/MapLogisticSseServiceImpl.java
index 1d97d6bb..d778c5b7 100644
--- a/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/impl/MapLogisticSseServiceImpl.java
+++ b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/impl/MapLogisticSseServiceImpl.java
@@ -50,7 +50,6 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService {
public SseEmitter SseSubscribe(String clientId) {
KiccUser kiccUser = getUser();
Optional.ofNullable(clientId).orElseThrow(() -> new CommonException("当前客户端Id为空,请检查后重试!"));
- sseSignalContainers.removeIf(item -> StrUtil.equals(item.getClientId(), clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId()));
// 设置超时时间为1小时
SseEmitter sseEmitter = new SseEmitter(3600_000L);
SseSignalContainer sseSignalContainer =new SseSignalContainer(
@@ -60,44 +59,38 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService {
kiccUser.getTenantId()
);
// 设置如果网络出错前端请求的重试时间为1s
- sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("创建通道连接成功"));
+ sseEmitter.send(SseEmitter.event().data("创建通道连接成功").reconnectTime(1000));
sseSignalContainers.add(sseSignalContainer);
log.info("clientId:{},建立的用户Id为:{}", clientId, kiccUser.getId());
sseEmitter.onTimeout(() -> {
log.info("clientId:{},用户Id为:{},的SSE长轮询已经超时,正在删除当前的建立通道对象", clientId, kiccUser.getId());
- sseSignalContainers.removeIf(item -> StrUtil.equals(item.getClientId(), clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId()));
+ sseEmitter.complete();
+ sseSignalContainers.remove(sseSignalContainer);
+ });
+ sseEmitter.onCompletion(() -> {
+ log.info("clientId:{},用户Id为:{}的SSE长轮询已经返回响应关闭,正在删除当前的建立通道对象", clientId, kiccUser.getId());
+ sseSignalContainers.remove(sseSignalContainer);
});
- sseEmitter.onCompletion(() -> log.info("clientId:{},用户Id为:{}的SSE长轮询已经返回响应关闭,正在删除当前的建立通道对象", clientId, kiccUser.getId()));
sseEmitter.onError(e -> log.info("clientId:{},当前用户Id为:{}的SSE长轮询出现异常,正在删除当前的建立通道对象,错误信息{}", clientId, kiccUser.getId(), e.getLocalizedMessage()));
return sseEmitter;
}
@Override
- @SneakyThrows
- public void sendMessage(String userId, String json) {
+ public void sendMessage(String userId, SseEmitterConsumer consumer) {
KiccUser kiccUser = getUser();
Set sendSseSignalContainers = sseSignalContainers.stream()
.filter(item -> StrUtil.equals(item.getUserId(), userId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId()))
.collect(Collectors.toSet());
- Iterator it = sendSseSignalContainers.iterator();
- while (it.hasNext()) {
- SseSignalContainer item = it.next();
- item.getSseEmitter().send(json);
- }
+ send(sendSseSignalContainers, consumer);
}
@Override
- @SneakyThrows
- public void sendMessage(String json) {
+ public void sendMessage(SseEmitterConsumer consumer) {
KiccUser kiccUser = getUser();
Set sendSseSignalContainers = sseSignalContainers.stream()
.filter(item -> StrUtil.equals(item.getTenantId(), kiccUser.getTenantId()))
.collect(Collectors.toSet());
- Iterator it = sendSseSignalContainers.iterator();
- while (it.hasNext()) {
- SseSignalContainer item = it.next();
- item.getSseEmitter().send(json);
- }
+ send(sendSseSignalContainers, consumer);
}
@Override
@@ -108,7 +101,7 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService {
.filter(item -> StrUtil.equals(item.getClientId(), clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId()))
.collect(Collectors.toSet());
sendSseSignalContainers.forEach(item -> item.getSseEmitter().complete());
- sseSignalContainers.removeIf(sseSignalContainer -> StrUtil.equals(sseSignalContainer.getClientId(), clientId) && StrUtil.equals(sseSignalContainer.getTenantId(), kiccUser.getTenantId()));
+ sseSignalContainers.removeAll(sendSseSignalContainers);
}
@Override
@@ -118,7 +111,7 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService {
.filter(item -> StrUtil.equals(item.getTenantId(), kiccUser.getTenantId()))
.collect(Collectors.toSet());
sendSseSignalContainers.forEach(item -> item.getSseEmitter().complete());
- sseSignalContainers.removeIf(item -> StrUtil.equals(item.getTenantId(), kiccUser.getTenantId()));
+ sseSignalContainers.removeAll(sendSseSignalContainers);
}
/**
@@ -131,13 +124,36 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService {
try {
item.getSseEmitter().send(SseEmitter.event().comment("保持心跳" + LocalDateTime.now()).reconnectTime(1000));
} catch (IOException e) {
+ item.getSseEmitter().completeWithError(e);
+ sseSignalContainers.remove(item);
log.debug("clientId:{},用户Id为:{}发送心跳包失败", item.getClientId(), item.getUserId());
- sseSignalContainers.removeIf(sseSignalContainer ->
- StrUtil.equals(sseSignalContainer.getClientId(), item.getClientId()) && StrUtil.equals(sseSignalContainer.getTenantId(), item.getTenantId()));
}
}
}
+ /**
+ * 发送消息
+ * @param sendSseSignalContainers SseEmitter发送集合
+ * @param consumer 消费者发送函数
+ * @return Set 失败集合字段
+ */
+ private Set send(Set sendSseSignalContainers, SseEmitterConsumer consumer) {
+ Set failedEmitters = new ConcurrentHashSet();
+ Iterator it = sendSseSignalContainers.iterator();
+ while (it.hasNext()) {
+ SseSignalContainer item = it.next();
+ try {
+ consumer.accept(item);
+ } catch (Exception e) {
+ item.getSseEmitter().completeWithError(e);
+ failedEmitters.add(item);
+ log.error("Emitter failed: {}", item.getSseEmitter(), e);
+ }
+ }
+ sseSignalContainers.removeAll(failedEmitters);
+ return failedEmitters;
+ }
+
private KiccUser getUser() {
KiccUser kiccUser = SecurityUtils.getUser();
Optional.ofNullable(kiccUser).orElseThrow(() -> new CommonException("当前用户登录,请先登录后重试!"));