Skip to content

Commit

Permalink
Merge pull request #12 from elliVM/tokens
Browse files Browse the repository at this point in the history
use blf_01 Token inside buffer
  • Loading branch information
kortemik authored Oct 11, 2023
2 parents c8b4d0a + 0c0ce49 commit 949223a
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 47 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<dependency>
<groupId>com.teragrep</groupId>
<artifactId>blf_01</artifactId>
<version>1.1.2</version>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
21 changes: 15 additions & 6 deletions src/main/scala/com/teragrep/functions/dpf_03/TokenAggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,30 @@

package com.teragrep.functions.dpf_03

import java.io.Serializable
import com.teragrep.blf_01.tokenizer.Tokenizer
import java.io.{ByteArrayInputStream, Serializable}
import com.teragrep.blf_01.Tokenizer
import org.apache.spark.sql.{Encoder, Encoders, Row}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.unsafe.types.UTF8String

import java.nio.charset.StandardCharsets
import scala.reflect.ClassTag

class TokenAggregator(private final val columnName: String) extends Aggregator[Row, TokenBuffer, Set[String]]
with Serializable {

override def zero(): TokenBuffer = new TokenBuffer()
var tokenizer: Option[Tokenizer] = None

override def zero(): TokenBuffer = {
tokenizer = Some(new Tokenizer(32))
new TokenBuffer()
}

override def reduce(b: TokenBuffer, a: Row): TokenBuffer = {
val input: String = a.getAs(columnName).toString
Tokenizer.tokenize(input).forEach(i => b.addKey(i))
val input = a.getAs[String](columnName).getBytes(StandardCharsets.UTF_8)
val stream = new ByteArrayInputStream(input)
tokenizer.get.tokenize(stream).forEach(token => b.addKey(token))
b
}

Expand All @@ -70,7 +79,7 @@ class TokenAggregator(private final val columnName: String) extends Aggregator[R
}

override def finish(reduction: TokenBuffer): Set[String] = {
reduction.getBuffer.keySet.toSet
reduction.getBuffer.keySet.map(token => token.toString).toSet
}

override def bufferEncoder: Encoder[TokenBuffer] = customKryoEncoder[TokenBuffer]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,21 @@
package com.teragrep.functions.dpf_03

import scala.collection.mutable
import com.teragrep.blf_01.Token

class TokenBuffer() {

private var hashMap: mutable.HashMap[String, Int] = mutable.HashMap[String, Int]()
private var hashMap: mutable.HashMap[Token, Int] = mutable.HashMap[Token, Int]()

def getBuffer: mutable.HashMap[String, Int] = hashMap
def getBuffer: mutable.HashMap[Token, Int] = hashMap

def mergeBuffer(other: mutable.HashMap[String, Int]): Unit ={
def mergeBuffer(other: mutable.HashMap[Token, Int]): Unit ={
hashMap = hashMap ++ other
}

def getSize: Int = hashMap.size

def addKey(key: String): Unit = {
def addKey(key: Token): Unit = {
hashMap.put(key, 1)
}

Expand Down
78 changes: 42 additions & 36 deletions src/test/scala/TokenAggregatorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,25 @@ import com.teragrep.functions.dpf_03.TokenBuffer
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.{Dataset, Row, RowFactory, SparkSession}
import org.apache.spark.sql.{DataFrame, Dataset, Row, RowFactory, SparkSession}
import org.apache.spark.sql.types.{DataTypes, MetadataBuilder, StructField, StructType}
import org.junit.Assert.assertEquals

import java.sql.Timestamp
import java.time.{Instant, LocalDateTime, ZoneOffset}
import java.util
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

class TokenAggregatorTest {
val exampleString: String = "NetScreen row=[Root]system-notification-00257" +
"(traffic\uD83D\uDE41 start_time=\"2022-09-02 10:13:40\"" +
" duration=0 policy_id=320000 service=tcp/port:8151 proto=6" +
" src zone=Null dst zone=Null action=Deny sent=0 rcvd=40" +
" src=127.127.127.127 dst=127.0.0.1 src_port=52362" +
" dst_port=8151 session_id=0 reason=Traffic Denied"

val amount: Long = 10

val testSchema: StructType = new StructType(
Array[StructField]
Expand All @@ -75,9 +86,9 @@ class TokenAggregatorTest {
val sparkSession = SparkSession.builder.master("local[*]").getOrCreate
val sqlContext = sparkSession.sqlContext
sparkSession.sparkContext.setLogLevel("ERROR")

val encoder = RowEncoder.apply(testSchema)
val rowMemoryStream = new MemoryStream[Row](1,sqlContext)(encoder)

var rowDataset = rowMemoryStream.toDF

val tokenAggregator = new TokenAggregator("_raw")
Expand All @@ -94,59 +105,54 @@ class TokenAggregatorTest {
while (streamingQuery.isActive) {
val time = Timestamp.valueOf(LocalDateTime.ofInstant(Instant.now, ZoneOffset.UTC))
rowMemoryStream.addData(
makeRows(time, String.valueOf(run), 10))
makeRows(time, String.valueOf(run)))

run += 1

if (run == 10) {
streamingQuery.processAllAvailable
sparkSession.sql("SELECT * FROM TokenAggregatorQuery").show(100)
streamingQuery.stop
streamingQuery.awaitTermination()
}
}
}

@org.junit.jupiter.api.Test
def testTokenBuffer(): Unit = {
val buffer1 = new TokenBuffer
val buffer2 = new TokenBuffer

// Test no duplicates
buffer1.addKey("one")
buffer1.addKey("one")

buffer2.addKey("two")
buffer2.addKey("three")

assert(buffer1.getSize == 1)

buffer1.mergeBuffer(buffer2.getBuffer)
assert(buffer1.getSize == 3)

val finalResult = sqlContext.sql("SELECT tokens FROM TokenAggregatorQuery").collectAsList()
println(finalResult.size())
println(finalResult)
}

private def makeRows(time: Timestamp, partition: String, amount: Long): Seq[Row] = {
private def makeRows(time: Timestamp, partition: String): Seq[Row] = {

val rowList: ArrayBuffer[Row] = new ArrayBuffer[Row]
val rowData = generateRawData()

for (i <- 0 until amount.toInt) {
val row = RowFactory.create(time,
exampleString,
"topic",
"stream",
"host",
"input",
partition,
"0L")

val row = RowFactory.create(
time,
"data Data",
"topic",
"stream",
"host",
"input",
partition,
"0L")

var temp = amount
while (temp > 0) {
rowList += row
temp -= 1
}
rowList
}

private def generateRawData(): Array[String] = {
val testDataList = new Array[String](amount.toInt)

for (i <- testDataList.indices) {
val randomVal = Math.floor(Math.random() * 999)
val text = "ip=172.17.255."+randomVal+",port=8080,session_id=46889"
testDataList.update(i, text)

}
testDataList
}

private def startStream(rowDataset: Dataset[Row]): StreamingQuery =
rowDataset.writeStream.queryName("TokenAggregatorQuery")
.outputMode("complete")
Expand Down
72 changes: 72 additions & 0 deletions src/test/scala/TokenBufferTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Teragrep Tokenizer DPF-03
* Copyright (C) 2019, 2020, 2021, 2022, 2023 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://github.com/teragrep/teragrep/blob/main/LICENSE>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/

import com.teragrep.blf_01.Tokenizer
import com.teragrep.functions.dpf_03.TokenBuffer

import java.io.{ByteArrayInputStream, InputStream}
import java.nio.charset.StandardCharsets

class TokenBufferTest {

@org.junit.jupiter.api.Test
def testNoDuplicateKeys(): Unit = {

val tokenizer: Tokenizer = new Tokenizer

val tokenBuffer: TokenBuffer = new TokenBuffer

val input: String = "one,one"

val is: InputStream = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8))

tokenizer.tokenize(is).forEach(token => tokenBuffer.addKey(token))

// "one" and ","
assert(tokenBuffer.getSize == 2)

}
}

0 comments on commit 949223a

Please sign in to comment.