How to create a simple and powerful data pipeline in Google Cloud.
How to create a simple and powerful data pipeline using only Google Cloud managed services.

How to create a simple and powerful data pipeline in Google Cloud.

In this post, we’ll learn how to create a simple and powerful data pipeline using only Google Cloud managed services. You'll only need Python and SQL basics to make it!

Currently, there are many tools that can be used to build data pipelines. Some of them are full-customized, others are fully-managed and expensive, and another requires specific language expertise.

Besides all the options on the market if you choose a Cloud solution you are on the way to success.?Data?and?AI?are very tied to cloud solutions nowadays because it's?scalable,?fast, and if well-developed,?cost-effective.

Simplicity, carried to an extreme, becomes elegance.

The solution described in this article can be used successfully as a starting point for complex data pipelines or to solve specific?ELTs?jobs. No?dataframes?or?collections?here. You can start with basic Python and SQL languages in Google Cloud.

Problem to be solved

Suppose you have to ingest employee information in a Big Query table. You want that every time a?.csv?file is placed in a Cloud Storage bucket it automatically uploads the information to the final table and processes it accordingly based on rules on employee ID.

id,name,birth date,occupation,gende
1,John,05/23/1985,Doctor,M
2,Mary,09/12/1992,Engineer,F
3,Joe,02/08/1978,Lawyer,M
4,Anna,07/31/1989,Programmer,F
5,Paul,11/17/1996,Student,M
6,Renata,04/02/1981,Nurse,F
7,Luke,08/19/1993,Architect,M
8,Fernanda,12/27/1975,Psychologist,F
9,Gus,03/14/1990,Veterinarian,M
10,Julie,06/21/1982,Programmer,F
11,Raphael,10/09/1995,Student,M
12,Carol,01/25/1987,Journalist,F
13,Matthew,05/07/1979,Entrepreneur,M
14,Bruna,08/01/1994,Designer,F
15,Fabio,12/18/1986,Pharmacist,M
16,Camille,02/06/1980,Teacher,F
17,Andrew,06/22/1991,Engineer,M
18,Maryann,11/13/1983,Psychiatrist,F
19,Pete,03/30/1998,Student,M
20,Luana,07/15/1976,Doctor,Fr        

If, for instance, we upload a file with a line changed (employee id 20)

  • name: Luana
  • birth_date: 07/15/1976
  • occupation:?Teacher
  • gender: F

Look that just the?occupation?changed and now we need to update the specific employee row to reflect this. This is our pipeline goal; when a file is loaded we update existing employee's information, and, if an employee doesn't exist create an entry for it.

N?o foi fornecido texto alternativo para esta imagem
Pipeline Overview

  1. When a file is placed in Google Cloud Storage (GCS) it triggers a cloud function passing the GCS bucket and file name as parameters.
  2. The raw information from the file is loaded in the?raw_employee?Big Query table.
  3. A deduplication SQL is used on top of?raw_employee?to merge data from the new file with existing rows on the final?employee?table.
  4. The final?employee?table is saved with up-to-date information.

Benefits of the solution

  • Since both Cloud Function and Big Query are fully managed you don't need to worry about configuring any infrastructure. Also, the solution is Big Data ready.
  • Compared to other pipeline options in Google Cloud: Composer + DataFlow or DataProc this simple pipeline is more cost-effective and simpler as a starting point.
  • You can visualize Big Query data in DataStudio, or feed machine learning models hosted in GCP.

Implementation

  1. Create a cloud storage bucket for?input files?where we are going to place the files we want to ingest. In Google Cloud, go to?Cloud Storage?→?Bucket?→?Create.?You can name as you want and keep the default options. Keep note of the region you choose, you'll need it later. If you are not sure which region to use I recommend set (US?multiple regions in the United States).
  2. Create another?Cloud Storage bucket that will be used as a temporary resource for Big Query jobs, using the same region. You can name it temp_[GCP project ID].
  3. Now, in the same Google Cloud project go to?Cloud Functions?(You can find it in the search bar). Enable any required API if prompted to. Click on?Create Function.
  4. In?Configuration?→?Basics?you should set up the Cloud Function as in the image below. Just be aware that the cloud function?region?should be?the same as the?cloud storage?bucket created in the first step.

N?o foi fornecido texto alternativo para esta imagem

4. Hit the "Add Eventarc Trigger". Set the event provider as "Cloud Storage" and the event as?"google.cloud.storage.object.v1.finalized",?then choose the?input file bucket?and finally "Save trigger". Now, every time a file is created inside the bucket the cloud function runs. It may popup approval for some APIs, hit "enable". Finally, hit the "next" button.

N?o foi fornecido texto alternativo para esta imagem

5. Under runtime select "Python 3.11" and fill entry point as "main". Then on the left panel let's create each file listed, you can find the code for copy/paste below. Replace where necessary with your?Google Cloud Project ID?and?Big Query?dataset.

  • main.py (Already exists by default we are going to replace its content)
  • requirements.txt (Already exists by default we are going to replace its content)
  • merge.sql (hit the cross sign to create the file)

