Pig To Spark Conversion Framework
Apache PIG to Spark Convertor

Pig To Spark Conversion Framework

In this blog we will discuss about the framework that we have developed to convert PIG Scripts to SPARK Scripts.

The need of this framework was due to Component Changes in CDP Private Cloud Base 7.0. Pig, Flume, Sentry, and Navigator have been removed. Since Pig is no longer supported in CDP 7.0, we had a major requirement of migrating all the existing pig jobs to spark. We had most of our development done in Pig. So, we required a quick framework to migrate all the pig jobs to spark. We identified most of the jobs were sourcing jobs which were having functions like LOAD, UNION, GROUPBY and STORE.?So, we decided to build a framework to convert LOAD, UNION, GROUPBY and STORE functions of PIG to SPARK.

In the current scope of this Framework, we can convert LOAD, UNION, GROUPBY and STORE functions of PIG to SPARK making use of Shell Scripting.

Below is the flow diagram of framework:

Pig to Spark conversion workflow

Pseudocode


BEGIN

?

? read the pig script

? replace all the new lines with space separator

?

? for i in lines

? do

?

? echo $i|grep -w LOAD

? if [ $? == 0 ]

? then

? create .scala script with load statement

? else

? echo "No LOAD Statement"

? fi

?

? echo $i|grep -w UNION

? if [ $? == 0 ]

? then

? append .scala file with union statement

? else

? echo "No UNION Statement"

? fi

? .

? .

? .

? .

?

? done

?

ENDN        

Actual Code

#Writing import statements and creating Spark Context

File=`echo $5|cut -d'.' -f1|awk -F'/' '{print $NF}'`

echo -e "spark-shell --master yarn --num-executors $1 --executor-cores $2 --driver-memory ${3}g --executor-memory ${4}g --conf \"spark.dynamicAllocation.enabled=false\" --jars? \"${6}/*.jar\" << EOF \n\nimport com.typesafe.config.ConfigFactory\nimport org.apache.log4j.Logger\nimport org.apache.spark.sql.SparkSession\nimport org.apache.spark.sql.SQLContext\nimport org.apache.spark.sql.SaveMode\nimport org.apache.spark.sql.functions.upper\nimport org.apache.spark.sql.functions.substring\nimport org.apache.spark.sql.functions.locate\nimport org.apache.spark.sql.functions.expr\nimport org.apache.spark.sql.functions.length\nimport org.apache.spark.sql.functions.col\nimport org.apache.spark.sql.functions.asc\nimport org.apache.spark.sql.functions.row_number\nimport org.apache.spark.sql.expressions.Window\nimport org.apache.spark.sql.functions.regexp_replace\nimport org.apache.spark.sql.functions.count\nimport org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};\n\nimport org.apache.spark.sql.Row;\n\nval props = ConfigFactory.load()\nval envProps = props.getConfig(\"TESTPROD\")\nval configuration = new Configuration()\nval storageService = new StorageService()\n\nval sparkContext = spark.sparkContext\n\nval sqlContext = new SQLContext(spark.sparkContext)\n\n" > ./${File}.scala

?

rm -r -f ./group_key.param

rm -r -f ./order_key.param

?

# Iterating over file based on ‘;’ delimiter and generate temporary param files.

SAVEIFS=$IFS

IFS=$";"

for i in `cat ${5}`

do

alias=`echo $i|tr '\n' ' '|grep -w LOAD|awk -F'=' '{print $1}'|sed -e 's/ //g'`

path=`echo $i|tr '\n' ' '|grep -w LOAD|awk -F' ' '{print $4}'|sed -e 's/ //g'|tr "'" '"'`

echo $path > ${alias}.txt

echo $i|tr '\n' ' '|grep -w "LOAD"|awk -F' as ' 'BEGIN{IGNORECASE = 1} {print $2}'|sed -e 's/ //g'|sed -e 's/(\|)//g'|sed -e 's/:chararray/ /g'|sed -e 's/,//g' >> ${alias}.txt

echo $i|tr '\n' ' '|grep -w "GROUP"|awk -F' ' '{print $7}'|sed -e 's/(\|)//g' >> ./group_key.param

echo $i|tr '\n' ' '|grep -w "ORDER"|awk -F' ' '{print $14}'|sed -e 's/ //g' >> ./order_key.param

store_loc+=`echo $i|tr '\n' ' '|grep -w "STORE"|awk -F' ' '{print $4}'|tr "'" " "|sed -e 's/ //g'`

?

?

?

#Grep the keyword LOAD and iterate over it and generate spark script.

echo $i|tr '\n' ' '|grep -w LOAD

