[转帖]RabbitMQ学习笔记07:RPC

rabbitmq,学习,笔记,rpc · 浏览次数 : 0

小编点评

**生成内容时需要带简单的排版** **代码示例** ```python # client print(" [x] Requesting fib(30)\") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response) # server print(" [x] Awaiting RPC requests\") response = fibonacci_rpc.call(30) print(" [.] Got %r" % response) ``` **排版** ``` [x] Requesting fib(30) [.] Got 832040 [x] Awaiting RPC requests [.] Got 832040 ``` **其他** * 客户端需要判断消息的有效性,比如我们只允许接收100以内的数等等这类的边界检查机制。 * 服务端需要运行在后台,否则客户端会无法接收消息。 * 客户端是否应该针对RPC设置超时时长?如果服务器出现故障并且抛出了一个异常,那么这个异常是否应该被转发给客户端?

正文

 

 
https://www.cnblogs.com/alongdidi/p/rabbitmq_tutorial_six.html

  

参考资料:RabbitMQ tutorial - Remote procedure call (RPC) — RabbitMQ 

 

Remote Procedure Call

What this tutorial focuses on

在 RabbitMQ学习笔记03:Work Queues 中我们学会了如何使用work queue在多个worker进程中分发耗时的任务。

但是如果我们需要在远程的电脑上执行一个函数并等待其结果呢?那就是完全不同的情况了。这种模式我们称之为远程过程调用 Remote Procedure Call,简称RPC

在本博文中,我们将会使用RabbitMQ去构建一个RPC系统:一个客户端和一个可扩展的服务器。实际上由于我们并没有任何值得分发的耗时任务,因此我们将会创建一个假的RPC服务用于返回斐波那契数列。

 

Client interface

为了描述一个RPC服务是如何被使用的,我们将会创建一个简单的客户端类。它会暴露一个名叫call的方法,该方法会发送一个RPC请求并阻塞直到收到回答:

fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print("fib(4) is %r" % result)

A note on RPC

尽管RPC在计算机技术中是一种非常常见的模型,但是它经常备受批评。当一个程序员没有办法知道是否函数的调用是本地的或者如果它是一个慢RPC,那么问题就会产生。这样的困惑会导致系统变得不稳定以及增加非必要的排错复杂度。滥用RPC不仅不会简化软件,反而会导致出现无法维护的面条式代码 spaghetti code

永远将以下内容记在心里:

  • 哪些函数调用是本地的,哪些函数调用是远程的,这些一定要确保是清晰的。
  • 文档要做好,确保不同的组件之间的依赖关系是清晰的。
  • 处理错误案例。当一个RPC服务器挂了很长一段时间以后,客户端会是什么样的反应?

如果不确定自己是否要使用RPC。如果可以的话,可以尝试使用异步管道,结果会被异步地推送到下一个计算阶段。

 

Callback queue

一般来说,通过RabbitMQ来实现RPC是简单的。一个客户端发送请求消息,然后一个服务器回复响应消息。为了接收响应,客户端需要在请求的时候发出callback队列的地址。

result = channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                            reply_to = callback_queue,
                            ),
                      body=request)

# ... and some code to read a response message from the callback_queue ...

Message properties

AMQP 0-9-1 预先定义了14种消息属性可以伴随消息一起发送出去。大多数属性很少使用到,除了以下即可:

  • delivery_mode: 标记一个消息是永久或者临时存在的。我们在消息的持久化时有使用到。
  • content_type: 用于描述编码时的MIME类型。比如常见的JSON编码application/json
  • reply_to: 一般用于命名一个callback队列。
  • correlation_id: 用于关联RPC的请求和响应。

 

Correlation id

在上面列出的方法种我们建议每次RPC请求都创建一个callback队列。这样是非常低效的,更好的办法是基于客户端来创建callback队列而不是基于每次RPC请求。

这会带来一个新的问题,在callback队列中接收到的响应消息,我们不知道对应的是哪条请求消息。因此我们需要使用到correlation_id属性,我们会针对每条消息单独设一个该属性的唯一的取值,然后当我们在回调队列中收到消息的时候,我们会关注消息中的这个属性的值,如果correlation_id的值相同就表示它们匹配的是同一条消息的请求和响应。如果我们发现correlation_id属性的值是我们所不知道的,那么它就不属于我们的请求,我们就可以安全丢弃它们。

