Category Archives: Distributed Enterprise Architecture

Abstracting message queue creation using the factory pattern

You had to create several additional modules for an existing enterprise solution but did not know which of the various message queues to use: your options are MSMQ, ZeroMQ, RabbitMQ, Azure, etc.
You decided to postpone this decision to latter in the game but in the meantime, abstracted away functionality of the message queue behind an interface that looks like this:

  public interface IMessageQueue : IDisposable
    {
        string Address { get; }
        IDictionary<string, string> Properties { get; }
        void InitializeInboundQueue(MessageQueueCreationContext creationContext);
        void InitializeOutboundQueue(MessageQueueCreationContext creationContext);
        void Send(Message message, string messageTopic = null);
        void Listen(Action<Message> onMessageReceived, string routingKey = null);
        void Receive(Action<Message> onMessageReceived);
        IMessageQueue GetResponseQueue();
        IMessageQueue GetReplyQueue(Message message);
    }

The creation context class looks like this:

  [Serializable]
    public class MessageQueueSpecs : IEquatable<MessageQueueSpecs>
    {
        public bool Equals(MessageQueueSpecs other)
        {
            if (ReferenceEquals(null, other)) return false;
            if (ReferenceEquals(this, other)) return true;
            return 
                string.Equals(Name, other.Name) && 
                string.Equals(Address, other.Address) && 
                MessagePattern == other.MessagePattern && 
                Direction == other.Direction && 
                string.Equals(MessageTopic, other.MessageTopic) && 
                string.Equals(HostName, other.HostName);
        }

        public override int GetHashCode()
        {
            unchecked
            {
                var hashCode = (Name != null ? Name.GetHashCode() : 0);
                hashCode = (hashCode*397) ^ (Address != null ? Address.GetHashCode() : 0);
                hashCode = (hashCode*397) ^ (int) MessagePattern;
                hashCode = (hashCode*397) ^ (int) Direction;
                hashCode = (hashCode*397) ^ (MessageTopic != null ? MessageTopic.GetHashCode() : 0);
                hashCode = (hashCode*397) ^ (HostName != null ? HostName.GetHashCode() : 0);
                return hashCode;
            }
        }

        public static bool operator ==(MessageQueueSpecs left, MessageQueueSpecs right)
        {
            return Equals(left, right);
        }

        public static bool operator !=(MessageQueueSpecs left, MessageQueueSpecs right)
        {
            return !Equals(left, right);
        }

        public MessageQueueSpecs()
        {
            AdditionalProperties = new Dictionary<string, string>(5);
            Name = GetType().FullName + DateTime.UtcNow;
        }

        public string Name { get; set; }
        public string Address { get; set; }
        public MessagePattern MessagePattern { get; set; }
        public Direction Direction { get; set; }
        public ISerializer Serializer { get; set; }
        public IDictionary<string, string> AdditionalProperties { get; set; }
        public string MessageTopic { get; set; }
        public string HostName { get; set; }

        public bool IsValid()
        {
            if (Serializer == null)
                return false;

            return true;
        }

        public override bool Equals(object obj)
        {
            if (ReferenceEquals(null, obj)) return false;
            if (ReferenceEquals(this, obj)) return true;
            if (obj.GetType() != this.GetType()) return false;
            return Equals((MessageQueueSpecs) obj);
        }

        public string GetUid()
        {
            return typeof (MessageQueueSpecs) + Name + Direction + GetHashCode();
        }

To delegate the creation of message queues to an independent entity that can be passed to stakeholders via IoC, we define an IMessageQueueFactory as listed below:

 
  public interface IMessageQueueFactory
    {
        IMessageQueue CreateInboundQueue(MessageQueueSpecs specs);
        IMessageQueue CreateOutnboundQueue(MessageQueueSpecs specs);
    }

The interface can possibly be simplified using only one method but two methods gives the API clarity and also gives us the flexibility to define different parameters for inbound or outbound queues among others, possibly segregating the interface if need be.

The implementation of the IMessageQueueFactory depends on how many different types of message queues we want to support in the solution. To keep things simple, we will assume that only one type of message queue will be used in this solution and the one we choose is RabbitMQ.

The implementation of IMessageQueueFactory for RabbitMQ is as follows:

  public class RabbitMessageQueueFactory : IMessageQueueFactory
    {
        private readonly MemoryCache _cache = new MemoryCache("RabbitMessageQueueFactory");
        
        public IMessageQueue CreateInboundQueue(MessageQueueSpecs specs)
        {
            return GetOrCreateQueue(specs);
        }
        
        public IMessageQueue CreateOutnboundQueue(MessageQueueSpecs specs)
        {
            return GetOrCreateQueue(specs);
        }

        private IMessageQueue GetOrCreateQueue(MessageQueueSpecs specs)
        {
            var cacheKey = specs.GetUid();
            var queueObj = _cache.Get(cacheKey);
            if (queueObj != null)
                return (IMessageQueue) queueObj;

            var queue = new RabbitMessageQueue(specs);
            _cache.Add(cacheKey, queue, new CacheItemPolicy());

            return queue;
        }
    }

Creating inbound and outbound RabbitMQ queues follow a similar pattern based on our message queue implementation so construction is easy. To use our factory, we can register it with an IoC container such as Unity or AutoFac or create an instance and pass it to interested parties.

For the sake of completeness, RabbitMessageQueue and MessageQueueBase is implemented as follows:


   public sealed class RabbitMessageQueue : MessageQueueBase
    {
        private readonly ISerializer _serializer;
        private IConnection _connection;
        private IModel _channel;
        private QueueingBasicConsumer _consumer;

        public string ExchangeType
        {
            get { return GetProperty(ExchangeTypePropertyName, "topic"); }
        }

        public string Exchange
        {
            get { return GetProperty(ExchangePropertyName, "topic"); }
        }

        public RabbitMessageQueue(MessageQueueSpecs specs)
        {
            _serializer = specs.Serializer;
        
            CopyProperties(specs.AdditionalProperties);
            ReconcileProperties(specs);
            InitializeInboundQueue(specs);
            InitializeOutboundQueue(specs);
        }
        
        private void ReconcileProperties(MessageQueueSpecs specs)
        {
            ReconcileProperty(MessageTopicPropertyName, () => specs.MessageTopic);
            ReconcileProperty(ExchangePropertyName, () => specs.Address);
            ReconcileProperty(ExchangeTypePropertyName, () => specs.MessagePattern == MessagePattern.PublishSubscribe ? "topic" : "fanout");
        }

        private void CopyProperties(IEnumerable<KeyValuePair<string, string>> properties)
        {
            if (properties != null)
            {
                foreach (var property in properties)
                {
                    AddProperty(property.Key, property.Value);
                }
            }
        }

        protected override void Dispose(bool disposing)
        {
            if (disposing && _channel != null)
            {
                _channel.Dispose();
                _connection.Dispose();
            }
        }

        protected override string GetAddress(string queueName)
        {
            throw new NotImplementedException();
        }

        private void CreateInboundQueue(MessageQueueSpecs specs)
        {
            var exchange = CreateChannel(specs);

            //2. create a non-durable, exclusive, autodelete queue with a generated name
            var result = _channel.QueueDeclare();
            var routingKey = specs.MessageTopic ?? string.Empty;
            //3. bind to the exclusive queue created above
            _channel.QueueBind(result.QueueName, exchange, routingKey);

            //4. now get message from queue
            _consumer = new QueueingBasicConsumer(_channel);
            _channel.BasicConsume(result.QueueName, true, _consumer);
        }

        private string CreateChannel(MessageQueueSpecs specs)
        {
            var hostName = specs.HostName ?? GetProperty(HostNamePropertyName, "localhost");
            var factory = new ConnectionFactory { HostName = hostName };
               
            _connection = factory.CreateConnection();
            _channel = _connection.CreateModel();

            _channel.ExchangeDeclare(Exchange, ExchangeType); 
            return Exchange;
        }

        private void CreateOutboundQueue(MessageQueueSpecs specs)
        {
            CreateChannel(specs);
        }

        public override void InitializeInboundQueue(MessageQueueSpecs specs)
        {
            Initialize(Direction.Inbound, specs.Address, specs.MessagePattern);
            // NOTE: for fanout, it apears as though we can use the same initializations defined in CreateInboundQueue
            CreateInboundQueue(specs);
        }

        public override void InitializeOutboundQueue(MessageQueueSpecs specs)
        {
            Initialize(Direction.Outbound, specs.Address, specs.MessagePattern);
            // NOTE: for fanout, it apears as though we can use the same initializations defined in CreateInboundQueue
            CreateOutboundQueue(specs);
        }

        public override void Send<T>(T message, string messageTopic = null)
        {
            var body = _serializer.MessageToBytes<T>(message);
            var exchange = GetProperty(ExchangePropertyName, Name);// "authentictions";
            if (string.IsNullOrEmpty(messageTopic))
                messageTopic = GetProperty(MessageTopicPropertyName, string.Empty);

            _channel.BasicPublish(exchange, messageTopic, null, body);
        }
    
        public override void Receive<T>(Action<T> onMessageReceived)
        {
            var ea = _consumer.Queue.Dequeue();
            var body = ea.Body;
            onMessageReceived.Invoke(_serializer.BytesToMessage<T>(body));
        }

        public override IMessageQueue GetResponseQueue()
        {
            throw new NotImplementedException();
        }

        public override IMessageQueue GetReplyQueue(Message message)
        {
            throw new NotImplementedException();
        }

        public override string ToString()
        {
            var properties = Properties;
            if (properties == null)
            {
                properties = new Dictionary<string, string>();
            }

            var propertyInfo = GetType().GetProperties(BindingFlags.Instance | BindingFlags.Public);
            foreach (var pinfo in propertyInfo)
            {
                if (properties.ContainsKey(pinfo.Name)) continue;

                var value = pinfo.GetValue(this);
                properties.Add(pinfo.Name, value != null ? value.ToString() : string.Empty);    
            }

            var sb = new StringBuilder(100);
            foreach (var kvp in properties)
            {
                sb.AppendLine(string.Format("\t {0}:{1}", kvp.Key, kvp.Value));
            }
            return sb.ToString();
        }
    }

public abstract class MessageQueueBase : IMessageQueue
    {
        public const string ExchangePropertyName = "Exchange";
        public const string ExchangeTypePropertyName = "ExchangeType";
        public const string MessageTopicPropertyName = "MessageTopic";
        public const string HostNamePropertyName = "HostName";

        private readonly IDictionary<string, string> _properties = new Dictionary<string, string>(10);

        protected void Initialize(Direction direction, string name, MessagePattern pattern)
        {
            Name = name;
            Direction = direction;
            MessagePattern = pattern;
           
        }

        protected void ReconcileProperty(string name,Func<string> valueFactory)
        {
            if (!Properties.ContainsKey(name))
            {
                Properties.Add(name,  valueFactory.Invoke());
            }
        }

        public void AddProperty(string name, string value)
        {
            _properties.Add(name, value);
        }

        public string GetProperty(string propertyName, string defaultValue)
        {
            if (_properties.ContainsKey(propertyName))
                return _properties[propertyName];

            return defaultValue;
        }

        public string Name { get; private set; }
        public Direction Direction { get; private set; }
        public string Address { get; private set; }
        public MessagePattern MessagePattern { get; private set; }

        public IDictionary<string, string> Properties
        {
            get { return _properties; }
        }

        public abstract void InitializeInboundQueue(MessageQueueSpecs specs);

        public abstract void InitializeOutboundQueue(MessageQueueSpecs specs);

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected abstract void Dispose(bool disposing);
        protected abstract string GetAddress(string queueName);

        public void Send(Message message, string messageTopic = null)
        {
            Send<Message>(message, messageTopic);
        }

        public abstract void Send<T>(T message, string messageTopic = null) where T : class;

        public void Listen(Action<Message> onMessageReceived, string routingKey = null)
        {
            Listen<Message>(onMessageReceived, routingKey);
        }

        public virtual void Listen<T>(Action<T> onMessageReceived, string routingKey = null) where T : class
        {
            Console.WriteLine(" [*] Waiting for messages." +
                             "To exit press CTRL+C");

            while (true)
            {
                Receive(onMessageReceived);
            }
        }

        public abstract void Receive<T>(Action<T> onMessageReceived);

        public void Receive(Action<Message> onMessageReceived)
        {
            Receive<Message>(onMessageReceived);
        }
        public abstract IMessageQueue GetResponseQueue();
        public abstract IMessageQueue GetReplyQueue(Message message);
    }

Happy coding!

Advertisements

Getting started with NServiceBus and RabbitMQ in 15 minutes

Assumptions:

You have downloaded and installed RabbitMQ.  To confirm your RabbitMQ service is running go to  the RabbitMQ Portal and ensure the service is running.

1. Create client class Library called Infrastructure.Messages and a folder in this libary called Events.
2. Create the following class in the Events folder:

  [Serializable]
    public class AuthInitiationRequestEvent 
    {
        public string EventSource { get; set; }
        public DateTime Timestamp { get; set; }
        public string Source { get; set; }
        public string UserName { get; set; }
      
        public override string ToString()
        {
            return string.Format("Timestamp:{3}, Event Source: {0}, Org: {1}, Source: {2}, User: {3} ", 
                EventSource,
                Source, UserName,
                Timestamp);
        }
    }

2. Create a new Console project called Publisher. This Publisher project will contain a client that publishes messages to a bus.

All instructions from this point uses the Publishers project.

3. Add a reference to NServiceBus and NServiceBus RabbitMQ using NuGet.

4. Create a class called AuthenticationClient that starts and stops as follows:

 public class AuthenticationClient 
    {
        public IBus Bus { get; private set; }

        public AuthenticationClient(IBus bus)
        {
            Bus = bus;
        }

        public void Start()
        {
            var random = new Random(100);
            var processName = Process.GetCurrentProcess().ProcessName;
            var processId = Process.GetCurrentProcess().Id;
            //var exchange = "authentications";
        
            while (true)
            {
                var stopWatch = new Stopwatch();
                stopWatch.Start();

                //2. create a message, that can be anything since it is a byte array
                for (var i = 0; i < 1000; i++)
                {
                    var orgId = random.Next(1000);
                    var authEvent = new AuthInitiationRequestEvent(i, "GearHeads" + orgId, String.Format("{0}:{1}", processId, processName));
                    SendMessage(authEvent);
                    Thread.Sleep(5);
                }

                stopWatch.Stop();
                Console.ReadLine();

                Console.WriteLine("====================================================");
                Console.WriteLine("[x] done sending 1000 messages in " + stopWatch.ElapsedMilliseconds);
                Console.WriteLine("[x] Sending reset counter to consumers.");

                Console.ReadLine();
            }
        }

        private void SendMessage<T>(T message)
        {
            Bus.Publish(message);
            Console.WriteLine(" [x] Sent {0}", message);
        }

        public void Stop()
        {
            Console.WriteLine("Authentication Service ended");
        }
    }

Now, we need to create an instance of this class, pass it a bus so that it can publish messages. First we need to create a bus and configure it.

5. Modify Program.Start to include the following lines of code:

  static void Main(string[] args)
        {
            var busConfiguration = new BusConfiguration();

            // this will be the name of this endpoint
            busConfiguration.EndpointName("Publishers.Authentications");

            // we are using JSON serialization
            busConfiguration.UseSerialization<JsonSerializer>();

            // we are using RabbitMQ as our messaging transport.  Other options are MSMQ
            busConfiguration.UseTransport<RabbitMQTransport>();

            // we are using In memory persistence for messages
            busConfiguration.UsePersistence<InMemoryPersistence>();

              // NOTE: this is important and has to be set on publisher and subscriber endpoints.
            // We are using unobstrusive mode, so our messages/events do not have to implement IMessage, IEvent, etc
            var conventionsBuilder = busConfiguration.Conventions();
            conventionsBuilder.DefiningEventsAs(t => t.Namespace != null && t.Namespace == "Infrastructure.Messages.Events");         
         
            busConfiguration.EnableInstallers();
            
            // we create the bus
            var startableBus = Bus.Create(busConfiguration);
            using (var bus = startableBus.Start())
            {
               new AuthenticationClient(bus).Start();
            }
        }

All good work so far. Now we need to establish configuration to tell RabbitMQ. When using RabbitMQ directly, you specify this when constructing the ConnectionFactory. We also need to tell NServiceBus the destination queue for each type of message. When using RabbitMQ directly, you specify this when declaring an exchange on the connection channel object.

7. To configure the location of the broker using RabbitMQ we add a connectionString element as follows:

  <connectionStrings>
    <add name="NServiceBus/Transport" connectionString="host=localhost"/>
  </connectionStrings>

8. If you have followed all these steps so far, compile and run the project and you should see the messages being sent and written to the console.

9. To define the queue or exchange where we want this messages to be sent to, add the following lines to app.Config:

  <UnicastBusConfig>
    <MessageEndpointMappings>
      <add Assembly="Infrastructure.Messages" Namespace="Infrastructure.Messages.Events" Endpoint="Publishers.Authentications" />
    </MessageEndpointMappings>
  </UnicastBusConfig>

We are basically creating an AuthInitiationRequestEvent with some properties and sending this off to the bus. We are sending these messages to an exchange called “Publishers.Authentications”. This could have been called anything. In RabbitMQ, a exchange represents a queue for a specific set of messages. Subscribers interested in these messages will have to listen to the same queue which in this case is called Publishers.Authentications.

10. Before running the project, add this configuration section to app.config:

  <configSections>

<section name="UnicastBusConfig" type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core"/>
  </configSections>

11. If you run the AuthClient project, now you should see messages being printed to the console.

All of this work has been to create the publisher of messages to the bus. Now, let us move to the subscriber. Its job will be simple. Subscribe to the messages and write the messages to the Visual Studio console.

12. Create a new Console project called Subscribers. This project will contain a service that will subscribe to a certain type of message published onto the bus and print them to the console. Not only is the subscriber interested in a certain type of message, it will listen only to messages published onto a certain queue.

The following instructions now refer to Subscribersproject.
13. Using NuGet, add references to NServiceBus and NServiceBus RabbitMQ.
14. Create an AuthenticationService class that implements IHandleMessages interface defined NServiceBus core assembly.

public class AuthenticationService : IHandleMessages<AuthInitiationRequestEvent>
    {
        public void Handle(AuthInitiationRequestEvent message)
        {
            Console.WriteLine(message);
        }
    }

How do we hook this subscriber to the bus? By default, NServiceBus uses auto subscription by scanning assemblies for IHandleMessage implementations. But we need to configure the bus and create the subscriber’s entry point

15. Add the following lines of code to Program.Start in Subscribers project.

  static void Main(string[] args)
        {
            var busConfiguration = new BusConfiguration();

            // this will be the name of this endpoint
            busConfiguration.EndpointName("AuthEventConsumer.ConsoleWriter");

            // we are using JSON serialization
            busConfiguration.UseSerialization<JsonSerializer>();

            // we are using RabbitMQ as our messaging transport.  Other options are MSMQ
            busConfiguration.UseTransport<RabbitMQTransport>();

            // we are using In memory persistence for messages
            busConfiguration.UsePersistence<InMemoryPersistence>();

   // NOTE: this is important and has to be set on publisher and subscriber endpoints.
            // We are using unobstrusive mode, so our messages/events do not have to implement IMessage, IEvent, etc
            var conventionsBuilder = busConfiguration.Conventions();
            conventionsBuilder.DefiningEventsAs(t => t.Namespace != null && t.Namespace == "Infrastructure.Messages.Events");                 

            busConfiguration.EnableInstallers();
           
            var startableBus = Bus.Create(busConfiguration);
            using (startableBus.Start())
            {
                Console.WriteLine("To exit, Ctrl + C");
                Console.ReadLine();
            }
        }

16. Since our broker is running on the local machine, open app.config for the Subscribers project and add the following lines:

  <connectionStrings>
    <add name="NServiceBus/Transport" connectionString="host=localhost"/>
  </connectionStrings>

We also need to tell the subscriber the queue or exchange from which it should retrieve its messages from. We do this by configuring a MessageEndpointMapping as follows:

  <UnicastBusConfig>
    <MessageEndpointMappings>
      <add Assembly="Infrastructure.Messages" Namespace="Infrastructure.Messages.Events" Endpoint="Publishers.Authentications" />
    </MessageEndpointMappings>
  </UnicastBusConfig>

What we are saying here is that messages in Infrastructure.Message assembly within the Infrastructure.Messages.Events namespace should be retrieve from Publishers.Authentications queue or endpoint. Please note that the subscriber endpoint must match the publisher’s endpoint.

When sending a message using pub/sub with RabbitMQ you need to specify at a minimum the name of an exchange where the publisher sends the message and where the subscriber should listen for messages. In addition, you need to specify the machine on which your RabbitMQ broker is running on. The NServiceBus endpoint name, I am guessing, serves as the RabbitMQ exchange, while the connection setting of host=localhost specifies the machine on which the broker is running.

17. Again, for the configuration to not throw an exception, you will need an NServiceBus configuration section as follows:

  <configSections>

<section name="UnicastBusConfig" type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core"/>
  </configSections>

As you’ve probably determined, the settings in app.config for the publisher and subscriber should look exactly the same as follows:

<?xml version="1.0" encoding="utf-8"?>
<configuration>
  <configSections>

<section name="MessageForwardingInCaseOfFaultConfig" type="NServiceBus.Config.MessageForwardingInCaseOfFaultConfig, NServiceBus.Core" />

<section name="UnicastBusConfig" type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core" />
  </configSections>
  <connectionStrings>
    <add name="NServiceBus/Transport" connectionString="host=localhost"/>
  </connectionStrings>
  <UnicastBusConfig>
    <MessageEndpointMappings>
      <add Assembly="Infrastructure.Messages" Endpoint="Publishers.Authentications" />
    </MessageEndpointMappings>
  </UnicastBusConfig>
  <MessageForwardingInCaseOfFaultConfig ErrorQueue="error" />
</configuration>

18. Run both client and producer projects and see what happens… 🙂

Implement pub-sub with RabbitMQ in 15 minutes

In publish/subscribe all subscribers get a copy of all published messages. A complete implementation of pub/sub using RabbitMQ follows:

Mandatory Setup

1. Download and install Erlang. RabbitMQ Server is written in Erlang.
2. Download and install the RabitMQ Service broker, which is a windows service.
3. Ensure service is running on its default port 5672.

Complete instructions for downloading and installing RabbitMQ on Windows is found here.

Create Producer and Consumer
4. Create a new Console project called RabbitMQProducer.
5. Using NuGet, add a reference to the C# RabbitMQ binding called RabbitMQ.Client.
6. Modify the content of the Program class using the following code:

  public class RabbitMqPublisher
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() {HostName = &quot;localhost&quot;};
            const string exchange = &quot;authentictions&quot;;

            //The connection abstracts the socket connection, and takes care of protocol version negotiation and authentication and so on for us.
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                     //1. declare and exchange were messages will be sent to
                    channel.ExchangeDeclare(exchange, &quot;fanout&quot;);

                    while (true)
                    {
                        var stopWatch = new Stopwatch();
                        stopWatch.Start();

                        //2. create a message, that can be anything since it is a byte array
                        for (var i = 0; i &lt; 1000; i++)
                        {
                            SendMessage(channel,
                                string.Format(&quot;{0}: msg:{1} hello world &quot;, DateTime.UtcNow.Ticks, i));
                            Thread.Sleep(10);
                        }

                        stopWatch.Stop();
                        Console.ReadLine();

                        Console.WriteLine(&quot;====================================================&quot;);
                        Console.WriteLine(&quot;[x] done sending 1000 messages in &quot; + stopWatch.ElapsedMilliseconds);
                        Console.WriteLine(&quot;[x] Sending reset counter to consumers.&quot;);

                        SendMessage(channel, &quot;reset&quot;);
                        Console.ReadLine();
                    }
                }
            }
        }

        private static void SendMessage(IModel channel, string message)
        {
            var body = Encoding.UTF8.GetBytes(message);

            //3. send the message
             channel.BasicPublish(exchange, string.Empty, null, body);

            Console.WriteLine(&quot; [x] Sent {0}&quot;, message);
        }
    }
}

