Python使用pika连接rabbitmq

一个连接rabbitmq的python例子.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
# coding: utf-8
import json

import pika
# from selenium.webdriver.common.by import By

from house_info import HouseInfo

# CONFIG AREA
AMQP_CONFIG = {
'host': '127.0.0.1',
'port': 5672,
'username': 'guest',
'password': 'guest'
}
AMQP_QUEUE_NAME = 'house-inquery'
AMQP_FEEDBACK_QUEUE_NAME = 'house-inquery-result'


# 因为反馈回去的队列要求在不同队列,所以重新建立了一个新的队列连接
def process_feedback(msg):
mq_credential = pika.PlainCredentials(
username=AMQP_CONFIG['username'],
password=AMQP_CONFIG['password']
)
mq_conn = pika.BlockingConnection(pika.ConnectionParameters(
host=AMQP_CONFIG['host'],
credentials=mq_credential
))
mq_channel = mq_conn.channel()
mq_channel.queue_declare(queue=AMQP_FEEDBACK_QUEUE_NAME, durable=True)
return mq_channel.basic_publish(exchange='',
routing_key=AMQP_FEEDBACK_QUEUE_NAME,
body=msg,
properties=pika.BasicProperties(
delivery_mode=2,
content_encoding='UTF-8',
content_type='text/plain'
)
)


user_pwd = pika.PlainCredentials(AMQP_CONFIG['username'], AMQP_CONFIG['password'])
s_conn = pika.BlockingConnection(pika.ConnectionParameters(AMQP_CONFIG['host'], credentials=user_pwd))
channel = s_conn.channel()
channel.queue_declare(queue=AMQP_QUEUE_NAME, durable=True)


def callback(ch, method, properties, body):
print(body)
query_data = json.loads(body)
if not query_data:
return
try:
# 一些业务逻辑代码
# 应答消息
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(e)

# 设置一次只获取一条消息处理
channel.basic_qos(prefetch_count=1)
# 启用ack,防止异常消息丢失 确认处理后再删除消息
channel.basic_consume(callback, queue=AMQP_QUEUE_NAME, no_ack=False)
channel.start_consuming()

关注公众号 尹安灿