你可能会问,为什么我们要丢弃回调队列中未知的消息而不是报错?这是因为在服务器端可能会出现系统错乱的情况(race condition)。尽管可能性很小,存在一种可能在RPC服务器发送给我们回答之后至在发送请求的确认消息之前这段时间,RPC刚好宕机了。这种情况下,当RPC服务器重启之后,它会重新处理一次请求。这就是为什么在客户端我们必须优雅地处理重复的响应,而且RPC理想情况下应该是幂等的。

 

Summary

我们的RPC的工作流程:

  1. 当客户端启动的时候,它会创建一个匿名的独有的回调队列。
  2. 对于一次RPC请求,客户端会发送一条消息并伴随2个消息属性。reply_to用于指向回调队列的地址,correlation_id用于标志每次请求消息。
  3. 请求会被发往rpc_queue队列。
  4. RPC的服务器端会作为消费者在该队列上等待请求(消息)。一旦收到请求就会对其处理,然后将返回的消息发往reply_to所指向的回调对类中。返回的消息会包含correlation_id,其值和请求时保持一致。
  5. 客户端将会在回调队列上等待数据。如果有新的数据抵达,并且correlation_id和之前请求时发出的消息中的correlation_id值一致的话,就会处理这条响应消息;否则若不一致,则丢弃消息。

有意思的一点,在RPC的C/S架构中,客户端和服务器端均作为消息队列的生产者和消费者。

 

 

Putting it all together

