Category Archives: Application Performance

Practical guidance on improving user experience by marshalling to UI thread just-in-time

Marshalling work between the between a background and main thread is very common pattern in many UI frameworks and Windows Presentation Foundation aka WPF is one of them. When you have a long running operation to perform, this needs to be done on a background thread, freeing the main UI thread from responding to peripheral events from mouse and keyboard operations, improving the overall user experience.

In WPF, there is one main UI thread and all objects created by the main UI thread are owned by the UI thread. Access to these objects, therefore, must be done on the main UI thread. This is well documented literature. However, what is missing is architectural guidance that one should take into consideration when designing applications that require lots of marshalling between the background and main thread. In this post I attempt this using a practical example, which involves an application that plots bus stops and associated activities on a map.

Say or application on ocassion, via a background thread calls a RESTFul web-service to retrieve spatial representation of air contaminants resulting from say a nuclear disaster within 10 km of a given position. The result of such a computation would typically look like this:

This typically is a long running operation so needs to be done in on a background thread. The result of this web-service call could be a collection of spatial records, containing bus stops including their respective coordinate and some visitation information that can be plotted onto a map.

Plotting geometries represented using vendor neutral specifications such as the Open Geospatial Consortium’s Web Feature Service specification aka OGC WFS, onto a map such as ThinkGeo’s MapSuite for WPF Desktop, or even ESRI’s ArcGIS Runtime SFK for WPF requires converting the raw data into the respective vendor’s format.

Let’s use ThinkGeo’s MapSuite as case study and take a few minutes to quickly understand their one small portion of their underlying architecture.

A ThinkGeo WpfMap object is the main UI element rendered on screen. An instance of this object exposes a collection of Overlays. There are many different types of overlays but for the sake of this discussion we limit ourselves to the LayerOverlay.

A LayerOverlay contains a collection of layers. There are many different layer types but fundamentally, the layer is responsible for converting whatever geometry context it is provided into an image that can be rendered by the Overlay. Visual representation of features can be controlled on some layers, such as the InMemoryFeatureLayer by assigning the layer an appropriate style.

Converting raw data from a vendor neutral OGC format to ThinkGeo feature representationfeature, creating the appropriate layer and styles can all be done on the background thread, since

  • these objects do not derive from DispatcherObject
  • we could be dealing with a thousand of these objects

