1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| import json
import pika
from house_info import HouseInfo
AMQP_CONFIG = { 'host': '127.0.0.1', 'port': 5672, 'username': 'guest', 'password': 'guest' } AMQP_QUEUE_NAME = 'house-inquery' AMQP_FEEDBACK_QUEUE_NAME = 'house-inquery-result'
def process_feedback(msg): mq_credential = pika.PlainCredentials( username=AMQP_CONFIG['username'], password=AMQP_CONFIG['password'] ) mq_conn = pika.BlockingConnection(pika.ConnectionParameters( host=AMQP_CONFIG['host'], credentials=mq_credential )) mq_channel = mq_conn.channel() mq_channel.queue_declare(queue=AMQP_FEEDBACK_QUEUE_NAME, durable=True) return mq_channel.basic_publish(exchange='', routing_key=AMQP_FEEDBACK_QUEUE_NAME, body=msg, properties=pika.BasicProperties( delivery_mode=2, content_encoding='UTF-8', content_type='text/plain' ) )
user_pwd = pika.PlainCredentials(AMQP_CONFIG['username'], AMQP_CONFIG['password']) s_conn = pika.BlockingConnection(pika.ConnectionParameters(AMQP_CONFIG['host'], credentials=user_pwd)) channel = s_conn.channel() channel.queue_declare(queue=AMQP_QUEUE_NAME, durable=True)
def callback(ch, method, properties, body): print(body) query_data = json.loads(body) if not query_data: return try: ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: print(e)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue=AMQP_QUEUE_NAME, no_ack=False) channel.start_consuming()
|