From 45d8cb483a23a9bef68ebb822696ef1c77f94f8c Mon Sep 17 00:00:00 2001 From: wangxiang <1827945911@qq.com> Date: Fri, 16 Sep 2022 00:00:32 +0800 Subject: [PATCH] =?UTF-8?q?:rocket:=20=E9=9B=86=E6=88=90SSE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../api/entity/SseSignalContainer.java | 2 + .../controller/MapLogisticSseController.java | 32 ++++- .../service/IMapLogisticSseService.java | 13 +- .../impl/MapLogisticSseServiceImpl.java | 125 +++++++++--------- 4 files changed, 95 insertions(+), 77 deletions(-) diff --git a/kicc-platform/kicc-platform-api/kicc-common-api/src/main/java/com/cloud/kicc/commonbiz/api/entity/SseSignalContainer.java b/kicc-platform/kicc-platform-api/kicc-common-api/src/main/java/com/cloud/kicc/commonbiz/api/entity/SseSignalContainer.java index ea3213f1..76dfe3cf 100644 --- a/kicc-platform/kicc-platform-api/kicc-common-api/src/main/java/com/cloud/kicc/commonbiz/api/entity/SseSignalContainer.java +++ b/kicc-platform/kicc-platform-api/kicc-common-api/src/main/java/com/cloud/kicc/commonbiz/api/entity/SseSignalContainer.java @@ -20,6 +20,8 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; @AllArgsConstructor public class SseSignalContainer { + private String clientId; + private String userId; private SseEmitter sseEmitter; diff --git a/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/controller/MapLogisticSseController.java b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/controller/MapLogisticSseController.java index 8e473ec3..f599cf01 100644 --- a/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/controller/MapLogisticSseController.java +++ b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/controller/MapLogisticSseController.java @@ -1,11 +1,20 @@ package com.cloud.kicc.commonbiz.controller; +import cn.hutool.core.codec.Base64; +import cn.hutool.core.util.StrUtil; import com.cloud.kicc.common.core.api.R; import com.cloud.kicc.common.core.constant.AppConstants; +import com.cloud.kicc.common.security.annotation.Inner; import com.cloud.kicc.commonbiz.service.IMapLogisticSseService; import io.swagger.annotations.Api; import lombok.RequiredArgsConstructor; import org.springframework.http.MediaType; +import org.springframework.security.core.context.SecurityContext; +import org.springframework.security.core.context.SecurityContextHolder; +import org.springframework.security.oauth2.client.OAuth2ClientContext; +import org.springframework.security.oauth2.common.DefaultOAuth2AccessToken; +import org.springframework.security.oauth2.provider.OAuth2Authentication; +import org.springframework.security.oauth2.provider.token.TokenStore; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; @@ -29,9 +38,20 @@ public class MapLogisticSseController { private final IMapLogisticSseService iMapLogisticSseService; - @GetMapping(value = "/subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE}) - public SseEmitter subscribe() { - return iMapLogisticSseService.SseSubscribe(); + private final TokenStore tokenStore; + + private final OAuth2ClientContext oAuth2ClientContext; + + @Inner(false) + @GetMapping(value = "/subscribe") + public SseEmitter subscribe(String accessToken, String clientId) { + OAuth2Authentication oAuth2Authentication = tokenStore.readAuthentication(accessToken); + SecurityContext context = SecurityContextHolder.createEmptyContext(); + context.setAuthentication(oAuth2Authentication); + DefaultOAuth2AccessToken defaultOAuth2AccessToken = new DefaultOAuth2AccessToken(accessToken); + oAuth2ClientContext.setAccessToken(defaultOAuth2AccessToken); + SecurityContextHolder.setContext(context); + return iMapLogisticSseService.SseSubscribe(clientId); } @GetMapping("/sendMessage") @@ -46,9 +66,9 @@ public class MapLogisticSseController { return R.ok(); } - @GetMapping("/remove/{userId:\\w+}") - public R remove(@PathVariable String userId) { - iMapLogisticSseService.remove(userId); + @GetMapping("/disconnect") + public R disconnect(String clientId) { + iMapLogisticSseService.disconnect(clientId); return R.ok(); } diff --git a/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/IMapLogisticSseService.java b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/IMapLogisticSseService.java index fa08a802..d66e7d27 100644 --- a/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/IMapLogisticSseService.java +++ b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/IMapLogisticSseService.java @@ -15,9 +15,10 @@ public interface IMapLogisticSseService { /** * 采用长轮询订阅 + * @param clientId 客户端唯一Id * @return SseEmitter */ - SseEmitter SseSubscribe(); + SseEmitter SseSubscribe(String clientId); /** * sse发送消息 @@ -42,15 +43,9 @@ public interface IMapLogisticSseService { /** * 断开当前用户连接 + * @param clientId 客户端唯一Id * @return void */ - void disconnect(); - - /** - * 移除指定用户客户端 - * @param userId 指定用户Id - * @return void - */ - void remove(String userId); + void disconnect(String clientId); } diff --git a/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/impl/MapLogisticSseServiceImpl.java b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/impl/MapLogisticSseServiceImpl.java index 62988082..af0b3a91 100644 --- a/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/impl/MapLogisticSseServiceImpl.java +++ b/kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/impl/MapLogisticSseServiceImpl.java @@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.ConcurrentHashSet; import cn.hutool.core.util.StrUtil; import com.cloud.kicc.common.core.exception.CommonException; +import com.cloud.kicc.common.core.util.BaseUtil; import com.cloud.kicc.common.data.entity.KiccUser; import com.cloud.kicc.common.security.util.SecurityUtils; import com.cloud.kicc.commonbiz.api.entity.SseSignalContainer; @@ -17,6 +18,7 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; import java.time.LocalDateTime; +import java.util.Iterator; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -42,43 +44,41 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService { */ @XxlJob("doHeartbeat") public void doHeartbeat() { - sseSignalContainers.forEach(item -> { - try { - item.getSseEmitter().send(SseEmitter.event().comment("保持心跳 " + LocalDateTime.now()).reconnectTime(1000)); - } catch (IOException e) { - log.debug("当前用户Id为:{}发送心跳包失败,正在删除当前的建立通道对象", item.getUserId()); - sseSignalContainers.removeIf(sseSignalContainer -> StrUtil.equals(item.getUserId(), sseSignalContainer.getUserId()) && StrUtil.equals(item.getTenantId(), sseSignalContainer.getTenantId())); - } - }); + doMaintenance(); } @Override @SneakyThrows - public SseEmitter SseSubscribe() { + public SseEmitter SseSubscribe(String clientId) { KiccUser kiccUser = getUser(); - // 超时时间设置为20秒 - SseEmitter sseEmitter = new SseEmitter(20000L); - SseSignalContainer sseSignalContainer =new SseSignalContainer( - kiccUser.getId(), - sseEmitter, - kiccUser.getTenantId() - ); - // 设置如果网络出错前端请求的重试时间为1s - sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("创建通道连接成功")); - sseSignalContainers.add(sseSignalContainer); - log.info("当前建立的用户Id为:{}", kiccUser.getId()); - sseEmitter.onTimeout(() -> { - log.info("当前用户Id为:{}的SSE长轮询已经超时,正在删除当前的建立通道对象", kiccUser.getId()); - sseSignalContainers.removeIf(item -> StrUtil.equals(item.getUserId(), kiccUser.getId()) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); - }); - sseEmitter.onCompletion(() -> { - log.info("当前用户Id为:{}的SSE长轮询已经返回响应关闭,正在删除当前的建立通道对象", kiccUser.getId()); - sseSignalContainers.removeIf(item -> StrUtil.equals(item.getUserId(), kiccUser.getId()) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); - }); - sseEmitter.onError(e -> { - log.info("当前用户Id为:{}的SSE长轮询出现异常,正在删除当前的建立通道对象,错误信息{}", kiccUser.getId(), e.getLocalizedMessage()); - sseSignalContainers.removeIf(item -> StrUtil.equals(item.getUserId(), kiccUser.getId()) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); - }); + // 设置超时时间为1小时 + SseEmitter sseEmitter = new SseEmitter(3600_000L); + if(sseSignalContainers.stream() + .filter(item -> StrUtil.equals(item.getClientId(),clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())) + .collect(Collectors.toSet()).isEmpty()) { + SseSignalContainer sseSignalContainer =new SseSignalContainer( + clientId, + kiccUser.getId(), + sseEmitter, + kiccUser.getTenantId() + ); + // 设置如果网络出错前端请求的重试时间为1s + sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("创建通道连接成功")); + sseSignalContainers.add(sseSignalContainer); + log.info("clientId:{},建立的用户Id为:{}", clientId, kiccUser.getId()); + sseEmitter.onTimeout(() -> { + log.info("clientId:{},用户Id为:{},的SSE长轮询已经超时,正在删除当前的建立通道对象", clientId, kiccUser.getId()); + sseSignalContainers.removeIf(item -> StrUtil.equals(item.getClientId(), clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); + }); + sseEmitter.onCompletion(() -> { + log.info("clientId:{},用户Id为:{}的SSE长轮询已经返回响应关闭,正在删除当前的建立通道对象", clientId, kiccUser.getId()); + sseSignalContainers.removeIf(item -> StrUtil.equals(item.getClientId(), clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); + }); + sseEmitter.onError(e -> { + log.info("clientId:{},当前用户Id为:{}的SSE长轮询出现异常,正在删除当前的建立通道对象,错误信息{}", clientId, kiccUser.getId(), e.getLocalizedMessage()); + sseSignalContainers.removeIf(item -> StrUtil.equals(item.getClientId(), clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); + }); + } else new CommonException("客户端id重复,请重新设置确保唯一"); return sseEmitter; } @@ -87,57 +87,58 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService { public void sendMessage(String userId, String json) { KiccUser kiccUser = getUser(); Set sendSseSignalContainers = sseSignalContainers.stream() - .filter(item -> StrUtil.equals(item.getUserId(), userId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())).collect(Collectors.toSet()); - if (CollectionUtil.isNotEmpty(sendSseSignalContainers)) { - SseSignalContainer sendSseSignalContainer = sendSseSignalContainers.stream().findFirst().get(); - sendSseSignalContainer.getSseEmitter().send(json); + .filter(item -> StrUtil.equals(item.getUserId(), userId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())) + .collect(Collectors.toSet()); + Iterator it = sendSseSignalContainers.iterator(); + while (it.hasNext()) { + SseSignalContainer item = it.next(); + item.getSseEmitter().send(json); } } @Override + @SneakyThrows public void sendTenantMessage(String json) { KiccUser kiccUser = getUser(); Set sendSseSignalContainers = sseSignalContainers.stream() - .filter(item -> StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())).collect(Collectors.toSet()); - sendSseSignalContainers.forEach(item -> { - try { - item.getSseEmitter().send(json); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + .filter(item -> StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())) + .collect(Collectors.toSet()); + Iterator it = sendSseSignalContainers.iterator(); + while (it.hasNext()) { + SseSignalContainer item = it.next(); + item.getSseEmitter().send(json); + } } @Override - public void disconnect() { + public void disconnect(String clientId) { KiccUser kiccUser = getUser(); Set sendSseSignalContainers = sseSignalContainers.stream() - .filter(item -> StrUtil.equals(item.getUserId(), kiccUser.getId()) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())).collect(Collectors.toSet()); - if (CollectionUtil.isNotEmpty(sendSseSignalContainers)) { - SseSignalContainer sendSseSignalContainer = sendSseSignalContainers.stream().findFirst().get(); - sendSseSignalContainer.getSseEmitter().complete(); - sseSignalContainers.removeIf(item -> StrUtil.equals(item.getUserId(), kiccUser.getId()) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); - } + .filter(item -> StrUtil.equals(item.getClientId(), clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())) + .collect(Collectors.toSet()); + sendSseSignalContainers.forEach(item -> item.getSseEmitter().complete()); + sseSignalContainers.removeIf(sseSignalContainer -> StrUtil.equals(sseSignalContainer.getClientId(), clientId) && StrUtil.equals(sseSignalContainer.getTenantId(), kiccUser.getTenantId())); } @Override public void disconnectTenant() { - KiccUser kiccUser = SecurityUtils.getUser(); + KiccUser kiccUser = getUser(); Set sendSseSignalContainers = sseSignalContainers.stream() - .filter(item -> StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())).collect(Collectors.toSet()); + .filter(item -> StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())) + .collect(Collectors.toSet()); sendSseSignalContainers.forEach(item -> item.getSseEmitter().complete()); sseSignalContainers.removeIf(item -> StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); } - @Override - public void remove(String userId) { - KiccUser kiccUser = getUser(); - Set sendSseSignalContainers = sseSignalContainers.stream() - .filter(item -> StrUtil.equals(item.getUserId(), userId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())).collect(Collectors.toSet()); - if (CollectionUtil.isNotEmpty(sendSseSignalContainers)) { - SseSignalContainer sendSseSignalContainer = sendSseSignalContainers.stream().findFirst().get(); - sendSseSignalContainer.getSseEmitter().complete(); - sseSignalContainers.removeIf(item -> StrUtil.equals(item.getUserId(), userId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); + /** + * 执行心跳维护,避免 sse 膨胀容量问题 + */ + @SneakyThrows + private void doMaintenance() { + Iterator it = sseSignalContainers.iterator(); + while (it.hasNext()) { + SseSignalContainer item = it.next(); + item.getSseEmitter().send(SseEmitter.event().comment("保持心跳" + LocalDateTime.now()).reconnectTime(1000)); } }