允许多个并发用户处理在同一个通讯通道接收的消息。这种模式使系统能够同时处理多个邮件,以优化吞吐量,提高可扩展性和可用性,以及平衡工作负载。
在云中运行的应用程序,可以预计,以处理大量的请求。而不是过程的每个请求同步地,一个常用的方法是通过一个消息传送系统到该异步地处理它们的另一服务(消费者服务),以通过他们的应用程序。这种策略有助于确保在应用程序的业务逻辑没有被阻塞,而正在处理的请求。
请求的数量可以随着时间的原因有很多显著变化。突然一阵在用户活动或聚集的请求,来自多个租户未来可能会导致不可预测的工作负载。在高峰时间的系统可能需要处理许多每秒数百个请求,而在其他时间的数量可能是非常小的。
此外,该工作的性质进行的处理这些请求可能是高度可变的。使用消费者服务的单个实例,可能会导致该实例成为充斥请求或消息传送系统可通过消息从应用程序来的流入被重载。
为了处理这种波动的负载,该系统可以运行消费者服务的多个实例。然而这些消费者必须协调,以确保每个消息只传送给一个单个消费者。工作量也需要跨消费者被负载平衡,以防止一个实例成为瓶颈。
使用消息队列来实现应用和消费者服务的实例之间的通信信道。在消息队列中的形式应用帖请求,以及消费者的服务实例从队列中接收消息并对其进行处理。这种方法使消费者的服务实例的同一池中从应用程序的任何实例处理消息。
该解决方案具有以下优点:
在决定如何实现这个模式时,请考虑以下几点:
注意
微软 Azure 服务总线队列可以通过使用消息会先入先出消息的顺序工具保证。欲了解更多信息,请参阅消息传递模式 MSDN 上使用会话。
注意
如果您正在使用 Azure 的工作进程可能能够通过使用专用的邮件回复队列回传结果的应用程序逻辑。应用逻辑必须能够将这些结果与原来的消息关联起来。这种情况下进行了更详细的异步消息的引物进行说明。
使用这种模式时:
这种模式可能不适合时:
有些邮件系统支持会话,使生产者对消息进行分组在一起,并确保它们都被同一个接收者处理。这个机制可以与优先消息使用(如果它们支持)来实现消息排序的一种形式,在顺序从生产者传送消息到单个消费者。
Azure 提供存储队列和服务总线队列,可作为一个合适的机制来实现这种模式。应用逻辑可以发布消息到一个队列,而消费者实现为在一个或多个角色的任务可以检索从这个队列中的消息并进行处理。对于弹性,一个服务总线队列使得消费者使用 PeekLock 模式,当它从队列检索消息。这种模式实际上不是删除消息,而只是从其他消费者隐藏它。当处理完它原来的用户可以删除该邮件。如果消费者要失败,偷看锁将超时,消息将再次变得可见,让消费者又找回它。
有关使用 Azure 的服务总线队列的详细信息,请参阅服务总线队列,主题和 MSDN 上的订阅。有关使用 Azure 存储队列的信息,请参阅如何 MSDN 上使用队列存储服务。
从可供下载的例子 CompetingConsumers 解决方案的 QueueManager 类下面的代码显示了本指南说明了如何通过在网络或辅助角色开始的事件处理程序使用 QueueClient 实例中创建一个队列。
private string queueName = ...; |
private string connectionString = ...; |
... |
public async Task Start() |
{ |
// Check if the queue already exists. |
var manager = NamespaceManager.CreateFromConnectionString(this.connectionString); |
if (!manager.QueueExists(this.queueName)) |
{ |
var queueDescription = new QueueDescription(this.queueName); |
// Set the maximum delivery count for messages in the queue. A message |
// is automatically dead-lettered after this number of deliveries. The |
// default value for dead letter count is 10. |
queueDescription.MaxDeliveryCount = 3; |
await manager.CreateQueueAsync(queueDescription); |
} |
... |
// Create the queue client. By default the PeekLock method is used. |
this.client = QueueClient.CreateFromConnectionString( |
this.connectionString, this.queueName); |
} |
public async Task SendMessagesAsync() |
{ |
// Simulate sending a batch of messages to the queue. |
var messages = new List<BrokeredMessage>(); |
for (int i = 0; i < 10; i++) |
{ |
var message = new BrokeredMessage() { MessageId = Guid.NewGuid().ToString() }; |
messages.Add(message); |
} |
await this.client.SendBatchAsync(messages); |
} |
private ManualResetEvent pauseProcessingEvent; |
... |
public void ReceiveMessages(Func<BrokeredMessage, Task> processMessageTask) |
{ |
// Set up the options for the message pump. |
var options = new OnMessageOptions(); |
// When AutoComplete is disabled it is necessary to manually |
// complete or abandon the messages and handle any errors. |
options.AutoComplete = false; |
options.MaxConcurrentCalls = 10; |
options.ExceptionReceived += this.OptionsOnExceptionReceived; |
// Use of the Service Bus OnMessage message pump. |
// The OnMessage method must be called once, otherwise an exception will occur. |
this.client.OnMessageAsync( |
async (msg) => |
{ |
// Will block the current thread if Stop is called. |
this.pauseProcessingEvent.WaitOne(); |
// Execute processing task here. |
await processMessageTask(msg); |
}, |
options); |
} |
... |
private void OptionsOnExceptionReceived(object sender, |
ExceptionReceivedEventArgs exceptionReceivedEventArgs) |
{ |
... |
} |