Browse Source

🚀 集成SSE

master
wangxiang 3 years ago
parent
commit
4c1e326d98
  1. 3
      kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/controller/MapLogisticSseController.java
  2. 50
      kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/impl/MapLogisticSseServiceImpl.java

3
kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/controller/MapLogisticSseController.java

@ -6,6 +6,7 @@ import com.cloud.kicc.common.security.annotation.Inner;
import com.cloud.kicc.commonbiz.service.IMapLogisticSseService; import com.cloud.kicc.commonbiz.service.IMapLogisticSseService;
import io.swagger.annotations.Api; import io.swagger.annotations.Api;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.springframework.http.MediaType;
import org.springframework.security.core.context.SecurityContext; import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.security.oauth2.client.OAuth2ClientContext; import org.springframework.security.oauth2.client.OAuth2ClientContext;
@ -39,7 +40,7 @@ public class MapLogisticSseController {
private final OAuth2ClientContext oAuth2ClientContext; private final OAuth2ClientContext oAuth2ClientContext;
@Inner(false) @Inner(false)
@GetMapping(value = "/subscribe") @GetMapping(value = "/subscribe", produces = { MediaType.TEXT_EVENT_STREAM_VALUE })
public SseEmitter subscribe(String accessToken, String clientId) { public SseEmitter subscribe(String accessToken, String clientId) {
OAuth2Authentication oAuth2Authentication = tokenStore.readAuthentication(accessToken); OAuth2Authentication oAuth2Authentication = tokenStore.readAuthentication(accessToken);
SecurityContext context = SecurityContextHolder.createEmptyContext(); SecurityContext context = SecurityContextHolder.createEmptyContext();

50
kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/impl/MapLogisticSseServiceImpl.java

@ -49,35 +49,31 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService {
public SseEmitter SseSubscribe(String clientId) { public SseEmitter SseSubscribe(String clientId) {
KiccUser kiccUser = getUser(); KiccUser kiccUser = getUser();
Optional.ofNullable(clientId).orElseThrow(() -> new CommonException("当前客户端Id为空,请检查后重试!")); Optional.ofNullable(clientId).orElseThrow(() -> new CommonException("当前客户端Id为空,请检查后重试!"));
sseSignalContainers.removeIf(item -> StrUtil.equals(item.getClientId(), clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId()));
// 设置超时时间为1小时 // 设置超时时间为1小时
SseEmitter sseEmitter = new SseEmitter(3600_000L); SseEmitter sseEmitter = new SseEmitter(3600_000L);
doMaintenance(); SseSignalContainer sseSignalContainer =new SseSignalContainer(
if(sseSignalContainers.stream() clientId,
.filter(item -> StrUtil.equals(item.getClientId(),clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())) kiccUser.getId(),
.collect(Collectors.toSet()).isEmpty()) { sseEmitter,
SseSignalContainer sseSignalContainer =new SseSignalContainer( kiccUser.getTenantId()
clientId, );
kiccUser.getId(), // 设置如果网络出错前端请求的重试时间为1s
sseEmitter, sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("创建通道连接成功"));
kiccUser.getTenantId() sseSignalContainers.add(sseSignalContainer);
); log.info("clientId:{},建立的用户Id为:{}", clientId, kiccUser.getId());
// 设置如果网络出错前端请求的重试时间为1s sseEmitter.onTimeout(() -> {
sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("创建通道连接成功")); log.info("clientId:{},用户Id为:{},的SSE长轮询已经超时,正在删除当前的建立通道对象", clientId, kiccUser.getId());
sseSignalContainers.add(sseSignalContainer); sseSignalContainers.removeIf(item -> StrUtil.equals(item.getClientId(), clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId()));
log.info("clientId:{},建立的用户Id为:{}", clientId, kiccUser.getId()); });
sseEmitter.onTimeout(() -> { sseEmitter.onCompletion(() -> {
log.info("clientId:{},用户Id为:{},的SSE长轮询已经超时,正在删除当前的建立通道对象", clientId, kiccUser.getId()); log.info("clientId:{},用户Id为:{}的SSE长轮询已经返回响应关闭,正在删除当前的建立通道对象", clientId, kiccUser.getId());
sseSignalContainers.removeIf(item -> StrUtil.equals(item.getClientId(), clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); sseSignalContainers.removeIf(item -> StrUtil.equals(item.getClientId(), clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId()));
}); });
sseEmitter.onCompletion(() -> { sseEmitter.onError(e -> {
log.info("clientId:{},用户Id为:{}的SSE长轮询已经返回响应关闭,正在删除当前的建立通道对象", clientId, kiccUser.getId()); log.info("clientId:{},当前用户Id为:{}的SSE长轮询出现异常,正在删除当前的建立通道对象,错误信息{}", clientId, kiccUser.getId(), e.getLocalizedMessage());
sseSignalContainers.removeIf(item -> StrUtil.equals(item.getClientId(), clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())); sseSignalContainers.removeIf(item -> StrUtil.equals(item.getClientId(), clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId()));
}); });
sseEmitter.onError(e -> {
log.info("clientId:{},当前用户Id为:{}的SSE长轮询出现异常,正在删除当前的建立通道对象,错误信息{}", clientId, kiccUser.getId(), e.getLocalizedMessage());
sseSignalContainers.removeIf(item -> StrUtil.equals(item.getClientId(), clientId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId()));
});
} else throw new CommonException("客户端id重复,请重新设置确保唯一");
return sseEmitter; return sseEmitter;
} }

Loading…
Cancel
Save