6. Compile and run the program and you should see each message being set written to the console.
7. Create a new C# Console project, called RabbitMQSubscriber and modify the Program class as
follows:

public class RabbitMqSubscriber
    {
        static void Main(string[] args)
        {
            int messagesReceived = 0;

            var factory = new ConnectionFactory() { HostName = &quot;localhost&quot; };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                     //1. declare and exchange were messages will be sent to
                    channel.ExchangeDeclare(exchange, &quot;fanout&quot;);

                    //2. create a non-durable, exclusive, autodelete queue with a generated name
                    var result = channel.QueueDeclare(); 

                    //3. bind to the exclusive queue created above
                    channel.QueueBind(result.QueueName, exchange, string.Empty);
                    
                    //4. now get message from queue
                    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(result.QueueName, true, consumer);

                    Console.WriteLine(&quot; [*] Waiting for messages.&quot; +
                                        &quot;To exit press CTRL+C&quot;);
                    while (true)
                    {
                        var ea = consumer.Queue.Dequeue();

                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        messagesReceived += 1;

                        Console.WriteLine(&quot;[x] {0} Received {1}&quot;, messagesReceived, message);
                        if (string.CompareOrdinal(&quot;reset&quot;, message) == 0)
                        {
                            messagesReceived = 0;
                        }
                    }

                }
            }
        }
    }

