ML Combining R with Spark and H2O DL framework

ML Combining R with Spark and H2O DL framework

R is a functional programming and scripting language written in a functional style to help to understand the intent and to ensure that the implementation corresponds to that very intent. R is becoming an essential tool for research, finance and analytics-driven companies such as Google, Facebook, LinkedIn, IBM, Microsoft and Amazon. There is a growing importance of R based on survey conducted by IEEE. Goggle used R to demonstrate the value of massively parallel computational infrastructure for forecasting based on the MapReduce paradigm. Both R and Python are on the rise with the convergence of the Internet of Things (IoT) and machine learning (ML).

Spark emerged as a framework to address the two limitations of Hadoop related to speed and tolerance to partitioning. Spark works on the top of Hadoop, and while Hadoop processes data on disk Spark processes data in memory. Spark uses also RDD concept to process the partitioning required in a computing cluster setting of scalable parallel processing.

In addition to R machine learning algorithms Spark supports also machine learning algorithms including linear such general linear models (GLM) and non-linear algorithms such as decision trees algorithms and feature transformations that can be carried out together as chain with dplyr pipelines.

The machine learning algorithms are performed in Spark cluster within sparklyr. For example to predict a car’s fuel consumption (mpg) for example based on its two features weight (wt), and the number of cylinders the engine contains (cyl) using GLM ML regression model. The assumption is based that a relationship between might exist between mpg and each of the two features is linear.

 This relationship between can be a function where fuel consumption (mpg), as a response variable is dependent on the weight and the number of cylinders of car.

mpg = function (wt, cyl)

 To connect to the car table with mpg, wt and cyl variable under Spark, the function copy_to () can be used/

# copy mtcars into spark
mtcars_tbl <- copy_to(sc, mtcars)

 There is also another function spark_connect ():

sc <- spark_connect(master = "local", version = "2.1.0")

mtcars_tbl <- copy_to(sc, mtcars, "mtcars")

Data preparation involving transformation and indexing to split data into a training set and test set to 70% and 30% respectively:

# transform the car data set, and then partition into training set and test set

partitions <- mtcars_tbl %>%

 filter(hp >= 100) %>%

 mutate(cyl8 = cyl == 8) %>%


 sdf_partition(training = 0.7, test = 0.7, seed = 1240)

Once the car data is split in the two sets, the model is fit to the training set

# fit a linear model to the training set

fit <- partitions$training %>%

 ml_linear_regression(response = "mpg", features = c("wt", "cyl"))


## * No rows dropped by 'na.omit' call

Fitting the models

library(rsparkling)

library(sparklyr)

library(dplyr)

library(h2o)

 

 mtcars_glm <- h2o.glm(x = c("wt", "cyl"),

                     y = "mpg",

                     training_frame = mtcars_h2o,

                     lambda_search = TRUE)

 

 

The models produced by Spark can be summarized, like in R using summary() to display information on the extent and quality of the model fit as well the accuracy of ML predictions.

summary(fit)

Instead of using built-in car data we can read and write data in CSV or JSON?, then store data in HDFS, S3, or on the local filesystem of cluster nodes.

temp_csv <- tempfile(fileext = ".csv")

temp_json <- tempfile(fileext = ".json")

iris_csv_tbl <- spark_read_csv(sc, "iris_csv", temp_csv)

 

spark_write_json(iris_tbl, temp_json)

iris_json_tbl <- spark_read_json(sc, "iris_json", temp_json)

 

src_tbls(sc)

 Distributed computing - Spark

Instead of using all of the data it is possible to partition the data to manageable sizes or groups, from large datasets, by making queries directly against tables within a Spark cluster.

The spark_connection object implements a DBIinterface for Spark and dbGetQuery to perform the SQL and return the result that can be used in turn as data frame in R.

library(DBI)

iris_preview <- dbGetQuery(sc, "SELECT * FROM iris LIMIT 10")

iris_preview

 As an example arbitrary r code across can be carried out over (distributed) cluster using spark_apply of iris data:

spark_apply(iris_tbl, function(data) {

 data[1:4] + rgamma(1,2)

})
 

 Another example of grouping data where iris data is used to group data by species and then use GLM model for each group (species).

 iris_tbl %>%
 spark_apply(nrow, group_by = "Species")

 iris_tbl %>%

 spark_apply(

   function(e) summary(glm(Petal_Length ~ Petal_Width, e))$r.squared,

   names = "r.squared",

   group_by = "Species")

 

There is possibility to also perform an operation over each group of rows grouped by columns to and make use of any package within the closure:

