From 0edd189a6ac2a34a4001946e36a060392b862f9c Mon Sep 17 00:00:00 2001 From: elliVM <126466762+elliVM@users.noreply.github.com> Date: Tue, 5 Dec 2023 11:25:14 +0200 Subject: [PATCH] Use estimate spark column to select size for bloom filter (#19) * use estimate column to select filter size * use sorted map to ensure smallest matching filter is used * test aggregator with multiple partitions * refactoring --- .../dpf_03/BloomFilterAggregator.scala | 34 +++++++++++++++--- .../scala/BloomFilterAggregatorTest.scala | 36 ++++++++++--------- src/test/scala/BloomFilterBufferTest.scala | 5 ++- src/test/scala/TokenizerTest.scala | 36 +++++++++---------- 4 files changed, 69 insertions(+), 42 deletions(-) diff --git a/src/main/scala/com/teragrep/functions/dpf_03/BloomFilterAggregator.scala b/src/main/scala/com/teragrep/functions/dpf_03/BloomFilterAggregator.scala index c99bf2a..879e5df 100644 --- a/src/main/scala/com/teragrep/functions/dpf_03/BloomFilterAggregator.scala +++ b/src/main/scala/com/teragrep/functions/dpf_03/BloomFilterAggregator.scala @@ -56,27 +56,36 @@ import org.apache.spark.util.sketch.BloomFilter import scala.collection.mutable import scala.reflect.ClassTag -class BloomFilterAggregator(final val columnName: String, final val bloomfilterExpectedItems: Long, final val bloomfilterFfp: Double ) extends Aggregator[Row, BloomFilter, Array[Byte]] +class BloomFilterAggregator(final val columnName: String, final val estimateName: String, sizeMap: mutable.SortedMap[Long, Double]) extends Aggregator[Row, BloomFilter, Array[Byte]] with Serializable { var tokenizer: Option[Tokenizer] = None override def zero(): BloomFilter = { - BloomFilter.create(bloomfilterExpectedItems, bloomfilterFfp) + BloomFilter.create(1, 0.01) } override def reduce(buffer: BloomFilter, row: Row): BloomFilter = { + var newBuffer = buffer val tokens : mutable.WrappedArray[mutable.WrappedArray[Byte]] = row.getAs[mutable.WrappedArray[mutable.WrappedArray[Byte]]](columnName) + val estimate: Long = row.getAs[Long](estimateName) + + if (newBuffer.bitSize() == 64) { // zero() will have 64 bitSize + newBuffer = selectFilterFromMap(estimate) + } for (token : mutable.WrappedArray[Byte] <- tokens) { val tokenByteArray: Array[Byte] = token.toArray - buffer.putBinary(tokenByteArray) + newBuffer.putBinary(tokenByteArray) } - buffer + newBuffer } override def merge(ours: BloomFilter, their: BloomFilter): BloomFilter = { + // ignore merge with zero buffer + if (!ours.isCompatible(their)) return their + ours.mergeInPlace(their) } @@ -96,4 +105,21 @@ class BloomFilterAggregator(final val columnName: String, final val bloomfilterE override def outputEncoder: Encoder[Array[Byte]] = ExpressionEncoder[Array[Byte]] implicit def customKryoEncoder[A](implicit ct: ClassTag[A]): Encoder[A] = Encoders.kryo[A](ct) + + private def selectFilterFromMap(estimate: Long): BloomFilter = { + + var backupExpected = 0L + var backupFpp = 0.01 + + for (entry <- sizeMap) { + backupExpected = entry._1 + backupFpp = entry._2 + + if (estimate <= entry._1) { + return BloomFilter.create(entry._1, entry._2) + } + } + + BloomFilter.create(backupExpected, backupFpp) + } } \ No newline at end of file diff --git a/src/test/scala/BloomFilterAggregatorTest.scala b/src/test/scala/BloomFilterAggregatorTest.scala index 9325344..f00c91e 100644 --- a/src/test/scala/BloomFilterAggregatorTest.scala +++ b/src/test/scala/BloomFilterAggregatorTest.scala @@ -55,6 +55,7 @@ import org.apache.spark.util.sketch.BloomFilter import java.io.ByteArrayInputStream import java.sql.Timestamp import java.time.{Instant, LocalDateTime, ZoneOffset} +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer class BloomFilterAggregatorTest { @@ -68,16 +69,18 @@ class BloomFilterAggregatorTest { val amount: Long = 10 val testSchema: StructType = new StructType( - Array[StructField] - (StructField("_time", DataTypes.TimestampType, nullable = false, new MetadataBuilder().build), - StructField("_raw", DataTypes.StringType, nullable = false, new MetadataBuilder().build), - StructField("index", DataTypes.StringType, nullable = false, new MetadataBuilder().build), - StructField("sourcetype", DataTypes.StringType, nullable = false, new MetadataBuilder().build), - StructField("host", DataTypes.StringType, nullable = false, new MetadataBuilder().build), - StructField("source", DataTypes.StringType, nullable = false, new MetadataBuilder().build), - StructField("partition", DataTypes.StringType, nullable = false, new MetadataBuilder().build), - // Offset set as string instead of Long. - StructField("offset", DataTypes.StringType, nullable = false, new MetadataBuilder().build))) + Array[StructField]( + StructField("_time", TimestampType, nullable = false, new MetadataBuilder().build), + StructField("_raw", StringType, nullable = false, new MetadataBuilder().build), + StructField("index", StringType, nullable = false, new MetadataBuilder().build), + StructField("sourcetype", StringType, nullable = false, new MetadataBuilder().build), + StructField("host", StringType, nullable = false, new MetadataBuilder().build), + StructField("source", StringType, nullable = false, new MetadataBuilder().build), + StructField("partition", StringType, nullable = false, new MetadataBuilder().build), + StructField("offset", LongType, nullable = false, new MetadataBuilder().build), + StructField("estimate(tokens)", LongType, nullable = false, new MetadataBuilder().build) + ) + ) @org.junit.jupiter.api.Test def testTokenization(): Unit = { @@ -88,7 +91,7 @@ class BloomFilterAggregatorTest { val rowMemoryStream = new MemoryStream[Row](1,sqlContext)(encoder) var rowDataset = rowMemoryStream.toDF - + val sizeMap: mutable.TreeMap[Long, Double] = mutable.TreeMap(1000L -> 0.01, 10000L -> 0.01) // create Scala udf @@ -99,9 +102,8 @@ class BloomFilterAggregatorTest { // apply udf to column rowDataset = rowDataset.withColumn("tokens", tokenizerUDF.apply(functions.col("_raw"))) - // run bloomfilter on the column - val tokenAggregator = new BloomFilterAggregator("tokens", 50000L, 0.01) + val tokenAggregator = new BloomFilterAggregator("tokens", "estimate(tokens)", sizeMap) val tokenAggregatorColumn = tokenAggregator.toColumn val aggregatedDataset = rowDataset @@ -147,14 +149,16 @@ class BloomFilterAggregatorTest { val rowData = generateRawData() for (i <- 0 until amount.toInt) { - val row = RowFactory.create(time, + val row = Row( + time, exampleString, "topic", "stream", "host", "input", - partition, - "0L") + i.toString, + 0L, + exampleString.length.toLong) rowList += row } diff --git a/src/test/scala/BloomFilterBufferTest.scala b/src/test/scala/BloomFilterBufferTest.scala index 4b217fb..7e2884e 100644 --- a/src/test/scala/BloomFilterBufferTest.scala +++ b/src/test/scala/BloomFilterBufferTest.scala @@ -61,8 +61,7 @@ class BloomFilterBufferTest { def testNoDuplicateKeys(): Unit = { // TODO test other sizes / size categorization - val bloomfilterExpectedItems = 50000L - val bloomfilterFpp = 0.01D + val sizeMap: mutable.TreeMap[Long, Double] = mutable.TreeMap(1000L -> 0.01, 10000L -> 0.01) // single token, converted to WrappedArray val input: String = "one,one" @@ -80,7 +79,7 @@ class BloomFilterBufferTest { val schema = StructType(Seq(StructField(columnName, ArrayType(ArrayType(ByteType))))) val row = new GenericRowWithSchema(columns, schema) - val bfAgg : BloomFilterAggregator = new BloomFilterAggregator(columnName, bloomfilterExpectedItems, bloomfilterFpp) + val bfAgg : BloomFilterAggregator = new BloomFilterAggregator(columnName, "estimate(tokens)", sizeMap) val bfAggBuf = bfAgg.zero() bfAgg.reduce(bfAggBuf, row) diff --git a/src/test/scala/TokenizerTest.scala b/src/test/scala/TokenizerTest.scala index b38a773..8efd5c7 100644 --- a/src/test/scala/TokenizerTest.scala +++ b/src/test/scala/TokenizerTest.scala @@ -44,15 +44,13 @@ * a licensee so wish it. */ -import com.teragrep.functions.dpf_03.{BloomFilterAggregator, ByteArrayListAsStringListUDF, TokenizerUDF} +import com.teragrep.functions.dpf_03.{ByteArrayListAsStringListUDF, TokenizerUDF} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.streaming.{StreamingQuery, Trigger} import org.apache.spark.sql.types._ -import org.apache.spark.util.sketch.BloomFilter -import java.io.ByteArrayInputStream import java.sql.Timestamp import java.time.{Instant, LocalDateTime, ZoneOffset} import scala.collection.mutable @@ -70,15 +68,17 @@ class TokenizerTest { val testSchema: StructType = new StructType( Array[StructField] - (StructField("_time", DataTypes.TimestampType, nullable = false, new MetadataBuilder().build), - StructField("_raw", DataTypes.StringType, nullable = false, new MetadataBuilder().build), - StructField("index", DataTypes.StringType, nullable = false, new MetadataBuilder().build), - StructField("sourcetype", DataTypes.StringType, nullable = false, new MetadataBuilder().build), - StructField("host", DataTypes.StringType, nullable = false, new MetadataBuilder().build), - StructField("source", DataTypes.StringType, nullable = false, new MetadataBuilder().build), - StructField("partition", DataTypes.StringType, nullable = false, new MetadataBuilder().build), - // Offset set as string instead of Long. - StructField("offset", DataTypes.StringType, nullable = false, new MetadataBuilder().build))) + (StructField("_time", TimestampType, nullable = false, new MetadataBuilder().build), + StructField("_raw", StringType, nullable = false, new MetadataBuilder().build), + StructField("index", StringType, nullable = false, new MetadataBuilder().build), + StructField("sourcetype", StringType, nullable = false, new MetadataBuilder().build), + StructField("host", StringType, nullable = false, new MetadataBuilder().build), + StructField("source", StringType, nullable = false, new MetadataBuilder().build), + StructField("partition", StringType, nullable = false, new MetadataBuilder().build), + StructField("offset", LongType, nullable = false, new MetadataBuilder().build), + StructField("estimate(tokens)", LongType, nullable = false, new MetadataBuilder().build) + ) + ) @org.junit.jupiter.api.Test def testTokenization(): Unit = { @@ -90,8 +90,6 @@ class TokenizerTest { var rowDataset = rowMemoryStream.toDF - - // create Scala udf for tokenizer val tokenizerUDF = functions.udf(new TokenizerUDF, DataTypes.createArrayType(DataTypes.createArrayType(ByteType, false), false)) // register tokenizer udf @@ -138,19 +136,19 @@ class TokenizerTest { private def makeRows(time: Timestamp, partition: String): Seq[Row] = { val rowList: ArrayBuffer[Row] = new ArrayBuffer[Row] - val rowData = generateRawData() for (i <- 0 until amount.toInt) { - val row = RowFactory.create(time, + rowList += Row( + time, exampleString, "topic", "stream", "host", "input", partition, - "0L") - - rowList += row + 0L, + exampleString.length.toLong + ) } rowList }