How to launch multiple databricks notebooks using Databricks API calls
The explosive Databricks - Python combination

How to launch multiple databricks notebooks using Databricks API calls

Databricks is a fascinating tool which boosts a plethora of features and capabilities. Amongst this depth of capabilities is the ability of parallel processing offered by the spark engine. However, often times in data engineering, one might want to push limits and extend this capability into launching multiple Databricks notebooks to run in parallel from another Databricks notebook .

There are probably many ways to do this, but one way is to call the notebook you want to run in multiple parallel instances from another notebook which will make calls to the Databricks Jobs API and kick-off the jobs on a given Databricks cluster.

A common use case of this is when one wants to consume messages from a Kafka topic, process them and write them to a storage location (as delta tables or parquet files). Using this technic, each notebook being executed becomes a Kafka consumer. This is especially useful when one is consuming thousands to millions of messages in a short space of time. Each message will be consumed exactly once (this is possible due to the Kafka consumer group).

The calling of a notebook from another notebook for "notebook parallelism" can be executed using a python class. Depending on the use case, you might want to pass different parameters to each “Worker” notebook from the “Boss” notebook. The use case of passing parameters will be explored in detail in a future article..

The sequence of events is as illustrated in the diagram below.

1.??????? The “Boss” notebook is launched. This could be a manual launch in the Databricks workspace, or the notebook could be part of a data pipeline and triggered from an Azure Data Pipeline for example.

2.??????? Once launched, the “Boss” notebook makes calls to the Databricks API and launches multiple “Worker” notebooks as per the desired number of parallel notebooks.

3.??????? “Worker” notebooks are launched and run in parallel. Each notebook run will be assigned a JobID. As noted in the diagram, its important to note that as each notebook instance executes, it will still have multiple spark jobs running in parallel.

The process flow

Note: You need to ensure that your cluster can handle multiple job runs. Your cluster should have enough computing resources to execute the job. This is especially important for memory intensive tasks.

Now let’s move on to the actual code. I will not focus much on actual coding principles, rather we will focus on the practical usage of the code. That being said, there is a few things the reader might want to be aware of in python in order to fully understand the code.

Note that l said to understand not to use the code. To use the code is quite simple and an example will be offered. Key python concepts: Classes and Object-Oriented Programming, Dunder methods, doc-strings, f-strings, and some basic error handling.

The code below consists of a class which can be used to launch multiple notebooks. I recommended you view the code on the Github link.

https://github.com/jp0793/public_code_shares/blob/main/DatabricksParallelJobsLaunch

Should you not be too fond of clicking links, you can view the code here (though Linkedin's indentation is quite shocking)

import requests
import json


class DatabricksParallelJobsLaunch:
    """
    This class contains methods to create and submit parallel jobs of a given Databricks          notebook onto a Databricks cluster.

    :param notebook_path: This is the path to the Databricks notebook you want to run in parallel
    :param databricks_token: This is the databricks token to authenticate into the Databricks workspace
    :param databricks_instance: This is the databricks instance id
    :param databricks_cluster_id: This is the cluster id of the databricks cluster which will run the jobs
    :param databricks_instance_id: This is the databricks workspace instance id
    :param num_of_parallel_instances: The number of desired parallel runs
    """

    def __init__(self, databricks_instance: str, databricks_token: str, databricks_instance_id: str,
                 databricks_cluster_id: str, notebook_path: str, num_of_parallel_instances: int):

        """
        Class initiation method
        """

        self.notebook_path = notebook_path
        self.databricks_token = databricks_token
        self.databricks_instance = databricks_instance
        self.databricks_cluster_id = databricks_cluster_id
        self.databricks_instance_id = databricks_instance_id
        self.num_of_parallel_instances = num_of_parallel_instances

    def __str__(self) -> str:

        """
        Dunder method for an end user-friendly string of the class object description
        """

        return f"DatabricksParallelJobsLaunch: notebook_path:{self.notebook_path}, databricks_token: {self.databricks_token}, databricks_instance: {self.databricks_instance}, databricks_cluster_id: {self.databricks_cluster_id}, databricks_instance_id: {self.databricks_instance_id}, number_of_parallel_instances: {self.num_of_parallel_instances}"

    def __repr__(self) -> str:

        """
        Dunder method for a programmer-friendly description of the class objec
        """

        class_name = type().__name__

        return f"{class_name}, notebook_path:{self.notebook_path}, databricks_token: {self.databricks_token}, databricks_instance: {self.databricks_instance}, databricks_cluster_id: {self.databricks_cluster_id}, databricks_instance_id: {self.databricks_instance_id}, number_of_parallel_instances: {self.num_of_parallel_instances}"

    def __create_jobs_run(self):

        """
        This method creates the job request to be sent to the API. This is a private method thus not meant to be exposed outside the class. Private methods are named with a preceding "__" in the same manner as Dunder methods.

        Return: Response data from the API call
        """

        url = f"{self.databricks_instance}/api/2.1/jobs/runs/submit"
        headers = {"Authorization": f"Bearer {self.databricks_token}"}
        data = {
            "run_name": f"KafkaConsumer-{self.databricks_instance_id}",
            "existing_cluster_id": self.databricks_cluster_id,
            "notebook_task": {
                "notebook_path": self.notebook_path,
                "base_parameters": {
                    "instance_id": str(self.databricks_instance_id)
                }
            }
        }

        try:
            # Call the API
            response = requests.post(url, headers=headers, data=json.dumps(data))

            # Print the full response for debugging purposes
            print("Response Status Code:", response.status_code)
            print("Response JSON:", response.json())

            # Parse the response
            response_data = response.json()

            # Return response data
            return response_data

        except AttributeError as e:
            print("API call failed due to error {e}")

    def submit_jobs_in_parallel(self):

        """
        This method calls the method 'create_job_runs' and submits the jobs in parallel as per the desired number of parallel instances. It also prints the returned 'job-id' for each successful submission
        Return: None
        """

        try:
            # Submit jobs in parallel
            job_responses = [self.create_jobs_run() for i in range(self.num_of_parallel_instances)]

            # Print job run ids
            for response in job_responses:
                print(f"Job Run ID: {response.get('run_id')}")

        except AttributeError as e:
            print(f"Failed due to error : {e}")
        

To find the different parameters, see below

Find the notebook path:

First go to the workspace where your notebook is saved. Then follow the steps below:

1.??????? Click on the 3 “dots” as shown in Figure 2 below.

2.??????? Click on “Copy URL/path”

3.??????? Select “Full path”

Upon executing step 3, you will be presented with a URL path you can copy and use in your code.

Finding the worker Notebook URL


The Databricks token:

This link from Databricks shows how to obtain one Databricks token:

https://docs.databricks.com/en/dev-tools/auth/pat.html

Databricks instance, cluster id, instance id:

The link below from Databricks shows how one can access the Databricks instance, cluster id and instance id

https://learn.microsoft.com/en-us/azure/databricks/workspace/workspace-details

Number of parallel instances

This number depicts how many worker notebooks are going to be launched in parallel. The thing to note is that your cluster should have enough resources in order for you to achieve the desired number of parallelism.

Finally, this is how one would use the class:

Note: In practice it is not advisable to pass the parameters in plain text. One would use a key vault or alternatively environmental variables so that sensitive information like the Databricks token is not easily accessible

Calling the class


?

要查看或添加评论,请登录

Prince Baloyi的更多文章

  • API requests with Rate Limit in Python

    API requests with Rate Limit in Python

    This is a short article to detail how one can apply API rate limiting in python using the ratelimit library and to be…

    2 条评论

社区洞察

其他会员也浏览了