介绍
RabbitMQ 是一个开源的消息代理和队列服务器,实现 [AMQP](http://www.rabbitmq.com/tutorials/amqp-concepts.html) (Advanced Message Queuing Protocol) 标准。本教程将引导你通过几个简单的步骤来设置并使用 RabbitMQ。
环境准备
1. 安装 RabbitMQ
- 在你的机器上安装 RabbitMQ。对于大多数 Linux 发行版,可以使用包管理器(如 `apt` 或 `yum`)进行安装。
- 对于 Windows 和 macOS,可以从官方网站下载安装包。
2. 启动 RabbitMQ 服务
- 安装完成后,启动 RabbitMQ 服务。
3. 安装客户端库
- 选择一种编程语言,并安装相应的客户端库。本教程以 Python 为例。
- 使用 pip 安装 Pika 库:
```bash
pip install pika
```
示例代码
1. 发送消息
```python
import pika
def send_message(queue_name):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明队列
channel.queue_declare(queue=queue_name)
# 发送消息
message = "Hello, RabbitMQ!"
channel.basic_publish(exchange='',
routing_key=queue_name,
body=message)
print(" [x] Sent %r" % message)
connection.close()
if __name__ == '__main__':
send_message('hello')
```
2. 接收消息
```python
import pika
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
def receive_message(queue_name):
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=queue_name)
# 设置回调函数
channel.basic_consume(queue=queue_name,
on_message_callback=callback,
auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
receive_message('hello')
```
概念解析
- Exchange: 交换机决定消息发送到哪个队列。
- Queue: 队列用来保存消息直到被消费。
- Routing Key: 路由键告诉交换机如何路由消息到队列。
- Binding: 绑定是队列和交换机之间的关联。
- Consumer: 消费者从队列中接收消息。
更进一步
使用持久化消息
```python
channel.basic_publish(exchange='',
routing_key='persistent_queue',
body='Persistent message',
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
```
使用发布确认
```python
channel.confirm_delivery()
channel.basic_publish(exchange='',
routing_key='confirm_queue',
body='Message with confirmation')
```
使用发布确认失败的处理
```python
def on_delivery_confirmation(frame):
if frame.method.PUBLISH:
print(' [x] Publish confirmed')
elif frame.method.NACK:
print(' [!] Publish NACKed')
channel.add_on_return_callback(on_delivery_confirmation)
```
使用工作队列
```python
生产者
def send_work(queue_name):
channel.queue_declare(queue=queue_name)
channel.basic_qos(prefetch_count=1)
for i in range(10):
message = f"Task {i}"
channel.basic_publish(exchange='',
routing_key=queue_name,
body=message)
print(f" [x] Sent {message}")
消费者
def do_work(channel, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print(" [x] Done")
channel.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=do_work)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
```