From f5665caf884678b44cbf7ab06d0e494d0138695f Mon Sep 17 00:00:00 2001 From: Motoko Kusanagi Date: Wed, 18 Oct 2023 22:17:15 +0300 Subject: [PATCH] change to Array[Byte] return value --- README.adoc | 89 +---------- .../dpf_03/BloomFilterAggregator.scala | 140 ++++++++++++++++++ ...enBuffer.scala => BloomFilterBuffer.scala} | 29 ++-- .../functions/dpf_03/TokenAggregator.scala | 90 ----------- ....scala => BloomFilterAggregatorTest.scala} | 12 +- ...Test.scala => BloomFilterBufferTest.scala} | 46 ++++-- 6 files changed, 196 insertions(+), 210 deletions(-) create mode 100644 src/main/scala/com/teragrep/functions/dpf_03/BloomFilterAggregator.scala rename src/main/scala/com/teragrep/functions/dpf_03/{TokenBuffer.scala => BloomFilterBuffer.scala} (80%) delete mode 100644 src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala rename src/test/scala/{TokenAggregatorTest.scala => BloomFilterAggregatorTest.scala} (93%) rename src/test/scala/{TokenBufferTest.scala => BloomFilterBufferTest.scala} (64%) diff --git a/README.adoc b/README.adoc index e960237..c1ceecc 100644 --- a/README.adoc +++ b/README.adoc @@ -1,88 +1 @@ - -[source, scala] ----- -%spark - -import org.apache.spark.SparkConf // for accessing properties -import org.apache.spark.sql.types._ -import org.apache.spark.sql.streaming.Trigger -import java.sql.Timestamp -import org.apache.spark.sql.streaming.StreamingQuery - -// token aggregator -import com.teragrep.functions.dpf_03.TokenAggregator -import org.apache.spark.sql._ - -private def isArchiveDone(streamingQuery: StreamingQuery) = { - var archiveDone = true - for (i <- streamingQuery.lastProgress.sources.indices) { - val startOffset = streamingQuery.lastProgress.sources(i).startOffset - val endOffset = streamingQuery.lastProgress.sources(i).endOffset - val description = streamingQuery.lastProgress.sources(i).description - if (description != null && description.startsWith("com.teragrep.pth06.ArchiveMicroBatchReader@")) { // ignore others than archive - if (startOffset != null) { - if (startOffset != endOffset) { - archiveDone = false - } - } - else { - archiveDone = false - } - } - } - archiveDone -} - -val s3identity: String = System.getProperty("user.name") -var s3credential: String = _ - -try { - val path: String = "hdfs:///user/" + s3identity + "/s3credential"; - val df = spark.read.textFile(path); - s3credential = df.first() -} -catch { - case e: Exception => { - println("Unable to get s3credential from HDFS: " + e) - System.exit(1) - } -} - -val df = spark - .readStream - .format("com.teragrep.pth06.ArchiveSourceProvider") - .option("S3endPoint", sc.getConf.get("fs.s3a.endpoint")) - .option("S3identity", s3identity) - .option("S3credential", s3credential) - .option("DBusername", sc.getConf.get("dpl.archive.db.username")) - .option("DBpassword", sc.getConf.get("dpl.archive.db.password")) - .option("DBurl", sc.getConf.get("dpl.archive.db.url")) - .option("DBstreamdbname", sc.getConf.get("dpl.archive.db.streamdb.name")) - .option("DBjournaldbname", sc.getConf.get("dpl.archive.db.journaldb.name")) - .option("num_partitions", "192") - .option("queryXML", """""") - .load() - -val tokenAggregator: TokenAggregator = new TokenAggregator("_raw") -val tokenAggregatorColumn: Column = tokenAggregator.toColumn - -val df2 = df.groupBy($"partition").agg(tokenAggregatorColumn) - -val query = df2 - .writeStream - .outputMode("complete") - .format("memory") - .trigger(Trigger.ProcessingTime(0)) - .queryName("ArchiveAccessExample") - .start() - - -while (!query.awaitTermination(1000)) { - val dfOut = sqlContext.sql("SELECT * FROM ArchiveAccessExample") - z.getInterpreterContext.out.clear(true); - z.show(dfOut) - if(query.lastProgress != null && isArchiveDone(query)) - query.stop() -} -s3credential="" // clear so it's not present on the output ----- \ No newline at end of file +See test cases \ No newline at end of file diff --git a/src/main/scala/com/teragrep/functions/dpf_03/BloomFilterAggregator.scala b/src/main/scala/com/teragrep/functions/dpf_03/BloomFilterAggregator.scala new file mode 100644 index 0000000..cceb3ff --- /dev/null +++ b/src/main/scala/com/teragrep/functions/dpf_03/BloomFilterAggregator.scala @@ -0,0 +1,140 @@ +/* + * Teragrep Tokenizer DPF-03 + * Copyright (C) 2019, 2020, 2021, 2022, 2023 Suomen Kanuuna Oy + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + * Additional permission under GNU Affero General Public License version 3 + * section 7 + * + * If you modify this Program, or any covered work, by linking or combining it + * with other code, such other code is not for that reason alone subject to any + * of the requirements of the GNU Affero GPL version 3 as long as this Program + * is the same Program as licensed from Suomen Kanuuna Oy without any additional + * modifications. + * + * Supplemented terms under GNU Affero General Public License version 3 + * section 7 + * + * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified + * versions must be marked as "Modified version of" The Program. + * + * Names of the licensors and authors may not be used for publicity purposes. + * + * No rights are granted for use of trade names, trademarks, or service marks + * which are in The Program if any. + * + * Licensee must indemnify licensors and authors for any liability that these + * contractual assumptions impose on licensors and authors. + * + * To the extent this program is licensed as part of the Commercial versions of + * Teragrep, the applicable Commercial License may apply to this file if you as + * a licensee so wish it. + */ + +package com.teragrep.functions.dpf_03 + +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, Serializable} +import com.teragrep.blf_01.Tokenizer +import org.apache.spark.sql.{Encoder, Encoders, Row} +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.expressions.Aggregator +import org.apache.spark.util.sketch.BloomFilter + +import java.nio.charset.StandardCharsets +import scala.reflect.ClassTag + +class BloomFilterAggregator(final val columnName: String, final val maxMinorTokens: Long, final val sizeSplit: Map[Long, Double]) extends Aggregator[Row, BloomFilterBuffer, Array[Byte]] + with Serializable { + + var tokenizer: Option[Tokenizer] = None + + override def zero(): BloomFilterBuffer = { + tokenizer = Some(new Tokenizer(maxMinorTokens)) + new BloomFilterBuffer(sizeSplit) + } + + override def reduce(buffer: BloomFilterBuffer, row: Row): BloomFilterBuffer = { + val input = row.getAs[String](columnName).getBytes(StandardCharsets.UTF_8) + val stream = new ByteArrayInputStream(input) + + for ((size: Long, bfByteArray: Array[Byte]) <- buffer.sizeToBloomFilterMap) { + val bios: ByteArrayInputStream = new ByteArrayInputStream(bfByteArray) + val bf = BloomFilter.readFrom(bios) + + tokenizer.get.tokenize(stream).forEach( + token => { + bf.put(token.bytes) + } + ) + + val baos = new ByteArrayOutputStream() + bf.writeTo(baos) + + buffer.sizeToBloomFilterMap.put(size, baos.toByteArray) + } + + buffer + } + + override def merge(ours: BloomFilterBuffer, their: BloomFilterBuffer): BloomFilterBuffer = { + for ((size: Long, bfByteArray: Array[Byte]) <- ours.sizeToBloomFilterMap) { + val ourBios: ByteArrayInputStream = new ByteArrayInputStream(bfByteArray) + val ourBf = BloomFilter.readFrom(ourBios) + + val maybeArray: Option[Array[Byte]] = their.sizeToBloomFilterMap.get(size) + val theirBios = new ByteArrayInputStream(maybeArray.get) + val theirBf = BloomFilter.readFrom(theirBios) + + ourBf.mergeInPlace(theirBf) + + val ourBaos = new ByteArrayOutputStream() + ourBf.writeTo(ourBaos) + + ours.sizeToBloomFilterMap.put(size, ourBaos.toByteArray) + } + ours + } + + /** + * Find best BloomFilter candidate for return + * @param buffer BloomFilterBuffer returned by reduce step + * @return best candidate by fpp being smaller than requested + */ + override def finish(buffer: BloomFilterBuffer): Array[Byte] = { + + // default to largest + var out = buffer.sizeToBloomFilterMap(buffer.sizeToBloomFilterMap.keys.max) + // seek best candidate, from smallest to largest + for (size <- buffer.sizeToBloomFilterMap.keys.toSeq.sorted) { + val bios = new ByteArrayInputStream(buffer.sizeToBloomFilterMap(size)) + val bf = BloomFilter.readFrom(bios) + val sizeFpp: Double = sizeSplit(size) + + if (bf.expectedFpp() <= sizeFpp) { + val baos = new ByteArrayOutputStream() + bf.writeTo(baos) + out = baos.toByteArray + } + } + out + } + + override def bufferEncoder: Encoder[BloomFilterBuffer] = customKryoEncoder[BloomFilterBuffer] + + override def outputEncoder: Encoder[Array[Byte]] = ExpressionEncoder[Array[Byte]] + + implicit def customKryoEncoder[A](implicit ct: ClassTag[A]): Encoder[A] = Encoders.kryo[A](ct) +} \ No newline at end of file diff --git a/src/main/scala/com/teragrep/functions/dpf_03/TokenBuffer.scala b/src/main/scala/com/teragrep/functions/dpf_03/BloomFilterBuffer.scala similarity index 80% rename from src/main/scala/com/teragrep/functions/dpf_03/TokenBuffer.scala rename to src/main/scala/com/teragrep/functions/dpf_03/BloomFilterBuffer.scala index 2276870..c5f5201 100644 --- a/src/main/scala/com/teragrep/functions/dpf_03/TokenBuffer.scala +++ b/src/main/scala/com/teragrep/functions/dpf_03/BloomFilterBuffer.scala @@ -47,26 +47,25 @@ package com.teragrep.functions.dpf_03 import scala.collection.mutable -import com.teragrep.blf_01.Token +import org.apache.spark.util.sketch.BloomFilter -class TokenBuffer() { +import java.io.ByteArrayOutputStream - private var hashMap: mutable.HashMap[Token, Int] = mutable.HashMap[Token, Int]() +class BloomFilterBuffer(final val sizeSplit: Map[Long, Double]) { - def getBuffer: mutable.HashMap[Token, Int] = hashMap + val sizeToBloomFilterMap: mutable.HashMap[Long, Array[Byte]] = { + val rv = mutable.HashMap[Long, Array[Byte]]() - def mergeBuffer(other: mutable.HashMap[Token, Int]): Unit ={ - hashMap = hashMap ++ other - } + for ((size, fpp) <- sizeSplit) { - def getSize: Int = hashMap.size + val bf: BloomFilter = BloomFilter.create(size, fpp) - def addKey(key: Token): Unit = { - hashMap.put(key, 1) - } + val baos: ByteArrayOutputStream = new ByteArrayOutputStream() - override def toString: String = - s"""Buffer{ - |map=$hashMap - |}""".stripMargin + bf.writeTo(baos) + rv.put(size, baos.toByteArray) + } + + rv + } } \ No newline at end of file diff --git a/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala b/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala deleted file mode 100644 index 5eefa29..0000000 --- a/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Teragrep Tokenizer DPF-03 - * Copyright (C) 2019, 2020, 2021, 2022, 2023 Suomen Kanuuna Oy - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - * - * Additional permission under GNU Affero General Public License version 3 - * section 7 - * - * If you modify this Program, or any covered work, by linking or combining it - * with other code, such other code is not for that reason alone subject to any - * of the requirements of the GNU Affero GPL version 3 as long as this Program - * is the same Program as licensed from Suomen Kanuuna Oy without any additional - * modifications. - * - * Supplemented terms under GNU Affero General Public License version 3 - * section 7 - * - * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified - * versions must be marked as "Modified version of" The Program. - * - * Names of the licensors and authors may not be used for publicity purposes. - * - * No rights are granted for use of trade names, trademarks, or service marks - * which are in The Program if any. - * - * Licensee must indemnify licensors and authors for any liability that these - * contractual assumptions impose on licensors and authors. - * - * To the extent this program is licensed as part of the Commercial versions of - * Teragrep, the applicable Commercial License may apply to this file if you as - * a licensee so wish it. - */ - -package com.teragrep.functions.dpf_03 - -import java.io.{ByteArrayInputStream, Serializable} -import com.teragrep.blf_01.Tokenizer -import org.apache.spark.sql.{Encoder, Encoders, Row} -import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder -import org.apache.spark.sql.expressions.Aggregator -import org.apache.spark.unsafe.types.UTF8String - -import java.nio.charset.StandardCharsets -import scala.reflect.ClassTag - -class TokenAggregator(final val columnName: String, final val maxMinorTokens: Long) extends Aggregator[Row, TokenBuffer, Set[String]] - with Serializable { - - var tokenizer: Option[Tokenizer] = None - - override def zero(): TokenBuffer = { - tokenizer = Some(new Tokenizer(maxMinorTokens)) - new TokenBuffer() - } - - override def reduce(b: TokenBuffer, a: Row): TokenBuffer = { - val input = a.getAs[String](columnName).getBytes(StandardCharsets.UTF_8) - val stream = new ByteArrayInputStream(input) - tokenizer.get.tokenize(stream).forEach(token => b.addKey(token)) - b - } - - override def merge(b1: TokenBuffer, b2: TokenBuffer): TokenBuffer = { - b1.mergeBuffer(b2.getBuffer) - b1 - } - - override def finish(reduction: TokenBuffer): Set[String] = { - reduction.getBuffer.keySet.map(token => token.toString).toSet - } - - override def bufferEncoder: Encoder[TokenBuffer] = customKryoEncoder[TokenBuffer] - - override def outputEncoder: Encoder[Set[String]] = ExpressionEncoder[Set[String]] - - implicit def customKryoEncoder[A](implicit ct: ClassTag[A]): Encoder[A] = Encoders.kryo[A](ct) -} \ No newline at end of file diff --git a/src/test/scala/TokenAggregatorTest.scala b/src/test/scala/BloomFilterAggregatorTest.scala similarity index 93% rename from src/test/scala/TokenAggregatorTest.scala rename to src/test/scala/BloomFilterAggregatorTest.scala index aa4d7d2..e818534 100644 --- a/src/test/scala/TokenAggregatorTest.scala +++ b/src/test/scala/BloomFilterAggregatorTest.scala @@ -44,8 +44,8 @@ * a licensee so wish it. */ -import com.teragrep.functions.dpf_03.TokenAggregator -import com.teragrep.functions.dpf_03.TokenBuffer +import com.teragrep.functions.dpf_03.BloomFilterAggregator +import com.teragrep.functions.dpf_03.BloomFilterBuffer import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.streaming.{StreamingQuery, Trigger} @@ -59,7 +59,7 @@ import java.util import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -class TokenAggregatorTest { +class BloomFilterAggregatorTest { val exampleString: String = "NetScreen row=[Root]system-notification-00257" + "(traffic\uD83D\uDE41 start_time=\"2022-09-02 10:13:40\"" + " duration=0 policy_id=320000 service=tcp/port:8151 proto=6" + @@ -91,13 +91,13 @@ class TokenAggregatorTest { var rowDataset = rowMemoryStream.toDF - val tokenAggregator = new TokenAggregator("_raw", 32) + val tokenAggregator = new BloomFilterAggregator("_raw", 32, Map(50000L -> 0.01)) val tokenAggregatorColumn = tokenAggregator.toColumn rowDataset = rowDataset .groupBy("partition") .agg(tokenAggregatorColumn) - .withColumnRenamed("TokenAggregator(org.apache.spark.sql.Row)", "tokens") + .withColumnRenamed("BloomFilterAggregator(org.apache.spark.sql.Row)", "bloomfilter") val streamingQuery = startStream(rowDataset) var run: Long = 0 @@ -116,7 +116,7 @@ class TokenAggregatorTest { } } - val finalResult = sqlContext.sql("SELECT tokens FROM TokenAggregatorQuery").collectAsList() + val finalResult = sqlContext.sql("SELECT bloomfilter FROM TokenAggregatorQuery").collectAsList() println(finalResult.size()) println(finalResult) } diff --git a/src/test/scala/TokenBufferTest.scala b/src/test/scala/BloomFilterBufferTest.scala similarity index 64% rename from src/test/scala/TokenBufferTest.scala rename to src/test/scala/BloomFilterBufferTest.scala index 00429b7..79743f9 100644 --- a/src/test/scala/TokenBufferTest.scala +++ b/src/test/scala/BloomFilterBufferTest.scala @@ -44,29 +44,53 @@ * a licensee so wish it. */ -import com.teragrep.blf_01.Tokenizer -import com.teragrep.functions.dpf_03.TokenBuffer +import com.teragrep.functions.dpf_03.BloomFilterAggregator +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import org.apache.spark.util.sketch.BloomFilter -import java.io.{ByteArrayInputStream, InputStream} -import java.nio.charset.StandardCharsets +import java.io.ByteArrayInputStream -class TokenBufferTest { +class BloomFilterBufferTest { @org.junit.jupiter.api.Test def testNoDuplicateKeys(): Unit = { - val tokenizer: Tokenizer = new Tokenizer + // TODO test other sizes / size categorization + val sizeSplit = Map(50000L -> 0.01D) - val tokenBuffer: TokenBuffer = new TokenBuffer + val expectedBfBitSize = { + val size = sizeSplit.keys.max + val fpp = sizeSplit(size) + val bf = BloomFilter.create(size, fpp) + bf.bitSize() + } val input: String = "one,one" - val is: InputStream = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)) - tokenizer.tokenize(is).forEach(token => tokenBuffer.addKey(token)) + val columnName = "column1"; - // "one" and "," - assert(tokenBuffer.getSize == 2) + val schema = StructType(Seq(StructField(columnName, StringType))) + val row = new GenericRowWithSchema(Array(input), schema) + + val bfAgg : BloomFilterAggregator = new BloomFilterAggregator(columnName, 32, sizeSplit) + + val bfAggBuf = bfAgg.zero() + bfAgg.reduce(bfAggBuf, row) + + // TODO test merge + + val outArr : Array[Byte] = bfAgg.finish(bfAggBuf) + val bios = new ByteArrayInputStream(outArr) + + val bf = BloomFilter.readFrom(bios) + + // "one" and "," + assert(bf.mightContain("one")) + assert(bf.mightContain(",")) + assert(bf.bitSize() == expectedBfBitSize) } + }