Bulk Data Load using Apache Sqoop

Bulk Data Load using Apache Sqoop

Apache Sqoop is an open-source tool that is widely used for transferring bulk data between Hadoop and structured data sources such as relational databases, data warehouses, and NoSQL databases. Sqoop was initially developed by Cloudera and later contributed to the Apache Software Foundation in 2012. Sqoop provides a simple command-line interface to import and export data, as well as a scalable and fault-tolerant architecture that can handle large volumes of data.

Bulk Data Load:

Bulk data loading is the process of transferring large volumes of data from one system to another. It is often required when migrating data between different systems or when loading data into a new system. Bulk data loading can be a time-consuming and complex process, especially when dealing with large datasets. Apache Sqoop provides a convenient and efficient way to load bulk data into Hadoop, where it can be easily analyzed and integrated with other data sources.

Apache Sqoop for Bulk Data Load:

Apache Sqoop can be used to import data from various sources, including relational databases, data warehouses, and NoSQL databases. Sqoop uses JDBC drivers to connect to these sources and extract data using SQL queries. Sqoop supports a wide range of databases, including Oracle, MySQL, PostgreSQL, SQL Server, and many more. Sqoop can also import data from data warehouses such as Teradata and Netezza, as well as NoSQL databases such as MongoDB and Cassandra.

The import process in Sqoop is straightforward. First, Sqoop connects to the source database and retrieves metadata about the table(s) to be imported. Sqoop then generates a MapReduce job to extract the data using SQL queries. The extracted data is stored in Hadoop Distributed File System (HDFS) in a format that is optimized for querying and processing with Hadoop tools such as MapReduce, Hive, and Pig.

Sqoop provides several options for controlling the import process, including selecting specific columns, filtering rows, and controlling the number of mappers used to extract the data. Sqoop can also handle complex data types such as BLOBs, CLOBs, and binary data.

In addition, Sqoop provides several features to optimize the import process for large datasets, including:

  1. Parallelism: Sqoop can split the import process into multiple tasks to run in parallel, which can significantly reduce the import time.
  2. Compression: Sqoop can compress the data during the import process, which can reduce the size of the imported data and improve performance.
  3. Incremental imports: Sqoop can perform incremental imports, which allows you to import only the data that has changed since the last import. This can save time and reduce the amount of data that needs to be imported.


Now I'm demonstrating a use case with apache sqoop


Use-case: Migration of historical data from RDBMS like Postgres or MySQL

No alt text provided for this image


Source Database? : Postgres

Target Database : GCP BigQuery

Here, we are going to talk about bulk migration from Postgres databases to GCP BigQuery.

Prerequisites:

  • Basic knowledge of Apache spark
  • Basic knowledge of Postgres DB
  • Basic knowledge of GCP services like Bigquery, GCS, Dataproc, and Dataflow
  • Programming skills: Python(Pyspark)/java/scala


STEP 1: Creation of Hadoop cluster.

  • We can use any Hadoop cluster but for this demo, we will consider using the Dataproc service from GCP.
  • Create a cluster. Please find a sample config below:


from . import 

GCS_BUCKET_NAME = "<BUCKET NAME>" # Name of the bucket
GCP_PROJECT_NAME = "<PROJECT NAME>" # Name of the project
SUBNETWORK_URI_ = f"projects/{GCP_PROJECT_NAME}/regions/<region>/subnetworks/<subnet name>"


