Reading Azure Service Bus Messages in Sitecore Commerce Engine

SergeyYatsenko
Sitecore Technology MVP & Sr. Director
  • Twitter
  • LinkedIn

Reading Azure Service Bus Messages in Sitecore Commerce Engine

Introduction

This post explains how to integrate Azure Service Bus message reader into Sitecore Commerce Engine. This can be helpful in a wide range of integration scenarios where Sitecore Commerce Engine needs to receive and act upon incoming messages coming from 3-rd-party services and apps in a close to real-time manner.

Why Azure Service Bus?

Microsoft Azure Service Bus is a fully managed enterprise message broker with message queues and public-subscribe topics. Service Bus is used to decouple applications and services from each other, for load balancing work across competing workers, for safely routing and transferring data and control across service and application boundaries, and for coordinating transactional work that requires a high degree of reliability. More details can be found in this doc (and all over the Internet :)).

Integrating Commerce Engine with 3-rd party data sources

Sitecore Experience Commerce 9+ (SXC) allows users to manage its catalog, pricing, inventory users, and other entities in the SXC database manually via its Business Tools interface and programmatically via Commerce SDK. This works well when SXC is the source its catalog entities, where these entities oriare created and managed, but for the companies with already established PIM (Product Information Management) systems as well as customer, pricing and inventory information and other management systems already in place, at least some degree of integration with these systems is needed. Most SXC projects I've been involved in so far included integration effort where enterprise data, used by Commerce Engine, was managed in already established 3-rd party systems. In these cases product data needed to be synced into SXC, preferably as soon as change happened in the source system(s). Introducing Azure Service Bus queue allows building reliable and close to real-time processing of such data in SXC.

The following diagram shows how product change notifications from 3rd party PIM are getting delivered to SXC, which upon receiving immediately retrieves changed entity from 3-rd party PIM and creates or updates matching entity in its Commerce Catalog. With this approach updates in 3rd party PIM are replicated in SXC catalog in a close to real-time and reliable manner.

Implementing ServiceBusConsumer in SXC

I opted to build custom service, which I called ServiceBusConsumer, which runs inside of Sitecore Commerce Engine and is responsible for receiving and processing messages from the message bus. Following are the steps I took to implement and integrate it into the Sitecore Commerce Engine.

First, the interface through which SXC's application's startup routine will register the ServiceBusConsumer service to serve as the processor of incoming Service Bus messages

public interface IServiceBusConsumer
{
  void RegisterOnMessageHandlerAndReceiveMessages();
  Task CloseQueueAsync();
}

Implementation class starts with a constructor to initialize all needed dependencies with a little help from its standard dependency injection mechanism, which will instantiate and pass needed parameters when a class instance is created, like so:

public class ServiceBusConsumer : IServiceBusConsumer
{
  private static string _connectionString;
  private static string _topicName;
  private const string _defaultCommerceEnvironment = "HabitatAuthoring";

  private readonly QueueClient _queueClient;
  private readonly ILogger _logger;
  private CommerceCommander _commerceCommander;
  private GetEnvironmentCommand _getEnvironmentCommand;
  protected internal IServiceProvider _serviceProvider { get; }
  private readonly NodeContext _nodeContext;
  CommerceEnvironment _globalEnvironment;

  public ServiceBusConsumer(
		GetEnvironmentCommand getEnvironmentCommand,
		IConfiguration configuration,
		ILogger logger,
		IServiceProvider serviceProvider,
		CommerceEnvironment globalEnvironment)
	{
		_serviceProvider = serviceProvider;
		_globalEnvironment = globalEnvironment;
		_getEnvironmentCommand = getEnvironmentCommand;
		_commerceCommander = commerceCommander;
		_logger = logger;
		_connectionString = configuration.GetConnectionString("ServiceBusConnectionString");
		topicName = configuration.GetValue("AppSettings:ServiceBusTopicName");
		_queueClient = new QueueClient(_connectionString, _topicName);
		this._nodeContext = serviceProvider.GetService();
	}
}

The following method is a standard way of registering service bus message processor in any .NET Core app, where ProcessMessagesAsync is a custom class, conaining the custom message processing logic

public void RegisterOnMessageHandlerAndReceiveMessages()
 {
   var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
   {
     MaxConcurrentCalls = 1,
     AutoComplete = false,
   };
 
   _queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);
 }

