Building a Streaming Pipeline
Artistic representation of a modern streaming data pipeline - created with https://deepai.org/

Building a Streaming Pipeline

Using Amazon Kinesis Data Streams and Redshift to develop a Modern Streaming Pipeline Architecture


Intro

The demand for near real-time data access is surging across industries. Businesses are increasingly reliant on up-to-the-minute insights to make critical decisions, optimize operations, and deliver exceptional customer experiences. At Squadra Labs, we recognized this growing need and explored implementing streaming solutions in the cloud using the robust and scalable tools offered by AWS. In this blog post, we'll share our learnings and guide you through building a serverless streaming pipeline for near real-time data access on AWS.

Modern Streaming Data Architecture

A modern streaming data stack consists of five key components:

  1. Source: Where your data originates (e.g., databases, sensors, applications)
  2. Ingestion: Process of bringing data into the pipeline
  3. Storage: Where data is temporarily or permanently stored
  4. Processing: Where data is transformed or analyzed
  5. Destination: Where the processed data is delivered (e.g., data warehouse, analytics platform)

As mentioned before, the article’s focus is on building a serverless, scalable streaming pipeline for near real-time data access. We'll follow an ELT (Extract, Load, Transform) approach, storing raw data first and transforming it later for data warehouse consumption. This will suit as a perfect opportunity to leverage tools like dbt.

Our experience across various client projects has revealed a common streaming architecture that's increasingly becoming the standard for real-time data processing. This proven approach is depicted in the following diagram, which details its key components.

  1. Producer: This is the source of your data. It can be any system or application that generates data continuously, such as an IoT device, a sensor network, a social media feed, or a website clickstream. It holds the source and ingestion steps from the modern streaming data architecture.
  2. Topic: A topic acts as a named queue or stream within a messaging system like Apache Kafka. It’s the storage step in the modern streaming data architecture. The producer publishes data to the topic, making it available to consumers.
  3. Consumer: The consumer is the destination or receiver of the data stream. It subscribes to the topic and processes the data as it arrives. Consumers can independently subscribe and process data at their own pace. Consumers may act as data processors, transforming the one sent to the topic.
  4. DW (Data Warehouse): Data warehouses are a common destination for data pipelines. The data warehouse is likely where the data from the Consumer would be stored for further analysis and transformations.

In essence, this streaming pipeline demonstrates how data can be continuously produced from a source, published to a central topic, and then subscribed to and processed by a designated consumer. This architecture enables real-time data processing and analytics.

Common Streaming Architecture on AWS

The practical implementation utilizing the AWS ecosystem to implement the streaming architecture utilizes:

  • Kinesis Data Stream: Creates a highly scalable data stream for ingesting high-throughput data.
  • Redshift: A serverless data warehouse for storing and analyzing data.

One advantage of this topology is its simplicity. Redshift acts as the consumer, eliminating the need for a separate service. You simply configure a specific schema in Redshift for Kinesis, allowing you to read each Kinesis topic as a table within that schema.

Kinesis Data Stream guarantees high-throughput data ingestion, ensuring producers can send data without information loss.

We've included a Lambda function as a data producer, mimicking data from an external IoT device. This approach demonstrates how to extend the solution to real-world applications. The following picture shows the specific AWS architecture diagram.

Once you have a grasp of the components and their roles, you're well on your way to building your own streaming pipeline!. Building your own streaming pipeline can seem complex. Let Squadra Labs bridge the gap. Our data engineering team can translate these concepts into a tailored solution for your specific needs. Contact us today to discuss your project and get started!.

Walkthrough

In this setup, we focus on directly storing data in Amazon Redshift using Redshift digestion, eliminating the need for Firehose to save data to the Redshift warehouse.

Grant Permissions

  1. Go to the IAM Console, navigate to the Roles section and click on Create Role
  2. Select AWS service and choose Redshift as the use case. Choose the Redshift - Customizable option and click on Nex

  1. Leave Permission policies and Set permissions boundary by default (empty)
  2. Enter kinesis-permission-redshift as role name. Also add a relevant description. Click on Create role.
  3. Enter the permissions policies recently created and Create inline policy

  1. Paste the following permissions and name it as redshift-access-kinesis-policy

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ReadStream",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStreamSummary",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:DescribeStream"
            ],
            "Resource": "arn:aws:kinesis:[aws-region]:*:stream/*"
        },
        {
            "Sid": "ListStream",
            "Effect": "Allow",
            "Action": [
                "kinesis:ListStreams",
                "kinesis:ListShards"
            ],
            "Resource": "*"
        }
    ]
}        

  1. Make sure the policy is correctly created

