ORC + ADLS = Polybase
Most people know that the fastest way to load Azure SQL Data Warehouse is through Polybase. And most people use delimited text files as their input.
That's good, until you need to move strings which need to differentiate null and empty strings, strings which may contain embedded carriage returns, and binary values.
Polybase also supports some Hadoop-based file formats - RCFile, ORC and Parquet. That's great if your data already lives in a Hadoop world, but what if you're moving data from an on-premise application? You might find that you're creating a delimited file, moving that file to Hadoop, and rewriting it in one of these formats before loading to ASDW. Not exactly a fast way to handle the data.
Recently I needed to address this problem, and I looked for ways to dynamically create ORC or Parquet files. There wasn't much information around, so I'm sharing my findings.
There were two challenges in this POC - the ORC and Parquet writers are buried in a mess of Hadoop dependencies, and the documentation on how to use them is virtually non-existent. In the end, I was able to succeed with ORC, but I've got some outstanding issues with Parquet that I'll address in another article.
I'm pleased with the outcome. ORC is a great format for Polybase ingestion. It compresses nicely, and a single file can be loaded in parallel by Polybase. This means that you get a file which uploads fast to ADLS _and_ loads quickly through Polybase.
So here it is - a Groovy code example that will run a query against a database, then write the results of that query to an ORC file.
// Get dependencies.
// WARNING: Many dependencies, may take VERY long time.
@GrabConfig(systemClassLoader=true)
@Grab(group='com.microsoft.sqlserver', module='mssql-jdbc', version='6.4.0.jre8')
@Grab (group='org.apache.hive', module='hive-storage-api', version='2.5.0')
@Grab (group='org.apache.orc', module='orc-core', version='1.4.3')
// Import library definitions
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.type.HiveDecimal
import org.apache.hadoop.hive.ql.exec.vector.*
import org.apache.orc.*
import groovy.sql.Sql
// Set the connection parameters we'll use for the query.
def sqlServer = 'localhost'
def sqlDatabase = 'AdventureWorks2017'
def sqlUser = 'sa'
def sqlPassword = 'pw'
def sqlQuery = 'select ProductID, Name, MakeFlag, StandardCost, Weight, ModifiedDate from Production.Product'
// Set up the ORC file schema.
Configuration conf = new Configuration()
TypeDescription schema = TypeDescription.createStruct()
.addField("ProductID", TypeDescription.createInt())
.addField("Name", TypeDescription.createString())
.addField("MakeFlag", TypeDescription.createBoolean())
.addField("StandardCost", TypeDescription.createFloat())
.addField("Weight", TypeDescription.createDecimal().withPrecision(10).withScale(2))
.addField("ModifiedDate", TypeDescription.createTimestamp())
// Open an ORC file based on the schema.
Writer writer = OrcFile.createWriter(new Path("./product.orc"),
OrcFile.writerOptions(conf)
.setSchema(schema)
.compress(CompressionKind.ZLIB)
);
// Create a batch of ORC vectors to receive the row data.
VectorizedRowBatch batch = schema.createRowBatch()
LongColumnVector vProductId = (LongColumnVector) batch.cols[0]
BytesColumnVector vName = (BytesColumnVector) batch.cols[1]
LongColumnVector vMakeFlag = (LongColumnVector) batch.cols[2]
DoubleColumnVector vStandardCost = (DoubleColumnVector) batch.cols[3]
DecimalColumnVector vWeight = (DecimalColumnVector) batch.cols[4]
TimestampColumnVector vModifiedDate = (TimestampColumnVector) batch.cols[5]
// Connect to the source database
def sql = Sql.newInstance ("jdbc:sqlserver://$sqlServer;databaseName=$sqlDatabase;user=$sqlUser;password=$sqlPassword;")
// Execute SQL query and iterate over rows to create ORC file.
sql.eachRow (sqlQuery) { row ->
// Short-hand the vector index
int v = batch.size
// Set an integer value
vProductId.vector[v] = row.getInt('ProductId')
// Set a string value that may be empty or null
String name = row.getString('Name')
if (name != null) {
vName.setVal (v,name.getBytes())
}
else {
vName.isNull[v] = true
vName.noNulls = false
}
// Use integers for bit values
vMakeFlag.vector[v] = row.getInt('MakeFlag')
// Float example
vStandardCost.vector[v] = row.getFloat('StandardCost')
// Decimal. Bit of fiddling here to handle NULLs
BigDecimal weight = row.getBigDecimal ('Weight')
if (weight == null) {
vWeight.set (v,(HiveDecimal) null)
}
else {
HiveDecimal hWeight = HiveDecimal.create(weight)
vWeight.set (v,hWeight)
}
// DateTime
vModifiedDate.set (v,row.getTimestamp('ModifiedDate'))
// Write each batch as filled.
++batch.size
if (batch.size == batch.getMaxSize()) {
writer.addRowBatch(batch);
batch.reset();
}
}
// Write final batch to disk
if (batch.size != 0) {
writer.addRowBatch(batch);
batch.reset();
}
// Finished
writer.close();
Last month I shared some code to transfer the resulting file to Azure Data Lake Store, ready for query through Polybase. You'll find it here:
https://www.dhirubhai.net/pulse/groovy-data-lake-store-ron-dunn/
To wrap up the load, here's the code that you'll need to use Polybase, in Azure SQL Data Warehouse.
The external data source connection:
create external data source adls
with (
type = HADOOP,
location = 'adl://me.azuredatalake.net',
credential = adlsCredential
);
Next, the external file type:
create external file format adls_orc
with (
format_type = ORC,
data_compression = 'org.apache.hadoop.io.compress.DefaultCodec'
);
And finally, the external table definition:
drop external table product_orc;
create external table product_orc (
ProductId int
,ProductName nvarchar(50)
,MakeFlag bit
,StandardCost real
,Weight decimal(10,2) null
,ModifiedDate datetime
)
with (
location = '/product.orc', -- Location of file I uploaded
data_source = adls, -- My external data source
file_format = adls_orc -- My external file format (see above)
);
Software Architect / Data Engineer
6 年Hi Ron, thanks for sharing . Question : After I writing String null? values to ORC, and then reading them back using?OrcFile.createReader , java read them as empty strings not null. And If I use these ORC file to create Hive tables, will these values be treated as null or as empty string in Hive ?
Data | BI | Data Engineering | Leadership | Data Strategy
6 年Thanks for sharing Ron. Did you also do any benchmarking on the performance of the two methods - is ORC quicker than the delimited text files and how fast if at all? We had a similar issue with NULL and empty strings and had to replace them with default strings when extracting from source. Will be keen to try ORC.
Assistant Vice President - Genpact
6 年Hi Ron, Good insight on your experience. Have you ever tried to load data from Excel into Azure SQL Data Warehouse using Polybase ? If so, can you share some insights on implementation