Optimizing Spark Configuration with Genetic Algorithm - Encoding

Optimizing Spark Configuration with Genetic Algorithm - Encoding

Struggling with the complex task of configuring your Apache Spark application??

Consider the innovative approach of a genetic algorithm, a search heuristic inspired by Charles Darwin's theory of evolution [ref?1]. This method applies the concepts of natural selection to efficiently determine the best configuration for your Apache Spark application. It aims to achieve an ideal balance between minimizing production costs and maximizing customer satisfaction.??????

What you will learn:?How to apply genetic algorithm to optimize the configuration parameters for deployment to production.


Notes:

  • This is the first installment of our series on optimizing Apache Spark configuration using Genetic Algorithm.
  • Source code available at GitHub - Patrick Nicolas - Genetic Algorithm
  • Versions: Apache Spark 3.3.1, Scala 2.13.4????


Apache Spark configuration

The goal is to identify the best settings for Apache Spark [ref?2] configuration parameters that minimize deployment expenses while ensuring high-quality service, as indicated by reduced latency. This goal will be converted into a fitness function during the genetic algorithm's development.


Tunable parameters

We define an Apache Spark configuration parameter with the following attributes:

  • key: Spark parameter identifier
  • value: Default or initial value for the parameter
  • isDynamic: Is this parameter tunable?
  • paramType: Type of the parameter (Float, Int,)
  • range: Range of valid values (any values if undefined)

The subsequent portion of the Apache Spark configuration file demonstrates the characteristics of different configuration parameters. This includes parameters that are not tunable (non-dynamic).??????????

{
"sparkParameters": [
  {
    "key": "spark.io.compression.snappy.blockSize",
    "value":"32k",
    "isDynamic": false,
    "paramType":"String"
  },
  {
    "key": "spark.KryoSerializer.unsafe",
    "value":true,
    "isDynamic": true,
    "paramType":"Boolean"
  },
  {
    "key": "spark.KryoSerializer.buffer.max",
    "value":"64m",
    "isDynamic": true,
    "paramType":"Int",
    "range":["16","32", "64", "96", "128"]
  },
  {
    "key": "spark.KryoSerializer.buffer",
    "value":"64m",
    "isDynamic": true,
    "paramType":"Int",
    "range":["16","32", "64", "96"]
  },
  {
    "key": "spark.worker.cleanup.interval"
    "value":900,
    "isDynamic": false,
    "paramType":"Int",
    "range":["300","600", "900", "1200"]
  },

            ...


  {
    "key": "spark.shuffle.file.buffer",
    "value":"32k",
    "isDynamic": true,
    "paramType":"Int",
    "range":["16","32", "64", "128","256"]
  },
  {
    "key": "spark.driver.memory",
    "value":"6g",
    "isDynamic": true,
    "paramType":"Int",
    "range":["2","4", "6", "8","10"]
  },
  {
    "key": "spark.executor.driverOverheadFactor",
    "value":0.1,
    "isDynamic": true,
    "paramType":"Float",
    "range":["0.05","0.1", "0.15", "0.2","0.25"]
  },
  {
    "key": "spark.executor.memory",
    "value":"4g",
    "isDynamic": true,
    "paramType":"Int",
    "range":["2","3","4","5","6"]
  },
  {
    "key": "spark.executor.memoryOverheadFactor",
    "value":0.1,
    "isDynamic": true,
    "paramType":"Float",
    "range":["0.05","0.1","0.15","0.2","0.25"]
  },
  {
    "key": "spark.default.parallelism",
    "value":8,
    "isDynamic": true,
    "paramType":"Int",
    "range":["4","8", "16", "24","32"]
  },
             ...
  ]
}                ?????????????????????????????????????????????????????????        

Example of a Spark configuration parameters


The parameter?spark.executor.memory?is a dynamic, tunable integer variable with 2, 3, 4, 5, & 6g valid values. The parameter?spark.io .compression.snappy.blockSize?is not tunable so the value 32k is immutable.

?

Fitness function

The fitness function is a balance between service delivery quality and the hourly deployment cost.?Our assumptions are as follows:

  • The deployment cost per hour rises linearly with the number of servers or containers used, denoted by N.
  • The adverse effect of latency on customer satisfaction and retention is estimated to grow exponentially with increasing latency L.

?

The fitness is consequently defined as...? fitness = 1/(exp(alpha*L) + beta*N)


Genetic algorithm basics

