Implement pub-sub with RabbitMQ in 15 minutes

In publish/subscribe all subscribers get a copy of all published messages. A complete implementation of pub/sub using RabbitMQ follows:

Mandatory Setup

1. Download and install Erlang. RabbitMQ Server is written in Erlang.
2. Download and install the RabitMQ Service broker, which is a windows service.
3. Ensure service is running on its default port 5672.

Complete instructions for downloading and installing RabbitMQ on Windows is found here.

Create Producer and Consumer
4. Create a new Console project called RabbitMQProducer.
5. Using NuGet, add a reference to the C# RabbitMQ binding called RabbitMQ.Client.
6. Modify the content of the Program class using the following code:

  public class RabbitMqPublisher
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() {HostName = "localhost"};
            const string exchange = "authentictions";

            //The connection abstracts the socket connection, and takes care of protocol version negotiation and authentication and so on for us.
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                     //1. declare and exchange were messages will be sent to
                    channel.ExchangeDeclare(exchange, "fanout");

                    while (true)
                    {
                        var stopWatch = new Stopwatch();
                        stopWatch.Start();

                        //2. create a message, that can be anything since it is a byte array
                        for (var i = 0; i < 1000; i++)
                        {
                            SendMessage(channel,
                                string.Format("{0}: msg:{1} hello world ", DateTime.UtcNow.Ticks, i));
                            Thread.Sleep(10);
                        }

                        stopWatch.Stop();
                        Console.ReadLine();

                        Console.WriteLine("====================================================");
                        Console.WriteLine("[x] done sending 1000 messages in " + stopWatch.ElapsedMilliseconds);
                        Console.WriteLine("[x] Sending reset counter to consumers.");

                        SendMessage(channel, "reset");
                        Console.ReadLine();
                    }
                }
            }
        }

        private static void SendMessage(IModel channel, string message)
        {
            var body = Encoding.UTF8.GetBytes(message);

            //3. send the message
             channel.BasicPublish(exchange, string.Empty, null, body);

            Console.WriteLine(" [x] Sent {0}", message);
        }
    }
}

6. Compile and run the program and you should see each message being set written to the console.
7. Create a new C# Console project, called RabbitMQSubscriber and modify the Program class as
follows:

public class RabbitMqSubscriber
    {
        static void Main(string[] args)
        {
            int messagesReceived = 0;

            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                     //1. declare and exchange were messages will be sent to
                    channel.ExchangeDeclare(exchange, "fanout");

                    //2. create a non-durable, exclusive, autodelete queue with a generated name
                    var result = channel.QueueDeclare(); 

                    //3. bind to the exclusive queue created above
                    channel.QueueBind(result.QueueName, exchange, string.Empty);
                    
                    //4. now get message from queue
                    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(result.QueueName, true, consumer);

                    Console.WriteLine(" [*] Waiting for messages." +
                                        "To exit press CTRL+C");
                    while (true)
                    {
                        var ea = consumer.Queue.Dequeue();

                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        messagesReceived += 1;

                        Console.WriteLine("[x] {0} Received {1}", messagesReceived, message);
                        if (string.CompareOrdinal("reset", message) == 0)
                        {
                            messagesReceived = 0;
                        }
                    }

                }
            }
        }
    }

8. Using NuGet, add a reference to the C# RabbitMQ client binding, called RabbitMQ.Client to resolve compilation problems.

9. Compile and run the program. If you have the Producer already running, you may have to
right click on the consumer project and select Debug | Start new instance.
10. Run an instance of the publisher and multiple subscribers and notice that all subscribers get a copy of the messages being produced.

Some notes about the implementation:
1. You can fire up more than one publisher and they will all send messages to the exchange.
2. You can fire up one or more listeners and they will all retrieve messages from the exchange.
3. In this exercise, messages are not durable, in the sense that if there are no subscribers, they are lost.
4. Every subscriber gets a copy of every message.
5. RabbitMQ just works as advertised. I am very impressed with the framework.

Use case
This can be used in a scenario where you have field data and have a bunch of processing stations. Each processing station gets a copy of field data so it can perform a different set of business rules.

For example, authentication events could be shared among multiple subscribers. One of such could log these messages to a database. Another could take the events and do some heuristics including machine intelligence.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s