Now, to the message processor to read the message body, which in my case includes a few custom properties, appended by Sitecore Content Hub, where those messages are coming from. The target_id and target_definition and holding the ID and the kind of updated entity, I use these properties to locate and read an entity data from the source and then process as needed. The following code is just an example, very specific to my use case, it'll be something very different in other integration scenarios, but the overall pattern might still be the same.

private async Task ProcessMessagesAsync(Message message, CancellationToken token)
{
	var messageString = Encoding.UTF8.GetString(message.Body);
	_logger.LogInformation($"Received Message: {messageString}");
 
	if (message != null
		&& message.UserProperties.ContainsKey("target_id")
		&& message.UserProperties.ContainsKey("target_definition"))
	{
		var targetId = (string)message.UserProperties["target_id"];
		var targetDefinition = (string)message.UserProperties["target_definition"];
		var instanceName = (string)message.UserProperties["instance_name"];
		if (!string.IsNullOrEmpty(targetId) && !string.IsNullOrEmpty(targetDefinition) && !string.IsNullOrEmpty(instanceName))
		{
			var context = GetCommerceContext();
			var environment = await _getEnvironmentCommand.Process(context, "HabitatAuthoring").ConfigureAwait(false);
			context.PipelineContextOptions.CommerceContext.Environment = environment;
 
			var mappingPolicy = context.GetPolicy();
			var mappingConfiguration = mappingPolicy?.MappingConfigurations?.FirstOrDefault(c => c.EntityType.Equals(targetDefinition, StringComparison.OrdinalIgnoreCase)
                                                   && c.SourceName.Equals(instanceName, StringComparison.OrdinalIgnoreCase));
 
			ImportCatalogEntityArgument result = null;
			if (mappingConfiguration != null)
			{
				result = await TryProcessSellableItem(targetId, targetDefinition, mappingConfiguration, context).ConfigureAwait(false);
			}
			else
			{
				var categoryMappingPolicy = context.GetPolicy();
         mappingConfiguration = categoryMappingPolicy?.MappingConfigurations?.FirstOrDefault(c => c.EntityType.Equals(targetDefinition, StringComparison.OrdinalIgnoreCase)
                                                   && c.SourceName.Equals(instanceName, StringComparison.OrdinalIgnoreCase));
				result = await TryProcessCategory(targetId, targetDefinition, mappingConfiguration, context).ConfigureAwait(false);
			}
			
            if (result == null)
			{
				_logger.LogError($"Cannot process Service Bus message. Mapping configuration not found for EntityType=={targetDefinition} and SourceName=={instanceName}");
			}
		}
	}
	else
	{
		_logger.LogError($"Cannot process Service Bus message. UserProperties: {string.Join(Environment.NewLine, message.UserProperties)}. Message: {messageString}");
	}

	await _queueClient.CompleteAsync(message.SystemProperties.LockToken);
}

The next step is to register the newly created class as a singleton service in ConfigureSitecore class, so it will be known and can be instantiated by the dependency injection mechanism.

public class ConfigureSitecore : IConfigureSitecore
{
	public void ConfigureServices(IServiceCollection services)
	{
        //...	
		services.AddSingleton();
		//...
    }
}

And finally, register in Startup.Configure( ) method in Sitecore Commerce Engine's Startup class like so (note that I introduced EnableContentHubSync variable in AppSettings section of config.json file, so queue processing can be enabled/disabled in config):

var enableContentHubSync = Configuration.GetSection("AppSettings:EnableContentHubSync").Value;
if (enableContentHubSync != null && enableContentHubSync.Equals("true", StringComparison.OrdinalIgnoreCase))
{
	var bus = app.ApplicationServices.GetService();
	bus.RegisterOnMessageHandlerAndReceiveMessages();
}

That's it... With this code in place the Sitecore Commerce Engine will connect to Azure Service Bus upon startup and process incoming messages from Azure Service Bus queue and will continue processing incoming messages in close to real-time manner. Error handling, poison messages, retires, and other aspects of message processing each deserve separate a separate topic or few. This post only covers the specifics of integrating Sitecore Commerce Engine with Azure Service Bus, for more details on Service Bus, please refer to Microsoft documentation.

Related Blogs

Latest Blogs