Hortonworks Use Case Discovery Workshop with Apache PySpark and HDFS for data correlation.
I am on-site at a Big Data client in Denver. As I have mentioned in the past, my new formula for training is: A) Train the students in Apache Spark. B) Ask the class if they wish to tackle a real-world use case with which they are currently struggling to solve.
A. In this case, our reputation for being able to do this work during the training session preceded us, and we were able to schedule a two day training to fast track the whole process. So, we did a four day Apache Spark training in one day (lightning fast). Then we set aside day two for just the use case discovery.
B. In their case, I was shown (before I even arrived) their python only solution -- but without any Apache Spark -- a lot of code similar to this...
import encoder
model = encoder.Model()
# Getting directory path of where the review data is located
data_sources_path = sys.argv[1]
# concatenate title with text
data = pd.read_table(os.path.join(data_sources_path,'file.csv'), sep = '\t', header = 0, encoding = 'utf-8').fillna('-')
data_full = data.assign(review_full = data.title + ', ' + data.review)
rv = data_full['review_full'].str.lower().values.tolist()
text_features = model.transform(rv)
# append sentiment to data
data_sent = data
data_sent['sentiment'] = pd.Series(text_features[:,2388])
data_sent.to_csv(path_or_buf = os.path.join(data_sources_path,'output.text'), index = False, header = False)
Let's get started on the use case itself:
1. Short Name: "Time and Data Throughput Optimization."
2. Problem Statement: "An already solved problem needs to be optimized for time and SerDe. That is time and data and throughput. Because the python only solution is too slow."
3. Proposed Solution: A) Port the whole thing to PySpark. B) Perhaps include Spark Machine Learning.
4. Data Sources: Month of August (2017) data in an existing Apache Hive table as PoC.
5. Reviewer(s): VP = Mike.
6. Stakeholder(s): CEO, Exec. VP, SVP, Customers, "Buffy the Vampire Slayer," "Butchers," & "Jesus." /* Yes, they really gave me all those "stakeholders"*/
7. Further Thoughts:
A) How important is this? IOW, does solving this really affect our bottom line?
B) Classification for predication may be useful here.
C) Correlation variables (not shown) and possible other useful target variables.
We are about to implement a Spark DataFrames solution on their Hortonworks cluster here!
Just to recap, right now the Python code (sans Spark) takes roughly -- five hours -- to run on a single month of data. Naturally we want to make this faster and more efficient with Spark. Can we? Let's try...
We used an Agile-Scrum hybrid, with 30 minute Sprint cycles, since we have only one day to solve this. I want to tip my hat to my Agile mentor Alistair Cockburn who taught me the value of shorter Sprints (he gave us 7 minute Sprints!).
{ "Yizhe," at left (and team) implementing the Use Case on Day 2 of class. Go Team! }
Here are the Scrum "User Stories" we generated as a team (very quickly). We all got to be Product Owner this morning (mostly Bob and me thought). We agreed as a team all development will include test driven development (TDD):
A) Get the data need from Hive <my_place>.slash_underscoredot [1]
B) Divide the dataset by label (abandoned or not) [2] - depends on A.
# PySpark code by Matt & Gavin
# import libraries
from pyspark.sql.functions import when
# hive query
df = sqlContext.sql("""
select *
from <my_place>.<my_table>
where label in
('Streams lasted longer than 1 minute'
,'Streams could have been impacted by QoS Factors')
""")
# create new variable 'stream_ok' as boolean for failed streams
df = df.withColumn('stream_ok', when(df.label == "Streams lasted longer than 1 minute", 1).otherwise(0))
# split into two datasets:
nonAbandon = df.filter(df.stream_ok == 1)
abandon = df.filter(df.stream_ok == 0)
C) Calculate target variable (unnamed here)[2]
D) Visualize two datasets and see how they are different. Correlation.[3] = "Yizhe" (She will use a PySpark Logistic Regression)
E) Group data by StreamID.[2]
F) Create an infographic for the CEO. (High-res and pretty).[5]
H) Hypothesis Test by "Trevor": Is there a difference in means of tune-time based on whether or not you abandon/not-abandon.
{ Above L to R: "Gavin, Trever, and Matt." }
So we worked from 9:00am until 1:30pm (with a one hour lunch break), and we are almost DONE! We are on to the integrate and test, and here is just some of our code:
# Thank you! Here’s Yizha's code (some parts are obfuscated by Laurent):
# pyspark --num-executors 5 --driver-memory 32g --executor-memory 32g
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
sqlContext.sql('USE myplace')
df1 = sqlContext.sql("SELECT <what-i-need> FROM slash_underscoredot WHERE ISNOTNULL(<something>) AND label IN ('message', 'message')").repartition(16)
df2 = df1.where((df1.label == 'Streams could have been impacted by QoS Factors') | (df1.label == 'Streams lasted longer than 1 minute')).filter(df1.time_ms > 0).persist(StorageLevel.DISK_ONLY)
abandoned = df2.filter(df2.label == 'Streams could have been impacted by QoS Factors')
not_abandoned = df2.filter(df2.label == 'Streams lasted longer than 1 minute')
# unfinished
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.classification import LogisticRegression
# convert categorical column into binary SparseVectors
stringIndexer = StringIndexer(inputCol='application_type', outputCol='application_index')
encoder = OneHotEncoder(inputCol='application_index', outputCol='application_vec')
stages = [stringIndexer, encoder]
# Transform all features into a vector using VectorAssembler
assemblerInputs = map(lambda c: c , ['application_vec']) + ['time_ms']
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol='features')
stages += [assembler]
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol = 'label', outputCol = 'label_index')
stages += [label_stringIdx]
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df2)
df3 = pipelineModel.transform(df2)
(trainingData, testData) = df3.randomSplit([0.3, 0.1], seed = 25)
from pyspark.ml.classification import LogisticRegression
# Create initial LogisticRegression model
lr = LogisticRegression(labelCol='index', featuresCol='features', maxIter=7)
# Train model with Training Data
lrModel = lr.fit(trainingData)
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))
predictions = lrModel.transform(testData)
predictions.printSchema()
selected = predictions.select('label', 'prediction', 'probability', 'application_type')
selected.show()
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol = 'index')
evaluator.evaluate(predictions)
evaluator.getMetricName()
G) Integrate all “done” stories and test.[4]
We just finished, with success!
Results: The Spark job ran 25 times faster than the original raw python:
The original python only solution took 5 hours, the refactored Apache Spark version took 12 minutes to run. That is a 25 x faster result. Needless to say, we are happy here.
Laurent Weichberger, Big Data Bear, Hortonworks: [email protected]
Data Engineering and Cloud Services Practice Manager
7 年Awesome Bob you show them where the data is at!
Regional Sales Manager -Major Accounts chez Zscaler
7 年Kacem Guassab??????