Abstracting message queue creation using the factory pattern

You had to create several additional modules for an existing enterprise solution but did not know which of the various message queues to use: your options are MSMQ, ZeroMQ, RabbitMQ, Azure, etc.
You decided to postpone this decision to latter in the game but in the meantime, abstracted away functionality of the message queue behind an interface that looks like this:

  public interface IMessageQueue : IDisposable
    {
        string Address { get; }
        IDictionary<string, string> Properties { get; }
        void InitializeInboundQueue(MessageQueueCreationContext creationContext);
        void InitializeOutboundQueue(MessageQueueCreationContext creationContext);
        void Send(Message message, string messageTopic = null);
        void Listen(Action<Message> onMessageReceived, string routingKey = null);
        void Receive(Action<Message> onMessageReceived);
        IMessageQueue GetResponseQueue();
        IMessageQueue GetReplyQueue(Message message);
    }

The creation context class looks like this:

  [Serializable]
    public class MessageQueueSpecs : IEquatable<MessageQueueSpecs>
    {
        public bool Equals(MessageQueueSpecs other)
        {
            if (ReferenceEquals(null, other)) return false;
            if (ReferenceEquals(this, other)) return true;
            return 
                string.Equals(Name, other.Name) && 
                string.Equals(Address, other.Address) && 
                MessagePattern == other.MessagePattern && 
                Direction == other.Direction && 
                string.Equals(MessageTopic, other.MessageTopic) && 
                string.Equals(HostName, other.HostName);
        }

        public override int GetHashCode()
        {
            unchecked
            {
                var hashCode = (Name != null ? Name.GetHashCode() : 0);
                hashCode = (hashCode*397) ^ (Address != null ? Address.GetHashCode() : 0);
                hashCode = (hashCode*397) ^ (int) MessagePattern;
                hashCode = (hashCode*397) ^ (int) Direction;
                hashCode = (hashCode*397) ^ (MessageTopic != null ? MessageTopic.GetHashCode() : 0);
                hashCode = (hashCode*397) ^ (HostName != null ? HostName.GetHashCode() : 0);
                return hashCode;
            }
        }

        public static bool operator ==(MessageQueueSpecs left, MessageQueueSpecs right)
        {
            return Equals(left, right);
        }

        public static bool operator !=(MessageQueueSpecs left, MessageQueueSpecs right)
        {
            return !Equals(left, right);
        }

        public MessageQueueSpecs()
        {
            AdditionalProperties = new Dictionary<string, string>(5);
            Name = GetType().FullName + DateTime.UtcNow;
        }

        public string Name { get; set; }
        public string Address { get; set; }
        public MessagePattern MessagePattern { get; set; }
        public Direction Direction { get; set; }
        public ISerializer Serializer { get; set; }
        public IDictionary<string, string> AdditionalProperties { get; set; }
        public string MessageTopic { get; set; }
        public string HostName { get; set; }

        public bool IsValid()
        {
            if (Serializer == null)
                return false;

            return true;
        }

        public override bool Equals(object obj)
        {
            if (ReferenceEquals(null, obj)) return false;
            if (ReferenceEquals(this, obj)) return true;
            if (obj.GetType() != this.GetType()) return false;
            return Equals((MessageQueueSpecs) obj);
        }

        public string GetUid()
        {
            return typeof (MessageQueueSpecs) + Name + Direction + GetHashCode();
        }

To delegate the creation of message queues to an independent entity that can be passed to stakeholders via IoC, we define an IMessageQueueFactory as listed below:

 
  public interface IMessageQueueFactory
    {
        IMessageQueue CreateInboundQueue(MessageQueueSpecs specs);
        IMessageQueue CreateOutnboundQueue(MessageQueueSpecs specs);
    }

The interface can possibly be simplified using only one method but two methods gives the API clarity and also gives us the flexibility to define different parameters for inbound or outbound queues among others, possibly segregating the interface if need be.

The implementation of the IMessageQueueFactory depends on how many different types of message queues we want to support in the solution. To keep things simple, we will assume that only one type of message queue will be used in this solution and the one we choose is RabbitMQ.

The implementation of IMessageQueueFactory for RabbitMQ is as follows:

  public class RabbitMessageQueueFactory : IMessageQueueFactory
    {
        private readonly MemoryCache _cache = new MemoryCache("RabbitMessageQueueFactory");
        
        public IMessageQueue CreateInboundQueue(MessageQueueSpecs specs)
        {
            return GetOrCreateQueue(specs);
        }
        
        public IMessageQueue CreateOutnboundQueue(MessageQueueSpecs specs)
        {
            return GetOrCreateQueue(specs);
        }

        private IMessageQueue GetOrCreateQueue(MessageQueueSpecs specs)
        {
            var cacheKey = specs.GetUid();
            var queueObj = _cache.Get(cacheKey);
            if (queueObj != null)
                return (IMessageQueue) queueObj;

            var queue = new RabbitMessageQueue(specs);
            _cache.Add(cacheKey, queue, new CacheItemPolicy());

            return queue;
        }
    }

Creating inbound and outbound RabbitMQ queues follow a similar pattern based on our message queue implementation so construction is easy. To use our factory, we can register it with an IoC container such as Unity or AutoFac or create an instance and pass it to interested parties.

For the sake of completeness, RabbitMessageQueue and MessageQueueBase is implemented as follows:


   public sealed class RabbitMessageQueue : MessageQueueBase
    {
        private readonly ISerializer _serializer;
        private IConnection _connection;
        private IModel _channel;
        private QueueingBasicConsumer _consumer;

        public string ExchangeType
        {
            get { return GetProperty(ExchangeTypePropertyName, "topic"); }
        }

        public string Exchange
        {
            get { return GetProperty(ExchangePropertyName, "topic"); }
        }

        public RabbitMessageQueue(MessageQueueSpecs specs)
        {
            _serializer = specs.Serializer;
        
            CopyProperties(specs.AdditionalProperties);
            ReconcileProperties(specs);
            InitializeInboundQueue(specs);
            InitializeOutboundQueue(specs);
        }
        
        private void ReconcileProperties(MessageQueueSpecs specs)
        {
            ReconcileProperty(MessageTopicPropertyName, () => specs.MessageTopic);
            ReconcileProperty(ExchangePropertyName, () => specs.Address);
            ReconcileProperty(ExchangeTypePropertyName, () => specs.MessagePattern == MessagePattern.PublishSubscribe ? "topic" : "fanout");
        }

        private void CopyProperties(IEnumerable<KeyValuePair<string, string>> properties)
        {
            if (properties != null)
            {
                foreach (var property in properties)
                {
                    AddProperty(property.Key, property.Value);
                }
            }
        }

        protected override void Dispose(bool disposing)
        {
            if (disposing && _channel != null)
            {
                _channel.Dispose();
                _connection.Dispose();
            }
        }

        protected override string GetAddress(string queueName)
        {
            throw new NotImplementedException();
        }

        private void CreateInboundQueue(MessageQueueSpecs specs)
        {
            var exchange = CreateChannel(specs);

            //2. create a non-durable, exclusive, autodelete queue with a generated name
            var result = _channel.QueueDeclare();
            var routingKey = specs.MessageTopic ?? string.Empty;
            //3. bind to the exclusive queue created above
            _channel.QueueBind(result.QueueName, exchange, routingKey);

            //4. now get message from queue
            _consumer = new QueueingBasicConsumer(_channel);
            _channel.BasicConsume(result.QueueName, true, _consumer);
        }

        private string CreateChannel(MessageQueueSpecs specs)
        {
            var hostName = specs.HostName ?? GetProperty(HostNamePropertyName, "localhost");
            var factory = new ConnectionFactory { HostName = hostName };
               
            _connection = factory.CreateConnection();
            _channel = _connection.CreateModel();

            _channel.ExchangeDeclare(Exchange, ExchangeType); 
            return Exchange;
        }

        private void CreateOutboundQueue(MessageQueueSpecs specs)
        {
            CreateChannel(specs);
        }

        public override void InitializeInboundQueue(MessageQueueSpecs specs)
        {
            Initialize(Direction.Inbound, specs.Address, specs.MessagePattern);
            // NOTE: for fanout, it apears as though we can use the same initializations defined in CreateInboundQueue
            CreateInboundQueue(specs);
        }

        public override void InitializeOutboundQueue(MessageQueueSpecs specs)
        {
            Initialize(Direction.Outbound, specs.Address, specs.MessagePattern);
            // NOTE: for fanout, it apears as though we can use the same initializations defined in CreateInboundQueue
            CreateOutboundQueue(specs);
        }

        public override void Send<T>(T message, string messageTopic = null)
        {
            var body = _serializer.MessageToBytes<T>(message);
            var exchange = GetProperty(ExchangePropertyName, Name);// "authentictions";
            if (string.IsNullOrEmpty(messageTopic))
                messageTopic = GetProperty(MessageTopicPropertyName, string.Empty);

            _channel.BasicPublish(exchange, messageTopic, null, body);
        }
    
        public override void Receive<T>(Action<T> onMessageReceived)
        {
            var ea = _consumer.Queue.Dequeue();
            var body = ea.Body;
            onMessageReceived.Invoke(_serializer.BytesToMessage<T>(body));
        }

        public override IMessageQueue GetResponseQueue()
        {
            throw new NotImplementedException();
        }

        public override IMessageQueue GetReplyQueue(Message message)
        {
            throw new NotImplementedException();
        }

        public override string ToString()
        {
            var properties = Properties;
            if (properties == null)
            {
                properties = new Dictionary<string, string>();
            }

            var propertyInfo = GetType().GetProperties(BindingFlags.Instance | BindingFlags.Public);
            foreach (var pinfo in propertyInfo)
            {
                if (properties.ContainsKey(pinfo.Name)) continue;

                var value = pinfo.GetValue(this);
                properties.Add(pinfo.Name, value != null ? value.ToString() : string.Empty);    
            }

            var sb = new StringBuilder(100);
            foreach (var kvp in properties)
            {
                sb.AppendLine(string.Format("\t {0}:{1}", kvp.Key, kvp.Value));
            }
            return sb.ToString();
        }
    }

public abstract class MessageQueueBase : IMessageQueue
    {
        public const string ExchangePropertyName = "Exchange";
        public const string ExchangeTypePropertyName = "ExchangeType";
        public const string MessageTopicPropertyName = "MessageTopic";
        public const string HostNamePropertyName = "HostName";

        private readonly IDictionary<string, string> _properties = new Dictionary<string, string>(10);

        protected void Initialize(Direction direction, string name, MessagePattern pattern)
        {
            Name = name;
            Direction = direction;
            MessagePattern = pattern;
           
        }

        protected void ReconcileProperty(string name,Func<string> valueFactory)
        {
            if (!Properties.ContainsKey(name))
            {
                Properties.Add(name,  valueFactory.Invoke());
            }
        }

        public void AddProperty(string name, string value)
        {
            _properties.Add(name, value);
        }

        public string GetProperty(string propertyName, string defaultValue)
        {
            if (_properties.ContainsKey(propertyName))
                return _properties[propertyName];

            return defaultValue;
        }

        public string Name { get; private set; }
        public Direction Direction { get; private set; }
        public string Address { get; private set; }
        public MessagePattern MessagePattern { get; private set; }

        public IDictionary<string, string> Properties
        {
            get { return _properties; }
        }

        public abstract void InitializeInboundQueue(MessageQueueSpecs specs);

        public abstract void InitializeOutboundQueue(MessageQueueSpecs specs);

        public void Dispose()
        {
            Dispose(true);
            GC.SuppressFinalize(this);
        }

        protected abstract void Dispose(bool disposing);
        protected abstract string GetAddress(string queueName);

        public void Send(Message message, string messageTopic = null)
        {
            Send<Message>(message, messageTopic);
        }

        public abstract void Send<T>(T message, string messageTopic = null) where T : class;

        public void Listen(Action<Message> onMessageReceived, string routingKey = null)
        {
            Listen<Message>(onMessageReceived, routingKey);
        }

        public virtual void Listen<T>(Action<T> onMessageReceived, string routingKey = null) where T : class
        {
            Console.WriteLine(" [*] Waiting for messages." +
                             "To exit press CTRL+C");

            while (true)
            {
                Receive(onMessageReceived);
            }
        }

        public abstract void Receive<T>(Action<T> onMessageReceived);

        public void Receive(Action<Message> onMessageReceived)
        {
            Receive<Message>(onMessageReceived);
        }
        public abstract IMessageQueue GetResponseQueue();
        public abstract IMessageQueue GetReplyQueue(Message message);
    }

Happy coding!

Advertisements

3 thoughts on “Abstracting message queue creation using the factory pattern

  1. Kreso Jurisic

    Great article, spent hours to abstract Ayure queue and haven’t came even close to your model.Is there maybe some extra code for this example? I am wondering what is behind of property of MessagePattern type. Direction is, is I supose, enum for Inbound and outbound.

    Regards,

    Kreso

    Reply
    1. Klaus NJi Post author

      Glad you found the article helpful. MessagePattern is an enum with options PublishScribe, RequestResponse and FireForget. I have updated the article including the full implementation of RabbitMessageQueue and MessageQueue base classes. I suspect one can easily us this model to create an implementation for Azure.

      Reply
      1. Kreso Jurisic

        Thank you, I abstracted Azure Queue successfully with your approach. Feels good when you know that MQ implementation may be easily replaced, if needed.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s