RabbitMQ中RPC的实现:客户端发送请求消息,服务端回复响应消息,为了接受响应response,客户端需要发送一个回调队列的地址来接受响应,每条消息在发送的时候会带上一个唯一的correlation_id,相应的服务端处理计算后会将结果返回到对应的correlation_id。
RPC调用流程:
当生产者启动时,它会创建一个匿名的独占回调队列,对于一个RPC请求,生产者发送一条具有两个属性的消息:reply_to(回调队列),correlation_id(每个请求的唯一值),请求被发送到rpc_queue队列,消费者等待该队列上的请求。当一个请求出现时,它会执行该任务,将带有结果的消息发送回生产者。生产者等待回调队列上的数据,当消息出现时,它检查相关ID属性,如果它与请求中的值匹配,则返回对应用程序的响应。
RabbitMQ斐波拉契计算的RPC,消费者实现:
"""基于RabbitMQ实现RPC通信机制 --> 服务端"""import pikaimport uuidfrom functools import lru_cacheclass RabbitServer(object): def __init__(self): self.conn = pika.BlockingConnection( pika.ConnectionParameters(host='localhost', port=5672) ) self.channel = self.conn.channel() # 声明一个队列,并进行持久化,exclusive设置为false self.channel.queue_declare( exclusive=False, durable=True, queue='task_queue' ) # 声明一个exhange交换机,类型为topic self.channel.exchange_declare( exchange='logs_rpc', exchange_type='topic', durable=True ) # 将队列与交换机进行绑定 routing_keys = ['#'] # 接受所有的消息 for routing_key in routing_keys: self.channel.queue_bind( exchange='logs_rpc', queue='task_queue', routing_key=routing_key ) @lru_cache() def fib(self, n): """ 斐波那契数列.===>程序的处理逻辑 使用lru_cache 优化递归 :param n: :return: """ if n == 0: return 0 elif n == 1: return 1 else: return self.fib(n - 1) + self.fib(n - 2) def call_back(self, channel, method, properties, body): print('------------------------------------------') print('接收到的消息为(斐波那契数列的入参项为):{}'.format(str(body))) print('消息的相关属性为:') print(properties) value = self.fib(int(body)) print('斐波那契数列的运行结果为:{}'.format(str(value))) # 交换机将消息发送到队列 self.channel.basic_publish( exchange='', routing_key=properties.reply_to, body=str(value), properties=pika.BasicProperties( delivery_mode=2, correlation_id=properties.correlation_id, )) # 消费者对消息进行确认 self.channel.basic_ack(delivery_tag=method.delivery_tag) def receive_msg(self): print('开始接受消息...') self.channel.basic_qos(prefetch_count=1) self.channel.basic_consume( consumer_callback=self.call_back, queue='task_queue', no_ack=False, # 消费者对消息进行确认 consumer_tag=str(uuid.uuid4()) ) def consume(self): self.receive_msg() self.channel.start_consuming()if __name__ == '__main__': rabbit_consumer = RabbitServer() rabbit_consumer.consume()
生产者实现:
"""基于RabbitMQ实现RPC通信机制 --> 客户端"""import pikaimport uuidimport timeclass RabbitClient(object): def __init__(self): # 与RabbitMq服务器建立连接 self.conn = pika.BlockingConnection( pika.ConnectionParameters(host='localhost', port=5672) ) self.channel = self.conn.channel() # 声明一个exchange交换机,交换机的类型为topic self.channel.exchange_declare( exchange='logs_rpc', exchange_type='topic', durable=True ) # 声明一个回调队列,用于接受RPC回调结果的运行结果 result = self.channel.queue_declare(durable=True, exclusive=False) self.call_queue = result.method.queue # 从回调队列当中获取运行结果. self.channel.basic_consume( consumer_callback=self.on_response, queue=self.call_queue, no_ack=False ) def on_response(self, channel, method, properties, body): """ 对收到的消息进行确认 找到correlation_id与服务端的消息标识匹配的消息结果 :param channel: :param method: :param properties: :param body: :return: """ if self.corr_id == properties.correlation_id: self.response = body print('斐波那契数列的RPC返回结果是:{}'.format(body)) print('相关属性信息:') print(properties) self.channel.basic_ack(delivery_tag=method.delivery_tag) def send_msg(self, routing_key, message): """ exchange交换机将根据消息的路由键将消息路由到对应的queue当中 :param routing_key: 消息的路由键 :param message: 生成者发送的消息 :return: """ self.response = None self.corr_id = str(uuid.uuid4()) self.channel.basic_publish( exchange='logs_rpc', routing_key=routing_key, body=message, properties=pika.BasicProperties( delivery_mode=2, correlation_id=self.corr_id, reply_to=self.call_queue, )) while self.response is None: print('等待远程服务端的返回结果...') self.conn.process_data_events() # 非阻塞式的不断获取消息. return self.response def close(self): self.conn.close()if __name__ == "__main__": rabbit_producer = RabbitClient() routing_key = 'hello every one' start_time = int(time.time()) for item in range(2000): num = str(item) print('生产者发送的消息为:{}'.format(num)) rabbit_producer.send_msg(routing_key, num) end_time = int(time.time()) print("耗时{}s".format(str(end_time - start_time)))
计算2000以内的斐波拉契数列,执行结果如下: