一、概述
前面几篇文章讲述的内容都是单向的消息传递,生产者将消息发送给消费者之后就不再管后续的业务处理了。实际业务中,有的时候我们还需要等待消费者返回结果给我们,或者是说我们需要消费者上的一个功能、一个方法或是一个接口返回给我们相应的值,而往往大型的系统软件,生产者跟消费者之间都是相互独立的两个系统,部署在两个不同的电脑上,不能通过直接对象.方法的形式获取想要的结果,这时候我们就需要用到RPC(Remote Procedure Call)远程过程调用方式。
RabbitMQ实现RPC的方式很简单,生产者发送一条带有标签(消息ID(correlation_id)+回调队列名称)的消息到发送队列,消费者(也称RPC服务端)从发送队列获取消息并处理业务,解析标签的信息将业务结果发送到指定的回调队列,生产者从回调队列中根据标签的信息获取发送消息的返回结果。
如图,客户端C发送消息,指定消息的ID=rpc_id,回调响应的队列名称为rpc_resp,消息从C发送到rpc_request队列,服务端S获取消息业务处理之后,将correlation_id附加到响应的结果发送到指定的回调队列rpc_resp中,客户端从回调队列获取消息,匹配与发送消息的correlation_id相同的值为消息应答结果。
RabbitMQ官网的示例是客户端通过RPC方式调用服务端获取斐波那契数列的值,我们举个简单的例子,客户端通过RPC调用获取服务端求平方的方法返回值。即客户端发送消息4,服务端返回4的平方16
二、几个简单的概念
2.1回调队列
前文说到,客户端发送消息到服务端之后,要接收返回结果,存放返回结果的队列叫做回调队列,客户端发送消息之后阻塞监听该队列返回的消息。我们可以使用随机队列命名,也可以指定队列的名称,同时,我们可以一个消息建立一个随机队列,但是通常考虑资源使用的情况,我们一般一个消费者建立一个指定的回调队列。
2.2消息属性
AMQP协议预定了一组14个消息属性(Message Properties),常用的有如下四种消息属性
1、deliveryMode:标记消息传递模式,为2时表示持久化消息,其它值不做持久化,在前面文章讲到消息持久化使用的PERSITNAT_TEXT_PLAIN时提到过
2、contentType:内容类型,用于描述内容编码,如json
3、replyTo:应答,指定的通用的回调队列名称
4、correlationId:关联ID,指定消息的标签,方便关联RPC的请求与响应
上述四个属性我们使用了replyTo和correlationId属性,同时因为RPC调用是具有幂等性的,所以我们可以忽视不属于我们应该获得到的correlationId。
三、示例代码
我们梳理一下文章开始提到的简单示例,我们的RPC处理流程如下
1、客户端启动,创建请求队列rpc_request和回调队列rpc_resp
2、客户端为我们的消息请求设置两个消息属性correlationId关联ID和replyTo回调队列(rpc_resp)
3、将请求发送到rpc_request队列
4、RPC服务端监听rpc_request队列中的请求,获取消息处理业务,并把带有接收消息的correlationId的返回结果消息返回到指定回调队列(rpc_resp)
5、客户端监听rpc_resp回调队列,如果有消息,匹配correlationId,如果和请求消息的相同,那么这个消息就是返回的响应结果了。
客户端代码MqRpcClient,相关说明已经写在注释中
1 | package com.cn.chenxyt.mq; |
服务端代码MqRpcServer
1 | package com.cn.chenxyt.mq; |
启动客户端、启动服务端,可以看到控制台打出的结果,符合我们的预期结果,管理台也新建了两条队列。
四、总结
综上,RabbitMQ的RPC调用方式就是形成了两条队列,两个客户端(服务端相互监听),需要注意的是,如前篇所述,如果开启手动回复,要记得在代码中手动回复ACK。