SpringBoot集成websocket能力(stomp)
序
之前有分享过springBoot集成Websocket推送信息。今天主要是来继续分享升级版,这次是采用STOMP协议。用这个的好处有很多,比如可以屏蔽浏览器之间的差异,更方便对接消息中间件等。
一、协议理解
HTTP、WebSocket 等应用层协议,都是基于 TCP 协议来传输数据的。
HTTP不足在于它与服务器的全双工通信依靠轮询实现,对于需要从服务器主动发送数据的情境,会给服务器资源造成很大的浪费,WebSocket是针对HTTP在这种情况下的补充。
对于 WebSocket 来说,它必须依赖 HTTP 协议进行一次握手 ,握手成功后,数据就直接从 TCP 通道传输,与 HTTP 无关了。
WebSocket是一个完整的应用层协议,包含一套标准的 API 。
STOMP即Simple (or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议,它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理(Broker)进行交互。 STOMP协议可以建立在WebSocket之上,也可以建立在其他应用层协议之上。
二、依赖包
<!-- websocket支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <!-- 有说需要依赖这个的,我这里实际没有引入 <dependency> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> </dependency> -->
三、能力集成核心代码
websocket用户类
import java.security.Principal; /** * @author zhengwen **/ public class WebSocketUser implements Principal { /** * 用户信息 */ private final String name; public WebSocketUser(String name) { this.name = name; } @Override public String getName() { return name; } }
消息对象类
WebSocketMsgVo
import lombok.Data; import java.time.LocalDateTime; /** * websocket信息vo对象 * * @author zhengwen **/ @Data public class WebSocketMsgVo<T> { /** * 发送方 */ private String from; /** * 接收方 */ private String to; /** * 时间 */ private LocalDateTime time = LocalDateTime.now(); /** * 平台来源 */ private String platform; /** * 主题通道 */ private String topicChannel; /** * 信息业务对象 */ private T data; }
这里data定义为抽象类,由业务系统自行定义。因为我们这是提供能力,所以尽量不要固定死。
配置类
import com.easylinkin.bm.handler.MyHandshakeHandler; import com.easylinkin.bm.interceptor.MyHandshakeInterceptor; import com.easylinkin.bm.interceptor.WebSocketUserInterceptor; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.ChannelRegistration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration; /** * websocket stomp协议配置类 * * @author zhengwen **/ @Configuration @EnableWebSocketMessageBroker public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer { /** * 添加这个Endpoint,这样在网页中就可以通过websocket连接上服务, * 也就是我们配置websocket的服务地址,并且可以指定是否使用socketjs * * @param registry */ @Override public void registerStompEndpoints(StompEndpointRegistry registry) { /* * 1. 将 /stomp/websocketJs路径注册为STOMP的端点, * 用户连接了这个端点后就可以进行websocket通讯,支持socketJs * 2. setAllowedOriginPatterns("*")表示可以跨域 * 3. withSockJS()表示支持socktJS访问 * 4. addInterceptors 添加自定义拦截器,这个拦截器是上一个demo自己定义的获取httpsession的拦截器 * 5. addInterceptors 添加拦截处理,这里MyPrincipalHandshakeHandler 封装的认证用户信息 */ //配置客户端连接地址 registry.addEndpoint("/stomp/websocketJS").setAllowedOriginPatterns("*").addInterceptors(new MyHandshakeInterceptor()).setHandshakeHandler(new MyHandshakeHandler()).withSockJS(); /* * 添加多个端点 * 它的实现类是WebMvcStompEndpointRegistry , * addEndpoint是添加到WebMvcStompWebSocketEndpointRegistration的集合中, * 所以可以添加多个端点 */ registry.addEndpoint("/stomp/websocket"); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { // 自定义调度器,用于控制心跳线程 ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); // 线程池线程数,心跳连接开线程 taskScheduler.setPoolSize(1); // 线程名前缀 taskScheduler.setThreadNamePrefix("websocket-heartbeat-thread-"); // 初始化 taskScheduler.initialize(); // 设置广播节点 registry.enableSimpleBroker("/ad", "/device", "/pay", "/data", "/warn", "/alone").setHeartbeatValue(new long[]{ 10000, 10000}) .setTaskScheduler(taskScheduler); // 客户端向服务端发送消息需有/app 前缀 registry.setApplicationDestinationPrefixes("/app"); // 指定用户发送(一对一)的前缀 /user/ registry.setUserDestinationPrefix("/user"); } /** * 配置发送与接收的消息参数,可以指定消息字节大小,缓存大小,发送超时时间 * * @param registration */ @Override public void configureWebSocketTransport(WebSocketTransportRegistration registration) { /* * 1. setMessageSizeLimit 设置消息缓存的字节数大小 字节 * 2. setSendBufferSizeLimit 设置websocket会话时,缓存的大小 字节 * 3. setSendTimeLimit 设置消息发送会话超时时间,毫秒 */ registration.setMessageSizeLimit(10240) .setSendBufferSizeLimit(10240) .setSendTimeLimit(10000); } /** * 配置客户端入站通道拦截器 * 设置输入消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间 * * @param registration */ @Override public void configureClientInboundChannel(ChannelRegistration registration) { /* * 配置消息线程池 * 1. corePoolSize 配置核心线程池,当线程数小于此配置时,不管线程中有无空闲的线程,都会产生新线程处理任务 * 2. maxPoolSize 配置线程池最大数,当线程池数等于此配置时,不会产生新线程 * 3. keepAliveSeconds 线程池维护线程所允许的空闲时间,单位秒 */ registration.taskExecutor().corePoolSize(10) .maxPoolSize(20) .keepAliveSeconds(60); registration.interceptors(new WebSocketUserInterceptor()); } /** * 设置输出消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间 * * @param registration */ @Override public void configureClientOutboundChannel(ChannelRegistration registration) { registration.taskExecutor().corePoolSize(10) .maxPoolSize(20) .keepAliveSeconds(60); //registration.interceptors(new WebSocketUserInterceptor()); } }
sebsocket的http握手拦截器
MyHandshakeInterceptor
import com.easylinkin.bm.vo.websocket.WebSocketUser; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.http.server.ServletServerHttpRequest; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.HandshakeInterceptor; import java.util.Map; /** * @author zhengwen **/ @Slf4j public class MyHandshakeInterceptor implements HandshakeInterceptor { /** * websocket握手之前 */ @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { log.info("--websocket的http连接握手之前--"); ServletServerHttpRequest req = (ServletServerHttpRequest) request; WebSocketUser user = null; //获取token认证 String token = req.getServletRequest().getParameter("token"); //解析token获取用户信息 //鉴权,我的方法是,前端把token传过来,解析token,判断正确与否,return true表示通过,false请求不通过。 //TODO 鉴权设置用户 if (StringUtils.isNotBlank(token)) { user = new WebSocketUser(token); } //如果token认证失败user为null,返回false拒绝握手 if (user == null) { return false; } //保存认证用户 attributes.put("user", user); return true; } @Override public void afterHandshake(ServerHttpRequest serverHttpRequest, ServerHttpResponse serverHttpResponse, WebSocketHandler webSocketHandler, Exception e) { } }
websocket的握手之后拦截器
import lombok.extern.slf4j.Slf4j; import org.springframework.http.server.ServerHttpRequest; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.support.DefaultHandshakeHandler; import java.security.Principal; import java.util.Map; /** * @author zhengwen **/ @Slf4j public class MyHandshakeHandler extends DefaultHandshakeHandler { @Override protected Principal determineUser(ServerHttpRequest request, WebSocketHandler wsHandler, Map<String, Object> attributes) { log.info("--websocket的http连接握手之后--"); //设置认证用户 return (Principal) attributes.get("user"); } }
websocket设置自定义的连接通道拦截器
WebSocketUserInterceptor
import com.easylinkin.bm.vo.websocket.WebSocketUser; import lombok.extern.slf4j.Slf4j; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.simp.SimpMessageHeaderAccessor; import org.springframework.messaging.simp.stomp.StompCommand; import org.springframework.messaging.simp.stomp.StompHeaderAccessor; import org.springframework.messaging.support.ChannelInterceptor; import org.springframework.messaging.support.MessageHeaderAccessor; import java.util.Map; /** * @author zhengwen **/ @Slf4j public class WebSocketUserInterceptor implements ChannelInterceptor { @Override public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) { log.info("--websocket信息发送后--"); ChannelInterceptor.super.afterSendCompletion(message, channel, sent, ex); } /** * 获取包含在stomp中的用户信息 */ @SuppressWarnings("rawtypes") @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { log.info("--websocket信息发送前--"); StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class); if (accessor != null) { if (StompCommand.CONNECT.equals(accessor.getCommand())) { Object raw = message.getHeaders().get(SimpMessageHeaderAccessor.NATIVE_HEADERS); if (raw instanceof Map) { Object nameObj = ((Map) raw).get("name"); if (nameObj != null) { // 设置当前访问器的认证用户,或者做其他业务 WebSocketUser webSocketUser = new WebSocketUser(String.valueOf(nameObj)); accessor.setUser(webSocketUser); } } } } return message; } }
WebSocketStompController
import com.easylinkin.bm.core.Result; import com.easylinkin.bm.service.WebSocketService; import com.easylinkin.bm.vo.websocket.WebSocketMsgVo; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * websocket stomp协议controller * * @author zhengwen **/ @Slf4j @RestController @RequestMapping("/web/socket/stomp") public class WebSocketStompController { @Autowired private WebSocketService webSocketService; /** * 发送信息 stomp * * @param webSocketMsgVo 信息对象vo * @return 统一出参 */ @PostMapping("/sendStompMsg") @MessageMapping("/sendStompMsg") public Result<?> sendStompMsg(@RequestBody WebSocketMsgVo webSocketMsgVo) { log.info("--发送信息--"); return webSocketService.sendStompMsg(webSocketMsgVo); } }
WebSocketService实现类
import com.alibaba.fastjson.JSON; import com.easylinkin.bm.core.Result; import com.easylinkin.bm.core.ResultGenerator; import com.easylinkin.bm.service.WebSocketService; import com.easylinkin.bm.vo.websocket.WebSocketMsgVo; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; /** * @author zhengwen **/ @Slf4j @Service @Transactional(rollbackFor = Exception.class) public class WebSocketServiceImpl implements WebSocketService { @Autowired private SimpMessagingTemplate simpMessagingTemplate; @Override public Result<?> sendStompMsg(WebSocketMsgVo webSocketMsgVo) { String topicChannel = webSocketMsgVo.getTopicChannel(); if (StringUtils.isNotBlank(topicChannel)) { topicChannel = "/" + topicChannel; } String message = JSON.toJSONString(webSocketMsgVo); String to = webSocketMsgVo.getTo(); try { if (StringUtils.isNotBlank(to)) { //MD 不明原因用convertAndSendToUser不能收到,确认订阅没有问题 //simpMessagingTemplate.convertAndSendToUser(to, topicChannel, message); simpMessagingTemplate.convertAndSend(topicChannel + "/" + to, message); } else { simpMessagingTemplate.convertAndSend(topicChannel, message); } return ResultGenerator.genSuccessResult(); } catch (Exception e) { return ResultGenerator.genFailResult("发送失败"); } } }
这里一对一发送按道理应该用template的convertAndSendToUser方法,但是死活没效果。这里先用这种方式实现的。
四、测试html
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>Insert title here</title> <link rel="stylesheet" href="http://cdn.static.runoob.com/libs/bootstrap/3.3.7/css/bootstrap.min.css"> <script src="http://cdn.static.runoob.com/libs/jquery/2.1.1/jquery.min.js"></script> <script src="http://cdn.static.runoob.com/libs/bootstrap/3.3.7/js/bootstrap.min.js"></script> <script src="https://cdn.jsdelivr.net/npm/sockjs-client@1/dist/sockjs.min.js"></script> <script src="https://cdnjs.cloudflare.com/ajax/libs/stomp.js/2.3.3/stomp.min.js"></script> <script type="text/javascript"> var userName = "zs"; var sendTopic = "ad"; //var subsc = '/ad';//广播 //var subsc = '/user/' + userName + '/' + sendTopic;//一对一 /user/zs/ad //var subsc = "/user/zs/ad"; var subsc = "/ad/zs"; // 建立连接对象(还未发起连接) var socket = new SockJS("http://localhost:8099/stomp/websocketJS?token=zw"); // 获取 STOMP 子协议的客户端对象 var stompClient = Stomp.over(socket); stompClient.debug = function(str) { console.log("DEBUG---->" + str); }; // 向服务器发起websocket连接并发送CONNECT帧 stompClient.connect({ name:userName,token:userName}, function connectCallback(frame) { // 连接成功时(服务器响应 CONNECTED 帧)的回调方法 setMessageInnerHTML("连接成功"); console.log("---订阅:" + subsc); stompClient.subscribe(subsc, function (res) { console.log("----res:"+res); re = JSON.parse(res.body); console.log(re); setMessageInnerHTML("") setMessageInnerHTML("你接收到的消息为:" + re.data); }); }, function errorCallBack(error) { // 连接失败时(服务器响应 ERROR 帧)的回调方法 setMessageInnerHTML("连接失败"); } ); //发送消息 function send() { var message = $("#content").val(); var msg = { "data":message, "topicChannel":sendTopic, "to":"zs" }; var messageJson = JSON.stringify(msg); stompClient.send("/app/sendStompMsg", { }, messageJson); sendMessageInnerHTML("/app/sendStompMsg 你发送的消息:" + message); } //将消息显示在网页上 function setMessageInnerHTML(innerHTML) { $("#in").html(innerHTML + '<br/>'); } function sendMessageInnerHTML(innerHTML) { $("#out").append(innerHTML + '<br/>'); } $(function(){ $("#btn").click(function(){ send(); }); }) </script> </head> <body> <input id="content" class="form-control"> <button id="btn" class="btn btn-info">发送</button> <div id="in"></div> <div id="out"></div> </body> </html>
上面的代码我就不多解释,我写了一些注释,上面也是找的博友的改改就开测了。
五、看效果
一对一发送
接口发送
六、总结
1、同事有springBoot直接集成websocket的,页面用的定时器做心跳重连,但是还是会出现被nginx断掉。这里用stomp是可以设置心跳的,用到项目上生产环境,让运维设置nginx放行ws协议的超时 + 设置指定url的超时限制,目前看是ok的。
2、controller上的注解标签很有迷惑性,postman请求的url很特别,可以自己去探索下。
3、stomp、webscoket的集成网上很多博友都有写,各有特色。我觉得写的不错的,可以看博友老郑来了的分享,还是我本家哦。
希望可以帮到到家。