Achieve massive parallel Dataverse(D365 CE) inserts with Azure Durable Functions. Part-2
Shashank Bhide
Principal Architect at Kerv Digital, Ex. Microsoft, Power platform, Azure and bodybuilding architect. Strength coach.
In the previous article, we saw the core features of azure durable functions and their applicability for Dataverse.
Now it's time to get hands-on and apply that information in a real-world scenario. In this article we'll import a lot of data from a data source (Cosmos DB) into our Dataverse instance.
Those who want to jump straight to the sample code can clone the repository from here.
Create an azure durable function
follow the steps to create a function app
Create a function app in azure.
Create an azure function as shown below.
What is our use case.
We are going to import Data from CosmosDB (500000 records) into Dataverse table ("Durable Sales"), refer to the previous article(link at the top) to download the solution.
Application Design
We are going to write an HTTP triggered durable function, the trigger will spawn an orchestration instance and give us the instance ID and a tracking URL, we'll use the tracking URL to track the completion of the instance.
As we can see below, a single HTTP trigger will instantiate a single durable orchestration which will queue up a huge number of activities for execution, plz note that as discussed in the previous article, we'll write the orchestrator code in away that it spawns the the activities out and then goes back to sleep until they all are complete, once done, the orchestrator will wake up again and perform aggregation logic to collect the state of all the running activities.
Needless to say that you can trigger as many parallel HTTP instances as desired and this is where the extra firepower we bought earlier will come to our aid to achieve higher speeds.
Request Batching
The next important piece is request batching, our source database is CosmosDB and target is Dataverse, both the systems provide batch reading and writing capabilities.
CosmosDB batching: CosmosDB provides '''OFFSET''' and '''LIMIT''' keywords to specify how many records we want to skip (OFFSET) and how many we want to read (LIMIT). more details here.
Dataverse batching: Dataverse provides request messages, we'll make CreateMultiple webapi call. Refer my previous article on how to use CreateMultiple in webapi.
How the batching works
We'll pass 3 parameters to our HTTP request message.
领英推荐
NumberOfRecords: How many total records to fetch
BatchSize: How many records to fetch in each batch.
StartFrom: Where to start reading from
So, if we pass,100, 10, 0, this is how the orchestrator will create queries for parallel reading
2024-12-09T14:13:12Z [Information] select * from c OFFSET 0 LIMIT 10
2024-12-09T14:13:12Z [Information] select * from c OFFSET 10 LIMIT 10
2024-12-09T14:13:12Z [Information] select * from c OFFSET 20 LIMIT 10
2024-12-09T14:13:12Z [Information] select * from c OFFSET 30 LIMIT 10
2024-12-09T14:13:12Z [Information] select * from c OFFSET 40 LIMIT 10
2024-12-09T14:13:12Z [Information] select * from c OFFSET 50 LIMIT 10
2024-12-09T14:13:12Z [Information] select * from c OFFSET 60 LIMIT 10
2024-12-09T14:13:12Z [Information] select * from c OFFSET 70 LIMIT 10
2024-12-09T14:13:12Z [Information] select * from c OFFSET 80 LIMIT 10
2024-12-09T14:13:12Z [Information] select * from c OFFSET 90 LIMIT 10
Once we get our queries, we want to use the full thrust of ADF framework and execute each query on a single activity in a fan-out fan-in approach. Below is the code that we use to parse the input parameters and convert them in a list of queries.
var queries = await context.CallActivityAsync<List<string>>(nameof(GetCosmoQueries), rcbc);
Now, we execute all the tasks at once.
var cosmosTasks = queries.Select(query => context.CallActivityAsync<CosmosResponse>(nameof(GetCosmoData), query)).ToList();
await Task.WhenAll(cosmosTasks);
Remember from our previous article, the moment the code hits '''await''', the orchestrator will go on sleep and at this point the framework might unload it from the memory, an unloaded instance doesn't count towards the active orchestrator limits.
Error Handling
This is tricky in fan-out, fan-in approach. some important points below.
await Task.WhenAll(cosmosTasks);
finalResponse.CosmosReadSuccessCount = cosmosTasks.Count(task => !task.Result.IsFaulted) * int.Parse(rcbc.Item2);
List<Task<List<CrmResponse>>> crmTasks = new List<Task<List<CrmResponse>>>();
var failedQueries = cosmosTasks.Where(task => task.Result.IsFaulted).Select(task => task.Result.Query).ToList();
if (failedQueries.Any())
{
await LogInformation(context, logger, $"now trying failed tasks after 10 mins, total failed tasks {failedQueries.Count()}");
await context.CreateTimer(TimeSpan.FromMinutes(10), CancellationToken.None);
var failedCosmosTasks = failedQueries.Select(query => context.CallActivityAsync<CosmosResponse>(nameof(GetCosmoData), query)).ToList();
await Task.WhenAll(failedCosmosTasks);
failedCosmosTasks.ForEach(action: async t =>
{
if (t.Result.TotalRecords.Count > 0)
crmTasks.Add(context.CallActivityAsync<List<CrmResponse>>(nameof(InsertCRM), t.Result.TotalRecords));
else
await LogInformation(context, logger, $"skipping CRM insert because of 0 records");
});
finalResponse.CosmosReadSuccessCount += failedCosmosTasks.Count(task => !task.Result.IsFaulted) * int.Parse(rcbc.Item2);
finalResponse.CosmosReadFailureCount = failedCosmosTasks.Count(task => task.Result.IsFaulted) * int.Parse(rcbc.Item2);
}
Once we complete the read, we aggregate all the returned results in a list and send them to Dataverse, once again in parallel fashion as shown in the final fan-in fan-out image below.
Measuring the performance.
It's time to test the speed of our ADF. to trigger and track the instances we have created a quick Canvas app to launch and monitor progress. the UI isn't great heheh.
25000 Records : 25000 records were inserted in 54 seconds flat. impressive???, let me know in comments.
40000 Records : 40000 records were inserted in 12 mins, does this look more, please remember that we've added a blind 10 mins sleep timer after we get a 429 error. So the effective time here is only 2 mins, which isn't bad, in real life scenarios however, you may have to plan your retry intervals more carefully.
var failedQueries = cosmosTasks.Where(task => task.Result.IsFaulted).Select(task => task.Result.Query).ToList();
if (failedQueries.Any())
{
await LogInformation(context, logger, $"now trying failed tasks after 10 mins, total failed tasks {failedQueries.Count()}");
await context.CreateTimer(TimeSpan.FromMinutes(10), CancellationToken.None);
}
Happy learning!!!