Optimizing Spark Configuration with Genetic Algorithm - Encoding
Patrick Nicolas
Director Data Engineering @ aidéo technologies |software & data engineering, operations, and machine learning.
Struggling with the complex task of configuring your Apache Spark application??
Consider the innovative approach of a genetic algorithm, a search heuristic inspired by Charles Darwin's theory of evolution [ref?1]. This method applies the concepts of natural selection to efficiently determine the best configuration for your Apache Spark application. It aims to achieve an ideal balance between minimizing production costs and maximizing customer satisfaction.??????
What you will learn:?How to apply genetic algorithm to optimize the configuration parameters for deployment to production.
Notes:
Apache Spark configuration
The goal is to identify the best settings for Apache Spark [ref?2] configuration parameters that minimize deployment expenses while ensuring high-quality service, as indicated by reduced latency. This goal will be converted into a fitness function during the genetic algorithm's development.
Tunable parameters
We define an Apache Spark configuration parameter with the following attributes:
The subsequent portion of the Apache Spark configuration file demonstrates the characteristics of different configuration parameters. This includes parameters that are not tunable (non-dynamic).??????????
{
"sparkParameters": [
{
"key": "spark.io.compression.snappy.blockSize",
"value":"32k",
"isDynamic": false,
"paramType":"String"
},
{
"key": "spark.KryoSerializer.unsafe",
"value":true,
"isDynamic": true,
"paramType":"Boolean"
},
{
"key": "spark.KryoSerializer.buffer.max",
"value":"64m",
"isDynamic": true,
"paramType":"Int",
"range":["16","32", "64", "96", "128"]
},
{
"key": "spark.KryoSerializer.buffer",
"value":"64m",
"isDynamic": true,
"paramType":"Int",
"range":["16","32", "64", "96"]
},
{
"key": "spark.worker.cleanup.interval"
"value":900,
"isDynamic": false,
"paramType":"Int",
"range":["300","600", "900", "1200"]
},
...
{
"key": "spark.shuffle.file.buffer",
"value":"32k",
"isDynamic": true,
"paramType":"Int",
"range":["16","32", "64", "128","256"]
},
{
"key": "spark.driver.memory",
"value":"6g",
"isDynamic": true,
"paramType":"Int",
"range":["2","4", "6", "8","10"]
},
{
"key": "spark.executor.driverOverheadFactor",
"value":0.1,
"isDynamic": true,
"paramType":"Float",
"range":["0.05","0.1", "0.15", "0.2","0.25"]
},
{
"key": "spark.executor.memory",
"value":"4g",
"isDynamic": true,
"paramType":"Int",
"range":["2","3","4","5","6"]
},
{
"key": "spark.executor.memoryOverheadFactor",
"value":0.1,
"isDynamic": true,
"paramType":"Float",
"range":["0.05","0.1","0.15","0.2","0.25"]
},
{
"key": "spark.default.parallelism",
"value":8,
"isDynamic": true,
"paramType":"Int",
"range":["4","8", "16", "24","32"]
},
...
]
} ?????????????????????????????????????????????????????????
Example of a Spark configuration parameters
The parameter?spark.executor.memory?is a dynamic, tunable integer variable with 2, 3, 4, 5, & 6g valid values. The parameter?spark.io .compression.snappy.blockSize?is not tunable so the value 32k is immutable.
?
Fitness function
The fitness function is a balance between service delivery quality and the hourly deployment cost.?Our assumptions are as follows:
?
The fitness is consequently defined as...? fitness = 1/(exp(alpha*L) + beta*N)
Genetic algorithm basics
Inspired by Charles Darwin's theory of natural evolution, a genetic algorithm is a search method that reflects the principles of natural selection. In this approach, the most suitable individuals are selected for reproduction to produce the next generation.
Scala's object-oriented and functional programming capabilities can be utilized to execute the computational process of an evolutionary algorithm.
?
Developed by John Holland in the 1970s [ref?2], Genetic Algorithms draw their characteristics from?Darwin's evolution theory. A living organism is composed of cells, which contain identical?chromosomes. These chromosomes, made up of?DNA?strings, act as a?blueprint?for the entire organism. A chromosome is made up of?genes, which are DNA segments encoding specific?proteins.
The first step in?reproduction?is recombination (or?crossover), where genes from the parents form a completely new chromosome (offspring), which may then undergo?mutation. During mutation, one or more elements, also known as individuals of the DNA strand or chromosome, are altered. These changes are primarily due to errors in copying genes from the parents. The organism's success in its environment determines its?fitness.
?
In the field of computer science, Genetic Algorithms represent a problem-solving technique that emulates natural processes. They employ a combination of selection, recombination, and mutation to evolve a group of candidates for solving a particular problem. The fundamental computational steps are as follows:
Each genetic operator (selection, crossover, and mutation) depends on a specific parameter:
Implementation
The initial phase involves converting the configuration parameters into genetic components. Every parameter is represented as a gene, corresponding to the type of that parameter. For example, a parameter with an integer attribute is linked with an instance of?Gene[Int].
A configuration represents a distinctive, tunable collection of Spark parameters, which the genetic algorithm treats and manages as a chromosome. The goal of the genetic algorithm is to identify the most suitable chromosome and its corresponding Spark configuration for the application, thereby optimizing the objective (or fitness).
?
Encoding
The first step is to implement the configuration and parameters as respective classes,?SparkConfiguration?and?ParameterDefinition.
case class SparkConfiguration(sparkParameters: Seq[ParameterDefinition])
case class ParameterDefinition(
key: String,
value: String,
isDynamic: Boolean,
paramType: String,
range: Seq[String] = Seq.empty[String]
)
?Next, we establish the genetic encoder responsible for transforming genes to and from configuration parameters [ref?4]. The?GAEncoder?trait encompasses two characteristics:
The sequence of bits, termed?BitsRepr, is defined as a?Seq[Int]consisting of either?0?or?1?values.
?
There are three primary methods:
trait GAEncoder[T]{
val encodingLength: Int
val range: Seq[T]
def rand: T
def apply(t: T): BitsRepr
def unapply(bitsRepr: BitsRepr): T
}
Here's a genetic encoder for an Apache Spark configuration parameter of the?Int?type. The encoding function, apply' checks whether the parameter value, t, falls within the valid range.
final class GAEncoderInt(
override val encodingLength: Int,
override val range: Seq[Int]) extends GAEncoder[Int]{
private[this] val encoder = new BitsIntEncoder(encodingLength)
override def rand: Int = Random.shuffle(range).head
@throws(classOf[GAException])
override def apply(t: Int): BitsRepr = {
if(!range.contains(t))
throw new GAException(s"Value $t violates constraint of quantizer")
encoder(t)
}
override def unapply(bitsRepr: BitsRepr): Int = encoder.unapply(bitsRepr)
}
Gene
A Gene serves as the genetic representation of a configuration parameter. Consequently, its constructor requires the following:
To minimize its memory usage and facilitate direct bit manipulation, the sequence of bits, 'bitsSequence', is transformed into a Java?BitSet.
class Gene[T : Ordering] (id: String, t: T, gaEncoder: GAEncoder[T]) {
// Encoding as a sequence of {0, 1}
private[this] val bitsSequence: BitsRepr = gaEncoder(t)
// Encoding as Bit set
private[this] val encoded: util.BitSet = {
val bs = new java.util.BitSet(gaEncoder.encodingLength)
bitsSequence.indices.foreach(index => bs.set(index, bitsSequence(index) == 1))
bs
}
def mutate(mutationProb: Double): Gene[T] = {
(new MutationOp{
override val mutationProbThreshold: Double = mutationProb
}).mutate(this)
The method?mutate?invokes the mutation operator,?MutationOp,?described in the next section.
?
Chromosome
A chromosome symbolizes a Spark configuration. Assuming the configuration parameters are of two types (namely?Float?and?Int), the constructor accepts two parameters:
Additionally, the attribute 'fitness' accumulates the score for the specified set of configuration parameters.
class Chromosome[T : Ordering, U : Ordering](
features1: Seq[Gene[T]],
features2: Seq[Gene[U]]){
var fitness: Double = -1.0
def xOver(
otherChromosome: Chromosome[T, U],
xOverThreshold: Double
): (Chromosome[T, U], Chromosome[T, U]) =
(new XOverOp{
override val xOverProbThreshold: Double = xOverThreshold
}).xOver(this, otherChromosome)
def mutate(mutationProb: Double): Chromosome[T, U] = {
(new MutationOp{
override val mutationProbThreshold: Double = mutationProb
}).xOver(this)
}
}
The method 'xOver' executes the genetic crossover as outlined in the 'XOverOp' trait. A crucial part of encoding a chromosome involves transforming a Spark configuration into a typed?Chromosome, featuring integer and floating-point parameters/genes.
The process of encoding a Spark configuration is carried out by the 'encode' method. This involves purifying the parameter values from any units (denoted as 'cleansedParamValue'). The type of the configuration parameter, referred to as 'paramType', is utilized to create the encoder and gene of the suitable type.
def encode(sparkConfig: SparkConfiguration): Chromosome[Int, Float] = {
val floatGenes = ListBuffer[Gene[Float]]()
val intGenes = ListBuffer[Gene[Int]]()
sparkConfig.sparkParameters.foreach(paramValue => {
val value = paramValue.value
val cleansedParamValue: String =
if (!value.last.isDigit) value.substring(0, value.length - 1)
else value
paramValue.paramType match {
case "Int" =>
val encoder = new GAEncoderInt(
encodingLength = 6,
paramValue.range.map(_.toInt)
)
val intGene = Gene[Int](
paramValue.key,
cleansedParamValue.toInt,
encoder)
intGenes.append(intGene)
....
The corresponding method?decode?is omitted in this post but is available on GitHub ConfigEncoderDecoder.scala
References
[1]?Charles Darwin biography Britannica
[2]?Apache Spark
[3]?Genetic Algorithms by John H. Holland
[4]?Encoding Methods in Genetic Algorithm - Geeks for Geeks
[5]?Stackoverflow: Choosing parents to crossover in genetic algorithms
[6]?Tracking storms with Kafka/Spark streaming
[7]?Hyperparameter optimization - Wikipedia
??
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning", Packt Publishing ISBN 978-1-78712-238-3
?