8. Using NuGet, add a reference to the C# RabbitMQ client binding, called RabbitMQ.Client to resolve compilation problems.

9. Compile and run the program. If you have the Producer already running, you may have to
right click on the consumer project and select Debug | Start new instance.
10. Run an instance of the publisher and multiple subscribers and notice that all subscribers get a copy of the messages being produced.

Some notes about the implementation:
1. You can fire up more than one publisher and they will all send messages to the exchange.
2. You can fire up one or more listeners and they will all retrieve messages from the exchange.
3. In this exercise, messages are not durable, in the sense that if there are no subscribers, they are lost.
4. Every subscriber gets a copy of every message.
5. RabbitMQ just works as advertised. I am very impressed with the framework.

Use case
This can be used in a scenario where you have field data and have a bunch of processing stations. Each processing station gets a copy of field data so it can perform a different set of business rules.

For example, authentication events could be shared among multiple subscribers. One of such could log these messages to a database. Another could take the events and do some heuristics including machine intelligence.

Implementing partial pub-sub with ZeroMQ in 15 minutes

I thoroughly enjoyed reading their documentation. This couple of lines stuck at the back of my brains:

If you’ve done any work with threads, protocols, or networks, you’ll realize this is pretty much impossible. It’s a dream. Even connecting a few programs across a few sockets is plain nasty when you start to handle real life situations. Trillions? The cost would be unimaginable. Connecting computers is so difficult that software and services to do this is a multi-billion dollar business.