Copy the ARN associated with the Role, which has the following format

arn:aws:iam::[role-id-number]:role/kinesis-permission-redshift.        

Create a Kinesis Data Stream

  1. Open the Amazon Kinesis Console and navigate to Data streams. Then, click on Create data stream.

  1. Provide a name for the Data stream, such as “data_stream_demo”.
  2. Specify the Number of open shards as "1". (For production scenarios, use the shard calculator to determine the appropriate number of shards.)
  3. Click Create data stream.

Setup Amazon Redshift

Amazon offers a free trial of the Serverless Redshift service with a $300 bonus. Please make sure this option is active, in case you’re trying the service using the AWS free tier.

  1. Open the Amazon Redshift Console and navigate to the serverless dashboard. After that, click on Create Workgroup.
  2. Select a proper name for the workgroup. For this demo, name it as demo-workgroup. Leave the default options for the Network and Security section. Click Next.

  1. Choose Create a new namespace and name it as demo-namespace

  1. Select Customize admin user credentials, add an admin user name and specify a password for the service

  1. In the permissions section assign the role previously created, clicking in Associate IAM role. Select kinesis-permission-redshift

  1. Verify that the role is correctly attache

  1. Leave the default values for Encryption and security. Click Next, Review and create the Redshift Workgroup.
  2. After some minutes you should see the workgroup Available in the Serverless dashboard.

Connect Kinesis to Redshift

Now it’s time to subscribe Redshift to Kinesis in order to fetch the data present in the topic. For so

  1. Open the Query editor v2 from the Redshift console and subscribe to the kinesis topic using the following syntax. Replace the ARN with the one assigned while creating the permissions (see Grant Permissions paragraph).?

CREATE EXTERNAL SCHEMA kinesis_datastream FROM KINESIS 
IAM_ROLE 'arn:aws:iam::[iam-#]:role/kinesis-permission-redshift';        

Notice that a schema kinesis_datastream has been created and the associated topic data_stream_demo is shown as a table.

  1. In order to read data from the topic, a materialized view has to be created

CREATE MATERIALIZED VIEW demo_view AUTO REFRESH YES AS 
SELECT approximate_arrival_timestamp, JSON_PARSE(kinesis_data) AS raw_data FROM "kinesis_datastream"."data_stream_demo" 
WHERE CAN_JSON_PARSE(kinesis_data);        

  1. Refresh materialized view using the following syntax and verify that the demo_view doesn’t have any data, since the producer hasn’t sent any message to the topic yet.

REFRESH MATERIALIZED VIEW demo_view; SELECT * FROM demo_view;        

Create a Lambda Function

  1. Open the AWS Lambda Console and click on Create a Function.
  2. Select Author from scratch option
  3. Choose a name for the lambda function, for this example we’ve selected push_data_kinesis
  4. Choose Python in the dropdown menu from the Runtime section.
  5. Leave Architecture, execution role and Advanced settings by default. Click on create functio

  1. In the code source section of the lambda function insert the following code. This will generate the data which will be sent to the Kinesis Data Stream.

import json
import boto3
import random
from typing import Any
client = boto3.client("kinesis")
def send_data_kinesis() -> None:
    for x in range(1, 6):
        v = x * random.randint(1, 4)
        t = x * random.randint(1, 3)
        p = x * random.randint(4,7)
        mydata = f'{{ "vibration": {v}, "temperature": {t}, "pressure": {p} }}'
        partitionkey = random.randint(10, 100);
        response = client.put_record(StreamName="data_stream_demo", Data=mydata, PartitionKey=str(partitionkey))

    print("Ingestion Done")

def lambda_handler(event, context) -> dict[str, Any]:
    send_data_kinesis()
    return {
        'statusCode': 200,
        'body': json.dumps('Code successfully run!')
    }        

  1. Click on Deploy

  1. Go back to the IAM Console, navigate to the Roles section and add the permissions to write to the kinesis stream as an inline policy. Here are the needed policies

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": "kinesis:PutRecord",
            "Resource": "arn:aws:kinesis:[aws-region]:[kinesis-topic-#]:stream/data_stream_demo"
         }
     ]
}        

Testing

