Automate Your Microsoft Fabric Workloads with Apache Airflow
Sai Prudhvi Neelakantam
????Data Engineer @Evidi | ??? Building Scalable Data & AI Solutions | ?? Microsoft Fabric, PowerBI, Databricks, ETL/ELT, Azure, Azure AI | ?? Top 3% of Most Active Speakers 2024
Automation plays a crucial role in streamlining processes and maximizing efficiency in data engineering. One of the most powerful combinations for automating data workflows is using Microsoft Fabric alongside Apache Airflow.
In this blog post, I’ll guide you through the steps to run Microsoft Fabric jobs using Apache Airflow. After struggling for two weeks to successfully run my first Airflow job on a Fabric item, I am excited to share the steps that ultimately led to success. I hope this guide will be useful for someone facing similar challenges.
What You Will Learn:
Prerequisites
Before we dive into the implementation, make sure you have the following prerequisites:
1. Enable Apache Airflow Job in Your Tenant:
The Apache Airflow Job feature is currently in preview. To enable it, your tenant admin needs to navigate to Admin Portal > Tenant Settings > Microsoft Fabric and apply the setting under the "Users can create and use Apache Airflow Job (preview)" section.
2. Create a Microsoft Entra ID App:
Create a Microsoft Entra ID app if you don’t have one. Ensure your tenant admin has enabled "Allow user consent for apps", and add these API permission scopes to the app.
3. Add Your Service Principal as a Contributor:
Add your app service principal as a "Contributor" in your Microsoft Fabric workspace to grant the necessary permissions.
4. Obtain a Refresh Token for Authentication:
Generate a refresh token that allows the Airflow connection to access Microsoft Fabric. When creating your Microsoft Entra ID app, make sure to set the following required scopes:
https://api.fabric.microsoft.com/Item.Execute.All https://api.fabric.microsoft.com/Item.ReadWrite.All https://api.fabric.microsoft.com/Workspace.ReadWrite.All offline_access openid profile
Follow the Microsoft documentation to get your refresh token or replicate this code in a Python environment with your credentials:
import requests
import webbrowser
# Replace these with your actual credentials
tenant_id = "<YOUR_TENANT_ID>"
client_id = "<YOUR_CLIENT_ID>"
client_secret = "<YOUR_CLIENT_SECRET>"
redirect_uri = "https://login.microsoftonline.com/common/oauth2/nativeclient"
scopes = "https://api.fabric.microsoft.com/Item.Execute.All https://api.fabric.microsoft.com/Item.ReadWrite.All https://api.fabric.microsoft.com/Workspace.ReadWrite.All offline_access openid profile"
# Parameters for the authorization request
params = {
"client_id": client_id,
"response_type": "code",
"redirect_uri": redirect_uri,
"response_mode": "query",
"state": "12345",
"scope": scopes,
"client_secret": client_secret,
}
# Construct the full authorization URL
base_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/authorize"
auth_url = requests.Request('GET', base_url, params=params).prepare().url
# Output the full authorization URL
print("Authorization URL:", auth_url)
# Optionally open the URL in the default web browser
webbrowser.open(auth_url)
Make sure to replace <YOUR_TENANT_ID>, <YOUR_CLIENT_ID>, and <YOUR_CLIENT_SECRET> with your actual credentials when using the code.
from urllib.parse import urlparse, parse_qs
# Your URL
url = """
replace with the url from the browser window.
"""
# Parse the URL
parsed_url = urlparse(url)
# Extract the query parameters
query_params = parse_qs(parsed_url.query)
# Get the 'code' value
auth_code = query_params.get('code', [None])[0]
#print(auth_code)
token_url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
# Prepare the data for the token request
data = {
'client_id': client_id,
'client_secret': client_secret,
'grant_type': 'authorization_code',
'code': auth_code,
'redirect_uri': redirect_uri,
"scope": scopes
}
# Send the POST request to get tokens
response = requests.post(token_url, data=data)
# Check the response
if response.status_code == 200:
tokens = response.json()
access_token = tokens['access_token']
refresh_token = tokens['refresh_token']
#print("Access Token:", access_token)
#print("Refresh Token:", refresh_token)
print(refresh_token)
else:
print("Failed to get tokens:", response.status_code)
print("Error:", response.json())
5. Enable Triggers in Apache Airflow Job:
Enable triggers in your Apache Airflow Job to allow the usage of deferrable operators, enhancing your workflow capabilities.
领英推荐
Apache Airflow Plugin
To trigger an on-demand Microsoft Fabric item run, we will use the apache-airflow-microsoft-fabric-plugin, which is pre-installed in the Apache Airflow job requirements.
Create an Airflow Connection for Microsoft Fabric
Click on "Add Connection" and configure it as follows:
{
"tenantId": "<YOUR_TENANT_ID>",
"client_secret": "<YOUR_CLIENT_SECRET>",
"scopes": "https://api.fabric.microsoft.com/Item.Execute.All https://api.fabric.microsoft.com/Item.ReadWrite.All https://api.fabric.microsoft.com/Workspace.ReadWrite.All offline_access openid profile"
}
Create a DAG to Trigger Microsoft Fabric Job Items
Now that you have your connection set up, it’s time to create a DAG file in the dags folder within Fabric managed storage:
from airflow import DAG
from datetime import datetime
from apache_airflow_microsoft_fabric_plugin.operators.fabric import FabricRunItemOperator
from airflow.hooks.base_hook import BaseHook
workspace_id="<workspace_id>"
with DAG(
dag_id="Run_Fabric_Items",
schedule_interval=None,
start_date=datetime(2023, 8, 7),
catchup=False,
concurrency=20,
) as dag:
run_notebook = FabricRunItemOperator(
task_id="run_fabric_notebook",
fabric_conn_id="fabric_conn",
workspace_id=workspace_id,
item_id="<item1_id>",
job_type="RunNotebook",
wait_for_termination=True,
deferrable=False,
)
run_pipeline = FabricRunItemOperator(
task_id="run_fabric_pipeline",
fabric_conn_id="fabric_conn",
workspace_id=workspace_id,
item_id="<item2_id>",
job_type="Pipeline",
wait_for_termination=True,
deferrable=False,
)
run_notebook>>run_pipeline
Replace <workspace_id> and <item_id> with the appropriate values for your use case. You can find them from the URL for a Fabric item.
Create a Plugin File for the Custom Operator
If you want to include an external monitoring link for Microsoft Fabric item runs, create a plugin file in the plugins folder:
from airflow.plugins_manager import AirflowPlugin
from apache_airflow_microsoft_fabric_plugin.hooks.fabric import FabricHook
from apache_airflow_microsoft_fabric_plugin.operators.fabric import FabricRunItemLink
class AirflowFabricPlugin(AirflowPlugin):
name = "fabric_plugin"
operator_extra_links = [FabricRunItemLink()]
hooks = [FabricHook]
Monitor Your DAG
Now that your DAG is created, you can monitor its execution In Apache Airflow Job UI. Go to the Airflow UI and select your created DAG. If you add the plugin, you’ll see an external monitoring link to navigate directly to the item run.
Xcom Integration (Optional):
Trigger the DAG to view task outputs in the Xcom tab for better visibility and debugging.
By automating your Microsoft Fabric workloads with Apache Airflow, you can streamline your data processes and focus on deriving insights rather than managing workflows manually. After overcoming the challenges of getting my first Airflow job running, I encourage you to explore these powerful automation capabilities. The combination of Microsoft Fabric and Apache Airflow opens up new possibilities for managing and executing complex data workflows efficiently.
Feel free to reach out if you have any questions or would like to share your own experiences with Airflow and Fabric!
If you found this guide helpful, connect with me for more insights on data engineering and automation tips! :)
Lead Consultant at Evidi
4 个月Nice one, Sai
Senior Consultant - Data Engineer | PhD Candidate in Evolutionary Biology
5 个月Great content, Sai Prudhvi Neelakantam ! ????
Databricks MVP | Lead Consultant | Architect / Senior Data engineer
5 个月Well done Sai! I know you spent some time on this One ??????