Inspired by Charles Darwin's theory of natural evolution, a genetic algorithm is a search method that reflects the principles of natural selection. In this approach, the most suitable individuals are selected for reproduction to produce the next generation.

Scala's object-oriented and functional programming capabilities can be utilized to execute the computational process of an evolutionary algorithm.

?

Developed by John Holland in the 1970s [ref?2], Genetic Algorithms draw their characteristics from?Darwin's evolution theory. A living organism is composed of cells, which contain identical?chromosomes. These chromosomes, made up of?DNA?strings, act as a?blueprint?for the entire organism. A chromosome is made up of?genes, which are DNA segments encoding specific?proteins.

The first step in?reproduction?is recombination (or?crossover), where genes from the parents form a completely new chromosome (offspring), which may then undergo?mutation. During mutation, one or more elements, also known as individuals of the DNA strand or chromosome, are altered. These changes are primarily due to errors in copying genes from the parents. The organism's success in its environment determines its?fitness.

?

In the field of computer science, Genetic Algorithms represent a problem-solving technique that emulates natural processes. They employ a combination of selection, recombination, and mutation to evolve a group of candidates for solving a particular problem. The fundamental computational steps are as follows:

  1. Initialize the population?(search space) with a set of random chromosomes, each representing a specific Apache Spark configuration.
  2. Convert?these chromosomes into a vector of real or integer values, or a string of bits.
  3. Pair chromosomes for crossover. This involves using a crossover rate to exchange a fragment or section of the "parent" chromosomes from a random point in the encoded string or vector.
  4. Mutate?chromosomes by altering one or more of their elements (bits) using a randomly generated index, governed by a mutation rate.
  5. Evaluate?each chromosome using a fitness function.
  6. Choose?the most fit chromosomes (those that meet the minimum fitness criteria) for reproduction.
  7. Repeat?the reproduction cycle (steps 2 to 6) for the newly formed population until a stop condition is reached.


Each genetic operator (selection, crossover, and mutation) depends on a specific parameter:

  • The?selection rate?is a random threshold value used to reduce the current chromosome population based on their fitness.
  • The?crossover rate?determines the index at which elements or bits of two parent chromosomes are swapped.
  • The?mutation rate?calculates the index of the element(s) or bit(s) in a chromosome that undergo mutation (or flipping).

Implementation

The initial phase involves converting the configuration parameters into genetic components. Every parameter is represented as a gene, corresponding to the type of that parameter. For example, a parameter with an integer attribute is linked with an instance of?Gene[Int].

Fig 2. Illustration of genetic configuration of Apache Spark deployment


A configuration represents a distinctive, tunable collection of Spark parameters, which the genetic algorithm treats and manages as a chromosome. The goal of the genetic algorithm is to identify the most suitable chromosome and its corresponding Spark configuration for the application, thereby optimizing the objective (or fitness).

?

Encoding

The first step is to implement the configuration and parameters as respective classes,?SparkConfiguration?and?ParameterDefinition.

case class SparkConfiguration(sparkParameters: Seq[ParameterDefinition])

case class ParameterDefinition(
   key: String,
   value: String,
   isDynamic: Boolean,
   paramType: String,
   range: Seq[String] = Seq.empty[String]
)        

?Next, we establish the genetic encoder responsible for transforming genes to and from configuration parameters [ref?4]. The?GAEncoder?trait encompasses two characteristics:

  • encodingLength: The number of bits required to represent a parameter's value.
  • range: A sequence of valid, adjustable values applicable to this parameter.

The sequence of bits, termed?BitsRepr, is defined as a?Seq[Int]consisting of either?0?or?1?values.

?

There are three primary methods:

  • rand: This method initializes a parameter value randomly.
  • apply: This function encodes a parameter's value into a sequence of bits.
  • unapply: This procedure decodes a bit sequence back into a parameter value.

trait GAEncoder[T]{
   val encodingLength: Int
   val range: Seq[T]

   def rand: T
   def apply(t: T): BitsRepr
   def unapply(bitsRepr: BitsRepr): T
}        

Here's a genetic encoder for an Apache Spark configuration parameter of the?Int?type. The encoding function, apply' checks whether the parameter value, t, falls within the valid range.

final class GAEncoderInt(
  override val encodingLength: Int,
  override val range: Seq[Int]) extends GAEncoder[Int]{

  private[this] val encoder = new BitsIntEncoder(encodingLength)

  override def rand: Int = Random.shuffle(range).head

  @throws(classOf[GAException])
  override def apply(t: Int): BitsRepr = {
      if(!range.contains(t))
         throw new GAException(s"Value $t violates constraint of quantizer")
      encoder(t)
  }

  override def unapply(bitsRepr: BitsRepr): Int = encoder.unapply(bitsRepr)
}        

