The RabbitMQ online tutorial Remote procedure call (RPC) – using the Java client demonstrates how RPC can be implemented between a Java client and a Java server process, communicating over the RabbitMQ open source message broker.

But access to the RPC server process is not limited to Java applications – Delphi and Free Pascal applications using the Habari Client for RabbitMQ library can invoke the server process method too.

This article shows the code for a Delphi client which sends a RPC call message to the inbound RabbitMQ request queue, and receives the response message over a temporary queue, which exists only for the duration of the connection.

This architecture has key differences between a traditional client/server based RPC architecture and a message broker based architecture:

  • the RPC client does not have to know where the RPC server is running (RPC client and RPC server only communicate with the message broker, not with each other)
  • the RPC client can send requests even if the RPC server is not running
  • additional computing power can be provided by running more RPC server processes (load balancing is “built-in” by design)

Delphi source code:

program RPCClient;

{$APPTYPE CONSOLE}

uses
  BTCommAdapterIndy, BTJMSConnection, BTJMSInterfaces,
  SysUtils;

function Fib(const Arg: Integer): Integer;
var
  Conn: IConnection;
  Session: ISession;
  ReplyQueue: IDestination;
  Producer: IMessageProducer;
  Consumer: IMessageConsumer;
  Request, Response: IMessage;
begin
  Result := -1;
  Conn := TBTJMSConnection.MakeConnection;
  try
    try
      Conn.Start;

      // create the session
      Session := Conn.CreateSession(False, amAutoAcknowledge);

      // prepare the reply queue
      ReplyQueue := Session.CreateTemporaryQueue;

      // prepare and send the request
      Producer := Session.CreateProducer(Session.CreateQueue('/amq/queue/rpc_queue'));
      Request := Session.CreateTextMessage(IntToStr(Arg));
      Request.JMSReplyTo := ReplyQueue;
      Request.JMSCorrelationID := FloatToStr(Now);
      Producer.Send(Request);

      // prepare and receive the response from the temp queue
      Consumer := Session.CreateConsumer(ReplyQueue);
      Response := Consumer.Receive(10000);
      if Response <> nil then
      begin
        Result := StrToInt((Response as ITextMessage).Text);
        Assert(Request.JMSCorrelationID = Response.JMSCorrelationID);
      end else begin
        WriteLn('No reply received');
      end;

    except
      on E: Exception do
      begin
        WriteLn(E.Message);
      end;
    end;
  finally
    Conn.Close;
  end;
end;

var
  Input, Output: Integer;

begin
  ReportMemoryLeaksOnShutdown := True;

  Input := 21;
  Writeln('[.] fib(' + IntToStr(Input) + ')');

  Output := Fib(Input);
  Writeln('[.] Result: ' + IntToStr(Output));

  ReadLn;
end.

A small change is needed in the tutorial code to support STOMP text messages:

                BasicProperties replyProps = new BasicProperties.Builder()
                        .correlationId(props.getCorrelationId())
                        .contentType("text/plain")
                        .build();

Server output:

run:
 [x] Awaiting RPC requests
 [.] fib(21)

Client output:

[.] fib(21)
[.] Result: 10946

Discover more from Habarisoft Blog

Subscribe to get the latest posts sent to your email.

Leave a Reply

Your email address will not be published. Required fields are marked *