请通过浏览器功能收藏网页

activemq 高级特性:异步发送消息 消费者特色 消费者优先级 再次传送策略 目标特色 消息预取 activemq

发布时间:2020-01-08 11:36:46  作者:本站编辑  来源:本站原创  浏览次数:
www.javainfo.com.cn 上干货 欢迎收藏

1异步发送消息

ActiveMQ支持生产者以同步或异步模式发送消息。使用不同的模式对send方法的反应时间有巨大的影响,反映时间是衡量ActiveMQ吞吐量的重要因素,使用异步发送可以提高系统的性能。

在默认大多数情况下,AcitveMQ是以异步模式发送消息。例外的情况:在没有使用事务的情况下,生产者以PERSISTENT传送模式发送消息。在这种情况下,send方法都是同步的,并且一直阻塞直到ActiveMQ发回确认消息:消息已经存储在持久性数据存储中。这种确认机制保证消息不会丢失,但会造成生产者阻塞从而影响反应时间。

高性能的程序一般都能容忍在故障情况下丢失少量数据。如果编写这样的程序,可以通过使用异步发送来提高吞吐量(甚至在使用PERSISTENT传送模式的情况下)。

使用Connection URI配置异步发送:

cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");

ConnectionFactory层面配置异步发送:

((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);

Connection层面配置异步发送,此层面的设置将覆盖ConnectionFactory层面的设置:

((ActiveMQConnection)connection).setUseAsyncSend(true);

消费者特色

消费者异步分派
ActiveMQ4中,支持ActiveMQ以同步或异步模式向消费者分派消息。这样的意义:可以以异步模式向处理消息慢的消费者分配消息;以同步模式向处理消息快的消费者分配消息。

ActiveMQ默认以同步模式分派消息,这样的设置可以提高性能。但是对于处理消息慢的消费者,需要以异步模式分派。

ConnectionFactory层面配置同步分派:

((ActiveMQConnectionFactory)connectionFactory).setDispatchAsync(false);

Connection层面配置同步分派,此层面的设置将覆盖ConnectionFactory层面的设置:

((ActiveMQConnection)connection).setDispatchAsync(false);

在消费者层面以Destination URI配置同步分派,此层面的设置将覆盖ConnectionFactoryConnection层面的设置:

queue = new ActiveMQQueue("TEMP.QUEUE?consumer.dispatchAsync=false");

consumer = session.createConsumer(queue);

消费者优先级

ActveMQ分布式环境中,在有消费者存在的情况下,如果更希望ActveMQ发送消息给消费者而不是其他的ActveMQActveMQ的传送,可以如下设置:

queue = new ActiveMQQueue("TEMP.QUEUE?consumer.prority=10");

consumer = session.createConsumer(queue);

4.4.2.3 独占的消费者
ActiveMQ维护队列消息的顺序并顺序把消息分派给消费者。但是如果建立了多个SessionMessageConsumer,那么同一时刻多个线程同时从一个队列中接收消息时就并不能保证处理时有序。

有时候有序处理消息是非常重要的ActiveMQ4支持独占的消费。ActiveMQ挑选一个MessageConsumer,并把一个队列中所有消息按顺序分派给它。如果消费者发生故障,那么ActiveMQ将自动故障转移并选择另一个消费者。可以如下设置:

queue = new ActiveMQQueue("TEMP.QUEUE?consumer.exclusive=true");

consumer = session.createConsumer(queue);

再次传送策略
在以下三种情况中,消息会被再次传送给消费者:

1.在使用事务的Session中,调用rollback()方法;

2.在使用事务的Session中,调用commit()方法之前就关闭了Session;

3.在Session中使用CLIENT_ACKNOWLEDGE签收模式,并且调用了recover()方法。

可以通过设置ActiveMQConnectionFactoryActiveMQConnection来定制想要的再次传送策略。

属性

默认值

描述

collisionAvoidanceFactor

0.15

The percentage of range of collision avoidance if enabled

maximumRedeliveries

6

Sets the maximum number of times a message will be redelivered before it is considered a poisoned pill and returned to the broker so it can go to a Dead Letter Queue

initialRedeliveryDelay

1000L

The initial redelivery delay in milliseconds

useCollisionAvoidance

false

Should the redelivery policy use collision avoidance

useExponentialBackOff

false

Should exponential back-off be used (i.e. to exponentially increase the timeout)

backOffMultiplier

5

The back-off multiplier

 

5  目标特色

1.1版本之后,ActiveMQ支持混合目标技术。它允许在一个JMS目标中使用一组JMS目标。

例如可以利用混合目标在同一操作中用向12个队列发送同一条消息或者在同一操作中向一个主题和一个队列发送同一条消息。

在混合目标中,通过“,”来分隔不同的目标。

 

// send to 3 queues as one logical operation

Queue queue = new ActiveMQQueue("FOO.A,FOO.B,FOO.C");

producer.send(queue, someMessage);

如果在一个目标中混合不同类别的目标,可以通过使用“queue://”“topic://”前缀来识别不

// send to queues and topic one logical operation

Queue queue = new ActiveMQQueue("FOO.A,topic://NOTIFY.FOO.A");

producer.send(queue, someMessage);

 

 目标选项

属性

默认值

描述

consumer.prefetchSize

variable

The number of message the consumer will prefetch.

consumer.maximumPendingMessageLimit

0

Use to control if messages are dropped if a slow consumersituation exists.

consumer.noLocal

false

Same as the noLocal flag on a Topic consumer. Exposed here so that it can be used with a queue.

consumer.dispatchAsync

false

Should the broker dispatch messages asynchronously to the consumer.

consumer.retroactive

false

Is this a Retroactive Consumer.

consumer.selector

null

JMS Selector used with the consumer.

consumer.exclusive

false

Is this an Exclusive Consumer.

consumer.priority

0

Allows you to configure aConsumer Priority.

 

queue = new ActiveMQQueue("TEMP.QUEUE?consumer.dispatchAsync=false&consumer.prefetchSize=10");

consumer = session.createConsumer(queue);

消息预取

ActiveMQ的目标之一就是高性能的数据传送,所以ActiveMQ使用预取限制来控制有多少消息能及时的传送给任何地方的消费者。

一旦预取数量达到限制,那么就不会有消息被分派给这个消费者直到它发回签收消息(用来标识所有的消息已经被处理)。

可以为每个消费者指定消息预取。如果有大量的消息并且希望更高的性能,那么可以为这个消费者增大预取值。如果有少量的消息并且每条消息的处理都要花费很长的时间,那么可以设置预取值为1,这样同一时间,ActiveMQ只会为这个消费者分派一条消息。

Java客户端:

ConnectionFactory层面为所有消费者配置预取值:

tcp://localhost:61616?jms.prefetchPolicy.all=50

ConnectionFactory层面为队列消费者配置预取值:

tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1

使用目标选项为一个消费者配置预取值:

queue = new ActiveMQQueue("TEMP.QUEUE?consumer.prefetchSize=10");

consumer = session.createConsumer(queue);

配置连接URL(这块需要单独说明)

ActiveMQ支持通过Configuration URI明确的配置连接属性。

例如:当要设置异步发送时,可以通过在Configuration URI中使用jms.$PROPERTY来设置。

tcp://localhost:61616?jms.useAsyncSend=true

以下的选项在URI必须以“jms.”为前缀。

属性

默认值

描述

alwaysSessionAsync

true

If this flag is set then a seperate thread is not used for dispatching messages for each Session in the Connection. However, a separate thread is always used if there is more than one session, or the session isn't in auto acknowledge or dups ok mode

clientID

null

Sets the JMS clientID to use for the connection

closeTimeout

15000 (milliseconds)

Sets the timeout before a close is considered complete. Normally a close() on a connection waits for confirmation from the broker; this allows that operation to timeout to save the client hanging if there is no broker.

copyMessageOnSend

true

Should a JMS message be copied to a new JMS Message object as part of the send() method in JMS. This is enabled by default to be compliant with the JMS specification. You can disable it if you do not mutate JMS messages after they are sent for a performance boost.

disableTimeStampsByDefault

false

Sets whether or not timestamps on messages should be disabled or not. If you disable them it adds a small performance boost.

dispatchAsync

false

Should the broker dispatch messages asynchronously to the consumer.

nestedMapAndListEnabled

true

Enables/disables whether or not Structured Message Properties and MapMessagesare supported so that Message properties and MapMessage entries can contain nested Map and List objects. Available since version 4.1 onwards

objectMessageSerializationDefered

false

When an object is set on an ObjectMessage, the JMS spec requires the object to be serialized by that set method. Enabling this flag causes the object to not get serialized. The object may subsequently get serialized if the message needs to be sent over a socket or stored to disk.

optimizeAcknowledge

false

Enables an optimised acknowledgement mode where messages are acknowledged in batches rather than individually. Alternatively, you could use Session.DUPS_OK_ACKNOWLEDGE acknowledgement mode for the consumers which can often be faster. WARNINGenabling this issue could cause some issues with auto-acknowledgement on reconnection

optimizedMessageDispatch

true

If this flag is set then an larger prefetch limit is used - only applicable for durable topic subscribers 

useAsyncSend

false

Forces the use of Async Sends which adds a massive performance boost; but means that the send() method will return immediately whether the message has been sent or not which could lead to message loss.

useCompression

false

Enables the use of compression of the message bodies

useRetroactiveConsumer

false

Sets whether or not retroactive consumers are enabled. Retroactive consumers allow non-durable topic subscribers to receive old messages that were published before the non-durable subscriber started.



如有疑问 请留言 欢迎提供建议

评论已有 0