Data lake on AWS using native services and cloud agnostic tools like Attunity Replicate, Striim, Streamsets, Collibra,Confluent Kafka, Tableau
In this article I will provide some background and production approaches while implementing data lake on AWS. The article has been written keeping in mind questions customers would generally ask before deciding on changing their organization culture to data driven and moving on-premises workloads to AWS.
What is Data Lake?
A data lake is a central storage repository that holds data from various sources in its natural format. A data lake can include structured data from relational databases, semi-structured data (logs, XML, JSON), unstructured data (emails, documents, PDFs) and binary data (images, audio, video).
How data lake is different from Data Warehouse?
Data warehouse is used for reporting purpose. The data is structured so schema on write as well as read is needed. In case of data lake schema is not required while writing data to data lake. Data in data lake is real time whereas data warehouse gets refreshed during nightly batch job.
We have ODS ( Operational data store ) so why data lake?
Many organizations are using ODS for operational reporting for over two decades. The ODS in most companies is IBM DB2 LUW database which gets transactional data from IBM Mainframe, Oracle, web applications, Salesforce, Marketo, SAP, Avaya, Peoplesoft etc. The data in ODS gets updated periodically ( every hour ). The ODS also acts as a staging area for enterprise data warehouse (EDW).
Since ODS is on RDBMS, it can be scaled vertically but not horizontally. Also concurrent user access can slow down the performance of the system.
Can't we move ODS to NoSQL database and call it data lake?
NoSQL databases can definitely scale horizontally. It also has master node and data nodes and we can add data nodes whenever we require but we are not isolating compute from storage.
How do we create data lake on-premise?
Most data driven organizations have created data lake on top of Apache Hadoop. Just like NoSQL database, Hadoop cluster cannot isolate compute from storage so organizations which have regulatory requirements or are in the early stages of creating data lake are storing data in Cloud storage like S3 on AWS, Azure data lake storage ( ADLS) on Azure and Google cloud storage ( GCS ) on GCP.
How do we create data lake on AWS without losing our existing data?
In order to keep compute and storage separate, we will create data lake on AWS S3. We will use 3 AWS S3 buckets.
- Raw data lake
- Archived data lake for audit purpose or replay.
- Curated data lake
Since AWS S3 bucket name should be unique, select unique prefix or suffix for your S3 bucket name.
The following are various stages of implementation of data lake on AWS.
- Identify all data sources within an organization which will become data source for data lake.
- Identify unique data delimiter as most delimiters can be part of the data. If you use '|' or comma or '~' then it is possible that such delimiters are part of the data so either data has to be enclosed in quotes or a new delimiter needs to be identified whenever delimiter is identified in the data while reading data using data science libraries like Python pandas, DASK or Spark. Redshift has limit on delimiter and so does glue. The delimiter in Redshift can be any character within ASCII 124. AWS Glue supports only single character delimiter. Spark 2.4 also cannot support all delimiters so during transformation also we may run into issues so Spark needs to be upgraded to version 3.0 or higher.
- Identify PII data which requires high security before moving to cloud. Some companies uses data masking tools for PII data.
- Plan data split strategy for massive parallelism. AWS S3 has 5GB limit per object. For data objects bigger than 5GB one has to use multipart upload. It is better to split data to around 1 to 2GB.
- Plan initial load of data on AWS. The ideal choice on AWS is AWS Snowball, Snowball Edge or Snow mobile. If initial data is about 50TB, most customers move data from on-premise to Aurora database first, decommission their on-premise database and then move initial data to data lake.
- Plan data cleansing. If data has been split to multiple 1 to 2 GB files then AWS lambda function can be invoked to perform data cleansing. Since AWS lambda function may timed out it is better to use time based AWS Glue trigger to clean data in raw data lake. Before moving curated data to data lake, store original data to archive data lake.
- Plan changed data capture (CDC) of real time transactional data and updating data lake. AWS DMS service can be used for both initial load as well as CDC.
- Plan folder structure in S3.
For raw data lake, the folder structure can be as follows. <RawBucketName>/<DataSource>/<InitialLoad>/<SchemaName>/<TableName>/<FileName>
<RawBucketName>/<DataSource>/<CDC>/<SchemaName>/<TableName>/<FileName..1> ... <FileName ... N>
For archive data lake, the folder structure should be same as raw data lake. The difference is in the bucket policy. The data in archive data lake needs to be moved to glacier after 90 days or 6 months.
The data lake folder structure will be as follows.
<CuratedDataLakeBucketName>/<DataSource>/<SchemaName>/<TableName>/<FileName...1> ... <FileName ...N>
The initial load and CDC data will be consolidated in the data lake so there is no need to have separate folder for initial load and CDC.
For example: If data source is Oracle and schema name is HR and table name is EMPLOYEE then folder structure will be raw-example-org-raw-dev/ORACLE/HR/EMPLOYEE/HR.EMPLOYEE_SPLIT0001.csv
- Create data catalog on top of curated data lake. AWS Glue service can be used to create data catalog.
- Store original CDC data in archive bucket.
- Plan data governance on top of data lake. AWS Lake formation service can be used to govern data in data lake. Our curated data lake can become data lake source for AWS Lake formation. Those who have used Apache Ranger to govern hive data can easily relate data governance using AWS Lake formation.
- Plan data lineage to identify changes at various stages from source system to curated data lake. There is no AWS service which can do this so one has to look at AWS partner network tools like Collibra, Informatica EDS or open source tool like Apache Atlas.
- Identify tools which can be used to query data in data lake. AWS Quicksight can access data catalog directly. Tableau has Athena connector which can access catalog. The better approach would be to create external schema in Redshift on top of glue database so that various third party tools can connect to Redshift directly and can access catalog as well as native Redshift data.
We do not want others to touch our data while moving to cloud so what do we do?
Please check compliance of AWS Snowball and other services on the following link.
If organization is still not comfortable with someone else storing data to cloud from Snowball hardware, then AWS direct connect can be used. The data movement has to happen in a phased manner and organization has to sync on-premises ODS so egress cost also needs to be looked into.
AWS Glue crawler infers schema but data type for most columns is string which is not compatible with Redshift so how can we access catalog using Redshift external schema?
It is possible that data has null bytes or some other characters due to which Glue crawler is not able to infer schema correctly. The string data type size is 16K bytes which is still okay to use with Redshift external schema but if multiple column data types is string then query will fail because Redshift query limit is 65K bytes. More than 3 string columns in a catalog table will throw an error. It is possible to change data type using Glue console. If we select "var" data type to reduce string size to 256 then it is not compatible with Redshift. Also string data type requires creating view where we have to use CONVERT or CAST Redshift operator so that consumers of data catalog can use join without frequently using CONVER and CAST operator.
It is possible to enforce data type by creating tables manually or using a script. Manually creating table is not possible for a lot of tables so AWS SDK should be used. When new column gets added, the catalog needs to be dropped and recreated.
AWS DMS doesn't support all data sources so how do we ingest data in data lake?
No tool or cloud service can support all data sources so one has to use a mix of AWS native service as well as tools from from partner network like Qlik Replicate, Streamsets, Striim, Nifi, Talend, Alooma etc.
Why can't we use AWS Lake formation service alone to create data lake, data security and data governance?
AWS Lake formation service has been created on top of AWS Glue. We can use AWS Glue connection to create JDBC connection to read initial load and CDC data. We can also read data from Kafka. It doesn't matter AWS MSK or Confluent Kafka on AWS so change data capture can still work for most data sources. If you use AWS DMS compatible source and target as Kafka then you can use AWS Lake formation to update data. It allows us to create custom connector as well. If we use JDBC connector of AWS Lake formation, we can ingest data only from SQL Server, PostgreSQL, MySQL and Oracle. If you provide JDBC connection string of DB2 then it will throw Enum not found error. Since AWS DMS supports more data sources and we can use AWS Glue in conjunction with DMS it is better option.
How AWS data lake architecture looks like?
The architecture depends on data sources, organization's compliance requirements, policies, restrictions etc.
High level Architecture
Below is the high level architecture of data lake on AWS.
- Data from various sources can be added to AWS provided Snowball hardware and then ship the hardware to AWS so that initial load data can be stored in AWS S3 raw data lake bucket.
- If organization doesn't want to use AWS Snowball then AWS direct connect needs to be used. AWS DMS can ingest data from various data sources but not all so third party tools like Qlik replicate, Streamsets, Striim, Talend etc can be used. The source database secrets should be stored in AWS Secrets manager.
- Certain data sources may not be supported by any tool so data export needs to be done with the help of on-premise DBA and then ingest data to raw data lake.
- The flat files or CSV export of on-premises data can be securely transmitted on AWS using AWS Transfer for SFTP. It will store data in S3. For higher security, the files can be encrypted using PGP encryption. The decryption key needs to be store in base64 format in AWS secrets manager so that a job running on AWS can decrypt PGP encrypted files.
- We can use AWS DMS migration job to get initial load of data if we are not using AWS Snowball. For CDC, we can use AWS DMS Replication job. For some data sources, CDC data may be sent on Kafka topic. We can use AWS MSK. If you have Confluent kafka license then you can install confluent kafka on AWS EC2. AWS MSK now supports schema registry just like Confluent kafka. The schema registry is in glue.
- Once initial raw data gets stored in raw data lake S3 bucket, a scheduled job in AWS Glue can perform data cleansing. For example: If phone number, e-mail format, data format is not correct then such records either needs to be fixed or moved to error bucket. Once data gets fixed at source, the data can be ingested using CDC channel. After data cleansing, data will be stored in staging bucket. The original data needs to be moved to archive bucket for audit purpose.
- We can create Spark job on AWS EMR which will convert data to partitioned parquet format during initial load. The partitioned data will be useful at time of CDC as only the partition in which data has been stored will be updated. The right partition strategy is required. Most of the times, update date column gets used for partition but in legacy data if update timestamp column is missing then one of the option would be to use hash of the primary or composite key and perform mode on the hash to control the number of partitions.
- After storing data in data lake, we can either use AWS Glue crawler or write custom code in Python to Scala/Java to create tables in Glue database.
- Once catalog has been created, we can create external schema on top of glue database so that the catalog will be accessible to Redshift.
- The data lake can act as a staging layer for Redshift data warehouse. A spark job running on AWS EMR can transform data and can load data in fact and dimension tables of Redshift assuming we are using Star schema while creating data warehouse.
- AWS Quicksight can directly access Glue catalog. For more information please check my other blog.
- If we would like to use Tableau instead of AWS quicksight then Tableau users can either use Athena connector or Redshift connector to access data lake. Since we have created external schema within Redshift, there is no need to use Athena connector.
- For data governance and lineage we can use Collibra.
- AWS Lake formation can be used to ingest data, trigger glue job and govern data as well. We can create JDBC connection in Lake formation which is actually connection setting in Glue but JDBC connection allows only MySQL, Oracle, PostgreSQL and SQL Server. If we use DB2 then it throws Enum not found error.
- Now that data in data lake is clean, predictive analytics can be done by SageMaker. We can create notebook endpoint in Sagemaker, provide our data lake as input and select appropriate machine learning algorithm.
Deployment Architecture
In order to deploy solution to production, we have to breakdown architecturi
Initial load architecture using AWS Snowball
The entire organization data will be copied to Snowball hardware and the hardware will be shipped to AWS. Once hardware reaches AWS, the data will be copied to AWS S3 bucket. The data at rest will be encrypted using KMS key.
Initial load/ Historical load architecture without AWS Snowball
We have to introduce AWS Direct connect for initial load of data. We will require it for CDC data as well. AWS VPC virtual private gateway (VGW) will be attached to AWS transit gateway. The AWS transit gateway will then connect to router / AWS direct connect endpoint within AWS direct connect cage. The AWS direct connect router will then connect to partner or customer router within customer/partner cage and then eventually to customer data center. We will use private VIF to keep data private so VPC endpoint has been introduced for S3.
AWS DMS in first VPC will have migration job for initial load of data. AWS DMS doesn't support all the data sources so we have to install third party tools like Qlik, Striim, Streamsets, Talend, Apache Nifi on EC2. It is not necessary to install it on separate VPC. I have just shown separation for understanding purpose. The target for each of the data migration services will be AWS S3.
Initial load / CDC architecture with data cleansing to create curated data lake
Limitations of Data Lakes
Data lakes cannot provide transactional guarantees. Specifically, data lakes fail to provide ACID guarantees on:
- Atomicity and isolation
ETL jobs inserts or updates data in data lakes as many files in a distributed manner. If the operation fails, there is no mechanism to roll back the files already written, thus leaving behind potentially corrupted data.
- Consistency
Lack of atomicity on failed writes further causes readers to get an inconsistent view of the data. In fact, it is hard to ensure data quality even in successfully written data. For example, a very common issue with data lakes is accidentally writing out data files in a format and schema inconsistent with existing data.
Workaround for inconsistent data lake
- Partition the data lake files based on primary key, composite key or update timestamp. The update timestamp is the most common method. Once initial data loaded in data lake has been partitioned, performing insert or update on existing data lake process will first identify the partition in which data needs to be inserted or updated and only that partition gets rewritten not the entire data lake there by improving performance and consistency of data lake.
- Schedule read and update ETL jobs at different time intervals.
- Replace data lake by lakehouse. This is the ideal solution.
- The statements, views, or opinions expressed in my LinkedIn profile and related articles represent my own views and not those of my current or previous employers or LinkedIn.
- Any comments from those responding to my postings belong to, and only to, the responder posting the comment. I am not responsible or liable for such comments.
- Although I hope that you find my articles helpful, and perhaps educational. If you decide to rely on them for any purpose whatsoever, I will not be held liable, and you do so at your own risk.
- "Names, Logos and other proprietary information quoted in this article are property of respective companies/owners and are mentioned here for reference purpose only."
- Only public domain data has been used in the examples. If you have any objection with the data then please let me know with the proof of your ownership and I will be happy to update my article.