A Story of Data Infrastructure at VideoBlocks
We heart data at VideoBlocks. And having the right data infrastructure is essential for us to make data driven decisions. In this article, I would like to tell you a story of our data infrastructure, from a single MariaDB server to a full system around AWS S3 and Redshift, with the reasons behind each improvement decisions. Hope this story can help you making better decisions of your data infrastructure.
Move data from MySQL to Redshift
When I first joined VideoBlocks, our data infrastructure looks like the following:
Several pain points of this system:
- Using MySQL to MariaDB replication is near real time, but it’s fragile. The MariaDB triggers we setup to aggregate data make it ever worse. Every time we have big database change (update schema or bulk insert/update) on MySQL, we need to double check MariaDB to make sure the triggers are not broken. And if the trigger does break, it is a painful process to bring it back due to the broken state. We have to delete the aggregated table and rebuild it from scratch.
- We use Rackspace to manage MariaDB, and we create tickets for anything we need (for example, the replication is broken or disk is full). It’s expensive and not efficient. We need to move faster than this.
- Even with less than 300GB of data, MariaDB is slow on full table analysis, for example, one A/B test analysis SQL took 30 minutes to run.
The first improvement I made is to introduce columnar data store for analytics team. I pick Redshift because it is cheap (we started with single node cluster, $180 per month) and it is easy to scale horizontally when we have more data.
I leveraged the mysql_to_redshift.py (from AWS Data Pipeline: https://s3.amazonaws.com/datapipeline-us-east-1/sample-scripts/mysql_to_redshift.py) and built a program to load all tables from our 3 MySQL servers into Redshift. In addition, our analytics team converted the existing MariaDB triggers (which aggregate all three sites data into one) to SQL code. Then, I setup a cronjob to run load program plus aggregation SQL at 12am everyday. The downside of this approach is we don’t have the real time data like before, but this approach is way more stable and Redshift is way faster than MariaDB on analytics queries (1.5 mins compare to 30 mins using MariaDB.) And we almost never need to same day data for analysis. So, the tradeoff makes sense for us.
Backend Data Pipeline
After we can efficiently analyze our operational data, my next goal is to tackle click stream data. Same as many other companies, we want to know everything about our (potential) customers. Therefore, we want to save all keywords our customer searched, and the result of their search. Also, all the page they viewed, what funnel (Google Search, Facebook Ad, blog posts, etc) brings them to these pages, basically, everything :)
To achieve our goal, we built the backend data pipeline, it looks like this:
The web server sends events as JSON string to fluentd docker container running on the same EC2 instance, and fluentd forwards events to AWS Kinesis Firehose. Kinesis Firehose aggregates events (JSON strings), gzip it, and put the gzipped file on AWS S3. S3 notifies AWS SQS about the new file, and Event Loader (a small Scala program we built that subscribes to SQS) gets the S3 messages from SQS, load the S3 file into Redshift, run SQL commands to process each event type (different events have different attributes) and load to its own event tables.
An example of the SQL that process the event:
insert into event.{{ siteName }}_page_view
(uuid, request_id, vid, mid, usid, date,
uri, referrer_uri, campaign, source, medium, term, utm_version)
select uuid, requestid, vid, mid, usid, date,
etl_text(json_extract_path_text(payload, 'uri'), 1000),
etl_text(json_extract_path_text(payload, 'referrerUri'), 200),
etl_text(json_extract_path_text(payload, 'utm', 'campaign'), 80),
etl_text(json_extract_path_text(payload, 'utm', 'source'), 80),
etl_text(json_extract_path_text(payload, 'utm', 'medium'), 80),
etl_text(json_extract_path_text(payload, 'utm', 'term'), 80),
etl_int(json_extract_path_text(payload, 'utm', 'utmV'))
from {{ tempEventTable }}
where name = '{{ eventName }}'
and length(vid)=64 and length(uuid)=36 and length(requestid)=36
and uuid not in (select uuid from event.{{ siteName }}_page_view where date >= (select min(date) from {{ tempEventTable }}))
and etl_text(json_extract_path_text(payload, 'uri'), 1000) not like '/api/%';
Some comments about the above code snippet:
- uuid is the unique identifier for de-duplicate events because Kinesis only guarantee at least once.
- etl_text and etl_int is the Redshift UDF built by us. We use these UDF to enforce data type and make sure the rest of the data is loaded if there is one malformed JSON in the file.
The following is an example of these UDF
CREATE OR REPLACE FUNCTION etl_int (payload VARCHAR(max))
RETURNS INT
IMMUTABLE
AS $$
if not payload:
return None
try:
x = int(payload)
if x < 2147483647 and x > -2147483648:
return x
else:
return None
except ValueError:
return None
$$ LANGUAGE PLPYTHONU;
CREATE OR REPLACE FUNCTION etl_boolean (payload VARCHAR(max))
RETURNS BOOLEAN
IMMUTABLE
AS $$
if not payload:
return None
x = payload.strip().lower()
if x == 'true':
return True
elif x == 'false':
return False
elif x == '1':
return True
elif x == '0':
return False
else:
return None
$$ LANGUAGE PLPYTHONU;
CREATE OR REPLACE FUNCTION etl_date (payload VARCHAR(max))
RETURNS TIMESTAMP
IMMUTABLE
AS $$
from datetime import datetime
if not payload:
return None
try:
return datetime.strptime(payload, '%Y-%m-%d')
except ValueError:
return None
$$ LANGUAGE PLPYTHONU;
CREATE OR REPLACE FUNCTION etl_text (payload VARCHAR(max), len INT)
RETURNS VARCHAR(max)
IMMUTABLE
AS $$
if not payload:
return None
try:
return payload.strip().encode('ascii', 'ignore')[:len]
except ValueError:
return None
$$ LANGUAGE PLPYTHONU;
Several benefits about this design:
- All data process logic are in SQL, Event Loader (a Scala program) only handles SQS and COPY command from S3 object to Redshift. Analytics team (who are SQL experts) can add or update process.sql of an event to customize the data format, without engineers help. This speeds up analytics team’s efficiency a lot.
- SQS queue can hold jobs if Event Loader crashes or has bugs. We can take down Event Loader at anytime, refactor its code, and deploy it say two days later. It is more robust than originally we have Event Loader logic in AWS Lambda and directly invoked by S3 events.
- The operation of Event Loader is idempotent because of uuid. This makes failure recovery or manual reload simple.
And of course, there are some drawbacks as well:
- Coding in SQL is not as straightforward as Python or Spark (at least for traditional software engineers.) Unit test the SQL code is also tricky. But, once you express your logic in declarative language like SQL, scalability is given. Redshift translates you SQL code into execution plan and run on all the nodes in the cluster. It’s the same idea with Hive or Spark SQL.
- Read and process events on the same Redshift cluster can cause resource competition issue. This issue can be easily solved by using a different cluster to process events and use UNLOAD and COPY to move the processed data into the readonly cluster.
As you can see, S3 is the source of truth in our data infrastructure. Everything else derived from it. For example, Redshift is the SQL interface of our S3 data, and we can load all S3 data back to a new Redshift cluster within an hour or two. We also have spark job (running on AWS EMR) using S3 data to build our recommendation model.
In the next articles, I would like to talk about our several attempts to acquire front end data, the data coming directly from users’ browsers. Stay tuned.
Currently, our production redshift cluster has 24 nodes, with 2.7TB data (click stream, operational data, 3rd party data, aggregation tables, etc.) And it is the most crucial internal tool of VideoBlocks. In fact, we treat the data infrastructure as serious as production sites, which has staging environment, integration tests and release test.
Hope this article helps you design your own data infrastructure. Hat tip to our engineering team and analytics team. We built this simple and robust data infrastructure together and it was fun.