« HOME

A better PubSub with Redis in Erlang

By Nacho Martín on 19 January 2015

In this this post we will see how to work with publish-subscribe pattern with Redis in Erlang using the recommended client eredis, the design restrictions it imposes, and how to overcome them. For that, I have written a layer on top of eredis: eredis_smart_sub.

The publish/subscribe pattern is a messaging paradigm where a publisher processes publish messages into channels. Other processes, called subscribers, subscribe to these channels and receive the messages.

In Redis, A client may publish messages while operating normally with other Redis commands. But if a client subscribes to a channel, it goes into a special mode. In this mode it is restricted to two operations: subscribe to more channels or unsubscribe from channels it was already subscribed to.

PubSub with eredis

Mimicking the two Redis modes (normal and subscribe), the recommended Erlang client eredis, has two different kinds of clients:

Here is how you publish using normal mode:

{ok, P} = eredis:start_link(),
eredis:q(P, ["PUBLISH", "foo", "bar"]),
eredis_client:stop(P).

Which is quite normal behaviour.

And this is how you subscribe to the channel foo using eredis_sub:

{ok, Sub} = eredis_sub:start_link(),
eredis_sub:controlling_process(Sub, self()),
eredis_sub:subscribe(Sub, [<<"foo">>]),
receive
    Msg ->
        io:format("received ~p~n", [Msg]),
        eredis:ack_message(Sub),
end.

Of course if you are in a gen_server you can use handle_info instead of using receive.

The point here is that, for each eredis_sub client, you call eredis_sub:controlling_process(Sub, self()) to assign a controlling process to that client. This controlling process is the process that will receive all the messages.

However, it is very likely that in an Erlang environment you have multiple processes wanting to subscribe to different channels. In this is the case, since you can only have one controlling process per client, you have two options:

1- To open a Redis connection for each process that wants to subscribe to channels. You create a client for every process and make that process controlling_process of that client. That is:

+--------------------+  +--------------------+  +--------------------+
|                    |  |                    |  |                    |
| controller_process |  | controller_process |  | controller_process |
|                    |  |                    |  |                    |
+----------+---------+  +----------+---------+  +----------+---------+
           |                       |                       |          
           |                       |                       |          
    +------+-----+          +------+-----+          +------+-----+    
    |            |          |            |          |            |    
    | eredis_sub |          | eredis_sub |          | eredis_sub |    
    |            |          |            |          |            |    
    +------+-----+          +------+-----+          +------+-----+    
           |                       |                       |          
           |                  +----+----+                  |          
           |                  |         |                  |          
           +------------------+  redis  +------------------+          
                              |         |                             
                              +---------+                             

However, this approach has some issues:

Which brings us to the second solution:

2- to write a thin layer on top of eredis_sub that will be the controlling process, receiving all the subscription and unsubscription requests from the interested processes, and all the messages, and will send the messages back to the subscribers of each channel. This also resembles the behaviour of RabbitMQ queues in erlang, where the process that wants to consume from a particular queue receives the messages of that queue, but can share the connection with other processes that want to use RabbitMQ.

Graphically:

+--------------+  +--------------+  +--------------+
|              |  |              |  |              |
| your process |  | your process |  | your process |
|              |  |              |  |              |
+-------+------+  +-------+------+  +-------+------+
        |                 |                 |
        |                 |                 |
        |      +----------+---------+       |
        |      |                    |       |
        +------+  eredis_smart_sub  +-------+
               |                    |
               +----------+---------+
                          |
                          |
                   +------+-----+
                   |            |
                   | eredis_sub |
                   |            |
                   +------+-----+
                          |
                          |
                     +----+----+
                     |         |
                     |  redis  |
                     |         |
                     +---------+

eredis_smart_sub is a proposal for this layer.

eredis_smart_sub

eredis_smart_sub has a register of which processes are subscribed to channels, and will send back the messages to them, It is smart with issuing SUBSCRIBE commands to Redis, when a process is the first one to subscribe to a channel, and issuing UNSUBSCRIBE when the process wanting to unsubscribe is the only one left subscribed to that channel.

{ok, EredisSubClient} = eredis_sub:start_link(),
% Start eredis_smart_sub with the eredis_sub client
% eredis_smart_sub will be assigned as the controller
% process of the eredis_sub client
{ok, SubClient} = eredis_smart_sub:start_link(EredisSubClient)

And then, from any process that wants to subscribe to a list of channels:

% Subscribe
gen_server:cast(SubClient, {subscribe, [<<"channel1">>], self()}),

% Do something with received messages, like:
Message = receive
   {message, M} -> M
end.

eredis_smart_sub will monitor the processes that are subscribed to channels, so if any dies it will issue the corresponding UNSUSCRIBE messages to Redis if the process was the only subscriber left to a channel. Still, it is a good idea to unsubscribe if you know that the process is done with the channel. It is cleaner and cheaper:

% Unsubscribe when finished
gen_server:cast(SubClient, {unsubscribe, [<<"channel1">>], self()}),

Written by @nacmartin

blog comments powered by Disqus

» ALL POSTS