When you finish it hit "Deploy". It will take some minutes after the conclusion.

N?o foi fornecido texto alternativo para esta imagem

Replace the variables at lines?10, 11,?and?46?in?main.py. Before testing let's discuss some points in the code.

  • main.py — line 34:?The cloud function is triggered every time a file is placed in the bucket (This is the default behavior for Google Cloud) but we want to filter the files we are going to process. We can use a?regex?to filter the ones we want to process or stop otherwise. In our scenario, the files should match the format?sample_[date].csv?(line 13)
  • main.py — line 40:?We use the?skip_leading_rows=1?option to not load into Big Query the header row of the .csv file. If your file doesn't have a header set to 0.
  • main.py — line 46:?Big Query jobs need a temporary cloud storage bucket for internal operations. Once the job is terminated the contents are deleted. Set the one you created in?Step 1.
  • main.py — line 52:?Wait for the load job to complete.
  • main.py — lines 58 to 61: Once the load job is completed the table?[gcp-project].[dataset].employee_stage?is created with all employee records from the file, the next step is to merge them into the final table. Here we read the merge SQL template (merge.sql) and replace the variables with the actual tables we want to use.
  • main.py — line 73:?Wait for the merge job to complete. Results are saved in the final table.

Testing

Open your Cloud Function in the Google Cloud console. Go to the "LOGS" section. Keep it there.

N?o foi fornecido texto alternativo para esta imagem

Now, open another tab in your navigator, open your cloud storage bucket, and drop a .csv file there (You can download the example .csv from this article?here. Remember to name it in the right format?sample_20230101.csv?for example before loading into Cloud Storage).

Once you place the file the Cloud Function execution begins, switch back to the logs tab and follow until it finishes, then, open Big Query and check your final table. If it's the first time you run the script the table will be created brand new with the initial records.

Now, if you change your employee's CVS file by switching a random column and upload it again you should see that just that column is updated for a given employee ID. If you include a new employee in your file with a non-existing employee ID a new row is created in the table.

Congratulations!

In this article, you learned how to create a simple pipeline that can import, process, and deduplicate data in Google Cloud. This automated solution is very cost-effective, flexible and can take advantage of?Big Query SQL?directly in your pipeline. You can try to change your?merge.sql?file with new features and explore possibilities!

Post-notes

  • Cloud Functions have a maximum execution time (at the moment of this article) of 540 seconds. So, if the entire pipeline execution takes more than the limit consider using Cloud Functions + DataFlow for your jobs. You can focus the data process and load steps to DataFlow and use Cloud Function as a trigger for the DataFlow job.
  • If your data is date partitioned, consider creating?partitioned Big Query tables.?It will enhance performance and save costs. Make sure to make the necessary adjustments in the code, specifically in the load job (line 38).
  • If your pipeline grows and you have several files that go through several tables and need more complex processing requirements consider using?Cloud Composer?as the Orchestrator. It is the fully managed Apache AirFlow for Google Cloud. It has many?GCP connectors?by default including Big Query, Cloud Storage, DataFlow, and more.

Thank you so much! I hope it will be useful for you!

Rodolfo Marcos, Sr. Data Engineer

Alexandre Scarambone

MarTech | Analytics | CRM | Growth Strategy | Manager

11 个月

Opa, muito obrigado Rodolfo Marcos me ajudou muito com esse artigo a construir meu primeiro Data pipeline na GCP. Funcionou certinho. Agrade?o imensamente. Abra?ao. ;)

回复
Rudra Mukherjee

Empowering businesses through seamless Digital journeys , using cutting-edge technologies Delivey models, Cloud and DevOps technologies. #CloudMigration #DevOps #DigitalTransformation

1 年

Rodolfo Marcos - Very Good project and very nicely explained. One quick question though to clarify? on the main.py file , line no 46 - can you suggest the exact syntax to replace the bucket name with ? I was trying this project but Cloud Function gave an error in logs, most likely for incorrect bucket name in line 46 . My bucket name was "temp_sample_pipeline"

回复
Alan Begnossi

Gerente de Martech | Marketing | Digital | CRM | Mídia Performance | BI | Dados | IA | Dados | Cloud | CRO | CX

1 年

monstro.

Taiana Guimar?es Arriel

Engenheira Florestal | Consultora Técnica | Rela??es Institucionais | Coordena??o executiva | Comunica??o Integrada | Florestas Plantadas | Gest?o de Projetos | Madeira | Políticas Públicas | Agricultura

1 年

Muito bom ????????????

Leonardo Cavalcante

Data Engineer | GCP - SQL - Python | Cloud

1 年

Parabéns Rodolfo! Conteúdo muito bom e de simples entendimento!

要查看或添加评论,请登录

Rodolfo Marcos的更多文章

社区洞察

其他会员也浏览了