1 前言

项目中用到视频和人脸识别,当检测到危险时,服务端需要定向给前端推送消息,且同一账号可以登录不同的客户端,要求客户端都要接收到消息。(实现原理:服务器通过向RabbitMQ生产消息,客户端(前端)直接通过Stomp协议订阅RabbitMQ的消息)

注意:stomp只是推送通知,不推送具体内容,可能会发生丢包,通过状态通知,然后前端再调用后端的一个http请求接口,后端维护状态,判断是否已经请求了该通知.

2 RabbitMQ

安装好RabbitMQ,官网安装说明。 并且安装Stomp插件

## 启动Stomp
rabbitmq-plugins enable rabbitmq_stomp

3 后端代码

3.1 *.yml配置文件

spring:
  rabbitmq:
    host: 192.168.10.19
    port: 5698
    username: admin
    password: 123456

3.2

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;


@Data
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    public static final String TOPIC_WARNING = "/topic/warning";

    @Value("${spring.rabbitmq.stomp-url}")
    private String stompUrl;

    @Value("${spring.rabbitmq.stomp-login}")
    private String stompLogin;

    @Value("${spring.rabbitmq.stomp-passcode}")
    private String stompPasscode;

    @Value("${spring.rabbitmq.virtual-host}")
    private String stompVirtualHost;

    @Value("${spring.rabbitmq.host}")
    private String rabbitmqHost;

    @Value("${spring.rabbitmq.stomp-port}")
    private int stompPort;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry  registry) {
        registry.addEndpoint("/ws")
                .setAllowedOrigins("*");
//                .withSockJS();
    }


    @Override
    public void configureMessageBroker(MessageBrokerRegistry config){
        config.enableStompBrokerRelay("/topic/")
                .setRelayHost(rabbitmqHost)
                .setRelayPort(stompPort)
                .setVirtualHost(stompVirtualHost)
                .setSystemLogin(stompLogin).setSystemPasscode(stompPasscode)
                .setSystemHeartbeatSendInterval(5000).setSystemHeartbeatReceiveInterval(5000);
        ## 
        config.setApplicationDestinationPrefixes("/app");
        ## 
        config.setUserDestinationPrefix("/user");
    }

}

3.3 RabbitmqConfig

@Configuration
@ConditionalOnProperty(name="websocket.enabled",havingValue = "true")
public class RabbitmqConfig {

    final public static String EXCHANGENAME = "websocketExchange";

    /**
     * 创建交换器
     */
    @Bean
    FanoutExchange exchange() {
        return new FanoutExchange(EXCHANGENAME);
    }

    @Bean
    public Queue queue(){
        return new Queue(orderQueueName());
    }

    @Bean
    Binding bindingExchangeMessage(Queue queue,FanoutExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange);
    }

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer(OrderReceiver orderReceiver, @Qualifier("rabbitConnectionFactory") CachingConnectionFactory cachingConnectionFactory){
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
        // 监听队列的名称
        container.setQueueNames(orderQueueName());
        container.setExposeListenerChannel(true);
        // 设置每个消费者获取的最大消息数量
        container.setPrefetchCount(100);
        // 消费者的个数
        container.setConcurrentConsumers(1);
        // 设置确认模式为自动确认
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setMessageListener(orderReceiver);
        return container;
    }


    /**
     * 在这里写获取订单队列名的具体过程
     * @return
     */
    public String orderQueueName(){
        return "orderChannel";
    }
}

3.4 监听器

@Component
@Slf4j
@ConditionalOnProperty(name="websocket.enabled",havingValue = "true")
public class OrderReceiver implements ChannelAwareMessageListener {

    @Autowired
    private MyWebSocket myWebSocket;

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        byte[] body = message.getBody();
        log.info("接收到消息:" + new String(body));
        try {
            myWebSocket.sendMessage(new String(body));
        } catch (IOException e) {
            log.error("send rabbitmq message error", e);
        }
    }
}

4 前端代码

RabbitMQ推荐前端如果使用webSocket的话使用stomp-webSocket