@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollectionUtil;
@@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ConcurrentHashSet ;
import cn.hutool.core.util.StrUtil ;
import com.cloud.kicc.common.core.exception.CommonException ;
import com.cloud.kicc.common.core.util.BaseUtil ;
import com.cloud.kicc.common.data.entity.KiccUser ;
import com.cloud.kicc.common.security.util.SecurityUtils ;
import com.cloud.kicc.commonbiz.api.entity.SseSignalContainer ;
@ -17,6 +18,7 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@@ -17,6 +18,7 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException ;
import java.time.LocalDateTime ;
import java.util.Iterator ;
import java.util.Optional ;
import java.util.Set ;
import java.util.stream.Collectors ;
@ -42,43 +44,41 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService {
@@ -42,43 +44,41 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService {
* /
@XxlJob ( "doHeartbeat" )
public void doHeartbeat ( ) {
sseSignalContainers . forEach ( item - > {
try {
item . getSseEmitter ( ) . send ( SseEmitter . event ( ) . comment ( "保持心跳 " + LocalDateTime . now ( ) ) . reconnectTime ( 1000 ) ) ;
} catch ( IOException e ) {
log . debug ( "当前用户Id为:{}发送心跳包失败,正在删除当前的建立通道对象" , item . getUserId ( ) ) ;
sseSignalContainers . removeIf ( sseSignalContainer - > StrUtil . equals ( item . getUserId ( ) , sseSignalContainer . getUserId ( ) ) & & StrUtil . equals ( item . getTenantId ( ) , sseSignalContainer . getTenantId ( ) ) ) ;
}
} ) ;
doMaintenance ( ) ;
}
@Override
@SneakyThrows
public SseEmitter SseSubscribe ( ) {
public SseEmitter SseSubscribe ( String clientId ) {
KiccUser kiccUser = getUser ( ) ;
// 超时时间设置为20秒
SseEmitter sseEmitter = new SseEmitter ( 20000L ) ;
SseSignalContainer sseSignalContainer = new SseSignalContainer (
kiccUser . getId ( ) ,
sseEmitter ,
kiccUser . getTenantId ( )
) ;
// 设置如果网络出错前端请求的重试时间为1s
sseEmitter . send ( SseEmitter . event ( ) . reconnectTime ( 1000 ) . data ( "创建通道连接成功" ) ) ;
sseSignalContainers . add ( sseSignalContainer ) ;
log . info ( "当前建立的用户Id为:{}" , kiccUser . getId ( ) ) ;
sseEmitter . onTimeout ( ( ) - > {
log . info ( "当前用户Id为:{}的SSE长轮询已经超时,正在删除当前的建立通道对象" , kiccUser . getId ( ) ) ;
sseSignalContainers . removeIf ( item - > StrUtil . equals ( item . getUserId ( ) , kiccUser . getId ( ) ) & & StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) ) ;
} ) ;
sseEmitter . onCompletion ( ( ) - > {
log . info ( "当前用户Id为:{}的SSE长轮询已经返回响应关闭,正在删除当前的建立通道对象" , kiccUser . getId ( ) ) ;
sseSignalContainers . removeIf ( item - > StrUtil . equals ( item . getUserId ( ) , kiccUser . getId ( ) ) & & StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) ) ;
} ) ;
sseEmitter . onError ( e - > {
log . info ( "当前用户Id为:{}的SSE长轮询出现异常,正在删除当前的建立通道对象,错误信息{}" , kiccUser . getId ( ) , e . getLocalizedMessage ( ) ) ;
sseSignalContainers . removeIf ( item - > StrUtil . equals ( item . getUserId ( ) , kiccUser . getId ( ) ) & & StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) ) ;
} ) ;
// 设置超时时间为1小时
SseEmitter sseEmitter = new SseEmitter ( 3600_000L ) ;
if ( sseSignalContainers . stream ( )
. filter ( item - > StrUtil . equals ( item . getClientId ( ) , clientId ) & & StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) )
. collect ( Collectors . toSet ( ) ) . isEmpty ( ) ) {
SseSignalContainer sseSignalContainer = new SseSignalContainer (
clientId ,
kiccUser . getId ( ) ,
sseEmitter ,
kiccUser . getTenantId ( )
) ;
// 设置如果网络出错前端请求的重试时间为1s
sseEmitter . send ( SseEmitter . event ( ) . reconnectTime ( 1000 ) . data ( "创建通道连接成功" ) ) ;
sseSignalContainers . add ( sseSignalContainer ) ;
log . info ( "clientId:{},建立的用户Id为:{}" , clientId , kiccUser . getId ( ) ) ;
sseEmitter . onTimeout ( ( ) - > {
log . info ( "clientId:{},用户Id为:{},的SSE长轮询已经超时,正在删除当前的建立通道对象" , clientId , kiccUser . getId ( ) ) ;
sseSignalContainers . removeIf ( item - > StrUtil . equals ( item . getClientId ( ) , clientId ) & & StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) ) ;
} ) ;
sseEmitter . onCompletion ( ( ) - > {
log . info ( "clientId:{},用户Id为:{}的SSE长轮询已经返回响应关闭,正在删除当前的建立通道对象" , clientId , kiccUser . getId ( ) ) ;
sseSignalContainers . removeIf ( item - > StrUtil . equals ( item . getClientId ( ) , clientId ) & & StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) ) ;
} ) ;
sseEmitter . onError ( e - > {
log . info ( "clientId:{},当前用户Id为:{}的SSE长轮询出现异常,正在删除当前的建立通道对象,错误信息{}" , clientId , kiccUser . getId ( ) , e . getLocalizedMessage ( ) ) ;
sseSignalContainers . removeIf ( item - > StrUtil . equals ( item . getClientId ( ) , clientId ) & & StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) ) ;
} ) ;
} else new CommonException ( "客户端id重复,请重新设置确保唯一" ) ;
return sseEmitter ;
}
@ -87,57 +87,58 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService {
@@ -87,57 +87,58 @@ public class MapLogisticSseServiceImpl implements IMapLogisticSseService {
public void sendMessage ( String userId , String json ) {
KiccUser kiccUser = getUser ( ) ;
Set < SseSignalContainer > sendSseSignalContainers = sseSignalContainers . stream ( )
. filter ( item - > StrUtil . equals ( item . getUserId ( ) , userId ) & & StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) ) . collect ( Collectors . toSet ( ) ) ;
if ( CollectionUtil . isNotEmpty ( sendSseSignalContainers ) ) {
SseSignalContainer sendSseSignalContainer = sendSseSignalContainers . stream ( ) . findFirst ( ) . get ( ) ;
sendSseSignalContainer . getSseEmitter ( ) . send ( json ) ;
. filter ( item - > StrUtil . equals ( item . getUserId ( ) , userId ) & & StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) )
. collect ( Collectors . toSet ( ) ) ;
Iterator < SseSignalContainer > it = sendSseSignalContainers . iterator ( ) ;
while ( it . hasNext ( ) ) {
SseSignalContainer item = it . next ( ) ;
item . getSseEmitter ( ) . send ( json ) ;
}
}
@Override
@SneakyThrows
public void sendTenantMessage ( String json ) {
KiccUser kiccUser = getUser ( ) ;
Set < SseSignalContainer > sendSseSignalContainers = sseSignalContainers . stream ( )
. filter ( item - > StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) ) . collect ( Collectors . toSet ( ) ) ;
sendSseSignalContainers . forEach ( item - > {
try {
item . getSseEmitter ( ) . send ( json ) ;
} catch ( IOException e ) {
throw new RuntimeException ( e ) ;
}
} ) ;
. filter ( item - > StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) )
. collect ( Collectors . toSet ( ) ) ;
Iterator < SseSignalContainer > it = sendSseSignalContainers . iterator ( ) ;
while ( it . hasNext ( ) ) {
SseSignalContainer item = it . next ( ) ;
item . getSseEmitter ( ) . send ( json ) ;
}
}
@Override
public void disconnect ( ) {
public void disconnect ( String clientId ) {
KiccUser kiccUser = getUser ( ) ;
Set < SseSignalContainer > sendSseSignalContainers = sseSignalContainers . stream ( )
. filter ( item - > StrUtil . equals ( item . getUserId ( ) , kiccUser . getId ( ) ) & & StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) ) . collect ( Collectors . toSet ( ) ) ;
if ( CollectionUtil . isNotEmpty ( sendSseSignalContainers ) ) {
SseSignalContainer sendSseSignalContainer = sendSseSignalContainers . stream ( ) . findFirst ( ) . get ( ) ;
sendSseSignalContainer . getSseEmitter ( ) . complete ( ) ;
sseSignalContainers . removeIf ( item - > StrUtil . equals ( item . getUserId ( ) , kiccUser . getId ( ) ) & & StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) ) ;
}
. filter ( item - > StrUtil . equals ( item . getClientId ( ) , clientId ) & & StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) )
. collect ( Collectors . toSet ( ) ) ;
sendSseSignalContainers . forEach ( item - > item . getSseEmitter ( ) . complete ( ) ) ;
sseSignalContainers . removeIf ( sseSignalContainer - > StrUtil . equals ( sseSignalContainer . getClientId ( ) , clientId ) & & StrUtil . equals ( sseSignalContainer . getTenantId ( ) , kiccUser . getTenantId ( ) ) ) ;
}
@Override
public void disconnectTenant ( ) {
KiccUser kiccUser = SecurityUtils . getUser ( ) ;
KiccUser kiccUser = getUser ( ) ;
Set < SseSignalContainer > sendSseSignalContainers = sseSignalContainers . stream ( )
. filter ( item - > StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) ) . collect ( Collectors . toSet ( ) ) ;
. filter ( item - > StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) )
. collect ( Collectors . toSet ( ) ) ;
sendSseSignalContainers . forEach ( item - > item . getSseEmitter ( ) . complete ( ) ) ;
sseSignalContainers . removeIf ( item - > StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) ) ;
}
@Override
public void remove ( String userId ) {
KiccUser kiccUser = getUser ( ) ;
Set < SseSignalContainer > sendSseSignalContainers = sseSignalContainers . stream ( )
. filter ( item - > StrUtil . equals ( item . getUserId ( ) , userId ) & & StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) ) . collect ( Collectors . toSet ( ) ) ;
if ( CollectionUtil . isNotEmpty ( sendSseSignalContainers ) ) {
SseSignalContainer sendSseSignalContainer = sendSseSignalContainers . stream ( ) . findFirst ( ) . get ( ) ;
sendSseSignalContainer . getSseEmitter ( ) . complete ( ) ;
sseSignalContainers . removeIf ( item - > StrUtil . equals ( item . getUserId ( ) , userId ) & & StrUtil . equals ( item . getTenantId ( ) , kiccUser . getTenantId ( ) ) ) ;
/ * *
* 执行心跳维护 , 避免 sse 膨胀容量问题
* /
@SneakyThrows
private void doMaintenance ( ) {
Iterator < SseSignalContainer > it = sseSignalContainers . iterator ( ) ;
while ( it . hasNext ( ) ) {
SseSignalContainer item = it . next ( ) ;
item . getSseEmitter ( ) . send ( SseEmitter . event ( ) . comment ( "保持心跳" + LocalDateTime . now ( ) ) . reconnectTime ( 1000 ) ) ;
}
}