Getting started with NServiceBus and RabbitMQ in 15 minutes

Assumptions:

You have downloaded and installed RabbitMQ.  To confirm your RabbitMQ service is running go to  the RabbitMQ Portal and ensure the service is running.

1. Create client class Library called Infrastructure.Messages and a folder in this libary called Events.
2. Create the following class in the Events folder:

  [Serializable]
    public class AuthInitiationRequestEvent 
    {
        public string EventSource { get; set; }
        public DateTime Timestamp { get; set; }
        public string Source { get; set; }
        public string UserName { get; set; }
      
        public override string ToString()
        {
            return string.Format("Timestamp:{3}, Event Source: {0}, Org: {1}, Source: {2}, User: {3} ", 
                EventSource,
                Source, UserName,
                Timestamp);
        }
    }

2. Create a new Console project called Publisher. This Publisher project will contain a client that publishes messages to a bus.

All instructions from this point uses the Publishers project.

3. Add a reference to NServiceBus and NServiceBus RabbitMQ using NuGet.

4. Create a class called AuthenticationClient that starts and stops as follows:

 public class AuthenticationClient 
    {
        public IBus Bus { get; private set; }

        public AuthenticationClient(IBus bus)
        {
            Bus = bus;
        }

        public void Start()
        {
            var random = new Random(100);
            var processName = Process.GetCurrentProcess().ProcessName;
            var processId = Process.GetCurrentProcess().Id;
            //var exchange = "authentications";
        
            while (true)
            {
                var stopWatch = new Stopwatch();
                stopWatch.Start();

                //2. create a message, that can be anything since it is a byte array
                for (var i = 0; i < 1000; i++)
                {
                    var orgId = random.Next(1000);
                    var authEvent = new AuthInitiationRequestEvent(i, "GearHeads" + orgId, String.Format("{0}:{1}", processId, processName));
                    SendMessage(authEvent);
                    Thread.Sleep(5);
                }

                stopWatch.Stop();
                Console.ReadLine();

                Console.WriteLine("====================================================");
                Console.WriteLine("[x] done sending 1000 messages in " + stopWatch.ElapsedMilliseconds);
                Console.WriteLine("[x] Sending reset counter to consumers.");

                Console.ReadLine();
            }
        }

        private void SendMessage<T>(T message)
        {
            Bus.Publish(message);
            Console.WriteLine(" [x] Sent {0}", message);
        }

        public void Stop()
        {
            Console.WriteLine("Authentication Service ended");
        }
    }

Now, we need to create an instance of this class, pass it a bus so that it can publish messages. First we need to create a bus and configure it.

5. Modify Program.Start to include the following lines of code:

  static void Main(string[] args)
        {
            var busConfiguration = new BusConfiguration();

            // this will be the name of this endpoint
            busConfiguration.EndpointName("Publishers.Authentications");

            // we are using JSON serialization
            busConfiguration.UseSerialization<JsonSerializer>();

            // we are using RabbitMQ as our messaging transport.  Other options are MSMQ
            busConfiguration.UseTransport<RabbitMQTransport>();

            // we are using In memory persistence for messages
            busConfiguration.UsePersistence<InMemoryPersistence>();

              // NOTE: this is important and has to be set on publisher and subscriber endpoints.
            // We are using unobstrusive mode, so our messages/events do not have to implement IMessage, IEvent, etc
            var conventionsBuilder = busConfiguration.Conventions();
            conventionsBuilder.DefiningEventsAs(t => t.Namespace != null && t.Namespace == "Infrastructure.Messages.Events");         
         
            busConfiguration.EnableInstallers();
            
            // we create the bus
            var startableBus = Bus.Create(busConfiguration);
            using (var bus = startableBus.Start())
            {
               new AuthenticationClient(bus).Start();
            }
        }

All good work so far. Now we need to establish configuration to tell RabbitMQ. When using RabbitMQ directly, you specify this when constructing the ConnectionFactory. We also need to tell NServiceBus the destination queue for each type of message. When using RabbitMQ directly, you specify this when declaring an exchange on the connection channel object.

7. To configure the location of the broker using RabbitMQ we add a connectionString element as follows:

  <connectionStrings>
    <add name="NServiceBus/Transport" connectionString="host=localhost"/>
  </connectionStrings>

8. If you have followed all these steps so far, compile and run the project and you should see the messages being sent and written to the console.

9. To define the queue or exchange where we want this messages to be sent to, add the following lines to app.Config:

  <UnicastBusConfig>
    <MessageEndpointMappings>
      <add Assembly="Infrastructure.Messages" Namespace="Infrastructure.Messages.Events" Endpoint="Publishers.Authentications" />
    </MessageEndpointMappings>
  </UnicastBusConfig>

We are basically creating an AuthInitiationRequestEvent with some properties and sending this off to the bus. We are sending these messages to an exchange called “Publishers.Authentications”. This could have been called anything. In RabbitMQ, a exchange represents a queue for a specific set of messages. Subscribers interested in these messages will have to listen to the same queue which in this case is called Publishers.Authentications.

