From 50a978d1bfbde86799acb8433cb542d0a9f9a028 Mon Sep 17 00:00:00 2001 From: elliVM <49@teragrep.com> Date: Wed, 11 Oct 2023 12:58:13 +0300 Subject: [PATCH 1/3] use blf_01 Token inside buffer --- pom.xml | 2 +- .../functions/dpf_03/TokenAggregator.scala | 12 ++- .../functions/dpf_03/TokenBuffer.scala | 9 ++- src/test/scala/TokenAggregatorTest.scala | 78 ++++++++++--------- src/test/scala/TokenBufferTest.scala | 72 +++++++++++++++++ 5 files changed, 128 insertions(+), 45 deletions(-) create mode 100644 src/test/scala/TokenBufferTest.scala diff --git a/pom.xml b/pom.xml index a6b793c..b84794d 100644 --- a/pom.xml +++ b/pom.xml @@ -47,7 +47,7 @@ com.teragrep blf_01 - 1.1.2 + 2.0.0 org.apache.spark diff --git a/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala b/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala index bfbbb33..d6828d8 100644 --- a/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala +++ b/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala @@ -46,11 +46,13 @@ package com.teragrep.functions.dpf_03 -import java.io.Serializable -import com.teragrep.blf_01.tokenizer.Tokenizer +import java.io.{ByteArrayInputStream, Serializable} +import com.teragrep.blf_01.{Token, Tokenizer} import org.apache.spark.sql.{Encoder, Encoders, Row} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.expressions.Aggregator + +import java.nio.charset.StandardCharsets import scala.reflect.ClassTag class TokenAggregator(private final val columnName: String) extends Aggregator[Row, TokenBuffer, Set[String]] @@ -59,8 +61,10 @@ class TokenAggregator(private final val columnName: String) extends Aggregator[R override def zero(): TokenBuffer = new TokenBuffer() override def reduce(b: TokenBuffer, a: Row): TokenBuffer = { + val tokenizer: Tokenizer = new Tokenizer; val input: String = a.getAs(columnName).toString - Tokenizer.tokenize(input).forEach(i => b.addKey(i)) + val stream = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)) + tokenizer.tokenize(stream).forEach(token => b.addKey(token)) b } @@ -70,7 +74,7 @@ class TokenAggregator(private final val columnName: String) extends Aggregator[R } override def finish(reduction: TokenBuffer): Set[String] = { - reduction.getBuffer.keySet.toSet + reduction.getBuffer.keySet.map(token => token.toString).toSet } override def bufferEncoder: Encoder[TokenBuffer] = customKryoEncoder[TokenBuffer] diff --git a/src/main/scala/com/teragrep/functions/dpf_03/TokenBuffer.scala b/src/main/scala/com/teragrep/functions/dpf_03/TokenBuffer.scala index 0b90235..2276870 100644 --- a/src/main/scala/com/teragrep/functions/dpf_03/TokenBuffer.scala +++ b/src/main/scala/com/teragrep/functions/dpf_03/TokenBuffer.scala @@ -47,20 +47,21 @@ package com.teragrep.functions.dpf_03 import scala.collection.mutable +import com.teragrep.blf_01.Token class TokenBuffer() { - private var hashMap: mutable.HashMap[String, Int] = mutable.HashMap[String, Int]() + private var hashMap: mutable.HashMap[Token, Int] = mutable.HashMap[Token, Int]() - def getBuffer: mutable.HashMap[String, Int] = hashMap + def getBuffer: mutable.HashMap[Token, Int] = hashMap - def mergeBuffer(other: mutable.HashMap[String, Int]): Unit ={ + def mergeBuffer(other: mutable.HashMap[Token, Int]): Unit ={ hashMap = hashMap ++ other } def getSize: Int = hashMap.size - def addKey(key: String): Unit = { + def addKey(key: Token): Unit = { hashMap.put(key, 1) } diff --git a/src/test/scala/TokenAggregatorTest.scala b/src/test/scala/TokenAggregatorTest.scala index c2dc3b6..d37e8e9 100644 --- a/src/test/scala/TokenAggregatorTest.scala +++ b/src/test/scala/TokenAggregatorTest.scala @@ -49,14 +49,25 @@ import com.teragrep.functions.dpf_03.TokenBuffer import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.streaming.{StreamingQuery, Trigger} -import org.apache.spark.sql.{Dataset, Row, RowFactory, SparkSession} +import org.apache.spark.sql.{DataFrame, Dataset, Row, RowFactory, SparkSession} import org.apache.spark.sql.types.{DataTypes, MetadataBuilder, StructField, StructType} +import org.junit.Assert.assertEquals import java.sql.Timestamp import java.time.{Instant, LocalDateTime, ZoneOffset} +import java.util +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer class TokenAggregatorTest { + val exampleString: String = "NetScreen row=[Root]system-notification-00257" + + "(traffic\uD83D\uDE41 start_time=\"2022-09-02 10:13:40\"" + + " duration=0 policy_id=320000 service=tcp/port:8151 proto=6" + + " src zone=Null dst zone=Null action=Deny sent=0 rcvd=40" + + " src=127.127.127.127 dst=127.0.0.1 src_port=52362" + + " dst_port=8151 session_id=0 reason=Traffic Denied" + + val amount: Long = 10 val testSchema: StructType = new StructType( Array[StructField] @@ -75,9 +86,9 @@ class TokenAggregatorTest { val sparkSession = SparkSession.builder.master("local[*]").getOrCreate val sqlContext = sparkSession.sqlContext sparkSession.sparkContext.setLogLevel("ERROR") - val encoder = RowEncoder.apply(testSchema) val rowMemoryStream = new MemoryStream[Row](1,sqlContext)(encoder) + var rowDataset = rowMemoryStream.toDF val tokenAggregator = new TokenAggregator("_raw") @@ -94,59 +105,54 @@ class TokenAggregatorTest { while (streamingQuery.isActive) { val time = Timestamp.valueOf(LocalDateTime.ofInstant(Instant.now, ZoneOffset.UTC)) rowMemoryStream.addData( - makeRows(time, String.valueOf(run), 10)) + makeRows(time, String.valueOf(run))) run += 1 if (run == 10) { streamingQuery.processAllAvailable - sparkSession.sql("SELECT * FROM TokenAggregatorQuery").show(100) streamingQuery.stop + streamingQuery.awaitTermination() } } - } - - @org.junit.jupiter.api.Test - def testTokenBuffer(): Unit = { - val buffer1 = new TokenBuffer - val buffer2 = new TokenBuffer - - // Test no duplicates - buffer1.addKey("one") - buffer1.addKey("one") - - buffer2.addKey("two") - buffer2.addKey("three") - - assert(buffer1.getSize == 1) - - buffer1.mergeBuffer(buffer2.getBuffer) - assert(buffer1.getSize == 3) + val finalResult = sqlContext.sql("SELECT tokens FROM TokenAggregatorQuery").collectAsList() + println(finalResult.size()) + println(finalResult) } - private def makeRows(time: Timestamp, partition: String, amount: Long): Seq[Row] = { + private def makeRows(time: Timestamp, partition: String): Seq[Row] = { val rowList: ArrayBuffer[Row] = new ArrayBuffer[Row] + val rowData = generateRawData() + + for (i <- 0 until amount.toInt) { + val row = RowFactory.create(time, + exampleString, + "topic", + "stream", + "host", + "input", + partition, + "0L") - val row = RowFactory.create( - time, - "data Data", - "topic", - "stream", - "host", - "input", - partition, - "0L") - - var temp = amount - while (temp > 0) { rowList += row - temp -= 1 } rowList } + private def generateRawData(): Array[String] = { + val testDataList = new Array[String](amount.toInt) + + for (i <- testDataList.indices) { + val randomVal = Math.floor(Math.random() * 999) + val text = "ip=172.17.255."+randomVal+",port=8080,session_id=46889" + testDataList.update(i, text) + + } + testDataList + } + private def startStream(rowDataset: Dataset[Row]): StreamingQuery = rowDataset.writeStream.queryName("TokenAggregatorQuery") .outputMode("complete") diff --git a/src/test/scala/TokenBufferTest.scala b/src/test/scala/TokenBufferTest.scala new file mode 100644 index 0000000..00429b7 --- /dev/null +++ b/src/test/scala/TokenBufferTest.scala @@ -0,0 +1,72 @@ +/* + * Teragrep Tokenizer DPF-03 + * Copyright (C) 2019, 2020, 2021, 2022, 2023 Suomen Kanuuna Oy + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + * Additional permission under GNU Affero General Public License version 3 + * section 7 + * + * If you modify this Program, or any covered work, by linking or combining it + * with other code, such other code is not for that reason alone subject to any + * of the requirements of the GNU Affero GPL version 3 as long as this Program + * is the same Program as licensed from Suomen Kanuuna Oy without any additional + * modifications. + * + * Supplemented terms under GNU Affero General Public License version 3 + * section 7 + * + * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified + * versions must be marked as "Modified version of" The Program. + * + * Names of the licensors and authors may not be used for publicity purposes. + * + * No rights are granted for use of trade names, trademarks, or service marks + * which are in The Program if any. + * + * Licensee must indemnify licensors and authors for any liability that these + * contractual assumptions impose on licensors and authors. + * + * To the extent this program is licensed as part of the Commercial versions of + * Teragrep, the applicable Commercial License may apply to this file if you as + * a licensee so wish it. + */ + +import com.teragrep.blf_01.Tokenizer +import com.teragrep.functions.dpf_03.TokenBuffer + +import java.io.{ByteArrayInputStream, InputStream} +import java.nio.charset.StandardCharsets + +class TokenBufferTest { + + @org.junit.jupiter.api.Test + def testNoDuplicateKeys(): Unit = { + + val tokenizer: Tokenizer = new Tokenizer + + val tokenBuffer: TokenBuffer = new TokenBuffer + + val input: String = "one,one" + + val is: InputStream = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)) + + tokenizer.tokenize(is).forEach(token => tokenBuffer.addKey(token)) + + // "one" and "," + assert(tokenBuffer.getSize == 2) + + } +} From dd70ea50449ad7121a44dd561a4ad832d5ddd7a6 Mon Sep 17 00:00:00 2001 From: elliVM <49@teragrep.com> Date: Wed, 11 Oct 2023 13:06:44 +0300 Subject: [PATCH 2/3] remove unused import --- .../scala/com/teragrep/functions/dpf_03/TokenAggregator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala b/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala index d6828d8..e20ccc4 100644 --- a/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala +++ b/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala @@ -47,7 +47,7 @@ package com.teragrep.functions.dpf_03 import java.io.{ByteArrayInputStream, Serializable} -import com.teragrep.blf_01.{Token, Tokenizer} +import com.teragrep.blf_01.Tokenizer import org.apache.spark.sql.{Encoder, Encoders, Row} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.expressions.Aggregator From f26ae739d711e850a48d9540c14319d21eecbc1a Mon Sep 17 00:00:00 2001 From: Motoko Kusanagi Date: Wed, 11 Oct 2023 14:01:47 +0300 Subject: [PATCH 3/3] less new --- .../functions/dpf_03/TokenAggregator.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala b/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala index e20ccc4..a73207f 100644 --- a/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala +++ b/src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala @@ -51,6 +51,7 @@ import com.teragrep.blf_01.Tokenizer import org.apache.spark.sql.{Encoder, Encoders, Row} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.expressions.Aggregator +import org.apache.spark.unsafe.types.UTF8String import java.nio.charset.StandardCharsets import scala.reflect.ClassTag @@ -58,13 +59,17 @@ import scala.reflect.ClassTag class TokenAggregator(private final val columnName: String) extends Aggregator[Row, TokenBuffer, Set[String]] with Serializable { - override def zero(): TokenBuffer = new TokenBuffer() + var tokenizer: Option[Tokenizer] = None + + override def zero(): TokenBuffer = { + tokenizer = Some(new Tokenizer(32)) + new TokenBuffer() + } override def reduce(b: TokenBuffer, a: Row): TokenBuffer = { - val tokenizer: Tokenizer = new Tokenizer; - val input: String = a.getAs(columnName).toString - val stream = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)) - tokenizer.tokenize(stream).forEach(token => b.addKey(token)) + val input = a.getAs[String](columnName).getBytes(StandardCharsets.UTF_8) + val stream = new ByteArrayInputStream(input) + tokenizer.get.tokenize(stream).forEach(token => b.addKey(token)) b }