From 4c1e326d987970ce3f87ea7d26e8a98be9162125 Mon Sep 17 00:00:00 2001 From: wangxiang <1827945911@qq.com> Date: Fri, 16 Sep 2022 00:40:04 +0800 Subject: [PATCH] =?UTF-8?q?:rocket:=20=E9=9B=86=E6=88=90SSE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../controller/MapLogisticSseController.java | 3 +- .../impl/MapLogisticSseServiceImpl.java | 50 +++++++++---------- 2 files changed, 25 insertions(+), 28 deletions(-) diff --git a/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/controller/MapLogisticSseController.java b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/controller/MapLogisticSseController.java index 0159dab4..83c63ea5 100644 --- a/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/controller/MapLogisticSseController.java +++ b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/controller/MapLogisticSseController.java @@ -6,6 +6,7 @@ import com.cloud.kicc.common.security.annotation.Inner; import com.cloud.kicc.commonbiz.service.IMapLogisticSseService; import io.swagger.annotations.Api; import lombok.RequiredArgsConstructor; +import org.springframework.http.MediaType; import org.springframework.security.core.context.SecurityContext; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.security.oauth2.client.OAuth2ClientContext; @@ -39,7 +40,7 @@ public class MapLogisticSseController { private final OAuth2ClientContext oAuth2ClientContext; @Inner(false) - @GetMapping(value = "/subscribe") + @GetMapping(value = "/subscribe", produces = { MediaType.TEXT_EVENT_STREAM_VALUE }) public SseEmitter subscribe(String accessToken, String clientId) { OAuth2Authentication oAuth2Authentication = tokenStore.readAuthentication(accessToken); SecurityContext context = SecurityContextHolder.createEmptyContext(); diff --git a/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/impl/MapLogisticSseServiceImpl.java b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/impl/MapLogisticSseServiceImpl.java index bc80343d..4b1245d1 100644 --- a/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/impl/MapLogisticSseServiceImpl.java +++ b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/impl/MapLogisticSseServiceImpl.java @@ -49,35 +49,31 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService { public SseEmitter SseSubscribe(String clientId) { KiccUser kiccUser = getUser(); Optional.ofNullable(clientId).orElseThrow(() -> new CommonException("当前客户端Id为空,请检查后重试!")); + sseSignalContainers.removeIf(item -> StrUtil.equals(item.getClientId(), clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); // 设置超时时间为1小时 SseEmitter sseEmitter = new SseEmitter(3600_000L); - doMaintenance(); - if(sseSignalContainers.stream() - .filter(item -> StrUtil.equals(item.getClientId(),clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())) - .collect(Collectors.toSet()).isEmpty()) { - SseSignalContainer sseSignalContainer =new SseSignalContainer( - clientId, - kiccUser.getId(), - sseEmitter, - kiccUser.getTenantId() - ); - // 设置如果网络出错前端请求的重试时间为1s - sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("创建通道连接成功")); - sseSignalContainers.add(sseSignalContainer); - log.info("clientId:{},建立的用户Id为:{}", clientId, kiccUser.getId()); - sseEmitter.onTimeout(() -> { - log.info("clientId:{},用户Id为:{},的SSE长轮询已经超时,正在删除当前的建立通道对象", clientId, kiccUser.getId()); - sseSignalContainers.removeIf(item -> StrUtil.equals(item.getClientId(), clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); - }); - sseEmitter.onCompletion(() -> { - log.info("clientId:{},用户Id为:{}的SSE长轮询已经返回响应关闭,正在删除当前的建立通道对象", clientId, kiccUser.getId()); - sseSignalContainers.removeIf(item -> StrUtil.equals(item.getClientId(), clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); - }); - sseEmitter.onError(e -> { - log.info("clientId:{},当前用户Id为:{}的SSE长轮询出现异常,正在删除当前的建立通道对象,错误信息{}", clientId, kiccUser.getId(), e.getLocalizedMessage()); - sseSignalContainers.removeIf(item -> StrUtil.equals(item.getClientId(), clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); - }); - } else throw new CommonException("客户端id重复,请重新设置确保唯一"); + SseSignalContainer sseSignalContainer =new SseSignalContainer( + clientId, + kiccUser.getId(), + sseEmitter, + kiccUser.getTenantId() + ); + // 设置如果网络出错前端请求的重试时间为1s + sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("创建通道连接成功")); + sseSignalContainers.add(sseSignalContainer); + log.info("clientId:{},建立的用户Id为:{}", clientId, kiccUser.getId()); + sseEmitter.onTimeout(() -> { + log.info("clientId:{},用户Id为:{},的SSE长轮询已经超时,正在删除当前的建立通道对象", clientId, kiccUser.getId()); + sseSignalContainers.removeIf(item -> StrUtil.equals(item.getClientId(), clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); + }); + sseEmitter.onCompletion(() -> { + log.info("clientId:{},用户Id为:{}的SSE长轮询已经返回响应关闭,正在删除当前的建立通道对象", clientId, kiccUser.getId()); + sseSignalContainers.removeIf(item -> StrUtil.equals(item.getClientId(), clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); + }); + sseEmitter.onError(e -> { + log.info("clientId:{},当前用户Id为:{}的SSE长轮询出现异常,正在删除当前的建立通道对象,错误信息{}", clientId, kiccUser.getId(), e.getLocalizedMessage()); + sseSignalContainers.removeIf(item -> StrUtil.equals(item.getClientId(), clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); + }); return sseEmitter; }