@ -50,7 +50,6 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService {
@@ -50,7 +50,6 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService {
public SseEmitter SseSubscribe ( String clientId ) {
KiccUser kiccUser = getUser ( ) ;
Optional . ofNullable ( clientId ) . orElseThrow ( ( ) - > new CommonException ( "当前客户端Id为空,请检查后重试!" ) ) ;
sseSignalContainers . removeIf ( item - > StrUtil . equals ( item . getClientId ( ) , clientId ) & & StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) ) ;
// 设置超时时间为1小时
SseEmitter sseEmitter = new SseEmitter ( 3600_000L ) ;
SseSignalContainer sseSignalContainer = new SseSignalContainer (
@ -60,44 +59,38 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService {
@@ -60,44 +59,38 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService {
kiccUser . getTenantId ( )
) ;
// 设置如果网络出错前端请求的重试时间为1s
sseEmitter . send ( SseEmitter . event ( ) . reconnectTime ( 1000 ) . data ( "创建通道连接成功" ) ) ;
sseEmitter . send ( SseEmitter . event ( ) . data ( "创建通道连接成功" ) . reconnectTime ( 1000 ) ) ;
sseSignalContainers . add ( sseSignalContainer ) ;
log . info ( "clientId:{},建立的用户Id为:{}" , clientId , kiccUser . getId ( ) ) ;
sseEmitter . onTimeout ( ( ) - > {
log . info ( "clientId:{},用户Id为:{},的SSE长轮询已经超时,正在删除当前的建立通道对象" , clientId , kiccUser . getId ( ) ) ;
sseSignalContainers . removeIf ( item - > StrUtil . equals ( item . getClientId ( ) , clientId ) & & StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) ) ;
sseEmitter . complete ( ) ;
sseSignalContainers . remove ( sseSignalContainer ) ;
} ) ;
sseEmitter . onCompletion ( ( ) - > {
log . info ( "clientId:{},用户Id为:{}的SSE长轮询已经返回响应关闭,正在删除当前的建立通道对象" , clientId , kiccUser . getId ( ) ) ;
sseSignalContainers . remove ( sseSignalContainer ) ;
} ) ;
sseEmitter . onCompletion ( ( ) - > log . info ( "clientId:{},用户Id为:{}的SSE长轮询已经返回响应关闭,正在删除当前的建立通道对象" , clientId , kiccUser . getId ( ) ) ) ;
sseEmitter . onError ( e - > log . info ( "clientId:{},当前用户Id为:{}的SSE长轮询出现异常,正在删除当前的建立通道对象,错误信息{}" , clientId , kiccUser . getId ( ) , e . getLocalizedMessage ( ) ) ) ;
return sseEmitter ;
}
@Override
@SneakyThrows
public void sendMessage ( String userId , String json ) {
public void sendMessage ( String userId , SseEmitterConsumer < SseSignalContainer > consumer ) {
KiccUser kiccUser = getUser ( ) ;
Set < SseSignalContainer > sendSseSignalContainers = sseSignalContainers . stream ( )
. filter ( item - > StrUtil . equals ( item . getUserId ( ) , userId ) & & StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) )
. collect ( Collectors . toSet ( ) ) ;
Iterator < SseSignalContainer > it = sendSseSignalContainers . iterator ( ) ;
while ( it . hasNext ( ) ) {
SseSignalContainer item = it . next ( ) ;
item . getSseEmitter ( ) . send ( json ) ;
}
send ( sendSseSignalContainers , consumer ) ;
}
@Override
@SneakyThrows
public void sendMessage ( String json ) {
public void sendMessage ( SseEmitterConsumer < SseSignalContainer > consumer ) {
KiccUser kiccUser = getUser ( ) ;
Set < SseSignalContainer > sendSseSignalContainers = sseSignalContainers . stream ( )
. filter ( item - > StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) )
. collect ( Collectors . toSet ( ) ) ;
Iterator < SseSignalContainer > it = sendSseSignalContainers . iterator ( ) ;
while ( it . hasNext ( ) ) {
SseSignalContainer item = it . next ( ) ;
item . getSseEmitter ( ) . send ( json ) ;
}
send ( sendSseSignalContainers , consumer ) ;
}
@Override
@ -108,7 +101,7 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService {
@@ -108,7 +101,7 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService {
. filter ( item - > StrUtil . equals ( item . getClientId ( ) , clientId ) & & StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) )
. collect ( Collectors . toSet ( ) ) ;
sendSseSignalContainers . forEach ( item - > item . getSseEmitter ( ) . complete ( ) ) ;
sseSignalContainers . removeIf ( sseSignalContainer - > StrUtil . equals ( sseSignalContainer . getClientId ( ) , clientId ) & & StrUtil . equals ( sseSignalContainer . getTenantId ( ) , kiccUser . getTenantId ( ) ) ) ;
sseSignalContainers . removeAll ( sendSseSignalContainers ) ;
}
@Override
@ -118,7 +111,7 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService {
@@ -118,7 +111,7 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService {
. filter ( item - > StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) )
. collect ( Collectors . toSet ( ) ) ;
sendSseSignalContainers . forEach ( item - > item . getSseEmitter ( ) . complete ( ) ) ;
sseSignalContainers . removeIf ( item - > StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) ) ;
sseSignalContainers . removeAll ( sendSseSignalContainers ) ;
}
/ * *
@ -131,13 +124,36 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService {
@@ -131,13 +124,36 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService {
try {
item . getSseEmitter ( ) . send ( SseEmitter . event ( ) . comment ( "保持心跳" + LocalDateTime . now ( ) ) . reconnectTime ( 1000 ) ) ;
} catch ( IOException e ) {
item . getSseEmitter ( ) . completeWithError ( e ) ;
sseSignalContainers . remove ( item ) ;
log . debug ( "clientId:{},用户Id为:{}发送心跳包失败" , item . getClientId ( ) , item . getUserId ( ) ) ;
sseSignalContainers . removeIf ( sseSignalContainer - >
StrUtil . equals ( sseSignalContainer . getClientId ( ) , item . getClientId ( ) ) & & StrUtil . equals ( sseSignalContainer . getTenantId ( ) , item . getTenantId ( ) ) ) ;
}
}
}
/ * *
* 发送消息
* @param sendSseSignalContainers SseEmitter发送集合
* @param consumer 消费者发送函数
* @return Set < SseSignalContainer > 失败集合字段
* /
private Set < SseSignalContainer > send ( Set < SseSignalContainer > sendSseSignalContainers , SseEmitterConsumer < SseSignalContainer > consumer ) {
Set < SseSignalContainer > failedEmitters = new ConcurrentHashSet ( ) ;
Iterator < SseSignalContainer > it = sendSseSignalContainers . iterator ( ) ;
while ( it . hasNext ( ) ) {
SseSignalContainer item = it . next ( ) ;
try {
consumer . accept ( item ) ;
} catch ( Exception e ) {
item . getSseEmitter ( ) . completeWithError ( e ) ;
failedEmitters . add ( item ) ;
log . error ( "Emitter failed: {}" , item . getSseEmitter ( ) , e ) ;
}
}
sseSignalContainers . removeAll ( failedEmitters ) ;
return failedEmitters ;
}
private KiccUser getUser ( ) {
KiccUser kiccUser = SecurityUtils . getUser ( ) ;
Optional . ofNullable ( kiccUser ) . orElseThrow ( ( ) - > new CommonException ( "当前用户登录,请先登录后重试!" ) ) ;