Implementing partial pub-sub with ZeroMQ in 15 minutes

I thoroughly enjoyed reading their documentation. This couple of lines stuck at the back of my brains:

If you’ve done any work with threads, protocols, or networks, you’ll realize this is pretty much impossible. It’s a dream. Even connecting a few programs across a few sockets is plain nasty when you start to handle real life situations. Trillions? The cost would be unimaginable. Connecting computers is so difficult that software and services to do this is a multi-billion dollar business.

In this short tutorial, we create a simple, yet incomplete publish-subscribe implementation using ZeroMQ.

The code is as follows:

1. Create a new C# Console project.
2. Use NuGet to add a reference to ZeroMQ C# binding called (clrzmq).
3. Change your Program.cs to contain the following code:

class ZeroMqPublisher
    {
        static void Main(string[] args)
        {
            var context = new Context();  // can specify number of threads
            var processName = Process.GetCurrentProcess().Id;
            
            //1. create socket
            using (var socket = context.Socket(SocketType.PUB))
            {
                //2. connect socket to port
                socket.Connect("tcp://localhost:5555");

                while (true)
                {
                    var stopWatch = new Stopwatch();
                    stopWatch.Start();


                    for (var i = 0; i < 1000; i++)
                    {
                        var message = string.Format("{0}-{1}: msg:{2} hello world ", processName, DateTime.UtcNow.Ticks, i);
                        SendMessage(socket, message);
                        Thread.Sleep(10);
                    }

                    stopWatch.Stop();
                    Console.ReadLine();

                    Console.WriteLine("====================================================");
                    Console.WriteLine("[x] done sending 1000 messages in " + stopWatch.ElapsedMilliseconds);
                    Console.WriteLine("[x] Sending reset counter to consumers.");

                    SendMessage(socket, "reset");

                    Console.ReadLine();
                }    
            }
        }

        private static void SendMessage(Socket socket, string message)
        {
            var body = Encoding.UTF8.GetBytes(message);

            //3. send the message
            socket.Send(body);

            Console.WriteLine(" [x] Sent {0}", message);
        }
    }

3. Create another C# Console project for the subscriber.
4. Use NuGet to add reference to the ZeroMQ C# binding as you did above.
5. Change Program.cs in your subscriber project to contain this code:

 class ZeroMqSubscriber
    {
        static void Main(string[] args)
        {
            long messagesReceived = 0;
            var filePath = typeof (ZeroMqSubscriber).FullName + ".txt";
            var text = new Queue<string>();
            var context = new Context();
            using (var socket = context.Socket(SocketType.SUB))
            {
                // 1. bind to an address
                socket.Bind("tcp://*:5555");

                while (true)
                {
                    var message = socket.Recv();
                    var messageStr = Encoding.UTF8.GetString(message);
                    messagesReceived += 1;
                    text.Enqueue(messageStr);
                    Console.WriteLine(" [x] {0} Received {1}", messagesReceived, messageStr);

                    if (string.CompareOrdinal("reset", messageStr) == 0)
                    {
                        messagesReceived = 0;
                        File.WriteAllLines(filePath, text);
                        text.Clear();
                    }
                }    
            }
        }
   }
 

5. Start the publisher and let is send its first batch of messages.
6. Start the subscriber and notice that it comes online and starts processing the messages.
7. Try with different combinations of stopping and starting the publisher or subscriber.
8. Experiment with different numbers of publishers and subscribers.

Some notes about the implementation
1. If you stop the publisher before the subscriber comes online, all messages are lost since ZeroMQ does not support durable messaging. All messages are stored in the host processes’ memory.
2. You can start multiple publisher instances publishing to the same port.
3. You cannot start multiple subscriber instances listening to the same port so technically, this is not a true publish-subscribe pattern.
4. If you have a single subscriber and multiple publishers, the former gets all of the messages in the order in which they were sent from each publisher. Also messages between publishers are interleaved as noted below:

12928-635584021514875798: msg:611 hello world
13084-635584021514945802: msg:814 hello world
12928-635584021514975804: msg:612 hello world
13084-635584021515045808: msg:815 hello world
12928-635584021515075810: msg:613 hello world
13084-635584021515145814: msg:816 hello world
12928-635584021515175815: msg:614 hello world
13084-635584021515245819: msg:817 hello world
12928-635584021515275821: msg:615 hello world
13084-635584021515345825: msg:818 hello world
12928-635584021515375827: msg:616 hello world
13084-635584021515445831: msg:819 hello world

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s