Re: Factor

Factor: the language, the theory, and the practice.

STOMP

Thursday, May 9, 2024

#command-line #networking #parsing

Over the last few days, I implemented some support for STOMP, the “Simple Text Oriented Messaging Protocol”:

STOMP provides an interoperable wire format so that STOMP clients can communicate with any STOMP message broker to provide easy and widespread messaging interoperability among many languages, platforms and brokers.

There are three versions of the protocol available:

In the interest of learning Factor, I thought I would write a bit about parsing the STOMP protocol, and then about how to implement a client library using connection-oriented networking, interacting with it using mailboxes, and then building a command-line interface using the command-loop vocabulary.

There are many STOMP servers and clients available in different languages. I tried a few and decided that Apache ActiveMQ was one of the most convenient to setup and reliable to work with, but others are available as well. On my macOS laptop, this can be accomplished by:

$ brew install activemq

$ brew services start activemq

Protocol

The STOMP protocol consists of frames that are sent and received between the STOMP client and the STOMP server. Each frame consists of a command, some headers, and a body:

TUPLE: frame command headers body ;

: <frame> ( command -- frame ) LH{ } clone f frame boa ;

: set-header ( frame header-value header-name -- frame )
    pick headers>> set-at ;

An example SEND message might look like this, with the ^@ indicating a NUL byte to end the message:

SEND
destination:/queue/a

hello queue a
^@

We will begin by implementing words to read each of these sections. The command is the first line, followed by a series of name:value headers before a blank line, and then a body (specified either by a content-length header, or reading until a NUL byte is encountered):

: read-command ( -- command )
    readln ;

: read-headers ( -- headers )
    [ readln dup empty? not ] [ ":" split1 2array ] produce nip ;

: read-body ( content-length/f -- body )
    [ read read1 ] [ B{ 0 } read-until ] if* 0 assert= ;

And then implement a read-frame word that uses those to build up a frame tuple:

: read-frame ( -- frame )
    read-command
    read-headers
    dup "content-length" of string>number
    read-body frame boa ;

We can implement a write-frame word writing it out in the expected structure:

: write-frame ( frame -- )
    [ command>> print ]
    [ headers>> [ ":" swap [ write ] tri@ nl ] assoc-each nl ]
    [ body>> [ write ] when* 0 write1 ] tri flush ;

The CONNECT frame is typically the first one sent to the server:

SYMBOL: stomp-username
SYMBOL: stomp-password

: stomp-connect ( -- frame )
    "CONNECT" <frame>
        stomp-username get [ "login" set-header ] when*
        stomp-password get [ "passcode" set-header ] when* ;

The server will respond to CONNECT with a CONNECTED frame, which we can wait for:

: wait-for-connected ( -- frame )
    f [ drop read-frame dup command>> "CONNECTED" = not ] loop ;

: stomp-connect-and-wait ( -- frame )
    stomp-connect write-frame wait-for-connected ;

The SEND frame contains a body that is sent to a destination:

:: stomp-send ( destination body -- frame )
    "SEND" <frame>
        body >>body
        destination "destination" set-header ;

The SEND frame can also be used to send a file (using MIME types for automatic content encoding):

:: stomp-sendfile ( destination path -- frame )
    "SEND" <frame>
        destination "destination" set-header
        path dup mime-type
        [ mime-type-encoding file-contents >>body ]
        [ "content-type" set-header ] bi ;

The destination is a message queue, which we can SUBSCRIBE or UNSUBSCRIBE from:

:: stomp-subscribe ( destination -- frame )
    "SUBSCRIBE" <frame>
        destination "destination" set-header ;

:: stomp-unsubscribe ( destination -- frame )
    "UNSUBSCRIBE" <frame>
        destination "destination" set-header ;

There are also words to support transactions (BEGIN, COMMIT, ABORT), indicate message receipt (ACK, NACK), as well as to DISCONNECT from the server.

Client

Using those words, we can move on to the networking component that will enable connecting to a STOMP server and interacting with it. An inner loop takes a mailbox that a client can use to enqueue frames to be sent, and a quotation that will be called with each received frame from the server.

:: stomp-loop ( mailbox quot: ( frame -- ) -- )
    stomp-connect-and-wait drop

    [ mailbox mailbox-get write-frame t ]
    "stomp writer" spawn-server drop

    [ read-frame quot call t ] loop ; inline

The client library can use this to connect and print out any received frames, for example:

"127.0.0.1" 61613 utf8 [
    <mailbox> [ [ . flush ] with-global ] stomp-loop
] with-client

Command-Line

To build a simple command-line interface, we first need to create our mailbox and store a reference to it:

CONSTANT: stomp-mailbox $[ <mailbox> ]

And then make a word to put frames into it:

: put-frame ( frame -- )
    stomp-mailbox mailbox-put ;

Using the command-loop vocabulary, we can define some supported commands:

CONSTANT: COMMANDS {
    T{ command
        { name "send" }
        { quot [ " " split1 stomp-send put-frame ] }
        { help "Send a message to a destination in the messaging system." } }
    T{ command
        { name "sendfile" }
        { quot [ " " split1 stomp-sendfile put-frame ] }
        { help "Send a file to a destination in the messaging system." } }
    T{ command
        { name "subscribe" }
        { quot [ stomp-subscribe put-frame ] }
        { help "Subscribe to a destination." } }
    T{ command
        { name "unsubscribe" }
        { quot [ stomp-unsubscribe put-frame ] }
        { help "Unsubscribe from a destination." } }
}

Before we run our command-loop, we start a thread to connect to the STOMP server, and configure it to send frames queued in the mailbox and print out the frames that are received:

INITIALIZED-SYMBOL: stomp-host [ "127.0.0.1" ]
INITIALIZED-SYMBOL: stomp-port [ 61613 ]

: start-stomp-client ( -- )
    [
        stomp-host get stomp-port get <inet4> utf8 [
            stomp-mailbox [ [ nl . flush ] with-global ] stomp-loop
        ] with-client
    ] in-thread ;

And then a simple word to start the command-loop:

: stomp-main ( -- )
    "Welcome to STOMP!" "STOMP>" <command-loop>
    COMMANDS [ over add-command ] each
    start-stomp-client run-command-loop ;

MAIN: stomp-main

And then you can try running it!

$ ./factor -run=stomp.cli
Welcome to STOMP!
STOMP> subscribe /queue/test
STOMP> send /queue/test hello world
T{ frame
    { command "MESSAGE" }
    { headers
        {
            { "expires" "0" }
            { "destination" "/queue/test" }
            { "subscription" "1" }
            { "priority" "4" }
            {
                "message-id"
                "ID\\chostname-59660-1715273573218-3\\c3\\c-1\\c1\\c1"
            }
            { "timestamp" "1715276926454" }
        }
    }
    { body "hello world" }
}

In addition to this, support was added for all the other client messages, all three versions of the STOMP protocol, automatic heartbeats, graceful disconnect using message receipts, words to make using transactions easier, as well as a debug mode to print sent and received frames from the network, and command-line options for configuring the hostname, port, username, and passwords.

As always, there are a few things it would be nice to add – for example: better support for SSL/TLS connections, automatic reconnect attempts with backoff algorithm, better display of connection status in the command-line, and additional protocol-level support like automatic generation of ACK and NACK, and testing with additional STOMP compliant message brokers.

This is available in the development version. Take a look!