Distributed Bloom Filter
Patrick Nicolas
Director Data Engineering @ aidéo technologies |software & data engineering, operations, and machine learning.
Have you ever been in need of a method that's both effective and has low latency for checking if a particular object or piece of data belongs to a significantly large dataset??
In this article, we're going to explore the concept of a?distributed Bloom filter, utilizing Apache?Spark?and the application of a?cryptographic digest?as well as estimate the false positives.
What you will learn:?Creating and analyzing a Bloom filter for processing extremely large datasets with Apache Spark and cryptographic hashes, with a focus on understanding and managing false positives.
Notes:
Overview
Bloom filter became a popular?probabilistic?data structure to enable membership queries (object x belonging to set or category Y) a couple of years ago. The main benefit of Bloom filter is to reduce the requirement of large memory allocation by avoiding allocating objects in memory much like?HashSet?or?HashMap. The compact representation comes with a trade-off: although the filter does not allow false negatives it does not guarantee that there are no false positives.?
In other words, a query returns:
A Bloom filter is quite often used as a front end to a deterministic algorithm.
Theory
Let's consider a set A = {a0,.. an-1} of n elements for which a query to determine membership is executed. The data structure consists of a bit vector V of m bits and k completely independent?hash functions?that are associated to a position in the bit vector. The assignment (or mapping) of hash functions to bits has to follow a uniform distribution [ref?1].
The diagram below illustrates the basic mechanism behind the Bloom filter. The set A is defined by the pair a1 and a2. The hash functions h1 and h2 map the elements to bit position (bit set to 1) in the bit vector. The element b has one of the positions set to 0 and therefore does not belong to the set. The element c belongs to the set because its associated positions have bits set to 1.
However, the algorithm does not prevent false positive. For instance, a bit may have been set to 1 during the insertion of previous elements and the query reports erroneously that the element belongs to the set.
The insertion of an elements depends on the h hash functions, therefore the time needed to add a new element is?h?(number of hash functions) and independent from size of the bit vector: asymptotic insertion time =?O(h). However, the filter requires h bits for each element and is less effective that traditional bit array for small sets.
The probability of false positives decreases as the number n of inserted elements decreases and the size of the bitvector m, increases. The number of hash functions that minimizes the probability of false positives is defined by?h = m.ln2/n.
Digest-based filter
Scala implementation
The approach utilizes cryptographic hash functions, referencing both [ref?2] and employing the?MessageDigest?class from the Java library [ref?3] to create unique hash codes. Details such as auxiliary methods and specific conditions for method parameters are excluded for simplicity.?
The initial step involves establishing the?DigestBloomFilter?class along with its properties:
class DigestBloomFilter[T: ClassTag](
length: Int, // Length or capacity of the Bloom filter
numHashFunctions: Int, // Number of hash functions
hashingAlgo: HashingAlgo = SHA1Algo() // Hashing algorithm SHA1, MD5, ..
) extends BloomFilter[T] {
private[this] val set: Array[Byte] = new Array[Byte](length)
private[this] val digest = Try(MessageDigest.getInstance(hashingAlgo.toString))
private[this] var size: Int = 0
// Add a new element of type T to the set of the Bloom filter
override def add(t: T): Unit = {
hashToArray(t).foreach(set(_) = 1)
size += 1
}
// Add an array of elements of type T to the filter
override def addAll(ts: Array[T]): Unit =
if(ts.nonEmpty)
digest.foreach(_ => ts.foreach(add))
// Test whether the filter might contain a given element
override def mightContain(t: T): Boolean =
digest.map(_ => hashToArray(t).forall(set(_) == 1)).getOrElse(false)
The digest using the message digest of the java libraryjava.security .MessageDigest.
The next step consists of defining the methods to add single generic element?add(t: T)?and array of elements?addAll(ts: Array[T])
The method?mightContain?evaluates whether an element is contained in the filter. The method returns
The?add?and?mightContain?methods relies on the?hashToArray?private method to initialize the set of entries, with the first value being the hashCode of the new entry.
def hashToArray(t: T): Array[Int] =
(0 until numHashFunctions).foldLeft(new Array[Int](numHashFunctions))(
(buf, idx) => {
val value = if(idx > 0) hash(buf(idx -1)) else hash(t.hashCode)
buf.update(idx, value)
buf
}
)
The?hash?method is the core of the Bloom filter: It consists of computing an index of an entry.
def hash(value: Int): Int = digest.map(
d => {
d.reset()
d.update(value)
Math.abs(new BigInteger(1, d.digest).intValue) % (set.length - 1)
}
).getOrElse(-1)
The instance of the?MessageDigest?class,?digest?generates a hash value using either?MD5?or?SHA-1?algorithm. Tail recursion is used as an alternative to the iterative process to generate the set.?
The next code snippet implements a very simple implicit conversion from?Int?to?Array[Byte]?conversion.
val numBytes: Int = 4
val lastByte: Int = numBytes - 1
implicit def int2Bytes(value: Int): Array[Byte] = Array.tabulate(numBytes)(
n => {
val offset = (lastByte - n) << lastByte
((value >>> offset) & 0xFF).toByte
}
)
The conversion relies on the manipulation of bits from a 32 bit Integer to 4 bytes. Alternatively, you may consider a conversion from a long value to a 8 byte array.
Use case
This simple test consists of checking if a couple of values are indeed containing in the set. The filter will definitively reject 22 and very likely accept 5 & 97. If the objective is to confirm that 5 & 97 belong to the set, then a full-fledged hash table would have to be used.
val filter = new DigestBloomFilter[Long](100, 100)
val newValues = Array[Long](5L, 97L, 91L, 23L, 67L, 33L)
filter.addAll(newValues)
assert(filter.mightContain(5))
assert(filter.mightContain(97))
assert(!filter.mightContain(22))
Performance evaluation
Let's look at the behavior of the bloom filter under load. The test consists of adding?100,000,000?new random values then test if the filter contains a value (10,000) times. The test is run?10?times after a warm up of the JVM.
The first performance test evaluates the average time required to insert a new element into a Bloom filter which size range from?100M?to?1Billionentries. The second test evaluates the average search/query time for bloom filters with same range of size.
As expected the average time to load a new set of values and check the filter contains a specific value is fairly constant.?
Spark-based filter
Apache Spark
Apache Spark is a free, open-source framework for cluster computing, specifically designed to process data in real time via distributed computing [ref?4]. Its primary applications include:
Implementation
Apache Spark includes a Bloom filter implementation,?BloomFilter, that's suitable for handling large datasets within data frames [ref?5]. It features two primary attributes:
The class?SparkBloomFilter?is parameterized with the type of elements?Tto be inserted and searched.
import org.apache.spark.util.sketch._
class SparkBloomFilter[T](bloomFilter: BloomFilter) extends TBloomFilter[T] {
def getExpectedFPRate: Double = bloomFilter.expectedFpp()
override def mightContain(t: T): Boolean = bloomFilter.mightContain(t)
override def add(t: T): Unit = bloomFilter.put(t)
override def addAll(ts: Array[T]): Unit =
if(ts.nonEmpty)
ts.foreach(add)
}
The 3 methods are:
A generic constructor allows the customization of the Bloom filter on Spark with the following attributes:
def apply[T](capacity: Int, targetFPRate: Float): SparkBloomFilter[T] =
new SparkBloomFilter[T]( BloomFilter.create(capacity, targetFPRate))
Impact of capacity on expected false positives
The anticipated false-positive rate indicates the probability of hash function collisions. As the filter's capacity increases, it becomes less probable that a new item will clash with one already in the filter. We'll delve into how the filter's capacity influences the collision rate within the hashing mechanism.
def computeExpectedFPRate(capacity: Int, input: Array[Long]): Double = {
val filter = SparkBloomFilter[Long](capacity, 0.05F)
input.foreach(n => filter.add(n)) /
filter.getExpectedFPRate
}
val input = Array[Long](5L, 97L, 91L, 23L, 67L, 33L) ++
Array.tabulate(10000)(n => n.toLong+100L)
(1000 until 12000 by 500).foreach(
capacity => println(s"$capacity ${computeExpectedFPRate(capacity, input)}")
)
In this experiment, we chose capacity values within the range of?1,000?to?12,000?for a Bloom filter containing?10,006?entries. The expected false-positive rate decreases from?1.0?towards nearly?0.0. Specifically, at a capacity of?10,006, where each entry is allocated a distinct slot, the observed rate of false positives at?0.0509?aligns with the predetermined target of?0.5.
Application to datasets
A Bloom filter can be utilized on a specific column, referred to as?columnName, within a dataset containing a vast number of values, named?dataSet. This application requires setting a defined?capacity?and aiming for a certain false positive rate.
def apply[T](
dataSet: Dataset[T],
columnName: String,
capacity: Int,
targetFPRate: Double)(implicit sparkSession: SparkSession): SparkBloomFilter[T]= {
val filter = dataSet.stat.bloomFilter(columnName, capacity, targetFPRate)
new SparkBloomFilter[T](filter)
}
Finally, we can apply this constructor to create a Bloom filter on a very large Spark Dataset.
case class TEntry(id: String, value: Float)
val dataSize = 1000000
val dataSet = Seq.tabulate(dataSize)(
n => TEntry(n.toString, Random.nextFloat())
).toDS()
val filter = SparkBloomFilter(dataSet, "id", dataSize, 0.05)
References
---------------------------
Patrick Nicolas has over 25 years of experience in software and data engineering, architecture design and end-to-end deployment and support with extensive knowledge in machine learning.? He has been director of data engineering at Aideo Technologies since 2017 and he is the?author of "Scala for Machine Learning" Packt Publishing ISBN 978-1-78712-238-3
Keywords: #bloomfilter #hash #scala #spark