In this short tutorial, we create a simple, yet incomplete publish-subscribe implementation using ZeroMQ.

The code is as follows:

1. Create a new C# Console project.
2. Use NuGet to add a reference to ZeroMQ C# binding called (clrzmq).
3. Change your Program.cs to contain the following code:

class ZeroMqPublisher
    {
        static void Main(string[] args)
        {
            var context = new Context();  // can specify number of threads
            var processName = Process.GetCurrentProcess().Id;
            
            //1. create socket
            using (var socket = context.Socket(SocketType.PUB))
            {
                //2. connect socket to port
                socket.Connect("tcp://localhost:5555");

                while (true)
                {
                    var stopWatch = new Stopwatch();
                    stopWatch.Start();


                    for (var i = 0; i < 1000; i++)
                    {
                        var message = string.Format("{0}-{1}: msg:{2} hello world ", processName, DateTime.UtcNow.Ticks, i);
                        SendMessage(socket, message);
                        Thread.Sleep(10);
                    }

                    stopWatch.Stop();
                    Console.ReadLine();

                    Console.WriteLine("====================================================");
                    Console.WriteLine("[x] done sending 1000 messages in " + stopWatch.ElapsedMilliseconds);
                    Console.WriteLine("[x] Sending reset counter to consumers.");

                    SendMessage(socket, "reset");

                    Console.ReadLine();
                }    
            }
        }

        private static void SendMessage(Socket socket, string message)
        {
            var body = Encoding.UTF8.GetBytes(message);

            //3. send the message
            socket.Send(body);

            Console.WriteLine(" [x] Sent {0}", message);
        }
    }