With everything set-up we’re ready to test our pipeline, for so we’ll open the console for the 3 services involved.

Executing the Lambda Function

We’ll first execute the lambda function. It basically sends 5 messages with vibration, temperature and pressure data. Notice that we only need to declare the name of the topic and set a partition key. In this case the partition key is not relevant since we’ve configured our kinesis topic with only 1 shard. However, should the amount of data increase, the need for additional shards have to be considered. This is out of this tutorial scope.

  1. Go to the Lambda console and click on test. Verify that the Execution result is as follows.

Monitoring the Kinesis Data Stream

After executing the producer, we can monitor the messages received by the topic.

  1. Head for the Amazon Kinesis console, and enter the Data streams dashboard. Select the data_stream_demo topic. You should see the following metrics.

Notice the GetRecords - sum (Count) plot which indicates the amount of messages which have reached the top

Reading the data in Redshift

Finally we can fetch our data from the pipeline destination. We’ll be consuming and querying the data at (almost) the same time.

  1. Open the Query editor v2 from the Redshift console and refresh the materialized view, then fetch the data. Use the following queries:

REFRESH MATERIALIZED VIEW demo_view;

SELECT 
  approximate_arrival_timestamp, 
  raw_data, 
  CAST(JSON_EXTRACT_PATH_TEXT(JSON_SERIALIZE(raw_data), 'vibration') AS INT) AS vibration, 
  CAST(JSON_EXTRACT_PATH_TEXT(JSON_SERIALIZE(raw_data), 'temperature') AS INT) AS temperature, 
  CAST(JSON_EXTRACT_PATH_TEXT(JSON_SERIALIZE(raw_data), 'pressure') AS INT) AS pressure FROM demo_view;        

And voilá, the data is there. Notice that we’re extracting the fields from the raw_data json hence, no matter how complex our raw_data is, we can extract the needed information using the JSON functions provided by the Redshift engine.

Closing Notes

This blog post covered several key concepts. We began by exploring the advantages of a data streaming pipeline for near-real-time processing. We then designed an architecture using specific AWS services, leveraging their capabilities for seamless interconnection. Notably, all three services are serverless, incurring costs only when in use.

Despite the serverless nature of the services, we observed a minimal cost associated with keeping the Kinesis Data Stream active. To keep costs down for this demo environment, it's best to stop the services when not actively in use. Production environments would naturally require continuous operation.

Thank you for reading to the end! This has been a comprehensive exploration, and I truly appreciate your feedback on this project. Special thanks to the Squadra Labs team for their encouragement in writing this article and invaluable revision assistance. Their support motivates me to continue creating informative content.

If you need help translating these concepts into a solution for your specific needs, Squadra Labs' data engineering experts can guide you through the entire process, from design to implementation. Contact us today to discuss your project!

References

https://docs.aws.amazon.com/whitepapers/latest/build-modern-data-streaming-analytics-architectures/what-is-a-modern-streaming-data-architecture.html

https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html

https://docs.aws.amazon.com/redshift/latest/dg/materialized-view-streaming-ingestion-example-station-data.html

https://aws.amazon.com/blogs/big-data/krones-real-time-production-line-monitoring-with-amazon-managed-service-for-apache-flink/

https://mltimes.se/blog/data-flow-leveraging-aws-kinesis-redshift-api-gateway/

Jean-Baptiste Le Duigou

?????? Senior Data Engineer | ?? AWS Community Builder

9 个月

Absolutely agree, near real-time data access is becoming crucial across industries, and the AWS ecosystem provides robust solutions for this. Your implementation guide for a serverless streaming pipeline is insightful. ?? However, I'm curious if we could alternatively use Kinesis Data Firehose with Redshift Serverless as the destination. What would be the benefits and drawbacks of both approaches? ?? Thanks for sharing this valuable post! ??

Mariano Allevato

Data & AI Partner | Co-Founder at Squadra Labs

9 个月

Real-Time serverless architecture = Game-changer! Awesome work Tito!

César Antunes

Google Apps Script Developer & Workflow Automation| Helping people and companies to find sustainable growth by achieving the goals of both

9 个月

Very helpful!

Augusto de Nevrezé

Data Engineer | Data Scientist | Electronics Engineer

9 个月

Let's spread the word about serverless streaming pipelines! Feel free to share this article with anyone who might be interested.

回复

要查看或添加评论,请登录

Augusto de Nevrezé的更多文章