Building a Simple Data Pipeline with Mage: A Beginner's Guide
Rana Sheharyar
Building Data, Analytics, and AI Engineering teams at CYBRNODE | We are hiring! ??
Introduction:
In today's dynamic world of data engineering, tools like Mage are becoming increasingly popular due to their simplicity and versatility in managing data pipelines. Mage provides an intuitive user interface that feels familiar, much like using notebooks, which makes it accessible even to those without deep technical expertise. In this article, we'll take a step-by-step journey to build a basic data pipeline using Mage, starting from extracting data to aggregating and generating reports. So, let's dive in and explore the power of Mage in simplifying the data pipeline process!
Why Choose Mage.ai Over Airflow?
Mage.ai offers a specialized and simplified approach to data pipeline management compared to Airflow. Here's why Mage.ai may be the preferred option:
Getting Started with Mage:
Before we delve into pipeline construction, let's familiarize ourselves with Mage's setup process. Follow these three simple steps to install and start Mage:
I Would Recommend Using Docker:
Managing dependencies manually (e.g. pip or conda) for data integrations can be quite challenging due to their complexity. To simplify this process and ensure a smooth experience, Docker is highly recommended for running Mage.
Why Docker?
Docker offers a lightweight and portable solution for managing dependencies, ensuring consistency across different environments. By encapsulating Mage and its dependencies within Docker containers, you can avoid compatibility issues and streamline the installation process.
Installation using docker: (Recommended)
docker run -it -p 6789:6789 -v $(pwd):/home/src mageai/mageai /app/run_app.sh mage start [project_name]
Installation using pip: (Not Recommended)
pip install mage-ai
mage start [project name]
Building a pipeline
After installation, open https://localhost:6789 in your browser to start building your pipeline.
Running a pipeline
docker run -it -p 6789:6789 -v $(pwd):/home/src mageai/mageai mage run [project_name][pipeline]
Creating a new project
docker run -it -p 6789:6789 -v $(pwd):/home/src mageai/mageai mage init [project_name]
Understanding Mage's Pipeline Blocks:
Mage simplifies pipeline creation through modular blocks, each serving a specific purpose:
领英推荐
Constructing Our Pipeline:
Let's build a simple pipeline to illustrate Mage's capabilities:
Data Loading: We'll start by extracting data from a public API and parsing it into a DataFrame. Here's a snippet of the Data Loader block:
1. io_config.yml Configuration:
2. Batch Pipeline:
Update the requirements.txt file with the necessary libraries.
Write a function called load_soccer_data to fetch data from a remote source or load it from disk.
3. Data Loader:
Test run:
# io_config.yml dev: POSTGRES_CONNECT_TIMEOUT: 10 POSTGRES_DBNAME: "{{ env_var('POSTGRES_DBNAME') }}" POSTGRES_SCHEMA: "{{ env_var('POSTGRES_SCHEMA') }}"POSTGRES_USER: "{{ env_var('POSTGRES_USER') }}" POSTGRES_PASSWORD: "{{ env_var('POSTGRES_PASSWORD') }}" POSTGRES_HOST: "{{ env_var('POSTGRES_HOST') }}"POSTGRES_PORT: "{{ env_var('POSTGRES_PORT') }}"
Data Loader:
Write your code for fetching data from a remote source or loading it from a disk.
import io
import pandas as pd
import requests
from pandas import DataFrame
import soccerdata as sd
from statsbombpy import sb
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test
@data_loader
def load_data_from_api(**kwargs) -> DataFrame:
"""
Template for loading data from API
"""
events = sb.events(match_id=3923881) #Final match of the 2024 AFCON
return events
@test
def test_output(df) -> None:
"""
Template code for testing the output of the block.
"""
assert df is not None, 'The output is undefined'
This code retrieves data from the StatsBomb API regarding the final match of the 2024 AFCON between C?te d’Ivoire and Nigeria, identified by the match_id=3923881. It generates a data frame containing detailed information, comprising 2699 rows and 89 columns.
Data Exporting:
Next, we'll export the raw data DataFrame to a PostgreSQL table using the Data Exporter block. Configuration details are stored in an io_config.yaml file within the project directory.
@transformer
def transform_df(df: DataFrame, *args, **kwargs) -> DataFrame:
"""
This function filters the DataFrame for shots and selects relevant columns.
"""
# Define the columns to keep
columns_to_keep = [
'player', 'team', 'location', 'shot_aerial_won',
'shot_body_part', 'shot_end_location', 'shot_first_time',
'shot_freeze_frame', 'shot_key_pass_id', 'shot_one_on_one',
'shot_outcome', 'shot_statsbomb_xg', 'shot_technique', 'shot_type'
]
# Filter rows where type is 'Shot'
shots_df = df[df['type'] == 'Shot']
# Select the relevant columns
transformed_df = shots_df[columns_to_keep]
return transformed_df
Data Aggregation:
Finally, we'll create aggregated tables for reporting purposes using a Custom block with SQL queries. These queries aggregate data by region and geographical area.
@data_exporter
def export_data_to_postgres(df: DataFrame, **kwargs) -> None:
schema_name = 'statsbomb_data' # Specify the name of the schema to export data to
table_name = 'events_data' # Specify the name of the table to export data to
config_path = os.path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'dev'
with Postgres.with_config(ConfigFileLoader(config_path, config_profile)) as loader:
loader.export(
df,
schema_name,
table_name,
index=False, # Specifies whether to include index in exported table
if_exists='replace', # Specify resolution policy if table name already exists
)
To ensure that all data has been successfully loaded into PostgreSQL, we'll create another data loader block, this time establishing a connection to the PostgreSQL database and selecting the development environment. We can then use a simple SQL command to retrieve the data from the relevant table.
Orchestrating the Pipeline:
Mage simplifies pipeline orchestration with its DAG (Directed Acyclic Graph) representation of blocks. As you add blocks to your pipeline, Mage automatically constructs the DAG, ready for scheduling. Triggers within Mage allow for configuring pipeline execution based on various conditions or time-based schedules.
Conclusion:
In this beginner's guide, we've explored the fundamentals of building a data pipeline using Mage. From installation to pipeline construction and orchestration, Mage streamlines the entire process, empowering users to focus on data insights rather than infrastructure.
While we've covered a basic batch pipeline here, Mage offers a plethora of advanced capabilities for building data integrations, streaming pipelines, and more, awaiting exploration by curious minds in the data engineering community.
CEO @ Mage ??♀? ??
6 个月Subscribed!
CEO @ Mage ??♀? ??
6 个月This is super thorough, so epic ??♂?????????
CEO @ Mage ??♀? ??
6 个月Wow, thank you so much for sharing!
This is amazing!