3. Create another C# Console project for the subscriber.
4. Use NuGet to add reference to the ZeroMQ C# binding as you did above.
5. Change Program.cs in your subscriber project to contain this code:

 class ZeroMqSubscriber
    {
        static void Main(string[] args)
        {
            long messagesReceived = 0;
            var filePath = typeof (ZeroMqSubscriber).FullName + ".txt";
            var text = new Queue<string>();
            var context = new Context();
            using (var socket = context.Socket(SocketType.SUB))
            {
                // 1. bind to an address
                socket.Bind("tcp://*:5555");

                while (true)
                {
                    var message = socket.Recv();
                    var messageStr = Encoding.UTF8.GetString(message);
                    messagesReceived += 1;
                    text.Enqueue(messageStr);
                    Console.WriteLine(" [x] {0} Received {1}", messagesReceived, messageStr);

                    if (string.CompareOrdinal("reset", messageStr) == 0)
                    {
                        messagesReceived = 0;
                        File.WriteAllLines(filePath, text);
                        text.Clear();
                    }
                }    
            }
        }
   }
 

5. Start the publisher and let is send its first batch of messages.
6. Start the subscriber and notice that it comes online and starts processing the messages.
7. Try with different combinations of stopping and starting the publisher or subscriber.
8. Experiment with different numbers of publishers and subscribers.

