In continuation of my previous post, where in i briefed using Azure Service Bus Queue as a Repository and build a Data Abstraction Layer on top of it. This post details on a scenario chosen, and explains why do we need to make a choice of using the Azure Service Bus Queue and what are the advantages of Brokered messaging.

Scenario: A data heavy table(TS) residing in Azure table storage to be updated and simultaneously it’s On-premise copy to reflect the changes.

Solution: A resolution to this scenario can be achieved by 3 ways.

Option 1: Windows Azure Queues

Option 2: Windows Service Bus Queues – allows asynchronous, or decoupled messaging with support for transactions, a near real time process

Option 3: Data sync or batch updates between the azure table storage and On-premise databases, non-real time process

Proposed Optimal Solution: Using the Service Bus Queues which have the following major advantages as applicable to the scenario

  1. Maximum message size 256 KB(including header and body)
  2. Unlimited concurrent connections to the queue when connecting from REST based API(includes senders and receivers)
  3. Has maximum throughput of 2000 messages per second.
  4. Authentication uses ACS claims, Identity provider federation.
  5. Can operate in PeekLock mode making it possible to support applications that cannot tolerate failed messages.
  6. Can publish message to Topics with multiple subscribers – in scenarios where the same message should be used by multiple clients.

How does it work?

Queues provide the benefit of Load levelling which enables producers and consumers to send and receive messages at different rates i.e. in instances when there are large number of updates, post commits to the table storage, the message is sent to the queue.

The consuming application-(a worker role or directly an on premise app) only has to be provisioned to be able to handle average load instead of peak load. The depth of the queue will grow and contract as the incoming load varies. This directly saves money with regard to the amount of infrastructure required to service the application load.

As the load increases, more worker processes can be added to read from the queue.  This pull-based load balancing allows for optimum use of the worker computers even if the worker computers differ with regard to processing power, as they will pull messages at their own maximum rate. This pattern is often termed the “competing consumer” pattern.

SBQ1

Queue Creation: -performed via the NamespaceManager class, which is constructed by supplying the base address of the Service Bus namespace and the user credentials.

TokenProvider credentials = TokenProvider.CreateSharedSecretTokenProvider(IssuerName, IssuerKey);

NamespaceManager namespaceClient = new NamespaceManager(ServiceBusEnvironment.CreateServiceUri(“sb”, ServiceNamespace, string.Empty), credentials);

QueueDescription myQueue;

myQueue = namespaceClient.CreateQueue(“TSQueue”);

MessagingFactory factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri(“sb”, ServiceNamespace, string.Empty), credentials);

QueueClient myQueueClient = factory.CreateQueueClient(“TSQueue”,ReceiveMode.PeekLock);

Create a Brokered Message: First the TS message instance needs to be serialized into a brokered message.

BrokeredMessage message = new BrokeredMessage(PersonTS, new ProxyDataContractBinarySerializer(typeof(RGtbPersonTS)));

Where PersonTS is the TS TableEntity object.

Send Brokered Message:

myQueueClient.Send(TSMessage);

Receive Brokered Message:

As the messages were sent in PeekLock Mode it allows to detect missing messages, after successfully processing the message, the message.Complete() needs to be called, in case of error while processing message message.Abandon needs to be called. This can be done in a try-catch block.

BrokeredMessage message;

while ((message = myQueueClient.Receive(new TimeSpan(hours: 0, minutes: 1, seconds: 5))) != null)

{

 

RGtbPersonTS personTS = message.GetBody<RGtbPersonTS>();

UpdatedTSMessageToDB(personTS.StatusID, personTS.Status, personTS.RGtbPersonTSID);

message.Complete();

Thread.Sleep(1000);

}

 

Also instead of polling for the messages in continuous loop, azure sdk 2.0 has provided an ability to listen for messages rather than polling for them.

 

Client.OnMessage((receivedMessage) =>

 

Update Status to On-premise database:

 

The Updated MessageToDB method is called to update the status in the On-premise database.

 

The worker role is required in case we want to scale up or down the message processing, else the receive side code for the ServiceBus queues can directly be part of the wcf service hosted on premise.

 

Failure on reading Queue Message:

The receiver reads the queue until the queue is empty, in case of failure the failed messages are dead-lettered. The dead-lettered messages can be received and logged.

A separate receiver application to be created to read messages from Deadletter queues and updating database/ logging appropriately.

SBQ2

 

Improve performance of Service Bus queues:

  • Reusing factories and clients: not to recreate them for every message
  • Concurrent Async operations: Send, Receive

BrokeredMessage m1 = new BrokeredMessage(body);

myQueueClient.BeginSend(TSMessage, processEndSend, myQueueClient);

void processEndSend(IAsyncResult result)

{

QueueClient qc = result.AsyncState as QueueClient;

qc.EndSend(result);

}

queueClient.BeginReceive(processEndReceive, queueClient); // Receive message 1. void processEndReceive(IAsyncResult result) {    QueueClient qc = result.AsyncState as QueueClient;    BrokeredMessage m = qc.EndReceive(result);    m.BeginComplete(processEndComplete, m);    } void processEndComplete(IAsyncResult result){    BrokeredMessage m = result.AsyncState as BrokeredMessage;    m.EndComplete(result);    }

  • Client side batching – achievable only in async operations
  • Prefetching while performing receive operation.
  • Use of multiple queues to achieve throughput beyond several thousand messages per second.

Performance Numbers from POC for SBQ:

 

S.no Name Value
1 Size of a Message in Bytes 640
2 Time taken to send a message in queue in milli secs 4177
3 Time taken to read a message from Queue in milli secs 2879
4 Time lapse between sending a message and receiving the message in milli sec <~1
5 Time taken to send and receive 10, 000 messages to queue, messages being sent synchronously, (these numbers would improve with asynchronous messaging and concurrent users) 80 minutes

 

Accommodate increased Clients (Subscribers)

Scenario: The data heavy storage table updates (message) is to be consumed by multiple parties

Solution: Service Bus topic. Topics provide a publish/subscribe pattern in which each published message is made available to one or more subscriptions registered with the topic. In contrast, with queues each message is received by a single consumer. Messages are sent to a topic in the same way as they are sent to a queue. However, messages are not received from the topic directly; they are received from subscriptions.  The receiver applications would be different with required business logic.

Sync data from On Premise database to Azure storage table

Scenario: An action happens on a On-premise application, the same needs to reflect in azure table storage.

Solution: On premise WCF Data Service to access the Table storage as it implements OData, update the table storage. The solution can be made transactional so that any failure can be communicated at the same time.

Concurrency and performance issues should be low as we are committing to table storage.

SBQ3