Only after the layer has been created and we are ready to add it to the Overlays collection do we need to marshall to the UI thread. To encapsulate all of this in one place, one can implement a custom ManagedInMemoryFeatureLayer deriving from InMemoryFeatureLayer with the following logic:


    public class ManagedInMemoryFeatureLayer : InMemoryFeatureLayer
    {
        public event Action<object, EventArgs> FeaturesCreated;

        private readonly StylesFactory _stylesFactory;
        private readonly MapSuiteFeatureMapper _featureMapper;
        private readonly ILoggerFacade _logger;
        private readonly object _lock = new object();

        public ManagedInMemoryFeatureLayer(InMemoryLayerContext context, StylesFactory stylesFactory, MapSuiteFeatureMapper featureMapper)
        {
            if (!context.DataManager.IsSchemaDefined())
            {
                throw new InvalidDataException(" Geo data manager schema is not defined.");
            }
            
            Guard.ThrowIfArgumentIsNull(context.RenderInfo);
            _stylesFactory = stylesFactory;
            _featureMapper = featureMapper;

            Name = context.Name;
            SetupDataManager(context);
            
            SymbolizationInfo = context.RenderInfo;
            ApplyRenderer(stylesFactory, SymbolizationInfo, true);

            // NOTE: this line is crucial especially when you have features that need to 
            // be rendered based on the attributes.  So it is important the the geospatial 
            // schema is defined, hence the initial check.
            SetupLayerDataSchema(context.DataManager);

            // A random chosen number.
            MinNumberOfFeaturesRequiredForIndex = 20;
        }

        private void SetupDataManager(InMemoryLayerContext context)
        {
            DataManager = context.DataManager;

            DataManager.GeoDatasetChanged += (o, e) => Refresh(DataManager.GeoDataset);

            context.IsPendingLayerCreation = false;
        }

        public int MinNumberOfFeaturesRequiredForIndex { get; set; }
        public Style Style { get; set; }
        public IGeoDataManager DataManager { get; private set; }
        public ISymbolizationInfo SymbolizationInfo { get; private set; }
        public ObjectState LayerState { get; set; }
        public SpatialFilter PostFilter { get; set; }

        public void ApplyRenderer(StylesFactory styleFactory, ISymbolizationInfo stylingInfo, bool refreshLayer)
        {
            if (stylingInfo == null)
            {
                throw new ArgumentNullException("stylingInfo");
            }

            Style = styleFactory.CreateStyle(stylingInfo);

            if (stylingInfo.UseDefaultSymbolInfo)
            {
                SetDefaultStyle();
            }
            else
            {
                SetValueStyles(styleFactory, stylingInfo);
            }

            if (refreshLayer)
            {
                RecreateFeatures();
            }
        }

        private void SetupLayerDataSchema(IGeoDataManager goeDataManager)
        {
            var geoDatasetSchema = goeDataManager.GeoDataset.Schema;
            if (geoDatasetSchema != null && geoDatasetSchema.Columns.Count > 0)
            {
                Open();

                foreach (var field in geoDatasetSchema.Columns)
                {
                    if (field.ValidateColumn())
                    {
                        Columns.Add(new FeatureSourceColumn(field.Name, field.TypeString, 100));
                    }
                }

                if (SymbolizationInfo.ContainsAttibutesForStyling)
                {
                    foreach (var styleField in SymbolizationInfo.GetAttributeNamesForSpecialStyles())
                    {
                        Columns.Add(new FeatureSourceColumn(styleField));
                    }
                }

                Close();
            }
        }

        protected void SetDefaultStyle()
        {
            var oldStyle = ZoomLevelSet.ZoomLevel01.CustomStyles.FirstOrDefault(s => string.CompareOrdinal(s.Name, Style.Name) == 0);
            if (oldStyle != null)
                ZoomLevelSet.ZoomLevel01.CustomStyles.Remove(oldStyle);
            ZoomLevelSet.ZoomLevel01.CustomStyles.Add(Style);
            ZoomLevelSet.ZoomLevel01.ApplyUntilZoomLevel = ApplyUntilZoomLevel.Level20;
        }

        protected void SetValueStyles(StylesFactory stylesFactory, ISymbolizationInfo renderInfo)
        {
            var attributeSymbolInfos = renderInfo.GetAttributedSymbolInfos();
            Guard.ThrowIfArgumentIsNull(attributeSymbolInfos);

            var valueStyles = stylesFactory.CreateValueStyles(attributeSymbolInfos);
            ZoomLevelSet.ZoomLevel01.CustomStyles.Clear();
            foreach (var style in valueStyles)
            {
                ZoomLevelSet.ZoomLevel01.CustomStyles.Add(style);
            }

            ZoomLevelSet.ZoomLevel01.ApplyUntilZoomLevel = ApplyUntilZoomLevel.Level20;
        }

        public void RecreateFeatures()
        {
            Refresh(DataManager.GeoDataset);
        }

        public void Refresh(IGeoCollection dataset)
        {
            Task.Factory.StartNew(() =>
            {
                try
                {
                    if (PostFilter != null)
                    {
                        PostFilter.PreProcess();
                    }

                    IList<Feature> featuresToAdd = _featureMapper.CreateFeatures(dataset.GetFeatures(PostFilter)).ToArray();
                    lock (_lock)
                    {
                        InternalFeatures.Clear();
                        InternalFeatures.AddRange(featuresToAdd);
                    }

                    if (InternalFeatures.Count > MinNumberOfFeaturesRequiredForIndex)
                    {
                        BuildIndex();
                    }

                    RequestDrawing();
                    FeaturesCreated(null, null);
                }
                catch (Exception ex)
                {
                    LayerState = ObjectState.Error;
                    _logger.Error(ex, DataManager.Id.Alias);
                }
            });
        }   
        
    }
}

Creation of this class can happen on the background thread. Feature mapping can also occur on the background thread. However, when the layer calls RequestDrawing or FeatureCreated, interested parties are supposed to ensure these events are handled on the UI thread.