Some notes about the implementation
1. If you stop the publisher before the subscriber comes online, all messages are lost since ZeroMQ does not support durable messaging. All messages are stored in the host processes’ memory.
2. You can start multiple publisher instances publishing to the same port.
3. You cannot start multiple subscriber instances listening to the same port so technically, this is not a true publish-subscribe pattern.
4. If you have a single subscriber and multiple publishers, the former gets all of the messages in the order in which they were sent from each publisher. Also messages between publishers are interleaved as noted below:

12928-635584021514875798: msg:611 hello world
13084-635584021514945802: msg:814 hello world
12928-635584021514975804: msg:612 hello world
13084-635584021515045808: msg:815 hello world
12928-635584021515075810: msg:613 hello world
13084-635584021515145814: msg:816 hello world
12928-635584021515175815: msg:614 hello world
13084-635584021515245819: msg:817 hello world
12928-635584021515275821: msg:615 hello world
13084-635584021515345825: msg:818 hello world
12928-635584021515375827: msg:616 hello world
13084-635584021515445831: msg:819 hello world

Implementing messaging queue with RabbitMQ in 15 minutes

This is the beginning of a series or articles meant to capture the main differences between various message queue implementations, including MSMQ, RabbitMQ, ZeroMQ and ActiveMQ.

