Apache spark to convert a sequence file into tabular format

Apache spark to convert a sequence file into tabular format

Typically when you do first pass of flattening of hierarchal or nested file format like json, XML, HD5 you get a format like <colName><ColVal1,ColVal2,ColVal3,...><ts1,ts2,ts3,...>

For further analysis in spark dataframe/dataset you need to bring these values in tabular format. Here is apache spark code to do that with java api.

import java.util.ArrayList;
import java.util.List;
import java.util.Scanner;
import java.util.logging.Logger;


import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SaveMode;


import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.api.java.function.*;




public class Hd5toparquet {


	private static final Logger logger = Logger.getLogger(Hd5toparquet.class.getName());
	// private static List<String> columnValue = new ArrayList<String> ();
	// private static String columnName =  "";
	
	public static Dataset<Row> getDataFrame(SparkSession sqlContext, String format, String inputPath, String header) {


		Dataset<Row> dataFrame = null;
		String localheader = "false";
		if (!header.equalsIgnoreCase("true")) {
			localheader = "false";
		} else {
			localheader = "true";
		}


		try {
			if (format.equals("csv")) {
				dataFrame = sqlContext.read().format("com.databricks.spark.csv").option("header", localheader)
						.option("inferSchema", "true").load(inputPath);
			} else if (format.equals("parquet")) {
				dataFrame = sqlContext.read().format("parquet").option("inferSchema", "true").load(inputPath);
			} else if (format.equals("tab")) {
				dataFrame = sqlContext.read().format("com.databricks.spark.csv").option("header", localheader)
						.option("inferSchema", "true").option("delimiter", "\\t").load(inputPath);
			} else {
				dataFrame = sqlContext.read().format("com.databricks.spark.csv").option("header", localheader)
						.option("inferSchema", "true").option("delimiter", format).load(inputPath);
			}
			return dataFrame;


		} catch (Exception e) {
			logger.severe(("Exception in get Dataframe util - " + e.getMessage()));
		}
		return null;
	}
	


	public static Dataset<Row> getSelectedColumns(Dataset<Row> df, List<String> selectedColumns) {


		// scala.collection.Seq seqCols =
		// scala.collection.JavaConverters.asScalaIteratorConverter(selectedColumns.subList(1,
		// selectedColumns.size()-1).iterator()).asScala().toSeq();
		// scala.collection.Seq<String> seqCols =
		// scala.collection.JavaConverters.asScalaIteratorConverter(selectedColumns.iterator()).asScala().toSeq();


		List<String> dropCols = new ArrayList<>();
		if (!selectedColumns.isEmpty()) {
			for (String s : df.columns()) {
				if (!selectedColumns.contains(s))
					dropCols.add(s);
			}


			// logger.severe("--------BEFORE---------");
			// df.printSchema();


			for (String col : dropCols) {
				df = df.drop(col);
			}
		}


		// logger.severe("--------AFTER---------");
		// df.printSchema();


		return df;


	}


	public static void writeDataframe(Dataset<Row> df, String outputFormat, String outputPath,
			List<String> selectedColumns, String savemode) {


		writeDataFrame(getSelectedColumns(df, selectedColumns), outputFormat, outputPath,savemode);


	}


	public static void writeDataFrame(Dataset<Row> df, String outputFormat, String outputPath, String savemode) {


		String localheader = "true";
		SaveMode sm = SaveMode.Overwrite;
		if ( savemode.equalsIgnoreCase("append") == true)
			sm = SaveMode.Append;


		if (outputFormat.equalsIgnoreCase("csv") || outputFormat.equalsIgnoreCase("parquet")) {


			df.write().mode(sm).option("header", localheader).format(outputFormat).save(outputPath);
		} else if (outputFormat.equalsIgnoreCase("tab")) {
			df.write().mode(sm).format("com.databricks.spark.csv").option("header", localheader)
					.option("delimiter", "	").save(outputPath);
		} else {
			df.write().mode(sm).format("com.databricks.spark.csv").option("header", localheader)
					.option("delimiter", outputFormat).save(outputPath);
		}
	}
	