In our sample application, an entity called InMemoryFeatureLayerViewModel which manages a layer and its associated legend entry, a legend being the table of content representing all layers on a map, subscribes to FeaturesCreated event exposed by the layer and responds as accordingly. A snippet of a sample InMemoryFeatureLayerViewModel implementation is listed below:

 public class InMemoryFeatureLayerViewModel : FeatureLayerModel<LayerOverlay, ManagedInMemoryFeatureLayer, InMemoryLayerContext>
    {
        private readonly ManagedInMemoryFeatureLayer _layer;
        public InMemoryFeatureLayerViewModel(InMemoryLayerContext context, ManagedInMemoryFeatureLayer layer, bool isRemovable)
            : base(context)
        {
            Layer = _layer = layer;
            LayerDefinition.Layer = layer;
            LayerContext = context;

            context.RenderInfoChanged += OnRenderInfoChanged;
            _layer.FeaturesCreated += OnLayerFeaturesCreatedCreated;
        }

        private void OnRenderInfoChanged(object sender, DataEventArgs<ISymbolizationInfo> e)
        {
            // we need to execute on main UI thread...
            var dispatcher = GetMainDispatcher();

            dispatcher.BeginInvoke(new Action(() =>
            {
                // update our legend representation
                Representation = LayerContext.GetRepresentation();

                // update our layer styles and referesh layer
                _layer.ApplyRenderer(StylesFactory.Instance, e.Data, true);

            }));
        }

        private void OnLayerFeaturesCreatedCreated(object sender, EventArgs e)
        {
            _layer.FeatureSource.Open();
            var bbox = _layer.FeatureSource.GetBoundingBox();
            _layer.FeatureSource.Close();
            RefreshOverlay(bbox);
        }

   protected void RefreshOverlay(RectangleShape boundingBox)
        {
            if (Overlay == null)
                return;

            if (ParentViewModel == null)
                return;

            Dispatcher mapDispatcher = GetMainDispatcher();
            // this needs to be serviced at a high priority, so we use Normal
            mapDispatcher.BeginInvoke(new Action(() =>
            {
                //TODO: if there are performance concerns for rapidly changing layers, cache casted view.
                Overlay.PanTo(boundingBox);
                Overlay.SafelyRefresh();

                if (ParentViewModel != null)
                {
                    var map = (ExtendedWpfMap)ParentViewModel.GetMapObject();
                    if (map != null)
                    {
                        map.CurrentExtent = boundingBox;
                        map.SafelyRefresh();
                    }
                }
            }), DispatcherPriority.Normal);
        }
    }

Only when refreshing the overlay do we need to marshal to the UI thread. Otherwise, we waste UI thread cycles doing things that can be delegated to the background thread compromising application responsiveness.

One challenge when modelling your solution is reducing the number of entity definitions that rely on DispatcherObject, to just those required for rendering. This way, you can do most of the work on background threads, since these entities are thread agnostic, reserving the UI thread for rendering purposes only. This becomes especially useful, if your application comprises of multiple views rendering data grids, maps and charts. With such applications, you really want the UI thread to do nothing but render and respond to mouse and keyboard events.

Happy coding!

Implement pub-sub with RabbitMQ in 15 minutes

In publish/subscribe all subscribers get a copy of all published messages. A complete implementation of pub/sub using RabbitMQ follows:

Mandatory Setup

1. Download and install Erlang. RabbitMQ Server is written in Erlang.
2. Download and install the RabitMQ Service broker, which is a windows service.
3. Ensure service is running on its default port 5672.

Complete instructions for downloading and installing RabbitMQ on Windows is found here.

Create Producer and Consumer
4. Create a new Console project called RabbitMQProducer.
5. Using NuGet, add a reference to the C# RabbitMQ binding called RabbitMQ.Client.
6. Modify the content of the Program class using the following code:

  public class RabbitMqPublisher
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() {HostName = &quot;localhost&quot;};
            const string exchange = &quot;authentictions&quot;;

            //The connection abstracts the socket connection, and takes care of protocol version negotiation and authentication and so on for us.
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                     //1. declare and exchange were messages will be sent to
                    channel.ExchangeDeclare(exchange, &quot;fanout&quot;);

                    while (true)
                    {
                        var stopWatch = new Stopwatch();
                        stopWatch.Start();

                        //2. create a message, that can be anything since it is a byte array
                        for (var i = 0; i &lt; 1000; i++)
                        {
                            SendMessage(channel,
                                string.Format(&quot;{0}: msg:{1} hello world &quot;, DateTime.UtcNow.Ticks, i));
                            Thread.Sleep(10);
                        }

                        stopWatch.Stop();
                        Console.ReadLine();

                        Console.WriteLine(&quot;====================================================&quot;);
                        Console.WriteLine(&quot;[x] done sending 1000 messages in &quot; + stopWatch.ElapsedMilliseconds);
                        Console.WriteLine(&quot;[x] Sending reset counter to consumers.&quot;);

                        SendMessage(channel, &quot;reset&quot;);
                        Console.ReadLine();
                    }
                }
            }
        }

        private static void SendMessage(IModel channel, string message)
        {
            var body = Encoding.UTF8.GetBytes(message);

            //3. send the message
             channel.BasicPublish(exchange, string.Empty, null, body);

            Console.WriteLine(&quot; [x] Sent {0}&quot;, message);
        }
    }
}

