The RabbitMQ online tutorial Remote procedure call (RPC) – using the Java client demonstrates how RPC can be implemented between a Java client and a Java server process, communicating over the RabbitMQ open source message broker.
But access to the RPC server process is not limited to Java applications – Delphi and Free Pascal applications using the Habari Client for RabbitMQ library can invoke the server process method too.
This article shows the code for a Delphi client which sends a RPC call message to the inbound RabbitMQ request queue, and receives the response message over a temporary queue, which exists only for the duration of the connection.
This architecture has key differences between a traditional client/server based RPC architecture and a message broker based architecture:
- the RPC client does not have to know where the RPC server is running (RPC client and RPC server only communicate with the message broker, not with each other)
- the RPC client can send requests even if the RPC server is not running
- additional computing power can be provided by running more RPC server processes (load balancing is “built-in” by design)
Delphi source code:
program RPCClient;
{$APPTYPE CONSOLE}
uses
BTCommAdapterIndy, BTJMSConnection, BTJMSInterfaces,
SysUtils;
function Fib(const Arg: Integer): Integer;
var
Conn: IConnection;
Session: ISession;
ReplyQueue: IDestination;
Producer: IMessageProducer;
Consumer: IMessageConsumer;
Request, Response: IMessage;
begin
Result := -1;
Conn := TBTJMSConnection.MakeConnection;
try
try
Conn.Start;
// create the session
Session := Conn.CreateSession(False, amAutoAcknowledge);
// prepare the reply queue
ReplyQueue := Session.CreateTemporaryQueue;
// prepare and send the request
Producer := Session.CreateProducer(Session.CreateQueue('/amq/queue/rpc_queue'));
Request := Session.CreateTextMessage(IntToStr(Arg));
Request.JMSReplyTo := ReplyQueue;
Request.JMSCorrelationID := FloatToStr(Now);
Producer.Send(Request);
// prepare and receive the response from the temp queue
Consumer := Session.CreateConsumer(ReplyQueue);
Response := Consumer.Receive(10000);
if Response <> nil then
begin
Result := StrToInt((Response as ITextMessage).Text);
Assert(Request.JMSCorrelationID = Response.JMSCorrelationID);
end else begin
WriteLn('No reply received');
end;
except
on E: Exception do
begin
WriteLn(E.Message);
end;
end;
finally
Conn.Close;
end;
end;
var
Input, Output: Integer;
begin
ReportMemoryLeaksOnShutdown := True;
Input := 21;
Writeln('[.] fib(' + IntToStr(Input) + ')');
Output := Fib(Input);
Writeln('[.] Result: ' + IntToStr(Output));
ReadLn;
end.
A small change is needed in the tutorial code to support STOMP text messages:
BasicProperties replyProps = new BasicProperties.Builder()
.correlationId(props.getCorrelationId())
.contentType("text/plain")
.build();
Server output:
run: [x] Awaiting RPC requests [.] fib(21)
Client output:
[.] fib(21) [.] Result: 10946