本文提供了 AMQP 0-9-1 的 RabbitMQ 实现指南。作为对 AMQP 规范所定义的类和方法的有力补充,RabbitMQ 还提供了一系列协议扩展,同样在此列出。原始以及扩展规范可以在协议页面中进行下载。
为方便大家使用,相关章节内提供了 Java 和 .Net 客户端的API指南的链接。每个方法及其参数的完整细节可以从完整的AMQP 0-9-1 参考中查看。
basic.ack(delivery-tag delivery-tag, bit multiple)
支持: 完整
对一条或多条消息进行确认。
当确认回执(acknowledgement)由客户端发送时,此方法用来确认单条或多条消息已经由 Deliver
或 Get-Ok
方法成功发送。当确认回执由服务器端发送时,此方用来确认单条或多条消息已经由 Publish
方法通过 confirm
模式下的信道(channel)成功发布。确认回执可用于单条消息甚至于一个消息集合,并且可以包含一条特定信息。
basic.cancel(consumer-tag consumer-tag, no-wait no-wait) ➔ cancel-ok
支持: 完整
结束队列消费者
此方法用来清除消费者。它不会影响到已经成功投递的消息,但是会使得服务器不再将新的消息投送给此消费者。客户端会在发送cancel
方法和收到cancel-ok
回复的过程中收到任意数量的消息。当消费者端发生不可预估的错误时,此方法也有可能由服务器发送给客户端(也就是说结束行为不是由客户端发送给服务器的basic.cancel方法所触发)。这种情况下客户端可以接收到由于队列被删除等原因引起的消费者丢失通知。需要注意的是,客户端从服务器接收basic.cancel
方法并不是必须实现的,它通过消息代理可以辨识以协商手段接受basic.cancel
的客户端的特性来正常工作。
basic.consume(short reserved-1, queue-name queue, consumer-tag consumer-tag, no-local no-local,no-ack no-ack, bit exclusive, no-wait no-wait, table arguments) ➔ consume-ok
支持:部分
启动队列消费者
此方法告知服务器开启一个“消费者”,此消费者实质是一个针对特定队列消息的持久化请求。消费者在声明过的信道(channel)中会一直存在,直到客户端清除他们为止。
basic.deliver(consumer-tag consumer-tag, delivery-tag delivery-tag, redelivered redelivered, exchange-name exchange, shortstr routing-key)
支持:完整
将消费者消息通知给客户端
此方法将一条消息通过消费者投递给客户端。在异步消息投递模式中,客户端通过Consume
方法启动消费者,然后服务器使用Deliver
方法将消息送达。
basic.get(short reserved-1, queue-name queue, no-ack no-ack) ➔ get-ok | get-empty
支持:完整
直接访问队列
此方法提供了通过同步通讯的方式直接访问队列内消息的途径。它针对的是一些有特殊需求的应用,例如对应用来说同步的功能性意义远大于应用性能。
basic.nack(delivery-tag delivery-tag, bit multiple, bit requeue)
此方法为RabbitMQ特有的AMQP扩展
拒绝单条或多条输入消息。
此方法允许客户端拒绝单条或多条输入消息。它可以用来打断或清除大体积消息的输入,或者将无法处理的消息返回给消息的原始队列。这个方法也可以在确认模式(confirm mode)下被服务器用来通知信道(channel)上的消息发布者有未被处理的消息存在。
RabbitMQ Documentation
javadoc | dotnetdoc | amqpdoc
basic.publish(short reserved-1, exchange-name exchange, shortstr routing-key, bit mandatory, bitimmediate)
支持:完整
发布单条消息
此方法用来发布单条消息到指定的交换机(exchange)。消息将会通过配置好的交换机根据既定规则路由给队列(queues),之后,如果存在事务处理(transaction),并且事务已经被提交,就会分发给活跃的消费者。
basic.qos(long prefetch-size, short prefetch-count, bit global) ➔ qos-ok
支持:部分
指定服务质量
此方法指定服务的服务质量。QoS可以分配给当前的信道(channel)或者链接内的所有信道。qos方法的特定属性和语义依赖于内容类的语义。虽然qos方法原则上可以用于服务端及客户端,但在这里此方法仅适用于服务器端。
basic.recover(bit requeue)
支持:部分
重新投递未被确认的消息。
此方法会要求服务器重新投递特定信道内所有未确认的消息。零条或多条消息会被重新投递。此方法用于替代异步恢复(asynchronous Recover)。
basic.recover-async(bit requeue)
重新投递未确认的消息。
此方法会要求服务器重新投递特定信道内所有未确认的消息。零条或多条消息会被重新投递。此方法已经弃用,取而代之的是同步 Recover/Recover-Ok
方法。
basic.reject(delivery-tag delivery-tag, bit requeue)
支持:部分
拒绝单条输入消息。
此方法允许客户端拒绝单条或多条输入消息。它可以用来打断或清除大体积消息的输入,或者将无法处理的消息返回给消息的原始队列。
RabbitMQ blog post
javadoc | dotnetdoc | amqpdoc
basic.return(reply-code reply-code, reply-text reply-text, exchange-name exchange, shortstr routing-key)
支持: 部分
返回单条处理失败的消息
此方法将发布时打有"immediate"
标签的无法投递的,或发布时打有"mandatory"
标签的无法正确路由的单条消息返回。应答代码或文字中会注明失败原因。
channel.close(reply-code reply-code, reply-text reply-text, class-id class-id, method-id method-id) ➔close-ok
支持:完整
请求关闭信道。
此方法表明发送者希望关闭信道。这通常是由于内部条件(如强制关闭)或者由于处理特定方法引起的错误(也就是Exception)触发。当关闭行为是由 exception 触发时,发送者需提供引起 exception 的方法的 class id 和 method id。
[javadoc] | [dotnetdoc] | [amqpdoc]
channel.flow(bit active) ➔ flow-ok
支持:部分
启用/禁用对端流
此方法要求对端暂停或者重启消费者发送的内容数据流。这是一个简单的流控制机制,用来避免信道的队列溢出或者发现信道接收的消息是否超出了其处理能力。需要注意的是,此方法目的不在于控制窗口。它不会影响到Basic.Get-Ok
方法返回的内容。
[amqpdoc]
channel.open(shortstr reserved-1) ➔ open-ok
支持:完整
打开一个信道使用。
此方法会打开一个信道用于与服务器通讯。
[amqpdoc]
此类为RabbitMQ特有的AMQP扩展
confirm.select(bit nowait) ➔ select-ok
.
此方法用来设置信道以便使用发布者确认回执(acknowledgements)。客户端仅可将此方法用于非事务性信道。
RabbitMQ Documentation
[javadoc] | [dotnetdoc] | [amqpdoc]
exchange.bind(short reserved-1, exchange-name destination, exchange-name source, shortstr routing-key, no-wait no-wait, table arguments) ➔ bind-ok
此方法为RabbitMQ特有的AMQP扩展
将两个交换机进行绑定。
此方法将一个交换机绑定到另一个交换机上。
RabbitMQ Documentation
RabbitMQ blog post
[javadoc] | [dotnetdoc] | [amqpdoc]
exchange.declare(short reserved-1, exchange-name exchange, shortstr type, bit passive, bit durable, bitauto-delete*, bit internal*, no-wait no-wait, table arguments) ➔ declare-ok
RabbitMQ针对AMQP协议的扩展
支持:完整
验证交换机是否存在,如有需要创建之。
如果指定交换机不存在,此方法会新建之。如果交换机已经存在,会验证其类型是否正确。
RabbitMQ针对AMQP规范实现了一个扩展,此扩展允许将无法正确路由的消息投递到一个替代交换机中(AE)。替代交换机的特性可以帮助判断客户端何时发布了无法路由的消息,并且能够提供 "or else"
路由语义去对某些消息做特殊处理,其他的消息则由通用方法进行处理。
AE documention
[javadoc] | [dotnetdoc] | [amqpdoc]
exchange.delete(short reserved-1, exchange-name exchange, bit if-unused, no-wait no-wait) ➔ delete-ok
支持:部分
删除交换机
此方法用于删除交换机。当一个交换机被删除后,与其绑定的所有队列都会被清除。
[javadoc] | [dotnetdoc] | [amqpdoc]
exchange.unbind(short reserved-1, exchange-name destination, exchange-name source, shortstrrouting-key, no-wait no-wait, table arguments) ➔ unbind-ok
此方法为RabbitMQ特有的AMQP扩展
解除两个交换机之间的绑定关系
此方法用于解除两个交换机之间的绑定关系。
[javadoc] | [dotnetdoc] | [amqpdoc]
queue.bind(short reserved-1, queue-name queue, exchange-name exchange, shortstr routing-key, no-wait no-wait, table arguments) ➔ bind-ok
支持:完整
将队列绑定到交换机
此方法用于绑定队列到交换机。队列绑定到交换机之前不会接收到任何消息。在经典消息模型中,存储转发队列绑定到直连交换机,订阅队列绑定到主题交换机。
[javadoc] | [dotnetdoc] | [amqpdoc]
queue.declare(short reserved-1, queue-name queue, bit passive, bit durable, bit exclusive, bit auto-delete, no-wait no-wait, table arguments) ➔ declare-ok
支持:完整
声明队列,如果队列不存在创建之。
此方法用于创建或检查队列。当新建一个队列时,客户端可以指定一系列属性用于控制队列的持久性及其内容,还有队列的分享等级。
RabbitMQ为AMQP规范实现了一些扩展,允许队列创建者控制队列各个方面的行为。
每个队列的消息生命周期
这个扩展决定了一条消息从发布到被服务器丢弃的生存时间。此方法中设置生存时间的参数为 x-message-ttl
。
队列的过期时间
队列可以在声明时指定租约时限。租约时限指的是如果队列一直未被使用,多久之后服务器会将其自动删除。租约时限由此方法的x-expires
参数指定。
x-message-ttl documentation
x-expires documentation [javadoc] | [dotnetdoc] | [amqpdoc]
queue.delete(short reserved-1, queue-name queue, bit if-unused, bit if-empty, no-wait no-wait) ➔ delete-ok
支持:部分
删除队列。
此方法用于删除一个队列。如果服务器设置了死信队列(dead-letter queue),当队列被某个删除时,任何依存于此队列的消息都会被发送到死信队列中,队列上的所有消费者都会被清除掉。
[javadoc] | [dotnetdoc] | [amqpdoc]
queue.purge(short reserved-1, queue-name queue, no-wait no-wait) ➔ purge-ok
支持:完整
清空队列。
此方法会将队列中的所有不处于等待 确认回执(acknowledgment)状态的消息全部移除。
[javadoc] | [dotnetdoc] | [amqpdoc]
queue.unbind(short reserved-1, queue-name queue, exchange-name exchange, shortstr routing-key,table arguments) ➔ unbind-ok
支持:部分
解除队列与交换机的绑定。
此方法用于解除队列与交换机的绑定关系。
[javadoc] | [dotnetdoc] | [amqpdoc]
tx.commit() ➔ commit-ok
支持:完整
提交当前事务。
此方法用于提交当前事务中所有的消息发布以及确(acknowledgments)认执行动作。
[javadoc] | [dotnetdoc] | [amqpdoc]
tx.rollback() ➔ rollback-ok
支持:完整
终止当前事务。
此方法用于终止当前事务中的所有消息发布以及确认提交操作。回滚动作完成后,一个新的事务随即开始。如果有必要,应该发布一个明确的恢复操作。
[javadoc] | [dotnetdoc] | [amqpdoc]
tx.select() ➔ select-ok
支持:完整
选择标准事务模式。
此方法设置信道使用标准事务模式。客户端在使用提交(Commit)或者(回滚)方法之前,需要至少在信道上使用一次此方法。
[javadoc] | [dotnetdoc] | [amqpdoc]