6. Compile and run the program and you should see each message being set written to the console.
7. Create a new C# Console project, called RabbitMQSubscriber and modify the Program class as
follows:

public class RabbitMqSubscriber
    {
        static void Main(string[] args)
        {
            int messagesReceived = 0;

            var factory = new ConnectionFactory() { HostName = &quot;localhost&quot; };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                     //1. declare and exchange were messages will be sent to
                    channel.ExchangeDeclare(exchange, &quot;fanout&quot;);

                    //2. create a non-durable, exclusive, autodelete queue with a generated name
                    var result = channel.QueueDeclare(); 

                    //3. bind to the exclusive queue created above
                    channel.QueueBind(result.QueueName, exchange, string.Empty);
                    
                    //4. now get message from queue
                    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(result.QueueName, true, consumer);

                    Console.WriteLine(&quot; [*] Waiting for messages.&quot; +
                                        &quot;To exit press CTRL+C&quot;);
                    while (true)
                    {
                        var ea = consumer.Queue.Dequeue();

                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        messagesReceived += 1;

                        Console.WriteLine(&quot;[x] {0} Received {1}&quot;, messagesReceived, message);
                        if (string.CompareOrdinal(&quot;reset&quot;, message) == 0)
                        {
                            messagesReceived = 0;
                        }
                    }

                }
            }
        }
    }

8. Using NuGet, add a reference to the C# RabbitMQ client binding, called RabbitMQ.Client to resolve compilation problems.

9. Compile and run the program. If you have the Producer already running, you may have to
right click on the consumer project and select Debug | Start new instance.
10. Run an instance of the publisher and multiple subscribers and notice that all subscribers get a copy of the messages being produced.

Some notes about the implementation:
1. You can fire up more than one publisher and they will all send messages to the exchange.
2. You can fire up one or more listeners and they will all retrieve messages from the exchange.
3. In this exercise, messages are not durable, in the sense that if there are no subscribers, they are lost.
4. Every subscriber gets a copy of every message.
5. RabbitMQ just works as advertised. I am very impressed with the framework.

Use case
This can be used in a scenario where you have field data and have a bunch of processing stations. Each processing station gets a copy of field data so it can perform a different set of business rules.

For example, authentication events could be shared among multiple subscribers. One of such could log these messages to a database. Another could take the events and do some heuristics including machine intelligence.

Implementing pub-sub using MSMQ in 15 minutes

Requirements:

1. Install MSMQ. By default the service is not enabled. To do this on Windows 7:

a. Launch the Control Panel and click on Programs | Turn Windows features on or off.
b. Select Microsoft Message Queue (MSMQ) Server.
c. Under this option, select MSMQ HTTP Support and Multicasting Support.
HTTP Support provides the ability to send messages across the internet.
Multicasting provides the ability for publish subscribe.

2. Click OK.
3. Open Computer Manager to confirm that you have a new listing under Services and Applications called Message Queuing with Outgoing, Private and System Queue folders.

Implementing Pub/Sub using MSMQ
1. Create a new C# Console Project.
2. Add reference to System.Messaging dll.
3. Modify Program.cs to look like follows:

    class Publisher
    {
        static void Main(string[] args)
        {
            //1. establish the queue
            using (var helloQueue = new MessageQueue("FormatName:MULTICAST=234.1.1.1:8001"))
            {
                while (true)
                {
                    var stopWatch = new Stopwatch();
                    stopWatch.Start();

                    //2. create a message, that can be anything since it is a byte array
                    for (var i = 0; i < 1000; i++)
                    {
                        SendMessage(helloQueue,
                            string.Format("{0}: msg:{1} hello world ", DateTime.UtcNow.Ticks, i));
                        Thread.Sleep(10);
                    }

                    stopWatch.Stop();
                    Console.ReadLine();

                    Console.WriteLine("====================================================");
                    Console.WriteLine("[MSMQ] done sending 1000 messages in " + stopWatch.ElapsedMilliseconds);
                    Console.WriteLine("[MSMQ] Sending reset counter to consumers.");

                    SendMessage(helloQueue, "reset");
                    Console.ReadLine();
                }
            }

        }

        private static void SendMessage(MessageQueue queue, string content)
        {
            //3. send the message
            queue.Send(content);
            Console.WriteLine(" [MSMQ] Sent {0}", content);
        }
    }

