【Spring boot】spring boot+RabbitMQ+stomp方式:后端实时推送数据到前端
1 前言
项目中用到视频和人脸识别,当检测到危险时,服务端需要定向给前端推送消息,且同一账号可以登录不同的客户端,要求客户端都要接收到消息。(实现原理:服务器通过向RabbitMQ生产消息,客户端(前端)直接通过Stomp协议订阅RabbitMQ的消息)
注意:stomp只是推送通知,不推送具体内容,可能会发生丢包,通过状态通知,然后前端再调用后端的一个http请求接口,后端维护状态,判断是否已经请求了该通知.
2 RabbitMQ
安装好RabbitMQ,官网安装说明。 并且安装Stomp插件
## 启动Stomp rabbitmq-plugins enable rabbitmq_stomp
3 后端代码
3.1 *.yml配置文件
spring: rabbitmq: host: 192.168.10.19 port: 5698 username: admin password: 123456
3.2
import lombok.Data; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; import org.springframework.web.socket.server.standard.ServerEndpointExporter; @Data @Configuration @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { public static final String TOPIC_WARNING = "/topic/warning"; @Value("${spring.rabbitmq.stomp-url}") private String stompUrl; @Value("${spring.rabbitmq.stomp-login}") private String stompLogin; @Value("${spring.rabbitmq.stomp-passcode}") private String stompPasscode; @Value("${spring.rabbitmq.virtual-host}") private String stompVirtualHost; @Value("${spring.rabbitmq.host}") private String rabbitmqHost; @Value("${spring.rabbitmq.stomp-port}") private int stompPort; @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/ws") .setAllowedOrigins("*"); // .withSockJS(); } @Override public void configureMessageBroker(MessageBrokerRegistry config){ config.enableStompBrokerRelay("/topic/") .setRelayHost(rabbitmqHost) .setRelayPort(stompPort) .setVirtualHost(stompVirtualHost) .setSystemLogin(stompLogin).setSystemPasscode(stompPasscode) .setSystemHeartbeatSendInterval(5000).setSystemHeartbeatReceiveInterval(5000); ## config.setApplicationDestinationPrefixes("/app"); ## config.setUserDestinationPrefix("/user"); } }
3.3 RabbitmqConfig
@Configuration @ConditionalOnProperty(name="websocket.enabled",havingValue = "true") public class RabbitmqConfig { final public static String EXCHANGENAME = "websocketExchange"; /** * 创建交换器 */ @Bean FanoutExchange exchange() { return new FanoutExchange(EXCHANGENAME); } @Bean public Queue queue(){ return new Queue(orderQueueName()); } @Bean Binding bindingExchangeMessage(Queue queue,FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } @Bean public SimpleMessageListenerContainer messageListenerContainer(OrderReceiver orderReceiver, @Qualifier("rabbitConnectionFactory") CachingConnectionFactory cachingConnectionFactory){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory); // 监听队列的名称 container.setQueueNames(orderQueueName()); container.setExposeListenerChannel(true); // 设置每个消费者获取的最大消息数量 container.setPrefetchCount(100); // 消费者的个数 container.setConcurrentConsumers(1); // 设置确认模式为自动确认 container.setAcknowledgeMode(AcknowledgeMode.AUTO); container.setMessageListener(orderReceiver); return container; } /** * 在这里写获取订单队列名的具体过程 * @return */ public String orderQueueName(){ return "orderChannel"; } }
3.4 监听器
@Component @Slf4j @ConditionalOnProperty(name="websocket.enabled",havingValue = "true") public class OrderReceiver implements ChannelAwareMessageListener { @Autowired private MyWebSocket myWebSocket; @Override public void onMessage(Message message, Channel channel) throws Exception { byte[] body = message.getBody(); log.info("接收到消息:" + new String(body)); try { myWebSocket.sendMessage(new String(body)); } catch (IOException e) { log.error("send rabbitmq message error", e); } } }
4 前端代码
RabbitMQ推荐前端如果使用webSocket的话使用stomp-webSocket