【Spring boot】spring boot+RabbitMQ+stomp方式:后端实时推送数据到前端
1 前言
项目中用到视频和人脸识别,当检测到危险时,服务端需要定向给前端推送消息,且同一账号可以登录不同的客户端,要求客户端都要接收到消息。(实现原理:服务器通过向RabbitMQ生产消息,客户端(前端)直接通过Stomp协议订阅RabbitMQ的消息)
注意:stomp只是推送通知,不推送具体内容,可能会发生丢包,通过状态通知,然后前端再调用后端的一个http请求接口,后端维护状态,判断是否已经请求了该通知.
2 RabbitMQ
安装好RabbitMQ,官网安装说明。 并且安装Stomp插件
## 启动Stomp rabbitmq-plugins enable rabbitmq_stomp
3 后端代码
3.1 *.yml配置文件
spring:
rabbitmq:
host: 192.168.10.19
port: 5698
username: admin
password: 123456
3.2
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Data
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
public static final String TOPIC_WARNING = "/topic/warning";
@Value("${spring.rabbitmq.stomp-url}")
private String stompUrl;
@Value("${spring.rabbitmq.stomp-login}")
private String stompLogin;
@Value("${spring.rabbitmq.stomp-passcode}")
private String stompPasscode;
@Value("${spring.rabbitmq.virtual-host}")
private String stompVirtualHost;
@Value("${spring.rabbitmq.host}")
private String rabbitmqHost;
@Value("${spring.rabbitmq.stomp-port}")
private int stompPort;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOrigins("*");
// .withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry config){
config.enableStompBrokerRelay("/topic/")
.setRelayHost(rabbitmqHost)
.setRelayPort(stompPort)
.setVirtualHost(stompVirtualHost)
.setSystemLogin(stompLogin).setSystemPasscode(stompPasscode)
.setSystemHeartbeatSendInterval(5000).setSystemHeartbeatReceiveInterval(5000);
##
config.setApplicationDestinationPrefixes("/app");
##
config.setUserDestinationPrefix("/user");
}
}
3.3 RabbitmqConfig
@Configuration
@ConditionalOnProperty(name="websocket.enabled",havingValue = "true")
public class RabbitmqConfig {
final public static String EXCHANGENAME = "websocketExchange";
/**
* 创建交换器
*/
@Bean
FanoutExchange exchange() {
return new FanoutExchange(EXCHANGENAME);
}
@Bean
public Queue queue(){
return new Queue(orderQueueName());
}
@Bean
Binding bindingExchangeMessage(Queue queue,FanoutExchange exchange) {
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer(OrderReceiver orderReceiver, @Qualifier("rabbitConnectionFactory") CachingConnectionFactory cachingConnectionFactory){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
// 监听队列的名称
container.setQueueNames(orderQueueName());
container.setExposeListenerChannel(true);
// 设置每个消费者获取的最大消息数量
container.setPrefetchCount(100);
// 消费者的个数
container.setConcurrentConsumers(1);
// 设置确认模式为自动确认
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
container.setMessageListener(orderReceiver);
return container;
}
/**
* 在这里写获取订单队列名的具体过程
* @return
*/
public String orderQueueName(){
return "orderChannel";
}
}
3.4 监听器
@Component
@Slf4j
@ConditionalOnProperty(name="websocket.enabled",havingValue = "true")
public class OrderReceiver implements ChannelAwareMessageListener {
@Autowired
private MyWebSocket myWebSocket;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
byte[] body = message.getBody();
log.info("接收到消息:" + new String(body));
try {
myWebSocket.sendMessage(new String(body));
} catch (IOException e) {
log.error("send rabbitmq message error", e);
}
}
}
4 前端代码
RabbitMQ推荐前端如果使用webSocket的话使用stomp-webSocket