spark_apply(

 iris_tbl,

 function(e) broom::tidy(lm(Petal_Width ~ Petal_Length, e)),

 names = c("term", "estimate", "std.error", "statistic", "p.value"),

 group_by = "Species"

)

 

 

H2O ML-Deep Learning

Parallel Distributed ML Algorithm with R and H2O, which is an open source math engine for Big Data that can be used to compute parallel distributed machine learning algorithms. 

Download H2O directly at https://h2o.ai/download. # Install H2O’s R package from CRAN at https://cran.r-project.org/web/packages/h2o/.

# install.packages("h2o")

# run h2o

library(h2o)

Start H2O on your local machine using all available processor cores. By default, CRAN policies limit use to only 2 cores.

h2o.init(nthreads = -1)

To launch H2O locally with default initialization arguments, use the following:

h2o.init()


# Checking Cluster Status

# To check the status and health of the H2O cluster, use h2o.clusterInfo()

h2o.clusterInfo()


# Import dataset into H2O and display summary

data(iris)

# Converts R object "iris" into H2O object "iris.hex"

iris.hex = as.h2o(iris, destination_frame= "iris.hex")

head(iris.hex)

 

# Search for factors in the data, if any

h2o.anyFactor(iris.hex)

 

##Displays the titles of the columns

colnames(iris.hex)

names(iris.hex)

 

##Displays data in a graph

plot(iris$Petal.Length,

    iris$Petal.Width,

    pch=21, bg=c("red","green3","blue")[unclass(iris$Species)],

    xlab="Petal Length",

    ylab="Petal Width")

 

## Getting Quantiles of the data

# Returns the percentiles at 0, 10, 20, ..., 100%

iris.qs <- quantile(iris.hex$Sepal.Length, probs =(1:10)/10)

iris.qs

 
# Take the outliers or the bottom and top 10% of iris data

Sepal.Length.outliers <- iris.hex[iris.hex$Sepal.Length <=

iris.qs["10%"] | iris.hex$Sepal.Length >= iris.qs["90%"],]

# Check that the number of rows return is about 20% of the original data

nrow(iris.hex)

nrow(Sepal.Length.outliers)

nrow(Sepal.Length.outliers)/nrow(iris.hex)

 

# View quantiles and histograms

quantile(x = iris.hex$Sepal.Length, na.rm = TRUE)

h2o.hist(iris.hex$Sepal.Length)

 

###################

# Data split with target assined categorical values

library(keras)

# keras as a package uses the pipe operator (%>%) to connect functions or operations together

# KerasR uses the dollar $ sign as operator. The pipe operator generally improves the readability of codes

# Determine sample size

ind <- sample(2, nrow(iris), replace=TRUE, prob=c(0.67, 0.33))

# Split the `iris` data

iris.training <- iris[ind==1, 1:4]

iris.test <- iris[ind==2, 1:4]

 

# Split the class attribute

iris.trainingtarget <- iris[ind==1, 5]

iris.testtarget <- iris[ind==2, 5]

 

# One hot encode training target values

iris.trainLabels <- to_categorical(iris.trainingtarget)

 

# One hot encode test target values

iris.testLabels <- to_categorical(iris.testtarget)

 

# Print out the iris.testLabels to double check the result

print(iris.testLabels)

 

# Generating Random Numbers

# Creates object for uniform distribution on iris data set

rnd_i <- h2o.runif(iris.hex)

summary (rnd_i) ## Summarize the results of h2o.runif

 

## Construct test and train sets

## Create training set with threshold of 0.67

iris.train <- iris.hex[rnd_i <= 0.67,]

##Assign name to training set

iris.train <- h2o.assign(iris.train, "iris.train")

## Create test set with threshold to filter values greater than 0.67

iris.test <- iris.hex[rnd_i > 0.67,]

## Assign name to test set

iris.test <- h2o.assign(iris.test, "iris.test")

## Combine results of test & training sets, then display result

nrow(iris.train) + nrow(iris.test)

nrow(iris.hex) ## Matches the full set

 

# Or by spiliting frames on two separate subsets based on a specified ratio

# Splits data in prostate data frame with a ratio of 0.75

iris.split <- h2o.splitFrame(data = iris.hex , ratios = 0.75)

# Creates training set from 1st data set in split

iris.train <- iris.split[[1]]

# Creates testing set from 2st data set in split

iris.test <- iris.split[[2]]

 

# Getting frames by creating a reference object to the data frame in H2O

# index.hex <- h2o.getFrame(id = "index.hex")

  

# Getting Models in H2O by creating a reference object for the model in H2O, using h2o.getModel().

