Automate Your Microsoft Fabric Workloads with Apache Airflow

Automate Your Microsoft Fabric Workloads with Apache Airflow

Automation plays a crucial role in streamlining processes and maximizing efficiency in data engineering. One of the most powerful combinations for automating data workflows is using Microsoft Fabric alongside Apache Airflow.

In this blog post, I’ll guide you through the steps to run Microsoft Fabric jobs using Apache Airflow. After struggling for two weeks to successfully run my first Airflow job on a Fabric item, I am excited to share the steps that ultimately led to success. I hope this guide will be useful for someone facing similar challenges.

What You Will Learn:

  1. How to enable Apache Airflow Job in your tenant
  2. Setting up the Apache Airflow plugin
  3. Creating an Airflow connection for Microsoft Fabric
  4. Building a Directed Acyclic Graph (DAG) to trigger Fabric jobs
  5. Monitoring your DAG runs


Prerequisites

Before we dive into the implementation, make sure you have the following prerequisites:

1. Enable Apache Airflow Job in Your Tenant:

The Apache Airflow Job feature is currently in preview. To enable it, your tenant admin needs to navigate to Admin Portal > Tenant Settings > Microsoft Fabric and apply the setting under the "Users can create and use Apache Airflow Job (preview)" section.

2. Create a Microsoft Entra ID App:

Create a Microsoft Entra ID app if you don’t have one. Ensure your tenant admin has enabled "Allow user consent for apps", and add these API permission scopes to the app.

3. Add Your Service Principal as a Contributor:

Add your app service principal as a "Contributor" in your Microsoft Fabric workspace to grant the necessary permissions.

4. Obtain a Refresh Token for Authentication:

Generate a refresh token that allows the Airflow connection to access Microsoft Fabric. When creating your Microsoft Entra ID app, make sure to set the following required scopes:

https://api.fabric.microsoft.com/Item.Execute.All https://api.fabric.microsoft.com/Item.ReadWrite.All https://api.fabric.microsoft.com/Workspace.ReadWrite.All offline_access openid profile

Follow the Microsoft documentation to get your refresh token or replicate this code in a Python environment with your credentials:

import requests
import webbrowser

# Replace these with your actual credentials
tenant_id = "<YOUR_TENANT_ID>"
client_id = "<YOUR_CLIENT_ID>"
client_secret = "<YOUR_CLIENT_SECRET>"
redirect_uri = "https://login.microsoftonline.com/common/oauth2/nativeclient"
scopes = "https://api.fabric.microsoft.com/Item.Execute.All https://api.fabric.microsoft.com/Item.ReadWrite.All https://api.fabric.microsoft.com/Workspace.ReadWrite.All offline_access openid profile"

# Parameters for the authorization request
params = {
    "client_id": client_id,
    "response_type": "code",
    "redirect_uri": redirect_uri,
    "response_mode": "query",
    "state": "12345",
    "scope": scopes,
    "client_secret": client_secret,
}

# Construct the full authorization URL
base_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/authorize"
auth_url = requests.Request('GET', base_url, params=params).prepare().url

# Output the full authorization URL
print("Authorization URL:", auth_url)

# Optionally open the URL in the default web browser
webbrowser.open(auth_url)        

Make sure to replace <YOUR_TENANT_ID>, <YOUR_CLIENT_ID>, and <YOUR_CLIENT_SECRET> with your actual credentials when using the code.

from urllib.parse import urlparse, parse_qs

# Your URL
url = """
replace with the url from the browser window.
"""

# Parse the URL
parsed_url = urlparse(url)

# Extract the query parameters
query_params = parse_qs(parsed_url.query)

# Get the 'code' value
auth_code = query_params.get('code', [None])[0]

#print(auth_code)

token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"

# Prepare the data for the token request
data = {
    'client_id': client_id,
    'client_secret': client_secret,
    'grant_type': 'authorization_code',
    'code': auth_code,
    'redirect_uri': redirect_uri,
    "scope": scopes
}

# Send the POST request to get tokens
response = requests.post(token_url, data=data)

# Check the response
if response.status_code == 200:
    tokens = response.json()
    access_token = tokens['access_token']
    refresh_token = tokens['refresh_token']
    #print("Access Token:", access_token)
    #print("Refresh Token:", refresh_token)
    print(refresh_token)
else:
    print("Failed to get tokens:", response.status_code)
    print("Error:", response.json())        

5. Enable Triggers in Apache Airflow Job:

Enable triggers in your Apache Airflow Job to allow the usage of deferrable operators, enhancing your workflow capabilities.


Enable Triggers in Apache Airflow Job

Apache Airflow Plugin

To trigger an on-demand Microsoft Fabric item run, we will use the apache-airflow-microsoft-fabric-plugin, which is pre-installed in the Apache Airflow job requirements.

