Implementing messaging queue with RabbitMQ in 15 minutes

This is the beginning of a series or articles meant to capture the main differences between various message queue implementations, including MSMQ, RabbitMQ, ZeroMQ and ActiveMQ.

RabbitMQ uses Advanced Message Queuing Protocol (AMQP) and is a brokered messaging system. Brokered in the sense there is a central entity responsible for storing and forwarding of the messages from producers to consumers.

This article presents the 1-2-3’s of creating a simple producer/consumer example using RabbitMQ.

Mandatory Setup
1. Download and install Erlang. RabbitMQ Server is written in Erlang.
2. Download and install the RabitMQ Service broker, which is a windows service.
3. Ensure service is running on its default port 5672. To do this, you can enter the following command on a DOS prompt:

netstat -ano > processes.txt

After running command, open text file using Notepad or even better, Notepad++ do a search for port 5672. If port is found, this means that the service is running on your local machine.

Create Producer and Consumer
4. Create a new Console project called RabbitMQProducer.
5. Using NuGet, add a reference to the C# RabbitMQ binding called RabbitMQ.Client.
6. Modify the content of the Program class using the following code:

  public class RabbitMqPublisher
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() {HostName = "localhost"};

            //The connection abstracts the socket connection, and takes care of protocol version negotiation and authentication and so on for us.
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //1. create a queue to send message. Queue is created only if it does not already exist.
                    channel.QueueDeclare("hello-queue", false, false, false, null);


                    while (true)
                    {
                        var stopWatch = new Stopwatch();
                        stopWatch.Start();

                        //2. create a message, that can be anything since it is a byte array
                        for (var i = 0; i < 1000; i++)
                        {
                            SendMessage(channel,
                                string.Format("{0}: msg:{1} hello world ", DateTime.UtcNow.Ticks, i));
                            Thread.Sleep(10);
                        }

                        stopWatch.Stop();
                        Console.ReadLine();

                        Console.WriteLine("====================================================");
                        Console.WriteLine("[x] done sending 1000 messages in " + stopWatch.ElapsedMilliseconds);
                        Console.WriteLine("[x] Sending reset counter to consumers.");

                        SendMessage(channel, "reset");
                        Console.ReadLine();
                    }
                }
            }
        }

        private static void SendMessage(IModel channel, string message)
        {
            var body = Encoding.UTF8.GetBytes(message);

            //3. send the message
            channel.BasicPublish("", "hello-queue", null, body);

            Console.WriteLine(" [x] Sent {0}", message);
        }
    }
}

6. Compile and run the program and you should see each message being set written to the console.
7. Create a new C# Console project, called RabbitMQReceiver and modify the Program class as
follows:

public class RabbitMqSubscriber
    {
        static void Main(string[] args)
        {
            int messagesReceived = 0;

            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //1. create queue if it does not exist.
                    channel.QueueDeclare("hello-queue", false, false, false, null);

                    //2. now get message from queue
                    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume("hello-queue", true, consumer);

                    Console.WriteLine(" [*] Waiting for messages." +
                                        "To exit press CTRL+C");
                    while (true)
                    {
                        var ea = consumer.Queue.Dequeue();

                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        messagesReceived += 1;

                        Console.WriteLine("[x] {0} Received {1}", messagesReceived, message);
                        if (string.CompareOrdinal("reset", message) == 0)
                        {
                            messagesReceived = 0;
                        }
                    }

                }
            }
        }
    }

8. Using NuGet, add a reference to the C# RabbitMQ client binding, called RabbitMQ.Client to resolve compilation problems.

9. Compile and run the program. If you have the Producer already running, you may have to
right click on the consumer project and select Debug | Start new instance.
10. Run an instance of the publisher and subscriber and notice messages being passed from former to latter.

Some notes about the implementation:
1. You can fire up more than one publisher and the will all send messages to the queue.
2. You can fire up one or more listeners and they will all retrieve messages from the queue.
3. Messages remain in the queue until they have been consumed by subscribers.
4. Messages are shared among subscribers. So if one of them gets a certain message, the others do not.
5. If you have more than one subscriber, messages are equally distributed among them.

Use case
This can be used in a scenario where you have field data and have a bunch of processing stations to process the field data in parallel.

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s