DATAPROC_CLUSTER_CONFIGURATION = {
? ? CONFIG_BUCKET: GCS_BUCKET_NAME,
? ? MASTER_CONFIG: {
? ? ? ? NUM_INSTANCES: 1,
? ? ? ? MACHINE_TYPE_URI: 'n1-standard-2',? # Machine type
? ? ? ? DISK_CONFIG: {
? ? ? ? ? ? BOOT_DISK_TYPE: "pd-standard", "boot_disk_size_gb": 100
? ? ? ? ? ? }
? ? },
? ? WORKER_CONFIG: {
? ? ? ? NUM_INSTANCES : 2,? ?# No. of instances
? ? ? ? MACHINE_TYPE_URI: "n1-standard-2",
? ? ? ? DISK_CONFIG: {
? ? ? ? ? ? BOOT_DISK_TYPE: "pd-standard", "boot_disk_size_gb": 100
? ? ? ? ? ? }
? ? },
? ? SECONDARY_WORKER_CONFIG: {
? ? ? ? NUM_INSTANCES: 2,
? ? ? ? MACHINE_TYPE_URI: "n1-standard-2",
? ? ? ? DISK_CONFIG: {
? ? ? ? ? ? BOOT_DISK_TYPE: "pd-standard", "boot_disk_size_gb": 100
? ? ? ? ? ? }
? ? },
? ? SOFTWARE_CONFIG: {
? ? ? ? IMAGE_VERSION: "1.5-debian10",? ? # OS
? ? ? ? PROPERTIES: {
? ? ? ? ? ? "dataproc:dataproc.conscrypt.provider.enable": "false"
? ? ? ? ? ? }
? ? },
? ? LIFESYCLE_CONFIG: {
? ? ? ? IDLE_DELETE_TTL: {"seconds": 1800}? ?
? ? },
? ? GCE_CLUSTER_CONFIG: {
? ? ? ? SUBNETWORK_URI: SUBNETWORK_URI_,
? ? ? ? INTERNAL_IP_ONLY: True,??
? ? ? ? SERVICE_ACCOUNT_SCOPES: ["https://www.googleapis.com/auth/cloud-platform"],
? ? },
? ? INITIALIZATION_ACTIONS: [
? ? ? ? {EXECUTABLE_FILE: f"gs://{GCS_BUCKET_NAME}/<Folder name>/<Script name>.sh"}],
? ? AUTOSCALING_CONFIG : {
? ? ? ? POLICY_URI : f"projects/{GCP_PROJECT_NAME}/regions/<region>/autoscalingPolicies/postgres-migration-autoscaling-policy"?
? ? }
}        

Note: Cluster can be created using Dataproc SDK or airflow operator.

STEP 2: Bulk migration of data from Postgres DB to Google Cloud Storage bucket.

  • Soop migration takes place at table level inside the database.
  • Once the cluster is created, we need to submit our python/ scala job to the cluster.
  • Sqoop import command goes like this:

# To load entire table dat
sqoop import \
--connect <JDBC url> \
--username root --password <Actual password>
--table <Table name> --m <Number of mappers>
--split-by <field name> --target-dir <Target directory> \
--<file-type>


# To tranform data in the table using query. You can apply filters here.
sqoop import \
--connect <JDBC url> \
--username root --password <Actual password>
--query <Query string> --m <Number of mappers>
--split-by <field name> --target-dir <Target directory> \
--<file-type>a        

  • The target directory can be HDFS path or GCS bucket path.
  • File type can be parquet, AVRO, text file. If you want another file type then convert these files to new types like JSON/CSV.
  • If data transformation is required, then we can create external hive tables and perform data transformation.
  • Data can also be loaded into dataframes and we can leverage the distributed features of Apache Spark.
  • Sqoop Best Practices:
  • Datatypes best practices:
  • Sqoop does not support all data types. We have to convert them using queries. For eg:
  • UUID datatype is not supported by sqoop so we have to convert it to a string datatype
  • Array data types are not supported so we have to convert it to delimiter separated string and convert it back to the array by splitting while writing the data.
  • By default, date and timestamp fields are converted to the epoch in integer and long respectively, so we need to convert them back to timestamp fields explicitly.
  • There can be more data types that would need special handling.
  • Do not give a very high number of mapper numbers as many JDBC connections will be created to fetch the data. It can slow down the instance.
  • Always create a replica or a snapshot of the instance else master instance performance might get impact.
  • For large tables, increase the persistent disk size of the cluster and use autoscaling policies. Otherwise, the job can fail due to insufficient resources.
  • Use Scopt library as argument parser and for args sanitization if you are using the scala programming language.
  • For passwords, it is recommended to use Secret Manager Service SDK. As arguments are visible in the dataproc console and it is not a good idea to display the password.

?

STEP 3: Load data from GCS to Bigquery using Dataflow

  • Once data is loaded in the GCS bucket, we have to load the files to Bigquery using the Dataflow service.
  • We can parse the file and do data checks, sanitization, transformation, aggregation operation in the job and load the data using BigqueryIO APIs.

要查看或添加评论,请登录

Jisto Jose的更多文章

社区洞察

其他会员也浏览了