The Intagr8 Framework: Preview

The Intagr8 Framework: Preview

For the past 20+ years I've been consulting in the IT sector in one form or another, having risen through the ranks I've become senior in a number of different disciplines such as Software Development (C#), Database Administration and more recently the cloud (Azure & AWS). This has given me a wide skillset with a unique viewpoint on solutions design. In recent years leading me to design the architecture for a number of solutions for customers ranging from the private sector to the NHS.

As I get more and more customers I realise that a lot of the problems we're trying to solve are the same and often we're building variations on a theme. Yes, each customer has their own unique requirements, but often the basics of a solution require many of the same capabilities with only a fraction of the development being dedicated to the unique requirements of the customer. However the lions share of the development seems to be around building a framework on which to start developing everything else, ensuring the solution is secure and that all the various components work together.

Over the last few years I've built around 5 microservices and events driven architectures, some in java, some in C#, and these have been built under a traditional development agreement. In such an agreement, all the intellectual property for the consultancy work that we do for the customer is owned by that customer and while the specific requirements of the customer are unique, the framework for these is often only unique in the sense that it uses the customers choice of technologies.

This means that we often end up building a similar framework for each customer from scratch so that they own the intellectual property. This is wasteful as we've already written the majority of the code, we could simply extend it to the customers needs.

I've worked in places where this was recognised, so they designed their own framework internally, however they rarely allow the customer access to this so the customer becomes completely reliant on that supplier to do any future work. However, many of my best repeat customers are SME's and don't necessarily have the money to keep me around all of the time and so I will often handover to local teams once I've done my improvements.

The Intagr8 Framework addresses this by creating a standardised approach to certain challenges, specifically surrounding workflow and microservices implementations. This framework is offered open source under the Apache 2.0 licence and we would like to encourage others to contribute to it.

Because of the work that this cuts out in building new products, I'm probably shooting myself in the foot with this, but it is my hope that by building this I can build projects for customers much quicker and offer much better value for money as a result!

Framework Features

Each of the framework features are split into their own class libraries available via nuget, allowing you to include the bits of the framework that you want without the bloat of having unneccessary libaries.

Event Entity Serialization

The core of the intagr8 framework is a the Event Entity Framework Serializer for JSON allowing us to handle derivitives of classes with relative ease.

This can easily be used to serialize and deserialize objects belonging to abstract classes:

var serializer = new EventEntityFrameworkSerializer()
? ? .WithType<Ledger>()
? ? .WithType<Purchase>()
? ? .WithType<Sale>();


var json = serializer.Serialize(ledger);
var deserializedLedger = serializer.Deserialize<Ledger>(json)!;        

It also allows you to tell the system that an entity is an old version that has been replaced and provide upgrade logic:

[UpgradeableModel(typeof(UserV2))
public class User : EventEntity
{
? ? ...
}

public class UserV2 : EventEntity, IUpgradeable
{
? ? ...


? ? public void Upgrade(object previousVersion)
? ? {
? ? ? ? /* Your upgrade logic */
? ? }
}        

Working with external topics

When you work with topics from external systems, the way this is tackled usually varies according to which external system you are using. As a developer, most of the time you don't really care whether it is Kafka, event hubs, service bus, or something else entirely. Your main interest is in reading from the topic, and dequeuing it or updating your checkpoints when you've finished processing the item.

The Intagr8 framework creates a generic methodology for working with consumers and producers, making it easier to work with rather than having to worry about the underlying technology (unless you have a specific need).

Example using Amazon Kinesis:

var topic = new AWSKinesisTopic(kinesisClient, s3Client, streamName, bucketName, bucketPathPrefix, serializer);
var consumer = topic.CreateConsumer()
? ? .WithReceive<SaleV3>(async (consumedEvent) =>
? ? {
? ? ? ? // consumedEvent has a property called Data above is of type SaleV3
? ? ? ? Console.WriteLine(consumedEvent.Data!.Id);


? ? ? ? // This tells the consumer that it is done with this item, in this instance this will update the checkpoint in S3
? ? ? ? await consumedEvent.Metadata!.MarkCompletedAsync();
? ? })
? ? .OnError((message, ex, cancellationToken) =>
? ? {
? ? ? ? // you can choose what to do with exceptions
? ? ? ? throw ex;
? ? });;        

The above code defines a AWSKinesisTopic and a consumer based upon this. The consumer will watch for inbound SaleV3 objects and will process them accordingly.

You can add as many WithReceive<> methods as you need to tell it to handle different types. By default, if it receives a message of an unexpected type it will throw an NotImplementedException exception which can be sent to you can then choose to handle by sending to a poison message queue, or through a process of your choice.

This can be applied to other technologies such as Azure Cosmos:

var topic = new AzureCosmosTopic("UseDevelopmentStorage=true", Guid.NewGuid().ToString());
var consumer = topic.CreateConsumer()
? ? .WithReceive<SaleV3>(async (consumedEvent) =>
? ? {
? ? ? ? Console.WriteLine(consumedEvent.Data!.Id);
? ? ? ? await consumedEvent.Metadata!.MarkCompleteAsync();
? ? })
? ? .OnError((message, ex, cancellationToken) =>
? ? {
? ? ? ? // you can choose what to do with exceptions
? ? ? ? throw ex;
? ? });        

As you can see, the workflow is identical, even though we're using cosmos rather than cosmos. Additional Consumers are expected to be built in due course.

Workflow

Handling workflow can always be a challenge, sometimes this is built into the code itself which makes it very difficult to change. This is sometimes desirable but sometimes you need a bit more flexibility.

The Intagr8 Workflow engine allows you to define workflows either in code, or dynamically if you need to. These workflows can then be used to drive the system based on the criteria of the data that is passed to it.

var workflow = new Intagr8Workflow()
? ? .WithIfElseStep("CheckValidity", "Source.Validity == \"Full\"", "CheckIsValuable", "InvalidEntry")
? ? .WithIfElseStep("CheckIsValuable", "Source.IsValuable", "ConvertToValuableOrderSummary", "NotValuable")
? ? .WithConversionStep("ConvertToValuableOrderSummary", typeof(TestOrder), typeof(TestValuableOrderSummary), conversionProperties, "ConvertToJson")
? ? .WithJsonSerializerStep("ConvertToJson", JsonSerializerStep.JsonSerializerDirection.ToJson, "ConvertFromJson")
? ? .WithJsonSerializerStep("ConvertFromJson", JsonSerializerStep.JsonSerializerDirection.FromJson, "Successful")
? ? .WithWorkflowOutputStep("NotValuable", onNotValuable)
? ? .WithWorkflowOutputStep("Successful", onSuccess)
? ? .WithWorkflowOutputStep("InvalidEntry", onInvalid);        

The above is an example workflow that works as follows:

No alt text provided for this image

You can execute this workflow locally using:

await workflow.ExecuteAsync(instance);        

We are currently planning to extend this to allow you to convert the workflow to either actor instances using Akka.Net or creating container based microservices using kubernetes.

Please note

The workflow engine just provides a framework for the defing the workflow of your services, however to get the most of this you will need to design your own WorkflowStep classes.

API Orchestration

API Orchestration is a common issue in microservices, other services such as Azure API Management service are available, however these often have their own issues and therefore we wanted to create a framework for our own that can be containerised as you see fit.

app.UseApiOrchestrator("/example", exampleYaml, "https://petstore3.swagger.io/api/v3", options, async (request) => {
? ? // you can put in any modifications to the request before it is sent onward here
? ? // this can be useful to add audit, inject authorization headers, etc
});        

The ApiOrchestrator will read from the yaml file provided, it will expose the same API's available on the original and forward the requests to the same api at the path "https://petstore3.swagger.io/api/v3". It will then return the response to the requester.

No alt text provided for this image

The API orchestrator has basic throttling capabilities to throttle inbound messages, the throttlers are designed to be extensible allowing us to add additional throttling components in future with relative ease.

It can also be wrapped with authentication modules in order to ensure that only authenticated users can access the API's, however this will need to be implemented by yourself according to your own authentication requirements.

Kubernetes Interaction

The class libraries for interacting with Kubernetes on .net can leave a little to be desired, especially with regards to how they handle custom resources. We've therefore built our own extensions for the K8s library and will be looking to contribute some of this into the main K8s project in due course.

We can define custom resources as follows:

[KubernetesCustomResource("test.intagr8.io", "CustomNamespacedClass", "v1")]
public class CustomNamespacedClass
{
? ? [KubernetesNamespaceProperty]
? ? [KubernetesPropertyDescription("Namespace for the resource in kubernetes")]
? ? public string? Namespace { get; set; };


? ? [KubernetesNameProperty]
? ? [KubernetesPropertyDescription("Name for the resource in kubernetes")]
? ? public string? Name { get; set; };
}        

This creates a basic class called CustomNamespacedClass it uses the attribute KubernetesNamespaceProperty and KubernetesNameProperty to determine which of these fields to use for the Namespace and for the Name of the item to populate the metadata. The KubernetesCustomResource attribute provides the ApiGroup, Kind and Version fields. The optional KubernetesPropertyDescription allow the CRD to be populated with a description.

This can be exported to a Kubernetes custom resource definition using the CRD builder:

var crd = KubernetesCustomResourceDefinitionBuilder.Generate(typeof(CustomNamespacedClass));        

When serialized this will give you:

{
? "apiVersion": "apiextensions.k8s.io/v1",
? "kind": "CustomResourceDefinition",
? "metadata": {
? ? "name": "customnamespacedclass.test.intagr8.io"
? },
? "spec": {
? ? "group": "test.intagr8.io",
? ? "names": {
? ? ? "categories": [],
? ? ? "kind": "CustomNamespacedClass",
? ? ? "plural": "customnamespacedclasses",
? ? ? "singular": "customnamespacedclass"
? ? },
? ? "scope": "Namespaced",
? ? "versions": [
? ? ? {
? ? ? ? "name": "v1",
? ? ? ? "schema": {
? ? ? ? ? "openAPIV3Schema": {
? ? ? ? ? ? "properties": {
? ? ? ? ? ? ? "spec": {
? ? ? ? ? ? ? ? "properties": {
? ? ? ? ? ? ? ? ? "namespace": {
? ? ? ? ? ? ? ? ? ? "description": "Namespace for the resource in kubernetes",
? ? ? ? ? ? ? ? ? ? "type": "string"
? ? ? ? ? ? ? ? ? },
? ? ? ? ? ? ? ? ? "name": {
? ? ? ? ? ? ? ? ? ? "description": "Name for the resource in kubernetes",
? ? ? ? ? ? ? ? ? ? "type": "string"
? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? },
? ? ? ? ? ? ? ? "type": "object"
? ? ? ? ? ? ? }
? ? ? ? ? ? },
? ? ? ? ? ? "type": "object"
? ? ? ? ? }
? ? ? ? },
? ? ? ? "served": true,
? ? ? ? "storage": true
? ? ? }
? ? ]
? }
}        

As you can see it's fully built the custom resource definition, and you should be able to apply this against your server.

var k8sClient = new KubernetesExtended(config, client, new DelegatingHandler[] { });
var customNamespacedClassClient = new NamespacedCustomResourceClient<CustomNamespacedClass>(k8sClient, namespace);        

You can see here that we've got our own extended version of the client, this is because there are certain things, such as the HTTP client which are not exposed to the outside world.

Unlike the standard client, we register the provider against a specific namespace, this makes it easier to work with as we can create new clients for different namespaces but we are capable of reading in the items in a more generic fashion.

var list = customNamespacedClassClient.ListAsync();        

We can also do deletes, patches and replacements using the appropriate methods. In addition to this we can watch the resources using the WatchAsync command.

genericClient.Watch(
? ? onEvent: (eventType, evt) =>
? ? {
? ? ? ? switch (eventType)
? ? ? ? {
? ? ? ? ? ? case WatchEventType.Added:
? ? ? ? ? ? ? ? // what to do if it was added
? ? ? ? ? ? ? ? break;


? ? ? ? ? ? case WatchEventType.Modified: 
? ? ? ? ? ? ? ? // what to do if it was modified
? ? ? ? ? ? ? ? break;


? ? ? ? ? ? case WatchEventType.Deleted:
? ? ? ? ? ? ? ? // what to do if it was deleted
? ? ? ? ? ? ? ? break;


? ? ? ? ? ? case WatchEventType.Error:
? ? ? ? ? ? ? ? // what to do if it was an error from the client
? ? ? ? ? ? ? ? break;


? ? ? ? ? ? case WatchEventType.Bookmark:
? ? ? ? ? ? ? ? // what to do if it was a bookmark 
? ? ? ? ? ? ? ? break;
? ? ? ? }
? ? },
? ? onError: (ex) =>
? ? {
? ? ? ? // what to do if there is an error calling the client or in serialization, etc
? ? ? ? throw;
? ? }
? ? );        

Resilience

The resilience libraries provide classes to improve how it interacts with external services, allowing us to perform exponential backoff for any function.


var retryHandler = new RetryHandler();
retryHandler.MaxRetries = 5;
retryHandler.AllowedDelta = TimeSpan.FromMilliseconds(10);
retryHandler.StartingBackOff = TimeSpan.FromMilliseconds(100);
var result = await retryHandler.RunAsync<string>(async () => {
? ? // what to run
? ? });        

you can also run without a need for an output.

await retryHandler.RunAsync(async () => {
? ? // what to run
? ? });         

This is just a starting point for the framework, we are hoping to grow this massively over the coming months and would welcome anyone who would like to contribute to it.

要查看或添加评论,请登录

Shaun Turner的更多文章

社区洞察

其他会员也浏览了