Achieve massive parallel Dataverse(D365 CE) inserts with Azure Durable Functions. Part-2

Achieve massive parallel Dataverse(D365 CE) inserts with Azure Durable Functions. Part-2

In the previous article, we saw the core features of azure durable functions and their applicability for Dataverse.

Now it's time to get hands-on and apply that information in a real-world scenario. In this article we'll import a lot of data from a data source (Cosmos DB) into our Dataverse instance.

Those who want to jump straight to the sample code can clone the repository from here.


Create an azure durable function

follow the steps to create a function app

  1. Open Visual Studio and create a function app. Choose the framework as shown below. We are using .NET 8 Isolated, you can choose .NET core as well, but please note that unlike the code in this article if you choose Xrm.Tooling.Connector library which uses WCF to connect with the Dataverse endpoint, you won't be able to do that with .NET core as it needs DLLs which are only available in .NET FX and not core.

Create a function app in azure.

Create an azure function as shown below.

  1. The point to highlight here is the app service plan, the app service plan is premium and pricing plan is elastic premium-3 as it gives us more horsepower for slightly higher cost. The number of CPUs that we get is 4. If your scenario requires moderate loads you can go for an EP1 You can scale down once the heavy loading is done. Premium plan provides a max scale out of 100 instances.
  2. Also remember from our previous article, Orchestrator workloads are sticky threads so once an orchestration begins to run on a given VM instance it'll stick to that instance, but activity loads can be executed on any instance, so 100 is a good scale number for us.



What is our use case.

We are going to import Data from CosmosDB (500000 records) into Dataverse table ("Durable Sales"), refer to the previous article(link at the top) to download the solution.

Application Design

We are going to write an HTTP triggered durable function, the trigger will spawn an orchestration instance and give us the instance ID and a tracking URL, we'll use the tracking URL to track the completion of the instance.


As we can see below, a single HTTP trigger will instantiate a single durable orchestration which will queue up a huge number of activities for execution, plz note that as discussed in the previous article, we'll write the orchestrator code in away that it spawns the the activities out and then goes back to sleep until they all are complete, once done, the orchestrator will wake up again and perform aggregation logic to collect the state of all the running activities.


Needless to say that you can trigger as many parallel HTTP instances as desired and this is where the extra firepower we bought earlier will come to our aid to achieve higher speeds.


Request Batching

The next important piece is request batching, our source database is CosmosDB and target is Dataverse, both the systems provide batch reading and writing capabilities.

CosmosDB batching: CosmosDB provides '''OFFSET''' and '''LIMIT''' keywords to specify how many records we want to skip (OFFSET) and how many we want to read (LIMIT). more details here.

Dataverse batching: Dataverse provides request messages, we'll make CreateMultiple webapi call. Refer my previous article on how to use CreateMultiple in webapi.

How the batching works

We'll pass 3 parameters to our HTTP request message.

NumberOfRecords: How many total records to fetch

BatchSize: How many records to fetch in each batch.

StartFrom: Where to start reading from

So, if we pass,100, 10, 0, this is how the orchestrator will create queries for parallel reading

2024-12-09T14:13:12Z   [Information]   select * from c OFFSET 0 LIMIT 10
2024-12-09T14:13:12Z   [Information]   select * from c OFFSET 10 LIMIT 10
2024-12-09T14:13:12Z   [Information]   select * from c OFFSET 20 LIMIT 10
2024-12-09T14:13:12Z   [Information]   select * from c OFFSET 30 LIMIT 10
2024-12-09T14:13:12Z   [Information]   select * from c OFFSET 40 LIMIT 10
2024-12-09T14:13:12Z   [Information]   select * from c OFFSET 50 LIMIT 10
2024-12-09T14:13:12Z   [Information]   select * from c OFFSET 60 LIMIT 10
2024-12-09T14:13:12Z   [Information]   select * from c OFFSET 70 LIMIT 10
2024-12-09T14:13:12Z   [Information]   select * from c OFFSET 80 LIMIT 10
2024-12-09T14:13:12Z   [Information]   select * from c OFFSET 90 LIMIT 10        

