spring-integration初探
最近有幸,公司让我研究了spring-integration,对于这个spring出品的功能强大的工具,功能繁多且复杂。写此博客分享一下心得,也为记录一下最近研究这么久的知识点。理解的不够深,如果有错误的地方,希望各位朋友能批评指出。
一、what
首先,什么是spring-integration?研究之初,对这根管道有些迷惑,这是队列?这个activeMQ有啥区别?待研究了一段时间之后,才发现,spring-integration越来越像曾经做过的esb组件。那么spring-integration到底是什么呢?
官网给出的解释是,spring-integration是一个功能强大的EIP(Enterprise Integration Patterns),即企业集成模式。对,spring-integration是一个集大成者。就我自己的理解,集成了众多功能的它,是一种便捷的事件驱动消息框架用来在系统之间做消息传递的。
二、why
那么,我们为什么用它呢?spring-integration的官网上,给出了以下说法
spring-integration的目标
- 提供一个简单的模型来实现复杂的企业集成解决方案
- 为基于spring的应用添加异步的、消息驱动的行为
- 让更多的Spring用户来使用他
看这种解释,我的直观感觉是:啥玩意?不懂啊!接着看到spring-integration的原则
- 组件之间应该是松散的,模块性的易测的
- 应用框架应该强迫分离业务逻辑和集成逻辑
- 扩展节点应该有更好的抽象和可以再使用的能力
感觉,这个应该说的是解耦吧。另外看了下其他人的理解,如果你的系统处在各个系统的中间,需要JMS交互,又需要Database/Redis/MongoDB,还需要监听Tcp/UDP等,还有固定的文件转移,分析。还面对着时不时的更改需求的风险。那么,它再适合不过了。
三、how
那么,重点来了,如何使用呢?在介绍之前,先简单的介绍几个名词。
1.Message
Message是它的基础构件和核心,所有的流程都围绕着Message运转,如图所示
Message,就是所说的消息体,用来承载传输的信息用的。Message分为两部分,header和payload。header是头部信息,用来存储传输的一些特性属性参数。payload是用来装载数据的,他可以携带的任何Object对象,放什么都行,随你 。
2.MessageChannel
消息管道,生产者生产一个消息到channel,消费者从channel消费一个消息,所以channel可以对消息组件解耦,并且提供一个方便的拦截功能和监控功能。
对于MessageChannel,有以下几种
(1).PublishSubscribeChannel
发布订阅式通道形式,多用于消息广播形式,发送给所有已经订阅了的用户。在3.x版本之前,订阅者如果是0,启动会报错或者发送的时候报错。在4.x版本后,订阅者是0,则仍然会返回true。当然,可以配置最小订阅者数量(min-subscribers)
(2).QueueChannel
队列模式通道,最常用的形式。与发布订阅通道不同,此通道实现点对点式的传输方式,管道内部是队列方式,可以设置管道的容量,如果内部的消息已经达到了最大容量,则会阻塞住,直到队列有时间,或者发送的消息被超时处理。
(3).PriorityChannel
优先级队列通道,我的理解为QueueChannel的升级版,可以无视排队,根据设置的优先级直接插队。(壕无人性)
(4).RendezvousChannel
前方施工,禁止通行!这个是一个强行阻塞的通道,当消息进入通道后,通道禁止通行,直到消息在对方通道receive()后,才能继续使用。
(5).DirectChannel
最简单的点对点通道方式,一个简单的单线程通道。是spring-integration的默认通道类型
(6).ExecutorChannel
多线程通道模式,开启多线程执行点对点通道形式。这个通道博主还未研究,不敢多说........
3.Message Endpoint
消息的终点,或者我称他为消息节点,在channel你不能操作消息,只能在endpoint操作。对于常用的消息节点,有以下几种
(1).Transformer
解释者,转换者,翻译者,怎么理解都可以。作用是可以将消息转为你想要的类型。可以将xml形式转换成string类型。
<!-- Filter过滤器 --> <int:channel id="filterAChannel"/> <int:filter input-channel="filterAChannel" output-channel="filterBChannel" expression="payload.name.equals('haha')"/> <int:channel id="filterBChannel"/> <int:service-activator input-channel="filterBChannel" expression="@receiveServiceImpl.helloMoreParam(payload.name,payload.age)"/>
(2).Filter
过滤器,顾名思义,过滤用的,用来判断一个消息是否应该被传输。用我的理解看,他就是spring-integration里面的if语句。
<!-- transformer转换器 --> <int:channel id="transformerInChannel"/> <int:transformer input-channel="transformerInChannel" output-channel="transformerOutChannel" expression="payload.name.toUpperCase() + '- [' + T(java.lang.System).currentTimeMillis() + ']'"/> <int:channel id="transformerOutChannel"> <int:queue/> </int:channel> <int:outbound-channel-adapter channel="transformerOutChannel" ref="receiveServiceImpl" method="helloTransformer"> <int:poller fixed-delay="0"/> </int:outbound-channel-adapter>
(3).Router
路由器,用来管理一个消息应该被发送到哪个channel中。相当于JAVA里面的switch case语句吧。判断条件很多,可是使用header里面的参数具体值(比如header里面有个定义为testRouter的参数,数值为A,那么消息经过路由会发送到判断为A的通道内,后面使用中再详细讲解)
(4).Service Activator
我称他为服务激活器,是一个连接服务实例到消息系统的通用端点。对于服务激活器,可能是因为我理解的不够全面,我总是将他和通道适配器搞混,因为我自己测试发现,激活器和适配器都可以作为一个消息出通道的节点。
(5).Channel Adapter
通道适配器是将消息通道连接到某个其他系统或传输的端点。通道适配器可以是入站或出站。通常情况下,通道适配器将在消息与从其他系统(文件,HTTP请求,JMS消息等)接收或发送的任何对象或资源之间进行映射。
(6).Channel Bridge
通道桥梁,用来作为管道之间进行通信使用的,常用情景为:在一个输入管道,将管道的内容发送到另外N个管道输出,配置方式如下
<!-- bridge --> <int:channel id="bridgeSendChannel"/> <int:bridge input-channel="bridgeSendChannel" output-channel="bridgeReceiveAChannel"/> <int:channel id="bridgeReceiveAChannel"/> <int:bridge input-channel="bridgeReceiveAChannel" output-channel="bridgeReceiveBChannel"/> <int:channel id="bridgeReceiveBChannel"> <int:queue/> </int:channel> <int:outbound-channel-adapter channel="bridgeReceiveBChannel" expression="@receiveServiceImpl.helloBridge(payload.name,payload.age)"> <int:poller fixed-delay="0"/> </int:outbound-channel-adapter>
另外还有Splitter(分解器),Aggregator(聚合器)等。对于其他的消息节点,博主还没有做过多研究,就不再次误人子弟了。后续会将未研究到的一一补上。
4.Channel Interceptor
管道拦截器,能够以非常优雅,非常温柔的方式捕获管道传递之间的节点。对于拦截器,spring-integration给了我们六种节点
分别是发送前,邮寄后,发送成功后,接收前,接收后,接受成功后。可以分别在不同的节点进行操作。
四、use(demo地址在本文最后)
下面使用到的Test类为
import lombok.Data; /** * 普通测试dto * @author lin */ @Data public class Test { private String name; private String age; }
(1)普通方式
xml配置,这里配置了一个通道helloWorldChannel,配置了个接收激活点,即接收方的地址为helloServiceImpl里面的hello方法。(其中ref指对应接收的类名,method指类里面接收的方法)
<!-- 测试dto模式传输 --> <int:channel id="testChannel"/> <int:service-activator input-channel="testChannel" ref="receiveServiceImpl" method="hello"/>
发送方Service里面
/** * 测试传输dto */ @Override public void testDto() { System.out.println("testDto方法"); Test test = new Test(); test.setName("testDto"); test.setAge("18"); testChannel.send(MessageBuilder.withPayload(test).build()); }
接收方Service里面
@Override public void hello(Test test) { System.out.println(test.getName() + " " + test.getAge()); }
(2)普通多参数方式
xml配置,这里通过获取payload里面的具体参数来传参的形式
<!-- 测试多参数传递 --> <int:channel id="moreParamChannel"/> <int:service-activator input-channel="moreParamChannel" expression="@receiveServiceImpl.helloMoreParam(payload.name,payload.age)"/>
发送方Service里面,将所有的参数通过Map形式装载到payload里面
/** * 测试多参数传输 */ @Override public void moreParamm() { System.out.println("greetMoreParam方法"); HashMap<String, String> map = new HashMap(); map.put("name", "moreParam"); map.put("age", "18"); helloWorldMoreParamChannel.send(MessageBuilder.withPayload(map).build()); }
接收方Service里面
@Override public void helloMoreParam(String name, String age) { System.out.println(name + " " + age); }
(3)JMS方式
xml配置,这里配置了个MQ,将消息放入mq中进行传递
<!-- 测试Mq配置--> <int:channel id="topicChannel"/> <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://127.0.0.1:61616?trace=true&keepAlive=true</value> </property> <property name="useAsyncSend" value="true"/> </bean> <int-jms:outbound-channel-adapter channel="topicChannel" destination-name="topic.myTopic" pub-sub-domain="true"/> <int:channel id="listenerChannel"/> <int-jms:message-driven-channel-adapter id="messageDrivenAdapter" channel="listenerChannel" destination-name="topic.myTopic" pub-sub-domain="true"/> <int:service-activator input-channel="listenerChannel" ref="messageListenerImpl" method="processMessage"/>
发送方Service里面
/** * 使用mq进行传输发送方法 */ @Override public void send() { HashMap<String,Object> map = new HashMap<>(); map.put("name","MqService"); map.put("age","18"); topicChannel.send(MessageBuilder.withPayload(map).build()); }
接收方Service里面
public void processMessage(HashMap<String,Object> map) { System.out.println("MessageListener::::::Received message: " + map.toString()); }
(4)订阅方式
xml配置,这里配置了两个订阅者,订阅者分别是两个方法
<!-- 测试订阅发布 --> <!--min-subscribers=""参数为预期最小订阅者,如果必须有订阅者,则这里填写最少数;默认值为0--> <int:publish-subscribe-channel id="pubsubChannel"/> <int:outbound-channel-adapter channel="pubsubChannel" ref="receiveServiceImpl" method="helloReceiveOne"> </int:outbound-channel-adapter> <int:outbound-channel-adapter channel="pubsubChannel" ref="receiveServiceImpl" method="helloReceiveTwo"> </int:outbound-channel-adapter>
发送方Service里面
@Override public void pubsubSend() { Test test = new Test(); test.setName("pubsubSend"); test.setAge("18"); publishSubscribeChannel.send(MessageBuilder.withPayload(test).build()); }
接收方Service里面
@Override public void helloReceiveOne(Test test){ System.out.println("One:"+test.getName()+" "+test.getAge()); } @Override public void helloReceiveTwo(Test test){ System.out.println("Two:"+test.getName()+" "+test.getAge()); }
(5)router方式
xml配置,这里配置了一个入口通道,当消息进入入口后,通过判断header里面的'tsetHeader'参数的值,如果值为A,则进入routerAChannel通道,如果为B则进入routerBChannel通道。进入通道后分别进入两者的接收方法中。其中两种方法用了传递类,和多参数传递的形式。
<!-- 测试路由 --> <!-- 路由入口 --> <int:channel id="routingChannel"> <int:queue/> </int:channel> <!-- 路由器 --> <int:header-value-router input-channel="routingChannel" header-name="testHeader"> <int:poller fixed-delay="0"/> <int:mapping value="A" channel="routerAChannel"/> <int:mapping value="B" channel="routerBChannel"/> </int:header-value-router> <!-- 路由出口 --> <int:channel id="routerAChannel"> <int:queue/> </int:channel> <int:outbound-channel-adapter channel="routerAChannel" ref="receiveServiceImpl" method="helloRouterTest"> <int:poller fixed-delay="0"/> </int:outbound-channel-adapter> <int:channel id="routerBChannel"> <int:queue/> </int:channel> <int:outbound-channel-adapter channel="routerBChannel" expression="@receiveServiceImpl.helloRouterMap(payload.name,payload.age)"> <int:poller fixed-delay="0"/> </int:outbound-channel-adapter>
发送方Service里面
@Override public void routerA(String name, String age) { Test test = new Test(); test.setAge(age); test.setName(name); routingChannel.send(MessageBuilder.withPayload(test).setHeader("testHeader", "A").build()); } @Override public void routerB(String name, String age) { HashMap<String,String> map = new HashMap<>(); map.put("name", name); map.put("age", age); routingChannel.send(MessageBuilder.withPayload(map).setHeader("testHeader", "B").build()); }
接收方Service里面
@Override public void helloRouterTest(Test test){ System.out.println("routerA方法"); System.out.println("helloRouterTest:"+test.getName()+" "+test.getAge()); } @Override public void helloRouterMap(String name,String age){ System.out.println("routerB方法"); System.out.println("helloRouterMap:"+name+" "+age); }
(6)网关方式
xml配置,在这里面配置了一个接口类,当调用这个接口的方法时,就会进入网关配置的通道
<!-- 网关通道口模式,dto --> <int:channel id="getWayChannel"> <int:queue/> </int:channel> <int:gateway service-interface="com.lin.integration.service.interfaces.UseGetWaySender" id="helloGetWaySender" default-request-channel="getWayChannel"/> <int:outbound-channel-adapter channel="getWayChannel" ref="receiveServiceImpl" method="hello"> <int:poller fixed-delay="0"></int:poller> </int:outbound-channel-adapter> <!-- 网关通道口模式,多参数传递 --> <int:channel id="getWayMoreParamChannel"> <int:queue/> </int:channel> <int:gateway service-interface="com.lin.integration.service.interfaces.MoreParamSender" id="getWayMoreParamSender" default-request-channel="getWayMoreParamChannel"/> <int:outbound-channel-adapter channel="getWayMoreParamChannel" expression="@receiveServiceImpl.helloMoreParam(payload.name,payload.age)"> <int:poller fixed-delay="0"></int:poller> </int:outbound-channel-adapter>
网关interface里面
public interface UseGetWaySender { void sendMessage(Test test); }
public interface MoreParamSender { void sendMessage(Map map); }
发送方Service里面
/** * 测试网关dto */ @Override public void getWay() { Test test = new Test(); test.setAge("18"); test.setName("getWay"); useGetWaySender.sendMessage(test); } /** * 测试网关多参数 */ @Override public void getWayMoreParam() { HashMap<String, String> map = new HashMap(); map.put("name", "getWayMoreParam"); map.put("age", "18"); moreParamSender.sendMessage(map); }
(7)全局拦截器
拦截器中,将需要拦截的管道进行拦截,拦截之后就会对这个管道的发送端,接收端进行拦截,拦截的接口在上文已经提到过,拦截的配置如下
<!-- 全局拦截器 --> <int:channel-interceptor pattern="testInterceptorChannel" order="3" ref="countingChannelInterceptor"> </int:channel-interceptor> <int:channel id="testInterceptorChannel"/> <int:service-activator input-channel="testInterceptorChannel" ref="receiveServiceImpl" method="hello"/>
对于近期的spring-integration研究,这些只是“初探”,如此好的一个框架模式,我也将在今后进行深入研究,会将文章进行补充,希望各位对于我文章里面的不足与错误的地方进行批评指出,从而能互相交流研究,多谢。
参考文献:
https://docs.spring.io/spring-integration/docs/5.0.4.RELEASE/reference/html/
https://www.aliyun.com/jiaocheng/301276.html
https://blog.csdn.net/xiayutai1/article/details/53302652?locationNum=4&fps=1
http://www.iteye.com/topic/744524
https://blog.csdn.net/slivefox/article/details/3740541
https://my.oschina.net/zhzhenqin/blog/86586
http://www.importnew.com/16538.html
demo码云地址(10.21更新,增加了java dsl):
https://gitee.com/doubletreelin/spring-integration-mydemo.git