	public static JavaPairRDD<String,Row> dfToPairRDD(String keyColumn,Dataset<Row> df){
		
		return df.toJavaRDD().keyBy(row -> row.getAs(keyColumn).toString());
		
	}
	
	public  static class FlattenForEachFunc implements ForeachFunction<Row> {


		/**
		 * 
		 */
		private static final long serialVersionUID = 1L;


		private static SparkSession ss = null;
		private List<Row> rows = new ArrayList<Row> ();
		
		public FlattenForEachFunc(SparkSession spark) {
			ss = spark;
		}
		public  static Dataset<Row> dsunion = null;
		
		StructType schema = new StructType(new StructField[] {
				new StructField("ColumnName",DataTypes.StringType, false,Metadata.empty()),
				new StructField("ColumnValue",DataTypes.FloatType, true,Metadata.empty()),
				
		});
		@Override
		public void call(Row r) throws Exception {
			String name = r.getString(0);
			String val = r.getString(1);
			if (val == null)  
				rows.add(RowFactory.create(name,null));
			else {
				String[] vals = val.split(",");
				for (String s : vals)
					rows.add(RowFactory.create(name,Float.parseFloat(s)));
				
				//System.out.println();
			}
			
			 //Dataset<Row> ds = ss.createDataset(columnValue, Encoders.STRING()).toDF();
			 Dataset<Row> ds = ss.createDataFrame(rows, schema);
			 // ds = ds.withColumnRenamed("value", name) ;
			 
			
			 if (dsunion == null)
				 dsunion = ds;
			 else
				 dsunion = dsunion.union(ds); 
			 
			 System.out.println(name);
			 System.out.println(ds.count());
			 System.out.println(dsunion.count());
			// dsunion.show();
			
		}
		
	}
	
	 public static void main(String[] args) {
		 
		String fileLoc = "C:\\Users\\VISING\\Documents\\SEQUENCEFILE.csv";
		
		System.setProperty("hadoop.home.dir", "C:\\Users\\VISING\\Downloads");
		
		SparkSession spark = SparkSession.builder().appName("myapp")
				.master("local[*]") // only for demo and testing purposes, use spark-submit instead
				.getOrCreate();
		
		
		 Dataset<Row> df = getDataFrame(spark,"csv",fileLoc,"true");
		 df = df.limit(10);
		 df.show();
		 df.printSchema();
		 List<String> colN = df.select("signalName").as(Encoders.STRING()).collectAsList();
		 //spark.sparkContext().parallelize((colN.toArra,10);
		 
//		 int i = 0;
//		 for (String s: colN) {
//			 if (i== 50) break;
//			 df = df.withColumn(s, functions.lit(null));
//		 }
		
		 Hd5toparquet.FlattenForEachFunc flatfunc = new Hd5toparquet.FlattenForEachFunc(spark);
		 df.foreach( flatfunc);
//		 List<String> value = getColumnValue();
//		 String name  = getColumnName();
//		 Dataset<Row> ds = spark.createDataset(value, Encoders.STRING()).toDF();
//		 ds = ds.withColumnRenamed("value", name) ;
//		 ds.show();
		 
		 Dataset<Row> ds  = flatfunc.dsunion;
		 
		 ds.show(false);
		 ds.printSchema();
		 
		 Scanner input = new Scanner(System.in);
	    	
	    	System.out.print("Enter the Signal to profile :");
	    	String  signame = input.nextLine();
	    	
	    	Dataset<Row> dsfilter = ds.filter(ds.col("ColumnName").equalTo(signame)).describe("ColumnValue");
	    	dsfilter.show();
	    	
	    	writeDataFrame(ds, "parquet", "C:\\\\Users\\\\VISING\\\\Documents\\\\FLATFILE.parq", "overwrite");
	    	
	    	
	    	input.close();
	    	
		 spark.stop();
	}
}

Feel free to reach me if you have any questions.

要查看或添加评论,请登录

社区洞察

其他会员也浏览了