@Slf4j
@RestController
@RequestMapping("sse")
public class SseController {
@Autowired
private SseService sseService;
@RequestMapping("/")
public String index(){
return "sse";
}
/**
* 创建SSE连接
* @return
*/
@RequestMapping(path = "/connect",produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter sse()
{
String uuid = UUID.randomUUID().toString();
log.info("新用户连接:{}",uuid);
return sseService.connect(uuid);
}
/**
* 广播消息
* @param message
*/
@RequestMapping("/sendMessage")
@ResponseBody
public void sendMessage(@RequestBody SseMessage message){
sseService.sendMessage(message);
}
}
public interface SseService {
/**
* 创建sse连接
* @param uuid
* @return
*/
SseEmitter connect(String uuid);
/**
* 发送消息
* @param message
*/
void sendMessage(SseMessage message);
}
@Slf4j
@Service
public class SseServiceImpl implements SseService {
/**
* sseMesageId的SseEmitter对象映射集
*
*/
private static Map<String,SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
/**
* 创建sse连接
*
* @param uuid
* @return
*/
@Override
public SseEmitter connect(String uuid) {
SseEmitter sseEmitter = new SseEmitter();
//连接成功需要返回数据,否则会出现待处理状态
try{
sseEmitter.send(SseEmitter.event().comment("connect success"));
}catch (IOException e){
e.printStackTrace();
}
//连接断开
sseEmitter.onCompletion(()->{
sseEmitterMap.remove(uuid);
});
//连接超时
sseEmitter.onTimeout(()->{
sseEmitterMap.remove(uuid);
});
//连接报错
sseEmitter.onError((throwable)-> {
sseEmitterMap.remove(uuid);
});
sseEmitterMap.put(uuid,sseEmitter);
return sseEmitter;
}
/**
* 发送消息
*
* @param message
*/
@Override
public void sendMessage(SseMessage message) {
message.setTotal(sseEmitterMap.size());
sseEmitterMap.forEach((uuid,sseEmiter)->{
try {
sseEmiter.send(message, MediaType.APPLICATION_JSON);
} catch (IOException e) {
e.printStackTrace();
}
});
}
}
@Configuration
public class SendMessageTask {
@Autowired
private SseService sseService;
/**
* 定时执行 秒 分 时 日 月 周
*/
@Scheduled(cron = "*/1 * * * * *")//间隔1S
public void sendMessageTask(){
SseMessage message = new SseMessage();
DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
message.setData(LocalDateTime.now().format(format));
message.setDtuStatus(1);
message.setPlcId(01);
message.setInletValve(0);
message.setSampleValve(1);
message.setVentingValve(0);
message.setDtuStatus(1);
message.setSamplerStatus("01");
sseService.sendMessage(message);
}
}
springboot集成sse详细教程: https://bilibili.com/video/BV1pC4y1P7cp