请通过浏览器功能收藏网页

activemq 代码实例:采用receive模式接收消息 activemq

发布时间:2020-01-08 11:45:37  作者:本站编辑  来源:本站原创  浏览次数:
www.javainfo.com.cn 上干货 欢迎收藏

总结: 多个consumer 接收一个producer的消息的时候,消息一般会被均分到每个消费者。

如果我们希望某个consumer来获取某些消息 ,我们应该为接收者添加过滤器;

采用receive 模式接收消息

消费者代码:

/**
 * 消息消费者
 * @author Administrator
 *
 */
public class JMSConsumer {

	private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
	private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
	private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
	
	public static void main(String[] args) {
		ConnectionFactory connectionFactory; // 连接工厂
		Connection connection = null; // 连接
		Session session; // 会话 接受或者发送消息的线程
		Destination destination; // 消息的目的地
		MessageConsumer messageConsumer; // 消息的消费者
		
		// 实例化连接工厂
		connectionFactory=new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);
				
		try {
			connection=connectionFactory.createConnection();  // 通过连接工厂获取连接
			connection.start(); // 启动连接
			session=connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 创建Session
			destination=session.createQueue("FirstQueue1");  // 创建连接的消息队列
			messageConsumer=session.createConsumer(destination); // 创建消息消费者
			while(true){
				TextMessage textMessage=(TextMessage)messageConsumer.receive(100000);
				if(textMessage!=null){
					System.out.println("收到的消息:"+textMessage.getText());
				}else{
					break;
				}
			}
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} 
	}
}

消息生产者:

public class JMSProducer {

	private static final String USERNAME=ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
	private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
	private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
	private static final int SENDNUM=10; // 发送的消息数量
	
	public static void main(String[] args) {
		ConnectionFactory connectionFactory; // 连接工厂
		Connection connection = null; // 连接对象
		Session session; // 会话 接受或者发送消息的线程
		Destination destination; // 消息的目的地
		MessageProducer messageProducer; // 消息生产者
		
		// 实例化连接工厂
		connectionFactory=new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);
		try {
			connection=connectionFactory.createConnection(); // 通过连接工厂获取连接
			connection.start(); // 启动连接
			session=connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Session
			destination=session.createQueue("FirstQueue1"); // 创建消息队列
			messageProducer=session.createProducer(destination); // 创建消息生产者
			for(int i=0;i<JMSProducer.SENDNUM;i++){
				TextMessage message=session.createTextMessage("ActiveMQ 发送的消息"+i);
				System.out.println("发送消息:"+"ActiveMQ 发送的消息"+i);
				messageProducer.send(message);
			}
			session.commit();
		} catch (Exception e) {
			e.printStackTrace();
		} finally{
			if(connection!=null){
				try {
					connection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}
	
}



如有疑问 请留言 欢迎提供建议

评论已有 0