Moving data from one system to another with some transformations is Data Integration. This is fine for ad hoc integrations but if such a pipeline is executed regularly and part of an entire ecosystem, I would add more requirements to a data integration process. Let's coin it Enterprise-grade Data Integration. To make things more obvious, the task shall be very basic: Move data from a source database into a different target database. This can be achieved in multiple ways. Some examples:
- Create a SQL select command, execute it, write the results to a file and import the file.
- A Python script executing the SQL select row by row and and for each returned row, execute an insert statement into the target database.
- If the source or target database allows to create a database link, meaning it can make remote tables appear as if they were local, an insert...select statement does the trick.
- A script executed in an Apache Spark cluster.
- Various ELT/ETL tools, onPrem, Cloud, whatever.
All of these methods (and countless more) do work. But the core quality of any Data Integration tool must be performance. It can't be that an initial load takes multiple days, and maybe even requires the source system to be locked for changes to guarantee consistent data, or a daily batch delta takes more than 24 hours. Unfortunately the volumes of data we deal with on a regular basis are vast enough to require the use of all techniques available to speed up the process.
Just imagine we must move a laughable 10 million rows and the batch window is graciously 4 hours long. That would require to move 694 rows/sec in average. Not an impressive amount but for some approaches the number might be challenging. If we assume one row has 100 columns and the data must be converted from the SQL driver's format to an internal format and then to the target database driver's format, 2 billion conversions will?happen. But CPUs are fast, that should pose no problem. On the other hand, it is not nothing. Some string to datetime conversions, some column based transformations (ltrim?) and it can get tight for a single CPU quickly. Let's further assume one column has an average width of 4 byte. Then a row is 4 bytes * 100 columns = 400 bytes big. A single SSD can process 500MB/s, then one can process 1.25 million rows per second. Nice. Even better, if that happens to be too slow, double the disks in a RAID setup will double the throughput. Network is different, though. A standard local 1GBit Lan has a throughput of only 110MByte/sec. That means 275'000 rows/sec. Still okay. But the network is a shared media. If the data moves from the source database via a single network hop directly into the target database, we get the full 275'000 rows/sec. If the architecture is with a separate server running the integration, the network bandwidth is used for read and write, so the throughput would be half: 137'500rows/sec.
Informatica deploys an agent on demand that is capable of performing the entire transformations without moving any data to the cloud and back. Most cloud integration tool however perform the transformation in the cloud, hence they must read and write from the source via the WAN network into the cloud center and back. A WAN connection will certainly be not with a distinct 1GBit cable between source-to-cloud-to-target. No chance to get a decent performance with such a cloud centric tool.
Things get even worse when a cluster is used and the architecture is such that each node does parts of the transformations. Say we have 10 nodes and 10 distinct steps. Then the reader node reads all rows - one network hop - hands over the data to the next node - one network hop - it performs transformation number 2 and gives the data to the next node - one network hop - and .... and the final writer sends all data into the target - one network hop. In worst case, all data must pass the network 11 times and the overall performance would be 275'000/11 = 25'000rows/sec. If the software is designed badly, it happens that a cluster gets slower the larger it is and a single laptop would outperform the expensive solution.
Frankly, the assumptions about the data volumes made are unrealistically low. Real projects will require to move ten times more data in a tenth of the time (24 minutes for 10m rows instead of 4hours). But all of that is beside the point. There is not a single data integration project at a customer where the data movement is considered to be too quick. The conclusion is hence, the tool of choice must support every single optimization technique available to maximize the performance.These techniques are no rocket science and well known, yet for some reason not implemented by all data integration tools. It is your job to asses if your tool of choice supports all and how well and if you can live with the compromises.
- As little data conversions as possible. If the source is an integer column and the target is an integer column, the tool should not force an intermediate format. It sounds like an obvious requirement, yet there are tools where the data is exchanged internally in e.g. a Json format. Which does not have integers. Json is a textual description where integers are represented in a standardized way, but a four byte integer like 1234 is turned into a text "value: 1234". If your tool is using such intermediate format, I would not even consider it any further. If they failed at the basics with no focus on performance, it will be worse in other areas. Rule: The data integration tool must support as many native data types as possible. integer, float, decimal, Unicode strings, ASCII strings, timestamp in UTC, timestamp with timezone, etc. A good candidate list is all JDBC/ODBC datatypes.
- Parallel processing must be the default. In the 1960s sequential batch processing was the only option: Read the source, apply the first transformation, write the intermediate result to a file. Then read the file, apply the next transformation and write its result to another file. Until after n-many stages the final result is created. Such a process cannot be parallelized. Only once the first intermediate file is written completely(!), the second transformation can start and once that is completed, the next can start. The entire server is idle and a single CPU and disk is busy. I can recall such a case in SAP Data Services. Before the ABAP datastore did support the RFC streaming method, first the ABAP code was executed writing a file with the result in the SAP server and the jobserver was idle. Then the file was downloaded and all systems did wait for this to complete. And then the jobserver read the file and did all processing. Three distinct sequential steps. With RFC streaming the ABAP sends the data to the jobserver where it is transformed and loaded all at the same time. So what took 1h + 10 minutes + 1h before is now done in 1.2h. Another tool category lacking in this area are ELT-type tools. They extract the source data and load it into a staging area of the target database. This is step one. Step two is executing a insert..select from the temp table into the final table with all the transformations. Rule: No sequential transformation executions inside a pipeline - all should happen as a stream of data - for maximum performance.
- All possible types of source partitions must be supported out of the box. Large database tables are all partitioned inside the database. This allows the database to process data in parallel, e.g. first process the data per partition and then create the final result set by combining the per-partition data. For data integration tools it is the same thing. Instead of having a single reader where all rows are processed, one reader per partition can be used and all or multipleprocesses read the data in parallel and independently of each other. Thus utilizing the system much better. Important! As the partitions can change at any point in time and will be different between DEV and PROD, the partitioned-read definition must be decided at runtime, not manually by the developer. A tool that requires the user to either write one pipeline per partition (e.g. because the SQL is different: "select * from table partition(0)") or where this can be parameterized but the user must manually specify the parameters, is a tool I would question. Also keep in mind, partition support comes in different flavors. In SQL it is controlled via the SQL select statement, in Kafka one consumer group is automatically assigned equal amounts of topic partitions - best would be to start as many consumers as there are topic partitions. In a Data Lake it could be 10 parquet files in different directories representing 10 partitions of the same table. Or even a single CSV file can be read by multiple readers, each working on a specified range of the file. Rule: A single pipeline must support partitions and the partition information is read at execution time.
- A pipeline should use as few network hops as possible. All high performance computing algorithms use a vertical parallel processing approach. Vertical parallelism means that each compute node does the complete end-to-end processing but on a subset of the data. Perfect. Video compression splits the image into rectangles and each gets compressed independently. It is the 101 of parallel processing, taught in kindergarden. Horizontal parallelism is that one node does a sub-set of the transformations, hands over the result via the TCP stack to the next node which does the next set and via another network hop to the next node. In result, where in the vertical parallelism 1GB of source data is transferred once across the network, in the horizontal parallelism 1GB is transferred per transformation step. 10 transformations must move 10GB of data over the slowest component - the network. Note: Even if the network hop is within the same physical machine, e.g. from one container to another, the entire data conversion and mem-copies are required to move the data. What a waste! Look carefully at (7). Rule: A tool that splits the pipeline per transformation will decrease the performance. It can even scale with a negative coefficient! A tool that assigns the entire work of one partition of the data to a node scales in theory linear.
- As few switches of execution engines within a pipeline as possible. There will always be different execution engines. The reader is probably SQL based and the transformation engine is built in another programming language (C++, C#, Python, ...). But a tool where you must use different engines for different tasks will be slow. It is more busy with data type conversions than with actual processing. SAP Data Services (6) is a good example. A join operation has been implemented for every single database in SQL plus in its C++ engine. Yes, that is expensive for the tool development if every transform is implemented in each engine but the customer get stellar performance in return. What is more important? Rule: All engines must support all transformations. A tool with different engines for different transformations should be considered a bad sign. It is not a sign of flexibility but a sign that the development's primary focus was on their development budget and not customer performance. Customers will have to invest a lot more into hardware to compensate for these shortcomings.
- Native drivers for every source and target. Trivial tools either support the basic options only or require the end user to implement advanced features themselves. For example, to read from a database the SQL is the way to go. Every database supports a select * from table. It is a standard. How to read data from a single partition and how to get the information about the database partitions are neither SQL standard nor does the JDBC/ODBC drivers have an API for. Without, multiple of above requirements cannot be achieved. For writing data into a database it is even worse. A regular insert..values will give you a performance of maybe 100rows/sec. A parameterized SQL 1000rows/sec, a parameterized SQL with array inserts 10000rows/sec. The native low level driver supports bulkload methods bypassing the SQL layer in certain scenarios (database dependent) and then you get 100'000rows/sec. Partition-aware loading allows unlimited scaling. While JDBC/ODBC supports parameterized SQL and often array inserts, bulkload methods cannot be implemented using these drivers and partition-aware loading similar to reading is database dependent. Examples: SAP Cloud Integration (8) is using Camel as shared abstraction level. Very limited. SAP Data Intelligence (7) and its reuse in Data Sphere is using JDBC without partitions. SAP Data Services (6) supports all including partition-aware bulkloading. Rule: A Data Integration tool without database specific low level drivers will be slow at reading and writing - usually the limiting factors in any pipeline.
SAP Finance Lead Solution Architect at SNP Group
8 个月Very nice article covering all aspects. One point I would like to add is, given performance constraints NZD (Near Zero Downtime) can be an option giving you opportunity to migrate 95% of your data during uptime and leaving ~5% for a much smaller downtime window.
Chief Delivery Officer - Simfoni
8 个月Thanks for the post Werner Daehn. I learned some new things today.
I've never seen numbers like you list with an ETL tool, the best performance out of an ETL tool that I got was I think with Pentaho. SAS BI studio on the other hand was super powerful but horribly slow.