How Azure Data Factory pipelines can be analysed using Python recursion
Rahul Biswas
Azure Data and AI Architect| Principal Data Engineer| Lakehouse| Databricks| Fabric| MLOPs| DataOPs| Career Coach| Mentor
SPOILER ALERT! This article might technically seem to be more about recursion using Python, than Azure Data Factory per se.
However, recursion can be used to solve interesting problems, and this can have application to Azure Data Factory code analysis. Imagine you've inherited an Azure Data Factory codebase you know next to nothing about. Shit hits the roof in production environment, some table field is not getting populated as expected and all hell breaks loose. How are you going to know which ADF pipeline is responsible for populating the table and that particular field?
Let's think about this for a couple of minutes. Imagine you have downloaded all the code of your ADF pipelines, triggers, datasets, linked services onto your local machine. This isn't difficult to do if you have your ADF integrated with Git, which is a standard best practice anyway. You can just download the ADF repo in your local and to your delight, you shall find the downloaded code has pipelines, triggers, datasets and linked services all neatly classified into their own folders. So there will be a pipelines folder with all your pipelines json code, a triggers folder with all your triggers json code and so on. Every pipeline would have its own json file with the file name as the name of the pipeline. Same goes for other objects, like triggers, datasets and linked services.
Json is just human readable text, so in essence, you can easily search that json codebase for any search text. In this case, our search text can be the table name or the field name. Once you find a json file where that search text occurs, you can look at the contents of the file and figure out how the search text is being referenced in that file. And the file would always belong to and represent some concrete entity in the ADF codebase, be it a pipeline, a trigger, a dataset or a linked service.
Now this is where things get interesting. A table name in ADF code can be within a dataset or it can be in a Copy Activity. A field name would most probably be in a Copy Activity schema mapping part. What if the table name occurs in a dataset? That won't tell you which pipeline is invoking it, does it? To find that, you'd have to search your codebase again with the dataset name now as your search text. Maybe now you shall find the file of the pipeline which uses that dataset. But maybe to understand the whole flow of events, you also need to know if that pipeline is itself a child pipeline being invoked by another parent pipeline. So now you have to search with the pipeline name as the search text.
I think you're slowly getting the idea that in order to discover the entire dependency tree of a search text in a codebase, you kind of have to keep doing the same kind of search activity over and over again, with the parameters to the search activity changing with each iteration. And you don't know at the outset how deep you may have to go to discover the whole chain. If you knew, for example, that you only have to go 2 levels deeps, you could have handled the search using 2 levels of calls to your search routine one after the other.
- 1st level - pass search text as parameter and search the whole codebase. Output may be zero or more file names where the search text occurs. Fun thing about ADF json files is that all of them invariably have a top level name attribute, which denotes the name of the object as is visible in ADF editor. Even more importantly, if a parent ADF object refers to a child ADF object, a pipeline referring to a dataset for example, the parent json file is going to contain the name attribute value of the child file, and not necessarily the file name of the child file per se. In most cases, the file name without the extension should match the name attribute, but I don't want my code to bet on it. So once I find the files where my original search text occurs, I can then extract the name attribute from the json in those files. The name attribute of these files become the search text arguments to my 2nd call to the search routine.
- 2nd level - Get the name attributes found from the first level, loop over each one of them, and again call the search routine, passing the name attributes as search texts one by one. You can, at this point, potentially land up with exactly the same or more number of files as compared to the 1st level. Why exactly the same or more? Why not less? So if my first level call identified N no. of files, why am I saying the second level call will definitely identify at least N no. of files, if not more?
See, here's the thing. The search routine scans through the whole codebase. What this means is - if I take any file, extract its name attribute, and search the whole codebase using that name attribute as search text, I will most definitely stumble upon at least one match - which is that file itself. So if my first level call identifies 5 files where a particular search text occurs, and I extract 5 different name attributes from those 5 files, and in my second level call, I use these 5 name attributes as search texts, at the very minimum, my second level call shall identify those very same 5 files. It can do more, provided any of those files are being referred to by some other file.
Can you mentally imagine the tree-like search path we are getting here yet?
The above is an important thing to grasp. This has implications for the code design. If I have a data structure for storing the file names which occur in the dependency tree of a search text, I have to make sure the data structure doesn't allow repetitions of the same file to be inserted into itself. These repetitions don't add any value to the dependency tree, for one, and secondly, storing these repetitions can open up a can of worms leading to infinite circular references and our overall code endlessly moving in circles with no escape.
But we don't know ahead of time how many levels deep we have to go. All we kind of know is that we can stop searching down a path in the tree whenever a search doesn't yield any more new file names. So, we have a piece of operation that we have to go on doing over and over, varying the arguments to that operation with each invocation, and we don't know how long we have to keep on doing it. We can imagine a tree-like explosion of various pathways our search shall take us down and all we know is how to be sure when one can safely terminate one's search down a branch or path in that tree.
And so we opt for recursion!
Wait what? Isn't that the topic in 1st year college that I could never wrap my head around? Guess it's time to redeem myself by writing my home-grown piece of Python recursion!
My code is going to have 2 inputs-
- The full file path of the folder which contains the ADF codebase I want to examine
- The search text
My output is going to be a tree like depiction of the dependency tree.
And here is the code you've all been waiting for (I hope ;-)).
First, let's do some handy imports and initialize our data structures.
Ok, so below is just the main method where I've hardcoded the 2 inputs I discussed earlier.
directory is the file path of the ADF codebase. phrase is the search text.
Below is my definition of create_recursive_dependency_tree function referred to in the above screenshot. You can ignore the orange text if you want as that's just my comments.
Now coming to find_text_in_json_file definition-
And that's it! I imagine this recursive approach can be slightly altered and adapted to other form of code as well, besides ADF.
For all it's worth, here's the whole code.
import os import json from treelib import Node, Tree dependency_tree={} tree = Tree() def find_text_in_json_file(directory, phrase): """ This function finds all the files containing a search phrase recursively in a directory :paramdirectory: The directory in which recursive search is to be conducted :paramphrase: The search phrase. It must be enclosed in double quotes in a file in order for the file to qualify as a match :return: List of fully qualified jsonfile paths containing search phrase """ files_in_directory=[] files_from_recursive_search=[] for file_or_dirin os.listdir(directory): path = directory + "/" + file_or_dir if os.path.isdir(path): files_from_recursive_search= find_text_in_json_file(path, phrase) else: #errors='ignore' was added to avoid exceptions from opening files if ('\"'+phrase+'\"') in open(path, errors='ignore').read(): if path.endswith('json'): files_in_directory.append(path) files_in_directory.extend(files_from_recursive_search) return files_in_directory def create_recursive_dependency_tree(directory, path): """ This function takes a ADF object (pipeline,trigger,dataset) file path and- a. Extracts top level 'name' property from it b. Creates a dictionary entry in deptreewith key as 'name' extracted above c. Populates the value of the above key as the list of jsonfile paths where 'name' occurs. It finds file paths via find_text_in_json_filefunction :paramdirectory: The directory in which recursive search is to be conducted :parampath: An ADF object (pipeline,trigger,dataset) file path """ # JSON file f = open(path, "r") # Reading from file data = json.loads(f.read()) phrase = data["name"] if phrase not in dependency_tree.keys(): files = find_text_in_json_file(directory, phrase) #dependency_tree[phrase] = [item for item in files if item != path] dependency_tree[path] = [item for item in files if item != path] for element in dependency_tree[path]: tree.create_node(element, element, parent=path) """ What situations can serve as terminating conditions in the recursion? 1. Important point is the 'name' property value shall always find a search hit in at least 1 file, that is, **the one file of which it is the 'name' property** 2. So, when I'm searching for 'name' property and all I come up with is a list containing just 1 member, I don't need to search further from there. That recursion sub-tree terminates there. """ if len(files) != 1 or files[0] != path: for file in files: if file != path: create_recursive_dependency_tree(directory, file) if __name__ == '__main__': directory = 'some file path which contains your ADF codebase' phrase = 'some search phrase' files = find_text_in_json_file(directory, phrase) dependency_tree[phrase] = files tree.create_node(phrase, phrase) for element in files: tree.create_node(element, element, parent=phrase) for file in files: create_recursive_dependency_tree(directory, file) #pretty print the deptreedictionary with indentations and nice formatting #print(json.dumps(dependency_tree, indent=4, sort_keys=True)) tree.show()
Please don't hesitate to comment if you have any questions or feedback about the code.
Adios amigos!
Healthcare Data Consultant & PhD Candidate
3 年Thank you for sharing this!
Partner | Data and AI Product Leader | PhD | Author
4 年Awesome work!