Create an Airflow Connection for Microsoft Fabric

  1. Navigate to Airflow Connections:
  2. Add a New Connection:

Click on "Add Connection" and configure it as follows:

  • Connection ID: Choose a meaningful name (e.g., fabric_conn_id).
  • Connection Type: Select Generic.
  • Login: Enter the Client ID of your service principal.
  • Password: Paste the refresh token obtained via Microsoft OAuth 2.0 from the code above. (prerequisites 4.)
  • Extra: (Make sure to replace <YOUR_TENANT_ID> and <YOUR_CLIENT_SECRET> with your actual values when using this JSON.)

{
  "tenantId": "<YOUR_TENANT_ID>",
  "client_secret": "<YOUR_CLIENT_SECRET>",
  "scopes": "https://api.fabric.microsoft.com/Item.Execute.All https://api.fabric.microsoft.com/Item.ReadWrite.All https://api.fabric.microsoft.com/Workspace.ReadWrite.All offline_access openid profile"
}        

Create a DAG to Trigger Microsoft Fabric Job Items

Now that you have your connection set up, it’s time to create a DAG file in the dags folder within Fabric managed storage:

  1. Create a New DAG File: Create a new Python file (e.g., run_fabric_item_dag.py) in the dags folder and add the following content:

from airflow import DAG
from datetime import datetime
from apache_airflow_microsoft_fabric_plugin.operators.fabric import FabricRunItemOperator
from airflow.hooks.base_hook import BaseHook

workspace_id="<workspace_id>"

with DAG(
    dag_id="Run_Fabric_Items",
    schedule_interval=None,
    start_date=datetime(2023, 8, 7),
    catchup=False,
    concurrency=20,
) as dag:

    run_notebook = FabricRunItemOperator(
        task_id="run_fabric_notebook",
        fabric_conn_id="fabric_conn",
        workspace_id=workspace_id,
        item_id="<item1_id>",
        job_type="RunNotebook",
        wait_for_termination=True,
        deferrable=False,
    )

    run_pipeline = FabricRunItemOperator(
        task_id="run_fabric_pipeline",
        fabric_conn_id="fabric_conn",
        workspace_id=workspace_id,
        item_id="<item2_id>",
        job_type="Pipeline",
        wait_for_termination=True,
        deferrable=False,
    )
    
    run_notebook>>run_pipeline        

Replace <workspace_id> and <item_id> with the appropriate values for your use case. You can find them from the URL for a Fabric item.

Fabric Workspace & Item IDs

Create a Plugin File for the Custom Operator

If you want to include an external monitoring link for Microsoft Fabric item runs, create a plugin file in the plugins folder:

  1. Create a New Plugin File: Create a new Python file (e.g., fabric_plugin.py) in the plugins folder with the following content:

from airflow.plugins_manager import AirflowPlugin
from apache_airflow_microsoft_fabric_plugin.hooks.fabric import FabricHook
from apache_airflow_microsoft_fabric_plugin.operators.fabric import FabricRunItemLink

class AirflowFabricPlugin(AirflowPlugin):
    name = "fabric_plugin"
    operator_extra_links = [FabricRunItemLink()]
    hooks = [FabricHook]        

Monitor Your DAG

Now that your DAG is created, you can monitor its execution In Apache Airflow Job UI. Go to the Airflow UI and select your created DAG. If you add the plugin, you’ll see an external monitoring link to navigate directly to the item run.

DAG Graph


DAG Gannt

Xcom Integration (Optional):

Trigger the DAG to view task outputs in the Xcom tab for better visibility and debugging.


By automating your Microsoft Fabric workloads with Apache Airflow, you can streamline your data processes and focus on deriving insights rather than managing workflows manually. After overcoming the challenges of getting my first Airflow job running, I encourage you to explore these powerful automation capabilities. The combination of Microsoft Fabric and Apache Airflow opens up new possibilities for managing and executing complex data workflows efficiently.

Feel free to reach out if you have any questions or would like to share your own experiences with Airflow and Fabric!

If you found this guide helpful, connect with me for more insights on data engineering and automation tips! :)

Bj?rn Kvisli

Lead Consultant at Evidi

4 个月

Nice one, Sai

Vitalija Bartusevi?iūt?

Senior Consultant - Data Engineer | PhD Candidate in Evolutionary Biology

5 个月

Great content, Sai Prudhvi Neelakantam ! ????

Julia F?rde

Databricks MVP | Lead Consultant | Architect / Senior Data engineer

5 个月

Well done Sai! I know you spent some time on this One ??????

要查看或添加评论,请登录

Sai Prudhvi Neelakantam的更多文章

社区洞察

其他会员也浏览了