Gene

A Gene serves as the genetic representation of a configuration parameter. Consequently, its constructor requires the following:

  • The name of the parameter, referred to as 'id'.
  • The value of the parameter denoted as 't'.
  • An encoder,?gaEncoder?that corresponds to the parameter's type.

To minimize its memory usage and facilitate direct bit manipulation, the sequence of bits, 'bitsSequence', is transformed into a Java?BitSet.

class Gene[T : Ordering] (id: String, t: T, gaEncoder: GAEncoder[T]) {
  // Encoding as a  sequence of {0, 1}
  private[this] val bitsSequence: BitsRepr = gaEncoder(t)

  // Encoding as Bit set
  private[this] val encoded: util.BitSet = {
    val bs =  new java.util.BitSet(gaEncoder.encodingLength)
    bitsSequence.indices.foreach(index => bs.set(index, bitsSequence(index) == 1))
    bs
  }


  def mutate(mutationProb: Double): Gene[T] = {
     (new MutationOp{
        override val mutationProbThreshold: Double = mutationProb
     }).mutate(this)        

The method?mutate?invokes the mutation operator,?MutationOp,?described in the next section.

?

Chromosome

A chromosome symbolizes a Spark configuration. Assuming the configuration parameters are of two types (namely?Float?and?Int), the constructor accepts two parameters:

  • features1: This represents the features/genes of one type.
  • features2: This encompasses the features/genes of the other type.

Additionally, the attribute 'fitness' accumulates the score for the specified set of configuration parameters.

class Chromosome[T : Ordering, U : Ordering](
  features1: Seq[Gene[T]],
  features2: Seq[Gene[U]]){

  var fitness: Double = -1.0


  def xOver(
     otherChromosome: Chromosome[T, U],
     xOverThreshold: Double
  ): (Chromosome[T, U], Chromosome[T, U]) = 
     (new XOverOp{
        override val xOverProbThreshold: Double = xOverThreshold
     }).xOver(this, otherChromosome)

  
  def mutate(mutationProb: Double): Chromosome[T, U] = {
     (new MutationOp{
       override val mutationProbThreshold: Double = mutationProb
    }).xOver(this)
  }
}        

The method 'xOver' executes the genetic crossover as outlined in the 'XOverOp' trait. A crucial part of encoding a chromosome involves transforming a Spark configuration into a typed?Chromosome, featuring integer and floating-point parameters/genes.

The process of encoding a Spark configuration is carried out by the 'encode' method. This involves purifying the parameter values from any units (denoted as 'cleansedParamValue'). The type of the configuration parameter, referred to as 'paramType', is utilized to create the encoder and gene of the suitable type.

def encode(sparkConfig: SparkConfiguration): Chromosome[Int, Float] = {
   val floatGenes = ListBuffer[Gene[Float]]()
   val intGenes = ListBuffer[Gene[Int]]()

   sparkConfig.sparkParameters.foreach(paramValue => {
      val value = paramValue.value
      val cleansedParamValue: String =
         if (!value.last.isDigit) value.substring(0, value.length - 1)
         else value

      paramValue.paramType match {
         case "Int" =>
            val encoder = new GAEncoderInt(
                encodingLength = 6, 
                paramValue.range.map(_.toInt)
            )
            val intGene = Gene[Int](
                  paramValue.key, 
                  cleansedParamValue.toInt, 
                  encoder)
            intGenes.append(intGene)
            ....        

The corresponding method?decode?is omitted in this post but is available on GitHub ConfigEncoderDecoder.scala


References

[1]?Charles Darwin biography Britannica

[2]?Apache Spark

[3]?Genetic Algorithms by John H. Holland

[4]?Encoding Methods in Genetic Algorithm - Geeks for Geeks

[5]?Stackoverflow: Choosing parents to crossover in genetic algorithms

[6]?Tracking storms with Kafka/Spark streaming

[7]?Hyperparameter optimization - Wikipedia


??

Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning. He has been director of data engineering at Aideo Technologies since 2017 and he is the author of "Scala for Machine Learning", Packt Publishing ISBN 978-1-78712-238-3

?

要查看或添加评论,请登录

社区洞察

其他会员也浏览了