Pig To Spark Conversion Framework
In this blog we will discuss about the framework that we have developed to convert PIG Scripts to SPARK Scripts.
The need of this framework was due to Component Changes in CDP Private Cloud Base 7.0. Pig, Flume, Sentry, and Navigator have been removed. Since Pig is no longer supported in CDP 7.0, we had a major requirement of migrating all the existing pig jobs to spark. We had most of our development done in Pig. So, we required a quick framework to migrate all the pig jobs to spark. We identified most of the jobs were sourcing jobs which were having functions like LOAD, UNION, GROUPBY and STORE.?So, we decided to build a framework to convert LOAD, UNION, GROUPBY and STORE functions of PIG to SPARK.
In the current scope of this Framework, we can convert LOAD, UNION, GROUPBY and STORE functions of PIG to SPARK making use of Shell Scripting.
Below is the flow diagram of framework:
Pseudocode
领英推荐
BEGIN
?
? read the pig script
? replace all the new lines with space separator
?
? for i in lines
? do
?
? echo $i|grep -w LOAD
? if [ $? == 0 ]
? then
? create .scala script with load statement
? else
? echo "No LOAD Statement"
? fi
?
? echo $i|grep -w UNION
? if [ $? == 0 ]
? then
? append .scala file with union statement
? else
? echo "No UNION Statement"
? fi
? .
? .
? .
? .
?
? done
?
ENDN
Actual Code
#Writing import statements and creating Spark Context
File=`echo $5|cut -d'.' -f1|awk -F'/' '{print $NF}'`
echo -e "spark-shell --master yarn --num-executors $1 --executor-cores $2 --driver-memory ${3}g --executor-memory ${4}g --conf \"spark.dynamicAllocation.enabled=false\" --jars? \"${6}/*.jar\" << EOF \n\nimport com.typesafe.config.ConfigFactory\nimport org.apache.log4j.Logger\nimport org.apache.spark.sql.SparkSession\nimport org.apache.spark.sql.SQLContext\nimport org.apache.spark.sql.SaveMode\nimport org.apache.spark.sql.functions.upper\nimport org.apache.spark.sql.functions.substring\nimport org.apache.spark.sql.functions.locate\nimport org.apache.spark.sql.functions.expr\nimport org.apache.spark.sql.functions.length\nimport org.apache.spark.sql.functions.col\nimport org.apache.spark.sql.functions.asc\nimport org.apache.spark.sql.functions.row_number\nimport org.apache.spark.sql.expressions.Window\nimport org.apache.spark.sql.functions.regexp_replace\nimport org.apache.spark.sql.functions.count\nimport org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};\n\nimport org.apache.spark.sql.Row;\n\nval props = ConfigFactory.load()\nval envProps = props.getConfig(\"TESTPROD\")\nval configuration = new Configuration()\nval storageService = new StorageService()\n\nval sparkContext = spark.sparkContext\n\nval sqlContext = new SQLContext(spark.sparkContext)\n\n" > ./${File}.scala
?
rm -r -f ./group_key.param
rm -r -f ./order_key.param
?
# Iterating over file based on ‘;’ delimiter and generate temporary param files.
SAVEIFS=$IFS
IFS=$";"
for i in `cat ${5}`
do
alias=`echo $i|tr '\n' ' '|grep -w LOAD|awk -F'=' '{print $1}'|sed -e 's/ //g'`
path=`echo $i|tr '\n' ' '|grep -w LOAD|awk -F' ' '{print $4}'|sed -e 's/ //g'|tr "'" '"'`
echo $path > ${alias}.txt
echo $i|tr '\n' ' '|grep -w "LOAD"|awk -F' as ' 'BEGIN{IGNORECASE = 1} {print $2}'|sed -e 's/ //g'|sed -e 's/(\|)//g'|sed -e 's/:chararray/ /g'|sed -e 's/,//g' >> ${alias}.txt
echo $i|tr '\n' ' '|grep -w "GROUP"|awk -F' ' '{print $7}'|sed -e 's/(\|)//g' >> ./group_key.param
echo $i|tr '\n' ' '|grep -w "ORDER"|awk -F' ' '{print $14}'|sed -e 's/ //g' >> ./order_key.param
store_loc+=`echo $i|tr '\n' ' '|grep -w "STORE"|awk -F' ' '{print $4}'|tr "'" " "|sed -e 's/ //g'`
?
?
?
#Grep the keyword LOAD and iterate over it and generate spark script.
echo $i|tr '\n' ' '|grep -w LOAD
if [ $? == 0 ]
then
x=`ls -lrt ./*.txt|awk -F' ' '{print $9}'`
for files in $x
do
df_schema_name+==`echo $files|awk -F'.txt' '{print $1}'|sed -e 's/\///g'|sed -e 's/\.//g'`
schema_name=`echo $files|awk -F'.txt' '{print $1}'|sed -e 's/\///g'|sed -e 's/\.//g'`
echo "val $schema_name = new StructType()"|tr -d '\n' >> ./${File}.scala
load_path=`cat ${files}|head -1`
SAVEIFS1=$IFS
IFS=$" "
for y in `cat ${files}|tail -1`
do
echo .add\(\"${y}\",StringType,true\)|tr -d '\n' >> ./${File}.scala
done
IFS=$SAVEIFS1
echo -e "\n\n" >> ./${File}.scala
echo "val df_${schema_name} = spark.read.format(\"csv\").option(\"header\", \"false\").option(\"delimiter\", \"\\u0001\").schema(${schema_name}).load(${load_path})" >> ./${File}.scala
done
else
echo "Not a LOAD Statement"
fi
rm -r -f ${alias}.txt
?
#Grep the keyword UNION and iterate over it and generate spark script.
echo $i|tr '\n' ' '|grep -w UNION
if [ $? == 0 ]
then
first_schema=`echo $i|tr '\n' ' '|grep -w UNION|awk -F',' '{print $1}'|awk -F' ' '{print $4}'|sed -e 's/ //g'`
second_schema=`echo $i|tr '\n' ' '|grep -w UNION|awk -F',' '{print $2}'|sed -e 's/ //g'`
echo val union_cv = 'df_'`echo $first_schema`.union'(df_'`echo $second_schema`')' >> ./${File}.scala
else
echo "Not a UNION Statement"
fi
?
##Grep the keyword ORDER and GROUP and iterate over it and generate spark script.
echo $i|tr '\n' ' '|grep -w GROUP|grep ORDER
if [ $? == 0 ]
then
group_key=`cat ./group_key.param`
rm -r -f ./group_key_val.param
x=`echo $group_key|awk -F"," '{print NF}'`
for ((i=1; i<=$x; i++))
do
a=`echo $group_key|cut -d',' -f $i`
mm=`echo $group_key|cut -d',' -f $x`
if [ $i == $x ]
then
filed_delim=
else
filed_delim=,
fi
echo -e "\"$a\"$filed_delim" >> ./group_key_val.param
done
?
group_key_val=`cat ./group_key_val.param|tr '\n' ' '|sed -e 's/ //g'`
echo val w2 = 'Window.partitionBy('`echo $group_key_val`').orderBy(col("'`cat ./order_key.param`'"))' >> ./${File}.scala
else
echo "Not a GROUP Statement"
fi
?
#Grep the keyword Store and iterate over it and generate spark script.
echo $i|tr '\n' ' '|grep -w STORE
if [ $? == 0 ]
then
echo union_cv.withColumn\(\"row\",row_number.over\(w2\)\).where\(\$\"row\" === 1\).drop\(\"row\"\).write.format\(\"csv\"\).save\(\"`echo $store_loc`\"\) >> ./${File}.scala
else
echo "Not a STORE Statement"
fi
done
IFS=$SAVEIFS
?
echo -e "\nEOF" >> ./${File}.scala
rm -r -f ./*.txt
rm -r -f ./*.param.
With this framework we were able to migrate all the sourcing pig scripts to spark script rapidly. Since, generation of Spark Script’s was an automated process using framework, there were no human errors, and this reduced our testing efforts.
Future Scope of Utility: To make it more robust by adding more functions to convert Pig functions to Spark.
Hope this blog was helpful; please provide feedback and comments.
Interested in #AI #Data #FHIR #Digital Health #IT Transformation #ClientSuccess #Healthcare #Cloud
2 年Great work Harshwardhan Jagtap. Cheers! Are you considering more complex scenarios for conversion as well?
Co-Founder and CEO at Atgeir Solutions
2 年Good one Harshwardhan Jagtap. Thanks for sharing.
Architect - Azure | Databricks | GCP | Big Data @ Onix-Datametica
2 年Amazing job! ??
Data Engineer - Advanced Data Analytics at Atgeir Solutions
2 年Great work harsh ??