Building a Streaming Pipeline
Using Amazon Kinesis Data Streams and Redshift to develop a Modern Streaming Pipeline Architecture
Intro
The demand for near real-time data access is surging across industries. Businesses are increasingly reliant on up-to-the-minute insights to make critical decisions, optimize operations, and deliver exceptional customer experiences. At Squadra Labs, we recognized this growing need and explored implementing streaming solutions in the cloud using the robust and scalable tools offered by AWS. In this blog post, we'll share our learnings and guide you through building a serverless streaming pipeline for near real-time data access on AWS.
Modern Streaming Data Architecture
A modern streaming data stack consists of five key components:
As mentioned before, the article’s focus is on building a serverless, scalable streaming pipeline for near real-time data access. We'll follow an ELT (Extract, Load, Transform) approach, storing raw data first and transforming it later for data warehouse consumption. This will suit as a perfect opportunity to leverage tools like dbt.
Our experience across various client projects has revealed a common streaming architecture that's increasingly becoming the standard for real-time data processing. This proven approach is depicted in the following diagram, which details its key components.
In essence, this streaming pipeline demonstrates how data can be continuously produced from a source, published to a central topic, and then subscribed to and processed by a designated consumer. This architecture enables real-time data processing and analytics.
Common Streaming Architecture on AWS
The practical implementation utilizing the AWS ecosystem to implement the streaming architecture utilizes:
One advantage of this topology is its simplicity. Redshift acts as the consumer, eliminating the need for a separate service. You simply configure a specific schema in Redshift for Kinesis, allowing you to read each Kinesis topic as a table within that schema.
Kinesis Data Stream guarantees high-throughput data ingestion, ensuring producers can send data without information loss.
We've included a Lambda function as a data producer, mimicking data from an external IoT device. This approach demonstrates how to extend the solution to real-world applications. The following picture shows the specific AWS architecture diagram.
Once you have a grasp of the components and their roles, you're well on your way to building your own streaming pipeline!. Building your own streaming pipeline can seem complex. Let Squadra Labs bridge the gap. Our data engineering team can translate these concepts into a tailored solution for your specific needs. Contact us today to discuss your project and get started!.
Walkthrough
In this setup, we focus on directly storing data in Amazon Redshift using Redshift digestion, eliminating the need for Firehose to save data to the Redshift warehouse.
Grant Permissions
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ReadStream",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStreamSummary",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:DescribeStream"
],
"Resource": "arn:aws:kinesis:[aws-region]:*:stream/*"
},
{
"Sid": "ListStream",
"Effect": "Allow",
"Action": [
"kinesis:ListStreams",
"kinesis:ListShards"
],
"Resource": "*"
}
]
}
Copy the ARN associated with the Role, which has the following format
arn:aws:iam::[role-id-number]:role/kinesis-permission-redshift.
Create a Kinesis Data Stream
Setup Amazon Redshift
Amazon offers a free trial of the Serverless Redshift service with a $300 bonus. Please make sure this option is active, in case you’re trying the service using the AWS free tier.
Connect Kinesis to Redshift
Now it’s time to subscribe Redshift to Kinesis in order to fetch the data present in the topic. For so
CREATE EXTERNAL SCHEMA kinesis_datastream FROM KINESIS
IAM_ROLE 'arn:aws:iam::[iam-#]:role/kinesis-permission-redshift';
Notice that a schema kinesis_datastream has been created and the associated topic data_stream_demo is shown as a table.
CREATE MATERIALIZED VIEW demo_view AUTO REFRESH YES AS
SELECT approximate_arrival_timestamp, JSON_PARSE(kinesis_data) AS raw_data FROM "kinesis_datastream"."data_stream_demo"
WHERE CAN_JSON_PARSE(kinesis_data);
REFRESH MATERIALIZED VIEW demo_view; SELECT * FROM demo_view;
Create a Lambda Function
import json
import boto3
import random
from typing import Any
client = boto3.client("kinesis")
def send_data_kinesis() -> None:
for x in range(1, 6):
v = x * random.randint(1, 4)
t = x * random.randint(1, 3)
p = x * random.randint(4,7)
mydata = f'{{ "vibration": {v}, "temperature": {t}, "pressure": {p} }}'
partitionkey = random.randint(10, 100);
response = client.put_record(StreamName="data_stream_demo", Data=mydata, PartitionKey=str(partitionkey))
print("Ingestion Done")
def lambda_handler(event, context) -> dict[str, Any]:
send_data_kinesis()
return {
'statusCode': 200,
'body': json.dumps('Code successfully run!')
}
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": "kinesis:PutRecord",
"Resource": "arn:aws:kinesis:[aws-region]:[kinesis-topic-#]:stream/data_stream_demo"
}
]
}
Testing
With everything set-up we’re ready to test our pipeline, for so we’ll open the console for the 3 services involved.
Executing the Lambda Function
We’ll first execute the lambda function. It basically sends 5 messages with vibration, temperature and pressure data. Notice that we only need to declare the name of the topic and set a partition key. In this case the partition key is not relevant since we’ve configured our kinesis topic with only 1 shard. However, should the amount of data increase, the need for additional shards have to be considered. This is out of this tutorial scope.
Monitoring the Kinesis Data Stream
After executing the producer, we can monitor the messages received by the topic.
Notice the GetRecords - sum (Count) plot which indicates the amount of messages which have reached the top
Reading the data in Redshift
Finally we can fetch our data from the pipeline destination. We’ll be consuming and querying the data at (almost) the same time.
REFRESH MATERIALIZED VIEW demo_view;
SELECT
approximate_arrival_timestamp,
raw_data,
CAST(JSON_EXTRACT_PATH_TEXT(JSON_SERIALIZE(raw_data), 'vibration') AS INT) AS vibration,
CAST(JSON_EXTRACT_PATH_TEXT(JSON_SERIALIZE(raw_data), 'temperature') AS INT) AS temperature,
CAST(JSON_EXTRACT_PATH_TEXT(JSON_SERIALIZE(raw_data), 'pressure') AS INT) AS pressure FROM demo_view;
And voilá, the data is there. Notice that we’re extracting the fields from the raw_data json hence, no matter how complex our raw_data is, we can extract the needed information using the JSON functions provided by the Redshift engine.
Closing Notes
This blog post covered several key concepts. We began by exploring the advantages of a data streaming pipeline for near-real-time processing. We then designed an architecture using specific AWS services, leveraging their capabilities for seamless interconnection. Notably, all three services are serverless, incurring costs only when in use.
Despite the serverless nature of the services, we observed a minimal cost associated with keeping the Kinesis Data Stream active. To keep costs down for this demo environment, it's best to stop the services when not actively in use. Production environments would naturally require continuous operation.
Thank you for reading to the end! This has been a comprehensive exploration, and I truly appreciate your feedback on this project. Special thanks to the Squadra Labs team for their encouragement in writing this article and invaluable revision assistance. Their support motivates me to continue creating informative content.
If you need help translating these concepts into a solution for your specific needs, Squadra Labs' data engineering experts can guide you through the entire process, from design to implementation. Contact us today to discuss your project!
?????? Senior Data Engineer | ?? AWS Community Builder
9 个月Absolutely agree, near real-time data access is becoming crucial across industries, and the AWS ecosystem provides robust solutions for this. Your implementation guide for a serverless streaming pipeline is insightful. ?? However, I'm curious if we could alternatively use Kinesis Data Firehose with Redshift Serverless as the destination. What would be the benefits and drawbacks of both approaches? ?? Thanks for sharing this valuable post! ??
Data & AI Partner | Co-Founder at Squadra Labs
9 个月Real-Time serverless architecture = Game-changer! Awesome work Tito!
Google Apps Script Developer & Workflow Automation| Helping people and companies to find sustainable growth by achieving the goals of both
9 个月Very helpful!
Data Engineer | Data Scientist | Electronics Engineer
9 个月Let's spread the word about serverless streaming pipelines! Feel free to share this article with anyone who might be interested.