请通过浏览器功能收藏网页

activeMQ 常用对象及 消息选择器 select使用方式 activemq

发布时间:2020-01-08 11:16:12  作者:本站编辑  来源:本站原创  浏览次数:
www.javainfo.com.cn 上干货 欢迎收藏

以下介绍对象的顺序 按照初始化的代码顺序描述

1 创建连接工厂

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.0.135:61616");

 

注意:  当一个程序执行完成后,必须关闭之前创建的Connection,否则ActiveMQ不能释放资源,关闭一个Connection同样也关闭了SessionMessageProducerMessageConsumer

Connection connection = connectionFactory.createConnection();

Connection.start();

 

2 创建 Session

Session是一个发送或接收消息的线程,可以使用Session创建MessageProducerMessageConsumerMessage

Session是否使用事物,只需设置一个Boolean值,可以通过向Connection上的适当创建方法传递一个布尔参数对此进行设置。

Session createSession(boolean transacted, int acknowledgeMode);

其中transacted为使用事务标识,acknowledgeMode为签收模式【下面介绍】。

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

3  Destination

Destination是一个客户端用来指定生产消息目标和消费消息来源的对象。

PTP【点对点】模式中,Destination被称作Queue即队列;

Pub/Sub(主题)模式,Destination被称作Topic即主题。

在程序中可以使用多个QueueTopic

 

Destination destination = session.createQueue("testqueue");

Destination destination = session.createTopic("testtopic");

 

4 MessageProducer(消息生产者)

MessageProducer是一个由Session创建的对象,用来向Destination发送消息。

MessageProducer producer = session.createProducer(destination);

 发送消息

void send(Message message);

void send(Destination destination, Message message);

void send(Destination destination, Message message, int deliveryMode, int priority,

long timeToLive);

void send(Message message, int deliveryMode, int priority, long timeToLive);

说明:deliveryMode为传送模式;priority为消息优先级;timeToLive为消息过期时间。

 

5 MessageConsumer(消息消费者)

MessageConsumer是一个由Session创建的对象,用来从Destination接收消息。

MessageConsumer createConsumer(Destination destination);

MessageConsumer createConsumer(Destination destination, String messageSelector); 

MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal);

TopicSubscriber createDurableSubscriber(Topic topic, String name);

TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal);

messageSelector为消息选择器;

noLocal(只适用于主题)标志默认为false,当设置为true时限制消费者只能接收和自己相同的连接(Connection)所发布的消息;

name标识订阅主题所对应的订阅名称,持久订阅时需要设置此参数。

 

MessageConsumer consumer = session.createConsumer(destination);

 

消息的接收

同步接收:客户端主动去接收消息,客户端可以采用MessageConsumer receive方法去接收消息。

ActiveMQMessageConsumer方法:

Message receive()   (用的少,一般都采用接口通知的方式来接收消息)

Message receive(long timeout)

Message receiveNoWait()

timeout为等待时间,单位为毫秒。

 

Message message = consumer.receive();

 

异步接收:当消息到达时,ActiveMQ主动通知客户端。客户端可以通过注册一个实现MessageListener 接口的对象到MessageConsumerMessageListener只有一个必须实现的方法 —— onMessage,它只接收一个参数,即Message。在为每个发送到Destination的消息实现onMessage时,将调用该方法。

 

实现MessageListener接口,每当消息到达时,ActiveMQ会调用MessageListener中的onMessage 函数。

 

消息选择器

JMS提供了一种机制对消息进行过滤过滤。这样consumer(接收者)可以避免接收不符合条件的消息。

消息选择器是用于MessageConsumer的过滤器,可以用来过滤传入消息的属性和消息头部分(但不过滤消息体),并确定是否将实际消费该消息。按照JMS文档的说法,消息选择器是一些字符串,它们基于某种语法,而这种语法是SQL-92的子集。

 

选择器的设置是成对出现的,解释如下:

消费者在接收信息的时候会检查生产者的属性,所以 我们需要对 生产者(producer)、消费者同时设置 ,才可以起到过滤的效果。

如下代码:

Producer: 生成者 设置一些属性

   message.setIntProperty("inputsize", line.length());

                message.setStringProperty("p1", "p1");

                producer.send(message);

 

consumer2:  消费者2

      consumer = session.createConsumer(destination,"inputsize=2 and p1='p1'");

说明: 消费者2 只消费字符长度为2 生成者是 P1的消息;

 

consumer22: 消费者22

consumer = session.createConsumer(destination,"inputsize=2");

说明: 消费者22 ,只消费字符长度为 2 的消息。

 

consumer3: 消费者3

consumer = session.createConsumer(destination,"inputsize=3 and p1='p1'");

说明:消费者3 只消费字符长度为3的消息

 

特别说明:不包含其他生产者的情况下,消费者2 消费者22 select的条件是 部分匹配的, 此时消费者2 消费者3 都会消费 长度为2的消息,概率一般是均等。



如有疑问 请留言 欢迎提供建议

评论已有 0