Browse Source

🚀 集成SSE

master
wangxiang 3 years ago
parent
commit
fe27a39dad
  1. 29
      kicc-platform/kicc-platform-api/kicc-common-api/src/main/java/com/cloud/kicc/commonbiz/api/entity/SseSignalContainer.java
  2. 5
      kicc-platform/kicc-platform-biz/kicc-common-biz/pom.xml
  3. 2
      kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/KiccCommonApplication.java
  4. 61
      kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/controller/MapLogisticSseController.java
  5. 56
      kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/IMapLogisticSseService.java
  6. 150
      kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/impl/MapLogisticSseServiceImpl.java

29
kicc-platform/kicc-platform-api/kicc-common-api/src/main/java/com/cloud/kicc/commonbiz/api/entity/SseSignalContainer.java

@ -0,0 +1,29 @@ @@ -0,0 +1,29 @@
package com.cloud.kicc.commonbiz.api.entity;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
/**
*<p>
* SSE 信号量容器
*</p>
*
* @Author: entfrm开发团队-王翔
* @since: 2022/9/15
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@AllArgsConstructor
public class SseSignalContainer {
private String userId;
private SseEmitter sseEmitter;
protected String tenantId;
}

5
kicc-platform/kicc-platform-biz/kicc-common-biz/pom.xml

@ -54,6 +54,11 @@ @@ -54,6 +54,11 @@
<groupId>com.cloud</groupId>
<artifactId>kicc-common-mock</artifactId>
</dependency>
<!--xxl-job分布式定时任务平台-->
<dependency>
<groupId>com.cloud</groupId>
<artifactId>kicc-common-job</artifactId>
</dependency>
<!--MQTT-->
<dependency>
<groupId>org.eclipse.paho</groupId>

2
kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/KiccCommonApplication.java

@ -2,6 +2,7 @@ package com.cloud.kicc.commonbiz; @@ -2,6 +2,7 @@ package com.cloud.kicc.commonbiz;
import com.cloud.kicc.common.core.annotation.EnableKiccJacksonAutoConvert;
import com.cloud.kicc.common.feign.annotation.EnableKiccFeignClients;
import com.cloud.kicc.common.job.annotation.EnableKiccXxlJob;
import com.cloud.kicc.common.security.annotation.EnableKiccResourceServer;
import com.cloud.kicc.common.swagger.annotation.EnableKiccSwagger2;
import org.springframework.boot.SpringApplication;
@ -20,6 +21,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -20,6 +21,7 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableKiccFeignClients
@SpringBootApplication
@EnableKiccJacksonAutoConvert
@EnableKiccXxlJob
public class KiccCommonApplication {
public static void main(String[] args) {

61
kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/controller/MapLogisticSseController.java

@ -0,0 +1,61 @@ @@ -0,0 +1,61 @@
package com.cloud.kicc.commonbiz.controller;
import com.cloud.kicc.common.core.api.R;
import com.cloud.kicc.common.core.constant.AppConstants;
import com.cloud.kicc.commonbiz.service.IMapLogisticSseService;
import io.swagger.annotations.Api;
import lombok.RequiredArgsConstructor;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
/**
*<p>
* 物流地图SSE长连接 控制层
* 长轮询: https://developer.aliyun.com/article/933937#slide-5
*</p>
*
* @Author: entfrm开发团队-王翔
* @since: 2022/9/14
*/
@RestController
@RequestMapping(AppConstants.APP_COMMON + "/mapLogisticSse")
@RequiredArgsConstructor
@Api(tags = "地图物流SSE控制接口")
public class MapLogisticSseController {
private final IMapLogisticSseService iMapLogisticSseService;
@GetMapping(value = "/subscribe", produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
public SseEmitter subscribe() {
return iMapLogisticSseService.SseSubscribe();
}
@GetMapping("/sendMessage")
public R sendMessage(String userId, String json) {
iMapLogisticSseService.sendMessage(userId, json);
return R.ok();
}
@GetMapping("/sendTenantMessage")
public R sendTenantMessage(String json) {
iMapLogisticSseService.sendTenantMessage(json);
return R.ok();
}
@GetMapping("/remove/{userId:\\w+}")
public R remove(@PathVariable String userId) {
iMapLogisticSseService.remove(userId);
return R.ok();
}
@GetMapping("/disconnectTenant")
public R disconnectTenant() {
iMapLogisticSseService.disconnectTenant();
return R.ok();
}
}

56
kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/IMapLogisticSseService.java

@ -0,0 +1,56 @@ @@ -0,0 +1,56 @@
package com.cloud.kicc.commonbiz.service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
/**
*<p>
* 物流地图SSE长连接 服务层
*</p>
*
* @Author: entfrm开发团队-王翔
* @since: 2022/9/14
*/
public interface IMapLogisticSseService {
/**
* 采用长轮询订阅
* @return SseEmitter
*/
SseEmitter SseSubscribe();
/**
* sse发送消息
* @param userId 用户Id
* @param json 发送数据包
* @return void
*/
void sendMessage(String userId, String json);
/**
* sse发送当前租户下所有在线客户端消息
* @param json 发送数据包
* @return void
*/
void sendTenantMessage(String json);
/**
* 断开当前用户租户下所有客户端连接
* @return void
*/
void disconnectTenant();
/**
* 断开当前用户连接
* @return void
*/
void disconnect();
/**
* 移除指定用户客户端
* @param userId 指定用户Id
* @return void
*/
void remove(String userId);
}

150
kicc-platform/kicc-platform-biz/kicc-common-biz/src/main/java/com/cloud/kicc/commonbiz/service/impl/MapLogisticSseServiceImpl.java

@ -0,0 +1,150 @@ @@ -0,0 +1,150 @@
package com.cloud.kicc.commonbiz.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ConcurrentHashSet;
import cn.hutool.core.util.StrUtil;
import com.cloud.kicc.common.core.exception.CommonException;
import com.cloud.kicc.common.data.entity.KiccUser;
import com.cloud.kicc.common.security.util.SecurityUtils;
import com.cloud.kicc.commonbiz.api.entity.SseSignalContainer;
import com.cloud.kicc.commonbiz.service.IMapLogisticSseService;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
/**
* <p>
* 地图核心主任务表 服务实现类
* </p>
*
* @author entfrm开发团队-王翔
* @since 2022-07-22
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class MapLogisticSseServiceImpl implements IMapLogisticSseService {
/** 存储 sse 上下文容器 */
private static Set<SseSignalContainer> sseSignalContainers = new ConcurrentHashSet();
/**
* 间隔10秒发送心跳包,检查客户端是否关闭
*/
@XxlJob("doHeartbeat")
public void doHeartbeat() {
sseSignalContainers.forEach(item -> {
try {
item.getSseEmitter().send(SseEmitter.event().comment("保持心跳 " + LocalDateTime.now()).reconnectTime(1000));
} catch (IOException e) {
log.debug("当前用户Id为:{}发送心跳包失败,正在删除当前的建立通道对象", item.getUserId());
sseSignalContainers.removeIf(sseSignalContainer -> StrUtil.equals(item.getUserId(), sseSignalContainer.getUserId()) && StrUtil.equals(item.getTenantId(), sseSignalContainer.getTenantId()));
}
});
}
@Override
@SneakyThrows
public SseEmitter SseSubscribe() {
KiccUser kiccUser = getUser();
// 超时时间设置为20秒
SseEmitter sseEmitter = new SseEmitter(20000L);
SseSignalContainer sseSignalContainer =new SseSignalContainer(
kiccUser.getId(),
sseEmitter,
kiccUser.getTenantId()
);
// 设置如果网络出错前端请求的重试时间为1s
sseEmitter.send(SseEmitter.event().reconnectTime(1000).data("创建通道连接成功"));
sseSignalContainers.add(sseSignalContainer);
log.info("当前建立的用户Id为:{}", kiccUser.getId());
sseEmitter.onTimeout(() -> {
log.info("当前用户Id为:{}的SSE长轮询已经超时,正在删除当前的建立通道对象", kiccUser.getId());
sseSignalContainers.removeIf(item -> StrUtil.equals(item.getUserId(), kiccUser.getId()) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId()));
});
sseEmitter.onCompletion(() -> {
log.info("当前用户Id为:{}的SSE长轮询已经返回响应关闭,正在删除当前的建立通道对象", kiccUser.getId());
sseSignalContainers.removeIf(item -> StrUtil.equals(item.getUserId(), kiccUser.getId()) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId()));
});
sseEmitter.onError(e -> {
log.info("当前用户Id为:{}的SSE长轮询出现异常,正在删除当前的建立通道对象,错误信息{}", kiccUser.getId(), e.getLocalizedMessage());
sseSignalContainers.removeIf(item -> StrUtil.equals(item.getUserId(), kiccUser.getId()) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId()));
});
return sseEmitter;
}
@Override
@SneakyThrows
public void sendMessage(String userId, String json) {
KiccUser kiccUser = getUser();
Set<SseSignalContainer> sendSseSignalContainers = sseSignalContainers.stream()
.filter(item -> StrUtil.equals(item.getUserId(), userId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())).collect(Collectors.toSet());
if (CollectionUtil.isNotEmpty(sendSseSignalContainers)) {
SseSignalContainer sendSseSignalContainer = sendSseSignalContainers.stream().findFirst().get();
sendSseSignalContainer.getSseEmitter().send(json);
}
}
@Override
public void sendTenantMessage(String json) {
KiccUser kiccUser = getUser();
Set<SseSignalContainer> sendSseSignalContainers = sseSignalContainers.stream()
.filter(item -> StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())).collect(Collectors.toSet());
sendSseSignalContainers.forEach(item -> {
try {
item.getSseEmitter().send(json);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
@Override
public void disconnect() {
KiccUser kiccUser = getUser();
Set<SseSignalContainer> sendSseSignalContainers = sseSignalContainers.stream()
.filter(item -> StrUtil.equals(item.getUserId(), kiccUser.getId()) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())).collect(Collectors.toSet());
if (CollectionUtil.isNotEmpty(sendSseSignalContainers)) {
SseSignalContainer sendSseSignalContainer = sendSseSignalContainers.stream().findFirst().get();
sendSseSignalContainer.getSseEmitter().complete();
sseSignalContainers.removeIf(item -> StrUtil.equals(item.getUserId(), kiccUser.getId()) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId()));
}
}
@Override
public void disconnectTenant() {
KiccUser kiccUser = SecurityUtils.getUser();
Set<SseSignalContainer> sendSseSignalContainers = sseSignalContainers.stream()
.filter(item -> StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())).collect(Collectors.toSet());
sendSseSignalContainers.forEach(item -> item.getSseEmitter().complete());
sseSignalContainers.removeIf(item -> StrUtil.equals(item.getTenantId(), kiccUser.getTenantId()));
}
@Override
public void remove(String userId) {
KiccUser kiccUser = getUser();
Set<SseSignalContainer> sendSseSignalContainers = sseSignalContainers.stream()
.filter(item -> StrUtil.equals(item.getUserId(), userId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId())).collect(Collectors.toSet());
if (CollectionUtil.isNotEmpty(sendSseSignalContainers)) {
SseSignalContainer sendSseSignalContainer = sendSseSignalContainers.stream().findFirst().get();
sendSseSignalContainer.getSseEmitter().complete();
sseSignalContainers.removeIf(item -> StrUtil.equals(item.getUserId(), userId) && StrUtil.equals(item.getTenantId(), kiccUser.getTenantId()));
}
}
private KiccUser getUser() {
KiccUser kiccUser = SecurityUtils.getUser();
Optional.ofNullable(kiccUser).orElseThrow(() -> new CommonException("当前用户登录,请先登录后重试!"));
return kiccUser;
}
}
Loading…
Cancel
Save