RabbitMQ uses Advanced Message Queuing Protocol (AMQP) and is a brokered messaging system. Brokered in the sense there is a central entity responsible for storing and forwarding of the messages from producers to consumers.

This article presents the 1-2-3’s of creating a simple producer/consumer example using RabbitMQ.

Mandatory Setup
1. Download and install Erlang. RabbitMQ Server is written in Erlang.
2. Download and install the RabitMQ Service broker, which is a windows service.
3. Ensure service is running on its default port 5672. To do this, you can enter the following command on a DOS prompt:

netstat -ano > processes.txt

After running command, open text file using Notepad or even better, Notepad++ do a search for port 5672. If port is found, this means that the service is running on your local machine.

Create Producer and Consumer
4. Create a new Console project called RabbitMQProducer.
5. Using NuGet, add a reference to the C# RabbitMQ binding called RabbitMQ.Client.
6. Modify the content of the Program class using the following code:

  public class RabbitMqPublisher
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() {HostName = "localhost"};

            //The connection abstracts the socket connection, and takes care of protocol version negotiation and authentication and so on for us.
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //1. create a queue to send message. Queue is created only if it does not already exist.
                    channel.QueueDeclare("hello-queue", false, false, false, null);


                    while (true)
                    {
                        var stopWatch = new Stopwatch();
                        stopWatch.Start();

                        //2. create a message, that can be anything since it is a byte array
                        for (var i = 0; i < 1000; i++)
                        {
                            SendMessage(channel,
                                string.Format("{0}: msg:{1} hello world ", DateTime.UtcNow.Ticks, i));
                            Thread.Sleep(10);
                        }

                        stopWatch.Stop();
                        Console.ReadLine();

                        Console.WriteLine("====================================================");
                        Console.WriteLine("[x] done sending 1000 messages in " + stopWatch.ElapsedMilliseconds);
                        Console.WriteLine("[x] Sending reset counter to consumers.");

                        SendMessage(channel, "reset");
                        Console.ReadLine();
                    }
                }
            }
        }

        private static void SendMessage(IModel channel, string message)
        {
            var body = Encoding.UTF8.GetBytes(message);

            //3. send the message
            channel.BasicPublish("", "hello-queue", null, body);

            Console.WriteLine(" [x] Sent {0}", message);
        }
    }
}