4. Create a new C# console project and add a reference to the System.Messaging dll.
5. Modify Program.cs to look like this:

  class Subscriber
    {
        static void Main(string[] args)
        {
            int messagesReceived = 0;
            var messages = new Queue<string>(5000);
            var filePath = typeof (Subscriber).FullName + ".txt";
            var path = @".\private$\hello-queue";

            using (var helloQueue = new MessageQueue(path))
            {
                helloQueue.MulticastAddress = "234.1.1.1:8001";
                while (true)
                {
                    var message = helloQueue.Receive();
                    if (message == null)
                        return;

                    var reader = new StreamReader(message.BodyStream);
                    var body = reader.ReadToEnd();

                    messagesReceived += 1;

                    messages.Enqueue(body);
                    Console.WriteLine(" [MSMQ] {0} Received {1}", messagesReceived, body);

                    if (string.CompareOrdinal("reset", body) == 0)
                    {
                        messagesReceived = 0;
                        File.WriteAllText(filePath, body);
                        messages.Clear();
                    }
                }
            }
        }
    }

Some implementation notes

1. Publish/subscribe requires a different queue path format. For fire and forget, the queue format is .\private$\hello-queue if sending to a private queue. For pub/sub, we need a multicast address in the form FormatName:MULTICAST=234.1.1.1:8001.
2. Messages received are not exactly same as sent. By default MSMQ uses an XML serializer so our plain string is now wrapped in an XML tag.
3. A multicast subscriber queue is just a regular queue with a multicast address. So you can create many different queues with different names with the same multicast address to receive the same message from the publisher.
4. You can have many publishers and many subscribers. All the subscribers receive all of the messages from each publisher.

Implementing partial pub-sub with ZeroMQ in 15 minutes

I thoroughly enjoyed reading their documentation. This couple of lines stuck at the back of my brains:

If you’ve done any work with threads, protocols, or networks, you’ll realize this is pretty much impossible. It’s a dream. Even connecting a few programs across a few sockets is plain nasty when you start to handle real life situations. Trillions? The cost would be unimaginable. Connecting computers is so difficult that software and services to do this is a multi-billion dollar business.

In this short tutorial, we create a simple, yet incomplete publish-subscribe implementation using ZeroMQ.

The code is as follows:

1. Create a new C# Console project.
2. Use NuGet to add a reference to ZeroMQ C# binding called (clrzmq).
3. Change your Program.cs to contain the following code:

class ZeroMqPublisher
    {
        static void Main(string[] args)
        {
            var context = new Context();  // can specify number of threads
            var processName = Process.GetCurrentProcess().Id;
            
            //1. create socket
            using (var socket = context.Socket(SocketType.PUB))
            {
                //2. connect socket to port
                socket.Connect("tcp://localhost:5555");

                while (true)
                {
                    var stopWatch = new Stopwatch();
                    stopWatch.Start();


                    for (var i = 0; i < 1000; i++)
                    {
                        var message = string.Format("{0}-{1}: msg:{2} hello world ", processName, DateTime.UtcNow.Ticks, i);
                        SendMessage(socket, message);
                        Thread.Sleep(10);
                    }

                    stopWatch.Stop();
                    Console.ReadLine();

                    Console.WriteLine("====================================================");
                    Console.WriteLine("[x] done sending 1000 messages in " + stopWatch.ElapsedMilliseconds);
                    Console.WriteLine("[x] Sending reset counter to consumers.");

                    SendMessage(socket, "reset");

                    Console.ReadLine();
                }    
            }
        }

        private static void SendMessage(Socket socket, string message)
        {
            var body = Encoding.UTF8.GetBytes(message);

            //3. send the message
            socket.Send(body);

            Console.WriteLine(" [x] Sent {0}", message);
        }
    }