if [ $? == 0 ]

then

x=`ls -lrt ./*.txt|awk -F' ' '{print $9}'`

for files in $x

do

df_schema_name+==`echo $files|awk -F'.txt' '{print $1}'|sed -e 's/\///g'|sed -e 's/\.//g'`

schema_name=`echo $files|awk -F'.txt' '{print $1}'|sed -e 's/\///g'|sed -e 's/\.//g'`

echo "val $schema_name = new StructType()"|tr -d '\n' >> ./${File}.scala

load_path=`cat ${files}|head -1`

SAVEIFS1=$IFS

IFS=$" "

for y in `cat ${files}|tail -1`

do

echo .add\(\"${y}\",StringType,true\)|tr -d '\n' >> ./${File}.scala

done

IFS=$SAVEIFS1

echo -e "\n\n" >> ./${File}.scala

echo "val df_${schema_name} = spark.read.format(\"csv\").option(\"header\", \"false\").option(\"delimiter\", \"\\u0001\").schema(${schema_name}).load(${load_path})" >> ./${File}.scala

done

else

echo "Not a LOAD Statement"

fi

rm -r -f ${alias}.txt

?

#Grep the keyword UNION and iterate over it and generate spark script.

echo $i|tr '\n' ' '|grep -w UNION

if [ $? == 0 ]

then

first_schema=`echo $i|tr '\n' ' '|grep -w UNION|awk -F',' '{print $1}'|awk -F' ' '{print $4}'|sed -e 's/ //g'`

second_schema=`echo $i|tr '\n' ' '|grep -w UNION|awk -F',' '{print $2}'|sed -e 's/ //g'`

echo val union_cv = 'df_'`echo $first_schema`.union'(df_'`echo $second_schema`')' >> ./${File}.scala

else

echo "Not a UNION Statement"

fi

?

##Grep the keyword ORDER and GROUP and iterate over it and generate spark script.

echo $i|tr '\n' ' '|grep -w GROUP|grep ORDER

if [ $? == 0 ]

then

group_key=`cat ./group_key.param`

rm -r -f ./group_key_val.param

x=`echo $group_key|awk -F"," '{print NF}'`

for ((i=1; i<=$x; i++))

do

a=`echo $group_key|cut -d',' -f $i`

mm=`echo $group_key|cut -d',' -f $x`

if [ $i == $x ]

then

filed_delim=

else

filed_delim=,

fi

echo -e "\"$a\"$filed_delim" >> ./group_key_val.param

done

?

group_key_val=`cat ./group_key_val.param|tr '\n' ' '|sed -e 's/ //g'`

echo val w2 = 'Window.partitionBy('`echo $group_key_val`').orderBy(col("'`cat ./order_key.param`'"))' >> ./${File}.scala

else

echo "Not a GROUP Statement"

fi

?

#Grep the keyword Store and iterate over it and generate spark script.

echo $i|tr '\n' ' '|grep -w STORE

if [ $? == 0 ]

then

echo union_cv.withColumn\(\"row\",row_number.over\(w2\)\).where\(\$\"row\" === 1\).drop\(\"row\"\).write.format\(\"csv\"\).save\(\"`echo $store_loc`\"\) >> ./${File}.scala

else

echo "Not a STORE Statement"

fi

done

IFS=$SAVEIFS

?

echo -e "\nEOF" >> ./${File}.scala

rm -r -f ./*.txt

rm -r -f ./*.param.        

With this framework we were able to migrate all the sourcing pig scripts to spark script rapidly. Since, generation of Spark Script’s was an automated process using framework, there were no human errors, and this reduced our testing efforts.

Future Scope of Utility: To make it more robust by adding more functions to convert Pig functions to Spark.

Hope this blog was helpful; please provide feedback and comments.


Dev Kripa Roy

Interested in #AI #Data #FHIR #Digital Health #IT Transformation #ClientSuccess #Healthcare #Cloud

2 年

Great work Harshwardhan Jagtap. Cheers! Are you considering more complex scenarios for conversion as well?

回复
Anand Deshpande

Co-Founder and CEO at Atgeir Solutions

2 年

Good one Harshwardhan Jagtap. Thanks for sharing.

回复
Lanyo Francis

Architect - Azure | Databricks | GCP | Big Data @ Onix-Datametica

2 年

Amazing job! ??

回复
Dr. MONICA JAGTAP-PATIL

Data Engineer - Advanced Data Analytics at Atgeir Solutions

2 年

Great work harsh ??

回复

要查看或添加评论,请登录

社区洞察

其他会员也浏览了