rpc_server.py

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(
    pika.ConnectionParameters(host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id = \
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)

print(" [x] Awaiting RPC requests")
channel.start_consuming()

服务器端的代码很直接:

  1. 像之前一样,我们建立connection、channel,并创建名为rpc_queue的队列。
  2. 我们声明了一个斐波那契数列函数,它只能接受正整数。我们不要传递太大的数,否则程序会变得很慢。
  3. 我们定义了回调函数on_requestbasic_consume使用,这是RPC服务器的核心部分。当rpc_queue队列中有消息的时候它就会被执行,用于发送响应给客户端。
  4. 在服务器端我们可能会想要运行多个rpc_server.py进程,为了让空闲的进程可以立即处理请求,我们使用了prefetch_count

rpc_client.py

#!/usr/bin/env python
import pika
import uuid


class FibonacciRpcClient(object):

    def __init__(self):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(queue='', exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(
            queue=self.callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True)

        self.response = None
        self.corr_id = None

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(
            exchange='',
            routing_key='rpc_queue',
            properties=pika.BasicProperties(
                reply_to=self.callback_queue,
                correlation_id=self.corr_id,
            ),
            body=str(n))
        self.connection.process_data_events(time_limit=None)
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()

print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)

客户端的代码则会稍微有点复杂:

  1. 建立connection、channel,并创建名为随机独有的回调队列,将该队列的名字保存到callback_queue
  2. 我们订阅callback_queue,这样我们就可以接收RPC响应。
  3. 每次有响应消息回来的时候,on_response回调函数就会被执行,完成一个很简单的工作,对于每条响应的消息它会检查是否correlation_id是我们在找的那个。如果是的话,它会将响应保存在self.response并且离开消费循环。
  4. 接下来我们定义我们的主方法call——它来执行实际的RPC请求。
  5. call方法中我们生成了correlation_id并保存到self.corr_id中。on_response回调函数会基于这个值获取合适的响应。
  6. 同样在call方法中发布消息的时候,我们包含了2个属性reply_tocorrelation_id
  7. 最后我们等待合适的响应到达,然后将该消息返回给用户。

接下来就可以执行代码了,打开第一个终端运行python rpc_server.py,打开第二个终端运行python rpc_client.py

两个终端最终结果如下

# 第一个终端
[root@rabbitmq-01 code]# python rpc_server.py 
 [x] Awaiting RPC requests
 [.] fib(30)

# 第二个终端
[root@rabbitmq-01 code]# python rpc_client.py 
 [x] Requesting fib(30)
 [.] Got 832040

代码中所呈现出来的设计不是唯一可能的RPC服务的实现,但是它有一些重要的优点:

  • 如果RPC服务太慢的话我们可以通过横向扩展的方式,在新的console窗口中再运行一个rpc_server.py进程。
  • 在客户端方面,RPC规定仅发送和接收1条消息。没有要求像queue_declare这样的异步调用。这样的结果对于一次RPC请求,RPC客户端只需要一次网络来回(network round trip)。

这里的代码只是示例代码而已,过度简化了,有一些复杂且重要的问题没有解决,比如说:

  • 如果没有server端运行的话,那么客户端会是什么反应?
  • 客户端是否应该针对RPC设置超时时长?
  • 如果服务器出现故障并且抛出了一个异常,那么这个异常是否应该被转发给客户端?
  • 处理数据前是否判断消息的有效性,比如我们只允许接收100以内的数等等这类的边界检查机制。

 

最后,tutorial在这里提到了 management UI ,这是一个以Web(GUI)形式展示RabbitMQ数据以及提供一些操作的插件,蛮适合不会或者不经常写代码操作RabbitMQ的人员,比如运维工程师。但是像生产者和消费者这种,还是得通过实际的代码来实现的。由此也可以看出,很多开源软件的应用,想要入门并且学好、用好的话,其实运维和开发相关的知识是都需要掌握的。

与[转帖]RabbitMQ学习笔记07:RPC相似的内容:

[转帖]RabbitMQ学习笔记07:RPC

https://www.cnblogs.com/alongdidi/p/rabbitmq_tutorial_six.html 参考资料:RabbitMQ tutorial - Remote procedure call (RPC) — RabbitMQ Remote Procedure Call W

[转帖]RabbitMQ学习笔记01:初识与安装

https://www.cnblogs.com/alongdidi/p/rabbitmq_overview.html 原作者写的真好. 前言 本人是一名运维工程师,在此公司接触到 RabbitMQ ,平时针对此软件的工作内容就是集群的安装以及配置监控等,对其的理解也仅仅是知道其是一种消息队列的服务,

[转帖]RabbitMQ学习笔记02:Hello World!

参考资料:RabbitMQ tutorial - "Hello world!" — RabbitMQ 前言 RabbitMQ是一个中间人,它接受和转发消息。我们可以把它想象成一个邮局:当你把邮件投入邮箱的时候,你可以确信它最终会被投递到收件人的手中。RabbitMQ就是那个邮箱、邮局和邮差。区别就在

[转帖]RabbitMQ学习笔记03:Work Queues

参考资料:RabbitMQ tutorial - Work Queues — RabbitMQ 前言 这篇文章我们会创建一个Work Queue,它会在多个worker(即消费者 consumer)中分发耗时的任务。Work Queue也叫做Task Queue是为了避免当处理一个占用资源的任务时必

[转帖]RabbitMQ学习笔记04:Publish/Subscribe

参考资料:RabbitMQ tutorial - Publish/Subscribe — RabbitMQ 前言 在 RabbitMQ学习笔记03:Work Queues 中,每个进入队列中的消息只会被投递给一个消费者进程。而在这篇文章中,我们将会把一条消息同时投递给多个消费者进程。这种模式也叫做p

[转帖]RabbitMQ学习笔记05:Routing

参考资料:RabbitMQ tutorial - Routing — RabbitMQ 前言 在之前的文章中我们构建了一个简单的日志系统,它可以广播消息到多个消费者中。 在这篇文章中,我们打算实现仅订阅消息的子集(即不是所有的消息,仅仅只是一部分消息。注意,这里不是说一条消息的一部分)。例如我们只会

[转帖]RabbitMQ学习笔记06:Topics

https://www.cnblogs.com/alongdidi/p/rabbitmq_tutorial_five.html 前言 在上一篇博文中我们使用direct类型的exchange改善了我们的日志系统,但是它仍然有一定的限制,它没有办法基于多个条件路由消息。 我们可能不仅仅希望基于日志级别

[转帖]RabbitMQ:Exchange的Fanout类型的介绍和使用

https://blog.csdn.net/weixin_45492007/article/details/106095591 1.声明 当前的内容用于本人学习和使用Fanout类型的Exchange,主要理解其主要作用 2.Fanout Exchange的官方介绍 扇出交换机将消息路由到与其绑定的

[转帖]RabbitMQ服务优化,修改最大连接数

https://www.cnblogs.com/hoyeong/p/16242202.html RabbitMQ的优化RabbitMQ的连接数是压垮消息队列的一个重要的指标。所以在平时使用OpenStack平台的过程中,如果大量的用户同时创建虚拟机,会导致云平台创建报错,其实就是消息队列服务的崩溃。

[转帖]RabbitMQ性能优化

https://www.cnblogs.com/zhengchunyuan/p/9253728.html 修改rabbitmq.config文件 rabbitmq.config文件时rabbitmq的配置文件,他遵守Erlang配置文件定义。 rabbitmq.config文件位置: Unix $R