STOMP
Thursday, May 9, 2024
Over the last few days, I implemented some support for STOMP, the “Simple Text Oriented Messaging Protocol”:
STOMP provides an interoperable wire format so that STOMP clients can communicate with any STOMP message broker to provide easy and widespread messaging interoperability among many languages, platforms and brokers.
There are three versions of the protocol available:
In the interest of learning Factor, I thought I would write a bit about parsing the STOMP protocol, and then about how to implement a client library using connection-oriented networking, interacting with it using mailboxes, and then building a command-line interface using the command-loop vocabulary.
There are many STOMP servers and clients available in different languages. I tried a few and decided that Apache ActiveMQ was one of the most convenient to setup and reliable to work with, but others are available as well. On my macOS laptop, this can be accomplished by:
$ brew install activemq
$ brew services start activemq
Protocol
The STOMP protocol consists of frames that are sent and received between the STOMP client and the STOMP server. Each frame consists of a command, some headers, and a body:
TUPLE: frame command headers body ;
: <frame> ( command -- frame ) LH{ } clone f frame boa ;
: set-header ( frame header-value header-name -- frame )
pick headers>> set-at ;
An example SEND
message might look like this, with the ^@
indicating a
NUL
byte to end the message:
SEND
destination:/queue/a
hello queue a
^@
We will begin by implementing words to read each of these sections. The
command is the first line, followed by a series of name:value
headers
before a blank line, and then a body (specified either by a
content-length
header, or reading until a NUL
byte is encountered):
: read-command ( -- command )
readln ;
: read-headers ( -- headers )
[ readln dup empty? not ] [ ":" split1 2array ] produce nip ;
: read-body ( content-length/f -- body )
[ read read1 ] [ B{ 0 } read-until ] if* 0 assert= ;
And then implement a read-frame
word that uses those to build up a
frame
tuple:
: read-frame ( -- frame )
read-command
read-headers
dup "content-length" of string>number
read-body frame boa ;
We can implement a write-frame
word writing it out in the expected
structure:
: write-frame ( frame -- )
[ command>> print ]
[ headers>> [ ":" swap [ write ] tri@ nl ] assoc-each nl ]
[ body>> [ write ] when* 0 write1 ] tri flush ;
The CONNECT
frame is typically the first one sent to the server:
SYMBOL: stomp-username
SYMBOL: stomp-password
: stomp-connect ( -- frame )
"CONNECT" <frame>
stomp-username get [ "login" set-header ] when*
stomp-password get [ "passcode" set-header ] when* ;
The server will respond to CONNECT
with a CONNECTED
frame, which we
can wait for:
: wait-for-connected ( -- frame )
f [ drop read-frame dup command>> "CONNECTED" = not ] loop ;
: stomp-connect-and-wait ( -- frame )
stomp-connect write-frame wait-for-connected ;
The SEND
frame contains a body that is sent to a destination:
:: stomp-send ( destination body -- frame )
"SEND" <frame>
body >>body
destination "destination" set-header ;
The SEND
frame can also be used to send a file (using MIME
types for
automatic content encoding):
:: stomp-sendfile ( destination path -- frame )
"SEND" <frame>
destination "destination" set-header
path dup mime-type
[ mime-type-encoding file-contents >>body ]
[ "content-type" set-header ] bi ;
The destination is a message queue, which we can SUBSCRIBE
or
UNSUBSCRIBE
from:
:: stomp-subscribe ( destination -- frame )
"SUBSCRIBE" <frame>
destination "destination" set-header ;
:: stomp-unsubscribe ( destination -- frame )
"UNSUBSCRIBE" <frame>
destination "destination" set-header ;
There are also words to support transactions (BEGIN
, COMMIT
,
ABORT
), indicate message receipt (ACK
, NACK
), as well as to
DISCONNECT
from the server.
Client
Using those words, we can move on to the networking component that will enable connecting to a STOMP server and interacting with it. An inner loop takes a mailbox that a client can use to enqueue frames to be sent, and a quotation that will be called with each received frame from the server.
:: stomp-loop ( mailbox quot: ( frame -- ) -- )
stomp-connect-and-wait drop
[ mailbox mailbox-get write-frame t ]
"stomp writer" spawn-server drop
[ read-frame quot call t ] loop ; inline
The client library can use this to connect and print out any received frames, for example:
"127.0.0.1" 61613 utf8 [
<mailbox> [ [ . flush ] with-global ] stomp-loop
] with-client
Command-Line
To build a simple command-line interface, we first need to create our mailbox and store a reference to it:
CONSTANT: stomp-mailbox $[ <mailbox> ]
And then make a word to put frames into it:
: put-frame ( frame -- )
stomp-mailbox mailbox-put ;
Using the command-loop vocabulary, we can define some supported commands:
CONSTANT: COMMANDS {
T{ command
{ name "send" }
{ quot [ " " split1 stomp-send put-frame ] }
{ help "Send a message to a destination in the messaging system." } }
T{ command
{ name "sendfile" }
{ quot [ " " split1 stomp-sendfile put-frame ] }
{ help "Send a file to a destination in the messaging system." } }
T{ command
{ name "subscribe" }
{ quot [ stomp-subscribe put-frame ] }
{ help "Subscribe to a destination." } }
T{ command
{ name "unsubscribe" }
{ quot [ stomp-unsubscribe put-frame ] }
{ help "Unsubscribe from a destination." } }
}
Before we run our command-loop, we start a thread to connect to the STOMP server, and configure it to send frames queued in the mailbox and print out the frames that are received:
INITIALIZED-SYMBOL: stomp-host [ "127.0.0.1" ]
INITIALIZED-SYMBOL: stomp-port [ 61613 ]
: start-stomp-client ( -- )
[
stomp-host get stomp-port get <inet4> utf8 [
stomp-mailbox [ [ nl . flush ] with-global ] stomp-loop
] with-client
] in-thread ;
And then a simple word to start the command-loop:
: stomp-main ( -- )
"Welcome to STOMP!" "STOMP>" <command-loop>
COMMANDS [ over add-command ] each
start-stomp-client run-command-loop ;
MAIN: stomp-main
And then you can try running it!
$ ./factor -run=stomp.cli
Welcome to STOMP!
STOMP> subscribe /queue/test
STOMP> send /queue/test hello world
T{ frame
{ command "MESSAGE" }
{ headers
{
{ "expires" "0" }
{ "destination" "/queue/test" }
{ "subscription" "1" }
{ "priority" "4" }
{
"message-id"
"ID\\chostname-59660-1715273573218-3\\c3\\c-1\\c1\\c1"
}
{ "timestamp" "1715276926454" }
}
}
{ body "hello world" }
}
In addition to this, support was added for all the other client messages, all three versions of the STOMP protocol, automatic heartbeats, graceful disconnect using message receipts, words to make using transactions easier, as well as a debug mode to print sent and received frames from the network, and command-line options for configuring the hostname, port, username, and passwords.
As always, there are a few things it would be nice to add – for example:
better support for SSL/TLS connections, automatic reconnect attempts with
backoff algorithm, better display of connection status in the command-line, and
additional protocol-level support like automatic generation of ACK
and
NACK
, and testing with additional STOMP compliant message brokers.
This is available in the development version. Take a look!