Apache spark to convert a sequence file into tabular format
Typically when you do first pass of flattening of hierarchal or nested file format like json, XML, HD5 you get a format like <colName><ColVal1,ColVal2,ColVal3,...><ts1,ts2,ts3,...>
For further analysis in spark dataframe/dataset you need to bring these values in tabular format. Here is apache spark code to do that with java api.
import java.util.ArrayList; import java.util.List; import java.util.Scanner; import java.util.logging.Logger; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.Metadata; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.api.java.function.*; public class Hd5toparquet { private static final Logger logger = Logger.getLogger(Hd5toparquet.class.getName()); // private static List<String> columnValue = new ArrayList<String> (); // private static String columnName = ""; public static Dataset<Row> getDataFrame(SparkSession sqlContext, String format, String inputPath, String header) { Dataset<Row> dataFrame = null; String localheader = "false"; if (!header.equalsIgnoreCase("true")) { localheader = "false"; } else { localheader = "true"; } try { if (format.equals("csv")) { dataFrame = sqlContext.read().format("com.databricks.spark.csv").option("header", localheader) .option("inferSchema", "true").load(inputPath); } else if (format.equals("parquet")) { dataFrame = sqlContext.read().format("parquet").option("inferSchema", "true").load(inputPath); } else if (format.equals("tab")) { dataFrame = sqlContext.read().format("com.databricks.spark.csv").option("header", localheader) .option("inferSchema", "true").option("delimiter", "\\t").load(inputPath); } else { dataFrame = sqlContext.read().format("com.databricks.spark.csv").option("header", localheader) .option("inferSchema", "true").option("delimiter", format).load(inputPath); } return dataFrame; } catch (Exception e) { logger.severe(("Exception in get Dataframe util - " + e.getMessage())); } return null; } public static Dataset<Row> getSelectedColumns(Dataset<Row> df, List<String> selectedColumns) { // scala.collection.Seq seqCols = // scala.collection.JavaConverters.asScalaIteratorConverter(selectedColumns.subList(1, // selectedColumns.size()-1).iterator()).asScala().toSeq(); // scala.collection.Seq<String> seqCols = // scala.collection.JavaConverters.asScalaIteratorConverter(selectedColumns.iterator()).asScala().toSeq(); List<String> dropCols = new ArrayList<>(); if (!selectedColumns.isEmpty()) { for (String s : df.columns()) { if (!selectedColumns.contains(s)) dropCols.add(s); } // logger.severe("--------BEFORE---------"); // df.printSchema(); for (String col : dropCols) { df = df.drop(col); } } // logger.severe("--------AFTER---------"); // df.printSchema(); return df; } public static void writeDataframe(Dataset<Row> df, String outputFormat, String outputPath, List<String> selectedColumns, String savemode) { writeDataFrame(getSelectedColumns(df, selectedColumns), outputFormat, outputPath,savemode); } public static void writeDataFrame(Dataset<Row> df, String outputFormat, String outputPath, String savemode) { String localheader = "true"; SaveMode sm = SaveMode.Overwrite; if ( savemode.equalsIgnoreCase("append") == true) sm = SaveMode.Append; if (outputFormat.equalsIgnoreCase("csv") || outputFormat.equalsIgnoreCase("parquet")) { df.write().mode(sm).option("header", localheader).format(outputFormat).save(outputPath); } else if (outputFormat.equalsIgnoreCase("tab")) { df.write().mode(sm).format("com.databricks.spark.csv").option("header", localheader) .option("delimiter", " ").save(outputPath); } else { df.write().mode(sm).format("com.databricks.spark.csv").option("header", localheader) .option("delimiter", outputFormat).save(outputPath); } } public static JavaPairRDD<String,Row> dfToPairRDD(String keyColumn,Dataset<Row> df){ return df.toJavaRDD().keyBy(row -> row.getAs(keyColumn).toString()); } public static class FlattenForEachFunc implements ForeachFunction<Row> { /** * */ private static final long serialVersionUID = 1L; private static SparkSession ss = null; private List<Row> rows = new ArrayList<Row> (); public FlattenForEachFunc(SparkSession spark) { ss = spark; } public static Dataset<Row> dsunion = null; StructType schema = new StructType(new StructField[] { new StructField("ColumnName",DataTypes.StringType, false,Metadata.empty()), new StructField("ColumnValue",DataTypes.FloatType, true,Metadata.empty()), }); @Override public void call(Row r) throws Exception { String name = r.getString(0); String val = r.getString(1); if (val == null) rows.add(RowFactory.create(name,null)); else { String[] vals = val.split(","); for (String s : vals) rows.add(RowFactory.create(name,Float.parseFloat(s))); //System.out.println(); } //Dataset<Row> ds = ss.createDataset(columnValue, Encoders.STRING()).toDF(); Dataset<Row> ds = ss.createDataFrame(rows, schema); // ds = ds.withColumnRenamed("value", name) ; if (dsunion == null) dsunion = ds; else dsunion = dsunion.union(ds); System.out.println(name); System.out.println(ds.count()); System.out.println(dsunion.count()); // dsunion.show(); } } public static void main(String[] args) { String fileLoc = "C:\\Users\\VISING\\Documents\\SEQUENCEFILE.csv"; System.setProperty("hadoop.home.dir", "C:\\Users\\VISING\\Downloads"); SparkSession spark = SparkSession.builder().appName("myapp") .master("local[*]") // only for demo and testing purposes, use spark-submit instead .getOrCreate(); Dataset<Row> df = getDataFrame(spark,"csv",fileLoc,"true"); df = df.limit(10); df.show(); df.printSchema(); List<String> colN = df.select("signalName").as(Encoders.STRING()).collectAsList(); //spark.sparkContext().parallelize((colN.toArra,10); // int i = 0; // for (String s: colN) { // if (i== 50) break; // df = df.withColumn(s, functions.lit(null)); // } Hd5toparquet.FlattenForEachFunc flatfunc = new Hd5toparquet.FlattenForEachFunc(spark); df.foreach( flatfunc); // List<String> value = getColumnValue(); // String name = getColumnName(); // Dataset<Row> ds = spark.createDataset(value, Encoders.STRING()).toDF(); // ds = ds.withColumnRenamed("value", name) ; // ds.show(); Dataset<Row> ds = flatfunc.dsunion; ds.show(false); ds.printSchema(); Scanner input = new Scanner(System.in); System.out.print("Enter the Signal to profile :"); String signame = input.nextLine(); Dataset<Row> dsfilter = ds.filter(ds.col("ColumnName").equalTo(signame)).describe("ColumnValue"); dsfilter.show(); writeDataFrame(ds, "parquet", "C:\\\\Users\\\\VISING\\\\Documents\\\\FLATFILE.parq", "overwrite"); input.close(); spark.stop(); } }
Feel free to reach me if you have any questions.