3. Create another C# Console project for the subscriber.
4. Use NuGet to add reference to the ZeroMQ C# binding as you did above.
5. Change Program.cs in your subscriber project to contain this code:

 class ZeroMqSubscriber
    {
        static void Main(string[] args)
        {
            long messagesReceived = 0;
            var filePath = typeof (ZeroMqSubscriber).FullName + ".txt";
            var text = new Queue<string>();
            var context = new Context();
            using (var socket = context.Socket(SocketType.SUB))
            {
                // 1. bind to an address
                socket.Bind("tcp://*:5555");

                while (true)
                {
                    var message = socket.Recv();
                    var messageStr = Encoding.UTF8.GetString(message);
                    messagesReceived += 1;
                    text.Enqueue(messageStr);
                    Console.WriteLine(" [x] {0} Received {1}", messagesReceived, messageStr);

                    if (string.CompareOrdinal("reset", messageStr) == 0)
                    {
                        messagesReceived = 0;
                        File.WriteAllLines(filePath, text);
                        text.Clear();
                    }
                }    
            }
        }
   }
 

5. Start the publisher and let is send its first batch of messages.
6. Start the subscriber and notice that it comes online and starts processing the messages.
7. Try with different combinations of stopping and starting the publisher or subscriber.
8. Experiment with different numbers of publishers and subscribers.

Some notes about the implementation
1. If you stop the publisher before the subscriber comes online, all messages are lost since ZeroMQ does not support durable messaging. All messages are stored in the host processes’ memory.
2. You can start multiple publisher instances publishing to the same port.
3. You cannot start multiple subscriber instances listening to the same port so technically, this is not a true publish-subscribe pattern.
4. If you have a single subscriber and multiple publishers, the former gets all of the messages in the order in which they were sent from each publisher. Also messages between publishers are interleaved as noted below:

12928-635584021514875798: msg:611 hello world
13084-635584021514945802: msg:814 hello world
12928-635584021514975804: msg:612 hello world
13084-635584021515045808: msg:815 hello world
12928-635584021515075810: msg:613 hello world
13084-635584021515145814: msg:816 hello world
12928-635584021515175815: msg:614 hello world
13084-635584021515245819: msg:817 hello world
12928-635584021515275821: msg:615 hello world
13084-635584021515345825: msg:818 hello world
12928-635584021515375827: msg:616 hello world
13084-635584021515445831: msg:819 hello world

Implementing messaging queue with RabbitMQ in 15 minutes

This is the beginning of a series or articles meant to capture the main differences between various message queue implementations, including MSMQ, RabbitMQ, ZeroMQ and ActiveMQ.

RabbitMQ uses Advanced Message Queuing Protocol (AMQP) and is a brokered messaging system. Brokered in the sense there is a central entity responsible for storing and forwarding of the messages from producers to consumers.

This article presents the 1-2-3’s of creating a simple producer/consumer example using RabbitMQ.

Mandatory Setup
1. Download and install Erlang. RabbitMQ Server is written in Erlang.
2. Download and install the RabitMQ Service broker, which is a windows service.
3. Ensure service is running on its default port 5672. To do this, you can enter the following command on a DOS prompt:

netstat -ano > processes.txt

After running command, open text file using Notepad or even better, Notepad++ do a search for port 5672. If port is found, this means that the service is running on your local machine.

Create Producer and Consumer
4. Create a new Console project called RabbitMQProducer.
5. Using NuGet, add a reference to the C# RabbitMQ binding called RabbitMQ.Client.
6. Modify the content of the Program class using the following code:

  public class RabbitMqPublisher
    {
        public static void Main()
        {
            var factory = new ConnectionFactory() {HostName = "localhost"};

            //The connection abstracts the socket connection, and takes care of protocol version negotiation and authentication and so on for us.
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //1. create a queue to send message. Queue is created only if it does not already exist.
                    channel.QueueDeclare("hello-queue", false, false, false, null);


                    while (true)
                    {
                        var stopWatch = new Stopwatch();
                        stopWatch.Start();

                        //2. create a message, that can be anything since it is a byte array
                        for (var i = 0; i < 1000; i++)
                        {
                            SendMessage(channel,
                                string.Format("{0}: msg:{1} hello world ", DateTime.UtcNow.Ticks, i));
                            Thread.Sleep(10);
                        }

                        stopWatch.Stop();
                        Console.ReadLine();

                        Console.WriteLine("====================================================");
                        Console.WriteLine("[x] done sending 1000 messages in " + stopWatch.ElapsedMilliseconds);
                        Console.WriteLine("[x] Sending reset counter to consumers.");

                        SendMessage(channel, "reset");
                        Console.ReadLine();
                    }
                }
            }
        }

        private static void SendMessage(IModel channel, string message)
        {
            var body = Encoding.UTF8.GetBytes(message);

            //3. send the message
            channel.BasicPublish("", "hello-queue", null, body);

            Console.WriteLine(" [x] Sent {0}", message);
        }
    }
}