Once we get our queries, we want to use the full thrust of ADF framework and execute each query on a single activity in a fan-out fan-in approach. Below is the code that we use to parse the input parameters and convert them in a list of queries.

                var queries = await context.CallActivityAsync<List<string>>(nameof(GetCosmoQueries), rcbc);
        

Now, we execute all the tasks at once.

 var cosmosTasks = queries.Select(query => context.CallActivityAsync<CosmosResponse>(nameof(GetCosmoData), query)).ToList();

 await Task.WhenAll(cosmosTasks);        

Remember from our previous article, the moment the code hits '''await''', the orchestrator will go on sleep and at this point the framework might unload it from the memory, an unloaded instance doesn't count towards the active orchestrator limits.


Error Handling

This is tricky in fan-out, fan-in approach. some important points below.

  1. Handle exceptions in activities, collect the errors properly and send them as part of the completed task.
  2. Activities do provide retry handlers, but in my personal experience I found handling of activities in activity triggers more easier and controllable than in retry handlers.
  3. Since our CosmosDB has only 1000RUs allocated, we start getting 429 errors once we increase the load, for our demo scenario, you can see that we've added one more layer of retrying the failed queries again.
  4. We've added a durable timer of 10 mins, so that once we get a set of failed queries, we can retry them after 10 mins.

await Task.WhenAll(cosmosTasks);

finalResponse.CosmosReadSuccessCount = cosmosTasks.Count(task => !task.Result.IsFaulted) * int.Parse(rcbc.Item2);

List<Task<List<CrmResponse>>> crmTasks = new List<Task<List<CrmResponse>>>();
var failedQueries = cosmosTasks.Where(task => task.Result.IsFaulted).Select(task => task.Result.Query).ToList();
if (failedQueries.Any())
{
    await LogInformation(context, logger, $"now trying failed tasks after 10 mins, total failed tasks {failedQueries.Count()}");

    await context.CreateTimer(TimeSpan.FromMinutes(10), CancellationToken.None);

    var failedCosmosTasks = failedQueries.Select(query => context.CallActivityAsync<CosmosResponse>(nameof(GetCosmoData), query)).ToList();

    await Task.WhenAll(failedCosmosTasks);

    failedCosmosTasks.ForEach(action: async t =>
    {
        if (t.Result.TotalRecords.Count > 0)
            crmTasks.Add(context.CallActivityAsync<List<CrmResponse>>(nameof(InsertCRM), t.Result.TotalRecords));
        else
            await LogInformation(context, logger, $"skipping CRM insert because of 0 records");
    });

    finalResponse.CosmosReadSuccessCount += failedCosmosTasks.Count(task => !task.Result.IsFaulted) * int.Parse(rcbc.Item2);
    finalResponse.CosmosReadFailureCount = failedCosmosTasks.Count(task => task.Result.IsFaulted) * int.Parse(rcbc.Item2);
}        

Once we complete the read, we aggregate all the returned results in a list and send them to Dataverse, once again in parallel fashion as shown in the final fan-in fan-out image below.



Measuring the performance.

It's time to test the speed of our ADF. to trigger and track the instances we have created a quick Canvas app to launch and monitor progress. the UI isn't great heheh.


25000 Records : 25000 records were inserted in 54 seconds flat. impressive???, let me know in comments.



40000 Records : 40000 records were inserted in 12 mins, does this look more, please remember that we've added a blind 10 mins sleep timer after we get a 429 error. So the effective time here is only 2 mins, which isn't bad, in real life scenarios however, you may have to plan your retry intervals more carefully.


 var failedQueries = cosmosTasks.Where(task => task.Result.IsFaulted).Select(task => task.Result.Query).ToList();
 if (failedQueries.Any())
 {
     await LogInformation(context, logger, $"now trying failed tasks after 10 mins, total failed tasks {failedQueries.Count()}");

     await context.CreateTimer(TimeSpan.FromMinutes(10), CancellationToken.None);
}         



Happy learning!!!

要查看或添加评论,请登录

Shashank Bhide的更多文章

社区洞察

其他会员也浏览了