10. Before running the project, add this configuration section to app.config:

  <configSections>

<section name="UnicastBusConfig" type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core"/>
  </configSections>

11. If you run the AuthClient project, now you should see messages being printed to the console.

All of this work has been to create the publisher of messages to the bus. Now, let us move to the subscriber. Its job will be simple. Subscribe to the messages and write the messages to the Visual Studio console.

12. Create a new Console project called Subscribers. This project will contain a service that will subscribe to a certain type of message published onto the bus and print them to the console. Not only is the subscriber interested in a certain type of message, it will listen only to messages published onto a certain queue.

The following instructions now refer to Subscribersproject.
13. Using NuGet, add references to NServiceBus and NServiceBus RabbitMQ.
14. Create an AuthenticationService class that implements IHandleMessages interface defined NServiceBus core assembly.

public class AuthenticationService : IHandleMessages<AuthInitiationRequestEvent>
    {
        public void Handle(AuthInitiationRequestEvent message)
        {
            Console.WriteLine(message);
        }
    }

How do we hook this subscriber to the bus? By default, NServiceBus uses auto subscription by scanning assemblies for IHandleMessage implementations. But we need to configure the bus and create the subscriber’s entry point

15. Add the following lines of code to Program.Start in Subscribers project.

  static void Main(string[] args)
        {
            var busConfiguration = new BusConfiguration();

            // this will be the name of this endpoint
            busConfiguration.EndpointName("AuthEventConsumer.ConsoleWriter");

            // we are using JSON serialization
            busConfiguration.UseSerialization<JsonSerializer>();

            // we are using RabbitMQ as our messaging transport.  Other options are MSMQ
            busConfiguration.UseTransport<RabbitMQTransport>();

            // we are using In memory persistence for messages
            busConfiguration.UsePersistence<InMemoryPersistence>();

   // NOTE: this is important and has to be set on publisher and subscriber endpoints.
            // We are using unobstrusive mode, so our messages/events do not have to implement IMessage, IEvent, etc
            var conventionsBuilder = busConfiguration.Conventions();
            conventionsBuilder.DefiningEventsAs(t => t.Namespace != null && t.Namespace == "Infrastructure.Messages.Events");                 

            busConfiguration.EnableInstallers();
           
            var startableBus = Bus.Create(busConfiguration);
            using (startableBus.Start())
            {
                Console.WriteLine("To exit, Ctrl + C");
                Console.ReadLine();
            }
        }

16. Since our broker is running on the local machine, open app.config for the Subscribers project and add the following lines:

  <connectionStrings>
    <add name="NServiceBus/Transport" connectionString="host=localhost"/>
  </connectionStrings>

We also need to tell the subscriber the queue or exchange from which it should retrieve its messages from. We do this by configuring a MessageEndpointMapping as follows:

  <UnicastBusConfig>
    <MessageEndpointMappings>
      <add Assembly="Infrastructure.Messages" Namespace="Infrastructure.Messages.Events" Endpoint="Publishers.Authentications" />
    </MessageEndpointMappings>
  </UnicastBusConfig>

What we are saying here is that messages in Infrastructure.Message assembly within the Infrastructure.Messages.Events namespace should be retrieve from Publishers.Authentications queue or endpoint. Please note that the subscriber endpoint must match the publisher’s endpoint.

When sending a message using pub/sub with RabbitMQ you need to specify at a minimum the name of an exchange where the publisher sends the message and where the subscriber should listen for messages. In addition, you need to specify the machine on which your RabbitMQ broker is running on. The NServiceBus endpoint name, I am guessing, serves as the RabbitMQ exchange, while the connection setting of host=localhost specifies the machine on which the broker is running.

17. Again, for the configuration to not throw an exception, you will need an NServiceBus configuration section as follows:

  <configSections>

<section name="UnicastBusConfig" type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core"/>
  </configSections>

As you’ve probably determined, the settings in app.config for the publisher and subscriber should look exactly the same as follows:

<?xml version="1.0" encoding="utf-8"?>
<configuration>
  <configSections>

<section name="MessageForwardingInCaseOfFaultConfig" type="NServiceBus.Config.MessageForwardingInCaseOfFaultConfig, NServiceBus.Core" />

<section name="UnicastBusConfig" type="NServiceBus.Config.UnicastBusConfig, NServiceBus.Core" />
  </configSections>
  <connectionStrings>
    <add name="NServiceBus/Transport" connectionString="host=localhost"/>
  </connectionStrings>
  <UnicastBusConfig>
    <MessageEndpointMappings>
      <add Assembly="Infrastructure.Messages" Endpoint="Publishers.Authentications" />
    </MessageEndpointMappings>
  </UnicastBusConfig>
  <MessageForwardingInCaseOfFaultConfig ErrorQueue="error" />
</configuration>

18. Run both client and producer projects and see what happens… 🙂

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s