6. Compile and run the program and you should see each message being set written to the console.
7. Create a new C# Console project, called RabbitMQReceiver and modify the Program class as
follows:

public class RabbitMqSubscriber
    {
        static void Main(string[] args)
        {
            int messagesReceived = 0;

            var factory = new ConnectionFactory() { HostName = "localhost" };
            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    //1. create queue if it does not exist.
                    channel.QueueDeclare("hello-queue", false, false, false, null);

                    //2. now get message from queue
                    var consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume("hello-queue", true, consumer);

                    Console.WriteLine(" [*] Waiting for messages." +
                                        "To exit press CTRL+C");
                    while (true)
                    {
                        var ea = consumer.Queue.Dequeue();

                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        messagesReceived += 1;

                        Console.WriteLine("[x] {0} Received {1}", messagesReceived, message);
                        if (string.CompareOrdinal("reset", message) == 0)
                        {
                            messagesReceived = 0;
                        }
                    }

                }
            }
        }
    }

8. Using NuGet, add a reference to the C# RabbitMQ client binding, called RabbitMQ.Client to resolve compilation problems.

9. Compile and run the program. If you have the Producer already running, you may have to
right click on the consumer project and select Debug | Start new instance.
10. Run an instance of the publisher and subscriber and notice messages being passed from former to latter.

Some notes about the implementation:
1. You can fire up more than one publisher and the will all send messages to the queue.
2. You can fire up one or more listeners and they will all retrieve messages from the queue.
3. Messages remain in the queue until they have been consumed by subscribers.
4. Messages are shared among subscribers. So if one of them gets a certain message, the others do not.
5. If you have more than one subscriber, messages are equally distributed among them.

Use case
This can be used in a scenario where you have field data and have a bunch of processing stations to process the field data in parallel.

Simple C# Interlocked use case

Original code in a ViewModel for a Windows Phone application looked like this:

 private readonly object _lock = new object();
 private int _secondsToNextRefresh;
 public int SecondsToNextRefresh
 {
 get { return _secondsToNextRefresh; }
 set { _secondsToNextRefresh = value; RaisePropertyChanged(() => SecondsToNextRefresh); }
 }

 protected void ResetRefreshTimer()
 {
 if (_refreshTimer == null)
 {
  _refreshTimer = new Timer(arg =>
  {
    if (SecondsToNextRefresh == 0)
    {     
     GetData();
     SecondsToNextRefresh = GetSecondsToNextRefresh();
    }
    else
    { 
     ModifySecondsToNextRefresh(SecondsToNextRefresh - RefreshPeriodInSec);
    }
    },null, 0, RefreshPeriodInSec * Constants.MilliSecsPerSec);
  } 
 }

protected int ModifySecondsToNextRefresh(int value)
 { 
   lock (_lock)
  {
   SecondsToNextRefresh = value; 
  }
  return SecondsToNextRefresh;
}

Upon further inspection of the code, it appeared to me that I can take advantage of the higher performance C# Interlocked methods.

Changing the above code to:


_refreshTimer = new Timer(arg =>
{
 if (_secondsToNextRefresh == 0)
 {   
   secsToNextRefresh = GetSecondsToNextRefresh();
   GetData(); 
  }
  ModifySecondsToNextRefresh(secsToNextRefresh) 

},null, 0, RefreshPeriodInSec * Constants.MilliSecsPerSec);

protected int ModifySecondsToNextRefresh(int value)
{ 
  var valueToReturn = Interlocked.Exchange(ref _secondsToNextRefresh, value);
  RaisePropertyChanged(() => SecondsToNextRefresh);
  return valueToReturn;
}

Based on advise from Writing High Performance .NET Code, surely is the preferred locking mechanism in this scenario.