Push and Pub/Sub in Ruby

Building real time features in Rails has gotten a lot easier to do with libraries like Action Cable . In this episode of AppSignal Academy , we'll dive into real time updating and toy around with building a minimal WebSocket server to look at how it works under the hood.

We'll be building an application that pushes data, and uses Pub/Sub over a WebSocket . Before we start on the code, let's first spend a little time covering what those three concepts mean:

  • Push refers to pushing data to a receiver instead of having the receiver poll for that data. A must have in realtime updates like stock prices, chat applications, or operations consoles.

  • Pub/Sub or publish and subscribe is an interaction model for pushing data made popular by TIBCO on Wall Street in the 1990s. A receiver subscribes to a subject and waits for a publisher to push data to that subject. It is common to include wildcard pattern matching to match a published message to a listener, although some simpler implementations only use named channels instead of wildcards in the subject. I got started at TIBCO in those early days so I like the flexibility of the wildcard pattern matching.

  • WebSocket is a protocol for exchanging data - usually between a web browser and an application. An HTTP connection is upgraded to a WebSocket connection and then data can be sent both ways between the two endpoints. WebSockets can push data from an application to the browser. It also provides another mechanism other than a POST or PUT for sending data from your JavaScript code in the browser back to the application. Pretty nice, huh?

Under the Hood

Let's look at how an example of a WebSocket server could work. From a browser, the client tries to make a WebSocket connection to a server with JavaScript code.

var sock = new WebSocket("ws://" + document.URL.split('/')[2] + '/upgrade');

The server receives an HTTP request with an indicator that an upgrade was requested. Generally, the server lets an application decide on whether to upgrade or not. How it does that depends on the API provided to the app. A server that supports Rack provides an option to hijack the socket and let the developer handle all the protocol details or, according to a proposed PR , a response to upgrade is enough.

The upgrade is a set of exchanges between the server and the client. All browsers and some server gems hide these details. Once the connection is established, messages can be exchanged following the WebSocket protocol.

The magic under the hood handles encoding, decoding, and the message exchange protocol. Messages are binary, fixed-width structures with a trailing payload, encrypted using SHA1. The WebSocket protocol includes several message types and exchanges, like ping/pong heartbeats and opening and closing message exchanges. That's the magic the servers perform by not using the connection-hijack approach.

Diving In

We'll use the example of a clock thread that is started to publish the current time to all listening clients. We'll use Agoo to build our server because it's fast and keeps the complexity to a minimum.

We will start out with some JavaScript as a client by showing the current time on an HTML page. After creating a new WebSocket an onopen callback is set that changes the status HTML element. The onmessage callback updates the message HTML element. Callbacks are a common design pattern when working with asynchronous calls such as publish and subscribe exchanges.

<!-- websocket.html -->
<html>
  <body>
    <p id="status"> ... </p>
    <p id="message"> ... waiting ... </p>

    <script type="text/javascript">
      var sock = new WebSocket("ws://" + document.URL.split('/')[2] + '/upgrade');
      sock.onopen = function() {
          document.getElementById("status").textContent = "connected";
      }
      sock.onmessage = function(msg) {
          document.getElementById("message").textContent = msg.data;
      }
    </script>
  </body>
</html>

With our client done, let's implement the server, which is a Ruby application using the Rack API. The Clock class itself will be a handler for all HTTP requests on the /upgrade path. If the request is for an upgrade we simply return Success with an HTTP status code of 200, otherwise, we return a 404 for Page Not Found. The only other step in the #call method is the assignment of the WebSocket handler.

class Clock
  def self.call(env)
    unless env['rack.upgrade?'].nil?
      env['rack.upgrade'] = Clock
      [ 200, { }, [ ] ]
    else
      [ 404, { }, [ ] ]
    end
  end
end

The API is based on callbacks. The only callback we care about for our server is the #on_open callback which enables us to create a subscription to the "time" subject. Messages are exchanged over channels identified by subjects or topics. #on_open is called when a web socket connection is established.

class Clock
  # ...

  def self.on_open(client)
    client.subscribe('time')
  end
end

Now, let's start publishing with a thread that publishes the time after every second. The call to Agoo.publish sends a message on the "time" subject, then all subscribers receive the message. The server keeps track of the subscriptions and connections and delivers the message to the JavaScript client which updates the HTML element.

require 'agoo'

Thread.new {
  loop do
    now = Time.now
    Agoo.publish('time', "%02d:%02d:%02d" % [now.hour, now.min, now.sec])
    sleep(1)
  end
}

The only other code needed is the code that initializes and starts the server. The call to Agoo::Server.handle(:GET, '/upgrade', Clock) tells the server to listen for HTTP GET requests on the /upgrade URL path and to pass those requests to the Clock class. This allows the routing to occur outside Ruby for improved performance and flexibility.

Agoo::Server.init(6464, '.', thread_count: 0)
Agoo::Server.handle(:GET, '/upgrade', Clock)
Agoo::Server.start

We are almost there. Run the server with this command.

$ ruby pubsub.rb

A log entry should appear showing something like the following, indicating that the server is running and listening on port 6464.

I 2018/08/14 19:49:45.170618000 INFO: Agoo 2.5.0 with pid 40366 is listening on http://:6464.

Time To See If It's Working

Let's open http://localhost:6464/websocket.html . After an initial flicker as a connection is established, the connection status and the time should be displayed. The time will increment every second as the clock ticks.

connected

19:50:12

Congratulations on making a publish and subscribe web application ;-)

In today's episode, we looked at using WebSocket. Server Side Events (SSE) offers another option of doing the same, and we have included SSE in the full source code example . If you want to find out more, take a look at the Agoo server we used or Iodine WebSocket Server.

We'd love to know what you thought of this article, or if you have any questions. We're always on the lookout for topics to investigate and explain, so if there's anything magical in Ruby you'd like to read about, don't hesitate to leave a comment.

This post is written by guest author Peter Ohler . Peter creates quite bit of a high performance code, and writes about it too, every now and then. He made the Agoo gem, which is a pretty cool high performance HTTP server.