gbm.model <- h2o.getModel(model_id = "GBM_8e4591a9b413407b983d73fbd9eb44cf")

 
# Listing H2O Objects by generating a list of all H2O objects generated during a session and each object‘s size in bytes.

h2o.ls()

 

Running Models in H2O - # Some of the model types:

1 Gradient Boosting Machine (GBM)

2 Generalized Linear Models (GLM)

3 K-means

4 Principal Components Analysis (PCA)

library(h2o)

h2o.init(nthreads = -1)

data(iris)

iris.hex <- as.h2o(iris,destination_frame = "iris.hex")

iris.gbm01 <- h2o.gbm(y = 1, x = 2:5, training_frame = iris.hex, ntrees = 10,

max_depth = 3,min_rows = 2, learn_rate = 0.2, distribution= "gaussian")


To find the most important variables type:

names(iris.gbm01@model)

iris.gbm01@model$variable_importances


# In the case of classification model that uses labels we will use distribution="multinomial":

iris.gbm02 <- h2o.gbm(y = 5, x = 1:4, training_frame = iris.hex, ntrees = 15, max_depth = 5, min_rows =

2, learn_rate = 0.01, distribution= "multinomial")

names(iris.gbm2@model)

iris.gbm02@model$variable_importances

Generalized linear models (GLM) are commonly-used sa they are extremely fast,and scales extremely well for models with a limited number of predictors.

binary_data <- read.csv("https://raw.githubusercontent.com/abari212/data/master/PRESENCE_ABSENCE_CASE.csv", header=T, dec=".",sep=",")

# Converts R object "binary_data" into H2O object "binary_dat.hex"

binary_data.hex = as.h2o(binary_data, destination_frame= "binary_data.hex")

head(binary_data.hex)

names(binary_data.hex)

binary_data.glm01 <- h2o.glm(y = "Y", x = c("X1","X2","X3",

"X4","X5","X6", "X7","X8"),

training_frame = binary_data.hex, family = "binomial", nfolds = 10, alpha = 0.5)

binary_data.glm01@model$cross_validation_metrics

names(binary_data.glm01@model)
 



Checking the accuracy of model

The accuracy is based on the Receiver Operating Characteristic(ROC) curve. The ROC curve has its origin in engineering for diagnostic test evaluation. The curve has two main axis one for true positive rate values (Sensitivity), which is plotted in function of the axis of false positive rate (100-Specificity), where each point on the curve corresponds to a particular decision threshold. ROC curve is a way to carry out cost/benefit analysis of diagnostic decision making process.

No alt text provided for this image

AUC, which is the Area Under Curve of the ROC curve, is the most common measure of accuracy metrics for machine learning techniques for binary problems.  AUC values vary between an area under the curve of 1, where the entire graph would fall beneath the curve to 0 when the area under the curve is equal 0.0.

# AUC values

binary_data.glm01@model$cross_validation_metrics_summary


# Set predictor and response variables

names(binary_data.hex)

Y = "Y"

X = x = c("X1","X2","X3","X4","X5","X6", "X7","X8")

 

# Define the data for the model and display the results

## Construct test and train sets

 

# Generating Random Numbers

# Creates object for uniform distribution on iris data set

rnd_i <- h2o.runif(binary_data.hex)

summary (rnd_i) ## Summarize the results of h2o.runif

 

## Create training set with threshold of 0.67

binary_data.train <- binary_data.hex[rnd_i <= 0.67,]

##Assign name to training set

binary_data.train <- h2o.assign(binary_data.train, "binary_data.train")

## Create test set with threshold to filter values greater than 0.67

binary_data.test <- binary_data.hex[rnd_i > 0.67,]

## Assign name to test set

binary_data.test <- h2o.assign(binary_data.test, "binary_data.test")

## Combine results of test & training sets, then display result

nrow(binary_data.train) + nrow(binary_data.test)

nrow(binary_data.hex) ## Matches the full set

 

binary_data.glm02 <- h2o.glm(training_frame=binary_data.train, x=X, y=Y, family = "gaussian", alpha = 0.5)

# View model information: training statistics, performance, important variables

summary(binary_data.glm02)

 

# Predict using GLM model

binary_data.pred = h2o.predict(object = binary_data.glm02, newdata =binary_data.test)

# Look at summary of predictions: probability of TRUE

names(binary_data.pred)

binary_data.pred

 




[1] Mark Landry with assistance from Spencer Aiello (2018).


More on ML in this book on Big Data.

No alt text provided for this image


I'm leaving my pch activation code FC359

回复

要查看或添加评论,请登录

Abdallah Bari - Ph.D.的更多文章

社区洞察

其他会员也浏览了