6. Compile and run the program and you should see each message being set written to the console.
7. Create a new C# Console project, called RabbitMQReceiver and modify the Program class as
follows:

public class RabbitMqSubscriber
    {
        static void Main(string[] args)
        {
            int messagesReceived = 0;

            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //1. create queue if it does not exist.
                    channel.QueueDeclare("hello-queue", false, false, false, null);

                    //2. now get message from queue
                    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume("hello-queue", true, consumer);

                    Console.WriteLine(" [*] Waiting for messages." +
                                        "To exit press CTRL+C");
                    while (true)
                    {
                        var ea = consumer.Queue.Dequeue();

                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        messagesReceived += 1;

                        Console.WriteLine("[x] {0} Received {1}", messagesReceived, message);
                        if (string.CompareOrdinal("reset", message) == 0)
                        {
                            messagesReceived = 0;
                        }
                    }

                }
            }
        }
    }

8. Using NuGet, add a reference to the C# RabbitMQ client binding, called RabbitMQ.Client to resolve compilation problems.

9. Compile and run the program. If you have the Producer already running, you may have to
right click on the consumer project and select Debug | Start new instance.
10. Run an instance of the publisher and subscriber and notice messages being passed from former to latter.

Some notes about the implementation:
1. You can fire up more than one publisher and the will all send messages to the queue.
2. You can fire up one or more listeners and they will all retrieve messages from the queue.
3. Messages remain in the queue until they have been consumed by subscribers.
4. Messages are shared among subscribers. So if one of them gets a certain message, the others do not.
5. If you have more than one subscriber, messages are equally distributed among them.

Use case
This can be used in a scenario where you have field data and have a bunch of processing stations to process the field data in parallel.