activeMQ 常用对象及 消息选择器 select使用方式 activemq
以下介绍对象的顺序 按照初始化的代码顺序描述
1 创建连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.0.135:61616");
注意: 当一个程序执行完成后,必须关闭之前创建的Connection,否则ActiveMQ不能释放资源,关闭一个Connection同样也关闭了Session,MessageProducer和MessageConsumer
Connection connection = connectionFactory.createConnection();
Connection.start();
2 创建 Session
Session是一个发送或接收消息的线程,可以使用Session创建MessageProducer,MessageConsumer和Message。
Session是否使用事物,只需设置一个Boolean值,可以通过向Connection上的适当创建方法传递一个布尔参数对此进行设置。
Session createSession(boolean transacted, int acknowledgeMode);
其中transacted为使用事务标识,acknowledgeMode为签收模式【下面介绍】。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
3 Destination
Destination是一个客户端用来指定生产消息目标和消费消息来源的对象。
在PTP【点对点】模式中,Destination被称作Queue即队列;
在Pub/Sub(主题)模式,Destination被称作Topic即主题。
在程序中可以使用多个Queue和Topic。
Destination destination = session.createQueue("testqueue");
Destination destination = session.createTopic("testtopic");
4 MessageProducer(消息生产者)
MessageProducer是一个由Session创建的对象,用来向Destination发送消息。
MessageProducer producer = session.createProducer(destination);
发送消息
void send(Message message);
void send(Destination destination, Message message);
void send(Destination destination, Message message, int deliveryMode, int priority,
long timeToLive);
void send(Message message, int deliveryMode, int priority, long timeToLive);
说明:deliveryMode为传送模式;priority为消息优先级;timeToLive为消息过期时间。
5 MessageConsumer(消息消费者)
MessageConsumer是一个由Session创建的对象,用来从Destination接收消息。
MessageConsumer createConsumer(Destination destination);
MessageConsumer createConsumer(Destination destination, String messageSelector);
MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal);
TopicSubscriber createDurableSubscriber(Topic topic, String name);
TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal);
messageSelector为消息选择器;
noLocal(只适用于主题)标志默认为false,当设置为true时限制消费者只能接收和自己相同的连接(Connection)所发布的消息;
name标识订阅主题所对应的订阅名称,持久订阅时需要设置此参数。
MessageConsumer consumer = session.createConsumer(destination);
消息的接收
同步接收:客户端主动去接收消息,客户端可以采用MessageConsumer 的receive方法去接收消息。
ActiveMQMessageConsumer方法:
Message receive() (用的少,一般都采用接口通知的方式来接收消息)
Message receive(long timeout)
Message receiveNoWait()
timeout为等待时间,单位为毫秒。
Message message = consumer.receive();
异步接收:当消息到达时,ActiveMQ主动通知客户端。客户端可以通过注册一个实现MessageListener 接口的对象到MessageConsumer。MessageListener只有一个必须实现的方法 —— onMessage,它只接收一个参数,即Message。在为每个发送到Destination的消息实现onMessage时,将调用该方法。
实现MessageListener接口,每当消息到达时,ActiveMQ会调用MessageListener中的onMessage 函数。
消息选择器
JMS提供了一种机制对消息进行过滤过滤。这样consumer(接收者)可以避免接收不符合条件的消息。
消息选择器是用于MessageConsumer的过滤器,可以用来过滤传入消息的属性和消息头部分(但不过滤消息体),并确定是否将实际消费该消息。按照JMS文档的说法,消息选择器是一些字符串,它们基于某种语法,而这种语法是SQL-92的子集。
选择器的设置是成对出现的,解释如下:
消费者在接收信息的时候会检查生产者的属性,所以 我们需要对 生产者(producer)、消费者同时设置 ,才可以起到过滤的效果。
如下代码:
Producer: 生成者 设置一些属性
message.setIntProperty("inputsize", line.length());
message.setStringProperty("p1", "p1");
producer.send(message);
consumer2: 消费者2
consumer = session.createConsumer(destination,"inputsize=2 and p1='p1'");
说明: 消费者2 只消费字符长度为2 且 生成者是 P1的消息;
consumer22: 消费者22
consumer = session.createConsumer(destination,"inputsize=2");
说明: 消费者22 ,只消费字符长度为 2 的消息。
consumer3: 消费者3:
consumer = session.createConsumer(destination,"inputsize=3 and p1='p1'");
说明:消费者3 只消费字符长度为3的消息
特别说明:不包含其他生产者的情况下,消费者2 与 消费者22 的select的条件是 部分匹配的, 此时消费者2 与 消费者3 都会消费 长度为2的消息,概率一般是均等。
如有疑问 请留言 欢迎提供建议
评论已有 0 条