From b2bbae897d7308092b10046029f3d7b37b4d755d Mon Sep 17 00:00:00 2001 From: Ioanna Kiriakidou Date: Wed, 25 Jul 2018 00:25:16 +0300 Subject: [PATCH 01/12] created class Cache.scala /* * Cache class keeps a hashmap where it * stores the number of the bucket that holds * the major coreset and the coreset itself * r is the merge threshold in this case * we use r = 2 since we implemented an 2-way * coreset tree */ --- .../streamdm/clusterers/clusters/Cache.scala | 99 +++++++++++++++++++ 1 file changed, 99 insertions(+) create mode 100644 src/main/scala/org/apache/spark/streamdm/clusterers/clusters/Cache.scala diff --git a/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/Cache.scala b/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/Cache.scala new file mode 100644 index 00000000..13b1a5bc --- /dev/null +++ b/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/Cache.scala @@ -0,0 +1,99 @@ +package org.apache.spark.streamdm.clusterers.clusters +import org.apache.spark.streamdm.core.Example +import scala.collection.mutable.Queue +import scala.collection.mutable.HashMap + +/* + * Cache class keeps a hashmap where it + * stores the number of the bucket that holds + * the major coreset and the coreset itself + * r is the merge threshold in this case + * we use r = 2 since we implemented an 2-way + * coreset tree + */ + + +class Cache(val r: Int) extends Serializable { + + val cacheMap: HashMap[Int,Array[Example]] = new HashMap[Int,Array[Example]]() + var counter: Int = 0 // the number of the buckets + + def size: Int = cacheMap.size + + def incrementsCounter(): Unit = { + counter += 1 + } + + def getCounter: Int = counter + /* + * insert the major coreset in the hashmap + * filter the coresets that exists in the partsum list + */ + def insertCoreset(n: Int,coreset:Array[Example] ): Unit = { + cacheMap.put(n,coreset) + val partsumList = partSum(n) + val filteredCache = cacheMap.filter{ case (k, _) => partsumList.contains(k) } + } + + + def removeCoreset(n: Int): Unit = { + cacheMap.remove(n) + } + + def getCoreset(n: Int): Array[Example] = cacheMap.getOrElse(n, null) + + /** + * return the partial sums of number n + * + * @param n + * @return + */ + private def partSum(n: Int) = { + val queue = new Queue[Int]() + val sumQueue = new Queue[Int]() + var weight = 1 + var num = n + while (num > 0) { + val a = num % r + num = num / r + if (a != 0) queue.enqueue(a * weight) + weight = weight * r + } + var temp = 0 + for(i<- queue.size - 1 to 1 by -1 ) { + temp += queue(i) + sumQueue.enqueue(temp) + } + sumQueue + } + + def major(n: Int): Int = n - minor(n) + + def minor(n: Int): Int = { + var num = n + if (num == 0) return 0 + var weight = 1 + while (num%r == 0){ + num = num / r + weight = weight * r + } + val minor = weight * (num % r) + minor + } + + /** + * Start from level 0 + * + * @param n + * @return + */ + def minorLevel(n: Int): Int = { + var level = 0 + var num = n + while (num % r ==0) { + num = num / r + level += 1 + } + level + } +} From d5201d74bda83fc03f02e2908e0b86f55fd1365c Mon Sep 17 00:00:00 2001 From: Ioanna Kiriakidou Date: Wed, 25 Jul 2018 00:29:52 +0300 Subject: [PATCH 02/12] created CachedKM class /** * Implements the CachedKM++ algorithm for data streams. CachedKM++ computes a * small (weighted) sample of the stream by using coresets from cache * to reduce the number of coresets needed for a merge step, and then uses * it as an input to a k-means++ algorithm. It uses a data structure called * BucketManager to handle the coresets. * *

It uses the following options: *

*/ --- .../spark/streamdm/clusterers/CachedKM.scala | 108 ++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 src/main/scala/org/apache/spark/streamdm/clusterers/CachedKM.scala diff --git a/src/main/scala/org/apache/spark/streamdm/clusterers/CachedKM.scala b/src/main/scala/org/apache/spark/streamdm/clusterers/CachedKM.scala new file mode 100644 index 00000000..96d8b5c8 --- /dev/null +++ b/src/main/scala/org/apache/spark/streamdm/clusterers/CachedKM.scala @@ -0,0 +1,108 @@ +package org.apache.spark.streamdm.clusterers + +import org.apache.spark.streamdm.clusterers.clusters._ +import org.apache.spark.streamdm.clusterers.utils._ +import org.apache.spark.streamdm.core._ +import org.apache.spark.streaming.dstream._ +import com.github.javacliparser._ +import org.apache.spark.streamdm.core.specification.ExampleSpecification +import org.apache.spark.streaming.StreamingContext + +import scala.collection.mutable + +/** + * Implements the CachedKM++ algorithm for data streams. CachedKM++ computes a + * small (weighted) sample of the stream by using coresets from cache + * to reduce the number of coresets needed for a merge step, and then uses + * it as an input to a k-means++ algorithm. It uses a data structure called + * BucketManager to handle the coresets. + * + *

It uses the following options: + *

+ */ +class CachedKM extends Clusterer { + + type T = BucketManager + + var bucketmanager: BucketManager = null + var numInstances: Long = 0 + var initialBuffer: Array[Example] = Array[Example]() + var clusters: Array[Example] = null + + val kOption: IntOption = new IntOption("numClusters", 'k', + "Number of clusters for output", 10, 1, Integer.MAX_VALUE) + + val repOption: IntOption = new IntOption("kMeansIters", 'i', + "Number of k-means iterations", 1000, 1, Integer.MAX_VALUE) + + val sizeCoresetOption: IntOption = new IntOption("sizeCoreset", 's', + "Size of coreset", 10000, 1, Integer.MAX_VALUE) + + val widthOption: IntOption = new IntOption("width", + 'w', "Size of window for training learner.", 100000, 1, Integer.MAX_VALUE); + + var exampleLearnerSpecification: ExampleSpecification = null + + /** + * Init the StreamKM++ algorithm. + */ + def init(exampleSpecification: ExampleSpecification) : Unit = { + exampleLearnerSpecification = exampleSpecification + bucketmanager = new BucketManager(widthOption.getValue, sizeCoresetOption.getValue) + } + + /** + * Maintain the BucketManager for coreset extraction, given an input DStream of Example. + * @param input a stream of instances + */ + def train(input: DStream[Example]): Unit = { + //do nothing + } + + /** + * Gets the current Model used for the Learner. + * @return the Model object used for training + */ + def getModel: BucketManager = bucketmanager + + /** + * Get the currently computed clusters + * @return an Array of Examples representing the clusters + */ + def getClusters: Array[Example] = { + if (clusters==null) Array[Example]() else clusters + } + + /** + * Assigns examples to clusters, given the current Clusters data structure. + * @param input the DStream of Examples to be assigned a cluster + * @return a DStream of tuples containing the original Example and the + * assigned cluster. + */ + def assign(input: DStream[Example]): DStream[(Example,Double)] = { + input.map(ex=> { + numInstances += 1 + bucketmanager = bucketmanager.updateWithCache(ex) + if(numInstances <= sizeCoresetOption.getValue){ + clusters = KMeans.cluster(bucketmanager.buckets(0).points.toArray,kOption.getValue,repOption.getValue) + } + else + { + val streamingCoreset = bucketmanager.getCachedCoreset + clusters = KMeans.cluster(streamingCoreset,kOption.getValue,repOption.getValue) + } + + val assignedCl = clusters.foldLeft((0, Double.MaxValue, 0))( + (cl, centr) => { + val dist = centr.in.distanceTo(ex.in) + if (dist < cl._2) ((cl._3, dist, cl._3 + 1)) + else ((cl._1, cl._2, cl._3 + 1)) + })._1 + (ex,assignedCl) + }) + } +} From 0589aeb750f60a2d56a639d46f339b13f6be953e Mon Sep 17 00:00:00 2001 From: Ioanna Kiriakidou Date: Wed, 25 Jul 2018 00:37:52 +0300 Subject: [PATCH 03/12] added updatedWithCache and getCachedCoreset --- .../clusterers/clusters/BucketManager.scala | 113 +++++++++++++++++- 1 file changed, 112 insertions(+), 1 deletion(-) diff --git a/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala b/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala index e9f4f5a9..ded82171 100755 --- a/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala +++ b/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala @@ -31,7 +31,8 @@ import scala.util.control.Breaks._ class BucketManager(val n : Int, val maxsize : Int) extends Clusters { type T = BucketManager - + val cacheMap: Cache = new Cache(2) + var counter: Int = 0 /** * Inner class Bucket for new instance management, this class has two buffers for * recursively computing the coresets. @@ -109,6 +110,81 @@ class BucketManager(val n : Int, val maxsize : Int) extends Clusters { this } + def updateWithCache(change: Example): BucketManager = { + // Check if there is enough space in the first bucket + var majorCoreset = Array[Example]() + if (buckets(0).isFull) { + var curbucket: Int = 0 + var nextbucket: Int = 1 + // Check if the next bucket is empty + if (!buckets(nextbucket).isFull) { + // Copy curbucket points to nextbucket points + val backpoints = buckets(curbucket).points.clone() + for (point <- backpoints) buckets(nextbucket).points.enqueue(point) + // Clear curbucket to empty + buckets(curbucket).points.clear() + counter = nextbucket + + } else { + // Copy curbucket points to nextbucket spillover and continue + val backspillover = buckets(curbucket).points.clone() + buckets(nextbucket).spillover.clear() + for (point <- backspillover) buckets(nextbucket).spillover.enqueue(point) + // Clear curbucket to empty + buckets(curbucket).points.clear() + curbucket += 1 + nextbucket += 1 + /* + * As long as the nextbucket is full, output the coreset to the spillover + * of the next bucket + */ + while (buckets(nextbucket).isFull) { + val examples = (buckets(curbucket).points union buckets(curbucket).spillover).toArray + val tree = new TreeCoreset + val coreset = tree.retrieveCoreset(tree.buildCoresetTree(examples, maxsize), + new Array[Example](0)) + // Copy coreset to nextbucket spillover + buckets(nextbucket).spillover.clear() + for (point <- coreset) buckets(nextbucket).spillover.enqueue(point) + // Clear curbucket + buckets(curbucket).points.clear() + buckets(curbucket).spillover.clear() + curbucket += 1 + nextbucket += 1 + cacheMap.incrementsCounter() + counter = nextbucket + } + val examples = (buckets(curbucket).points union buckets(curbucket).spillover).toArray + val tree = new TreeCoreset + //caching + //compute the minor and major numbers to select + //which coresets are going to be inserted in cache (major) + //and which are going to be pulled from the tree (minor) + val minor = cacheMap.minor(curbucket) + val major = curbucket - minor + val minorLevel = cacheMap.minorLevel(curbucket) + if (major != 0) { + majorCoreset = cacheMap.getCoreset(major) + } + /* Merge minor coreset (aka coreset) with major coreset + * examples have the lower level of the tree + * then we add major coreset-if that exists- to extract a coreset + * and push it in the cache + */ + val exPlusMajor = examples++majorCoreset + val mergedCoreset = tree.retrieveCoreset(tree.buildCoresetTree(exPlusMajor, maxsize), new Array[Example](0)) + buckets(nextbucket).points.clear() + cacheMap.insertCoreset(curbucket, mergedCoreset) + for (point <- mergedCoreset) buckets(nextbucket).points.enqueue(point) + // Clear curbucket + buckets(curbucket).points.clear() + buckets(curbucket).spillover.clear() + } + } + buckets(0).points.enqueue(change) + this + } + /** * Return an array of weighted examples corresponding to the coreset extracted from * the TreeCoreset data structure, in order to be used in k-means. @@ -143,4 +219,39 @@ class BucketManager(val n : Int, val maxsize : Int) extends Clusters { coreset } } + + /* Retrieve a coreset from the cache */ + def getCachedCoreset: Array[Example] = { + var majorCoreset = Array[Example]() + var coreset = Array[Example]() + var cnt = cacheMap.getCounter + if (cnt != 0) { + // smart start + if (cacheMap.getCoreset(cnt) != null) { + coreset = cacheMap.getCoreset(cnt) + coreset + } + // smart end + else { + val minor = cacheMap.minor(cnt) + val major = cnt - minor + + + val minorLevel = cacheMap.minorLevel(cnt) + if (major != 0) { + majorCoreset = cacheMap.getCoreset(major) + } + val examples = buckets(minorLevel).points.toArray ++ majorCoreset + val tree = new TreeCoreset + coreset = tree.retrieveCoreset(tree.buildCoresetTree(examples, maxsize),new Array[Example](0)) + cacheMap.insertCoreset(cnt, coreset) + coreset + } + } + coreset + } + + + + } From 706d0e0e4a57c1fcfc2558c63b787ad28b597bbd Mon Sep 17 00:00:00 2001 From: Ioanna Kiriakidou Date: Wed, 25 Jul 2018 00:38:26 +0300 Subject: [PATCH 04/12] Update BucketManager.scala --- .../spark/streamdm/clusterers/clusters/BucketManager.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala b/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala index ded82171..e29f9e53 100755 --- a/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala +++ b/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala @@ -249,9 +249,5 @@ class BucketManager(val n : Int, val maxsize : Int) extends Clusters { } } coreset - } - - - - + } } From ae1bbfaca7c6b9623e3832fb2a9aefb1eaea14fb Mon Sep 17 00:00:00 2001 From: Ioanna Kiriakidou Date: Fri, 10 Aug 2018 17:13:51 +0300 Subject: [PATCH 05/12] Update StreamKM.scala --- .../spark/streamdm/clusterers/StreamKM.scala | 124 +++++++++--------- 1 file changed, 62 insertions(+), 62 deletions(-) diff --git a/src/main/scala/org/apache/spark/streamdm/clusterers/StreamKM.scala b/src/main/scala/org/apache/spark/streamdm/clusterers/StreamKM.scala index 59e475e9..7ac0f5dc 100644 --- a/src/main/scala/org/apache/spark/streamdm/clusterers/StreamKM.scala +++ b/src/main/scala/org/apache/spark/streamdm/clusterers/StreamKM.scala @@ -25,96 +25,96 @@ import com.github.javacliparser._ import org.apache.spark.streamdm.core.specification.ExampleSpecification /** - * Implements the StreamKM++ algorithm for data streams. StreamKM++ computes a - * small (weighted) sample of the stream by using coresets, and then uses - * it as an input to a k-means++ algorithm. It uses a data structure called - * BucketManager to handle the coresets. - * - *

It uses the following options: - *

    - *
  • Number of microclusters (-m) - *
  • Initial buffer size (-b) - *
  • Size of coresets (-s) - *
  • Learning window (-w) *
- */ + * Implements the StreamKM++ algorithm for data streams. StreamKM++ computes a + * small (weighted) sample of the stream by using coresets, and then uses + * it as an input to a k-means++ algorithm. It uses a data structure called + * BucketManager to handle the coresets. + * + *

It uses the following options: + *

    + *
  • Number of microclusters (-m) + *
  • Initial buffer size (-b) + *
  • Size of coresets (-s) + *
  • Learning window (-w) *
+ */ class StreamKM extends Clusterer { - - type T = BucketManager + type T = BucketManager var bucketmanager: BucketManager = null var numInstances: Long = 0 - var initialBuffer: Array[Example] = Array[Example]() - + var clusters: Array[Example] = null + val kOption: IntOption = new IntOption("numClusters", 'k', "Number of clusters for output", 10, 1, Integer.MAX_VALUE) - + val repOption: IntOption = new IntOption("kMeansIters", 'i', "Number of k-means iterations", 1000, 1, Integer.MAX_VALUE) val sizeCoresetOption: IntOption = new IntOption("sizeCoreset", 's', "Size of coreset", 10000, 1, Integer.MAX_VALUE) - + val widthOption: IntOption = new IntOption("width", - 'w', "Size of window for training learner.", 100000, 1, Integer.MAX_VALUE); - + 'w', "Size of window for training learner.", 100000, 1, Integer.MAX_VALUE); + var exampleLearnerSpecification: ExampleSpecification = null - - /** - * Init the StreamKM++ algorithm. - */ + + /** + * Init the StreamKM++ algorithm. + */ def init(exampleSpecification: ExampleSpecification) : Unit = { exampleLearnerSpecification = exampleSpecification bucketmanager = new BucketManager(widthOption.getValue, sizeCoresetOption.getValue) } - - /** - * Maintain the BucketManager for coreset extraction, given an input DStream of Example. - * @param input a stream of instances - */ - def train(input: DStream[Example]): Unit = { - input.foreachRDD(rdd => { - rdd.foreach(ex => { - bucketmanager = bucketmanager.update(ex) - numInstances += 1 - }) - }) - } - + + + def train(input: DStream[Example]): Unit = {} + /** - * Gets the current Model used for the Learner. - * @return the Model object used for training - */ + * Gets the current Model used for the Learner. + * @return the Model object used for training + */ def getModel: BucketManager = bucketmanager - - /** - * Get the currently computed clusters - * @return an Array of Examples representing the clusters - */ + + /** + * Get the currently computed clusters + * @return an Array of Examples representing the clusters + */ def getClusters: Array[Example] = { if(numInstances <= sizeCoresetOption.getValue) { bucketmanager.buckets(0).points.toArray - } + } else { - val streamingCoreset = bucketmanager.getCoreset - KMeans.cluster(streamingCoreset, kOption.getValue, repOption.getValue) + val streamingCoreset = bucketmanager.getCoreset + KMeans.cluster(streamingCoreset, kOption.getValue, repOption.getValue) } } - + /** - * Assigns examples to clusters, given the current Clusters data structure. - * @param input the DStream of Examples to be assigned a cluster - * @return a DStream of tuples containing the original Example and the - * assigned cluster. - */ + * Maintain the BucketManager for coreset extraction, given an input DStream of Example. + * @param input a stream of instances + * @return a DStream of tuples containing the original Example and the + * assigned cluster. + */ def assign(input: DStream[Example]): DStream[(Example,Double)] = { - input.map(x => { - val assignedCl = getClusters.foldLeft((0,Double.MaxValue,0))( - (cl,centr) => { - val dist = centr.in.distanceTo(x.in) - if(dist { + numInstances += 1 + bucketmanager = bucketmanager.update(ex) + if(numInstances <= sizeCoresetOption.getValue){ + clusters = KMeans.cluster(bucketmanager.buckets(0).points.toArray,kOption.getValue,repOption.getValue) + } + else + { + val streamingCoreset = bucketmanager.getCoreset + clusters = KMeans.cluster(streamingCoreset,kOption.getValue,repOption.getValue) + } + + val assignedCl = clusters.foldLeft((0, Double.MaxValue, 0))( + (cl, centr) => { + val dist = centr.in.distanceTo(ex.in) + if (dist < cl._2) ((cl._3, dist, cl._3 + 1)) + else ((cl._1, cl._2, cl._3 + 1)) })._1 - (x,assignedCl) + (ex,assignedCl) }) } } From 4378ebd26d924f2c41a6127e6ff4cbf4467d4ee3 Mon Sep 17 00:00:00 2001 From: Ioanna Kiriakidou Date: Fri, 10 Aug 2018 17:14:54 +0300 Subject: [PATCH 06/12] Update BucketManager.scala --- .../clusterers/clusters/BucketManager.scala | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala b/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala index e29f9e53..b565a5e3 100755 --- a/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala +++ b/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala @@ -19,7 +19,6 @@ package org.apache.spark.streamdm.clusterers.clusters import org.apache.spark.streamdm.core._ import scala.collection.mutable.Queue -import scala.util.control.Breaks._ /** @@ -198,23 +197,25 @@ class BucketManager(val n : Int, val maxsize : Int) extends Clusters { * @return the coreset for the examples entered into the buckets. */ def getCoreset: Array[Example] = { - if(buckets(L-1).isFull) { - buckets(L-1).points.toArray - }else { + var isFound: Boolean = false + if (buckets(L - 1).isFull) { + buckets(L - 1).points.toArray + } else { var i = 0 var coreset = Array[Example]() - for(i <- 0 until L) { - if(buckets(i).isFull) { + + for (i <- 0 until L) { + if (buckets(i).isFull && isFound == false) { coreset = buckets(i).points.toArray - break + isFound=true } } - val start = i+1 - for(j <- start until L) { + val start = i + 1 + for (j <- start until L) { val examples = buckets(j).points.toArray ++ coreset val tree = new TreeCoreset coreset = tree.retrieveCoreset(tree.buildCoresetTree(examples, maxsize), - new Array[Example](0)) + new Array[Example](0)) } coreset } From 567d3fc37da059309b0e006b0ccabedb8e388cdf Mon Sep 17 00:00:00 2001 From: Ioanna Kiriakidou Date: Wed, 22 Aug 2018 19:45:13 +0300 Subject: [PATCH 07/12] Delete CachedKM.scala --- .../spark/streamdm/clusterers/CachedKM.scala | 108 ------------------ 1 file changed, 108 deletions(-) delete mode 100644 src/main/scala/org/apache/spark/streamdm/clusterers/CachedKM.scala diff --git a/src/main/scala/org/apache/spark/streamdm/clusterers/CachedKM.scala b/src/main/scala/org/apache/spark/streamdm/clusterers/CachedKM.scala deleted file mode 100644 index 96d8b5c8..00000000 --- a/src/main/scala/org/apache/spark/streamdm/clusterers/CachedKM.scala +++ /dev/null @@ -1,108 +0,0 @@ -package org.apache.spark.streamdm.clusterers - -import org.apache.spark.streamdm.clusterers.clusters._ -import org.apache.spark.streamdm.clusterers.utils._ -import org.apache.spark.streamdm.core._ -import org.apache.spark.streaming.dstream._ -import com.github.javacliparser._ -import org.apache.spark.streamdm.core.specification.ExampleSpecification -import org.apache.spark.streaming.StreamingContext - -import scala.collection.mutable - -/** - * Implements the CachedKM++ algorithm for data streams. CachedKM++ computes a - * small (weighted) sample of the stream by using coresets from cache - * to reduce the number of coresets needed for a merge step, and then uses - * it as an input to a k-means++ algorithm. It uses a data structure called - * BucketManager to handle the coresets. - * - *

It uses the following options: - *

    - *
  • Number of microclusters (-m) - *
  • Initial buffer size (-b) - *
  • Size of coresets (-s) - *
  • Learning window (-w) *
- */ -class CachedKM extends Clusterer { - - type T = BucketManager - - var bucketmanager: BucketManager = null - var numInstances: Long = 0 - var initialBuffer: Array[Example] = Array[Example]() - var clusters: Array[Example] = null - - val kOption: IntOption = new IntOption("numClusters", 'k', - "Number of clusters for output", 10, 1, Integer.MAX_VALUE) - - val repOption: IntOption = new IntOption("kMeansIters", 'i', - "Number of k-means iterations", 1000, 1, Integer.MAX_VALUE) - - val sizeCoresetOption: IntOption = new IntOption("sizeCoreset", 's', - "Size of coreset", 10000, 1, Integer.MAX_VALUE) - - val widthOption: IntOption = new IntOption("width", - 'w', "Size of window for training learner.", 100000, 1, Integer.MAX_VALUE); - - var exampleLearnerSpecification: ExampleSpecification = null - - /** - * Init the StreamKM++ algorithm. - */ - def init(exampleSpecification: ExampleSpecification) : Unit = { - exampleLearnerSpecification = exampleSpecification - bucketmanager = new BucketManager(widthOption.getValue, sizeCoresetOption.getValue) - } - - /** - * Maintain the BucketManager for coreset extraction, given an input DStream of Example. - * @param input a stream of instances - */ - def train(input: DStream[Example]): Unit = { - //do nothing - } - - /** - * Gets the current Model used for the Learner. - * @return the Model object used for training - */ - def getModel: BucketManager = bucketmanager - - /** - * Get the currently computed clusters - * @return an Array of Examples representing the clusters - */ - def getClusters: Array[Example] = { - if (clusters==null) Array[Example]() else clusters - } - - /** - * Assigns examples to clusters, given the current Clusters data structure. - * @param input the DStream of Examples to be assigned a cluster - * @return a DStream of tuples containing the original Example and the - * assigned cluster. - */ - def assign(input: DStream[Example]): DStream[(Example,Double)] = { - input.map(ex=> { - numInstances += 1 - bucketmanager = bucketmanager.updateWithCache(ex) - if(numInstances <= sizeCoresetOption.getValue){ - clusters = KMeans.cluster(bucketmanager.buckets(0).points.toArray,kOption.getValue,repOption.getValue) - } - else - { - val streamingCoreset = bucketmanager.getCachedCoreset - clusters = KMeans.cluster(streamingCoreset,kOption.getValue,repOption.getValue) - } - - val assignedCl = clusters.foldLeft((0, Double.MaxValue, 0))( - (cl, centr) => { - val dist = centr.in.distanceTo(ex.in) - if (dist < cl._2) ((cl._3, dist, cl._3 + 1)) - else ((cl._1, cl._2, cl._3 + 1)) - })._1 - (ex,assignedCl) - }) - } -} From f9e70f4264d093d922469738b904a61f8e0716d9 Mon Sep 17 00:00:00 2001 From: Ioanna Kiriakidou Date: Wed, 22 Aug 2018 19:48:11 +0300 Subject: [PATCH 08/12] Update BucketManager.scala --- .../clusterers/clusters/BucketManager.scala | 104 ------------------ 1 file changed, 104 deletions(-) diff --git a/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala b/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala index b565a5e3..7287fe67 100755 --- a/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala +++ b/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala @@ -109,81 +109,6 @@ class BucketManager(val n : Int, val maxsize : Int) extends Clusters { this } - def updateWithCache(change: Example): BucketManager = { - // Check if there is enough space in the first bucket - var majorCoreset = Array[Example]() - if (buckets(0).isFull) { - var curbucket: Int = 0 - var nextbucket: Int = 1 - // Check if the next bucket is empty - if (!buckets(nextbucket).isFull) { - // Copy curbucket points to nextbucket points - val backpoints = buckets(curbucket).points.clone() - for (point <- backpoints) buckets(nextbucket).points.enqueue(point) - // Clear curbucket to empty - buckets(curbucket).points.clear() - counter = nextbucket - - } else { - // Copy curbucket points to nextbucket spillover and continue - val backspillover = buckets(curbucket).points.clone() - buckets(nextbucket).spillover.clear() - for (point <- backspillover) buckets(nextbucket).spillover.enqueue(point) - // Clear curbucket to empty - buckets(curbucket).points.clear() - curbucket += 1 - nextbucket += 1 - /* - * As long as the nextbucket is full, output the coreset to the spillover - * of the next bucket - */ - while (buckets(nextbucket).isFull) { - val examples = (buckets(curbucket).points union buckets(curbucket).spillover).toArray - val tree = new TreeCoreset - val coreset = tree.retrieveCoreset(tree.buildCoresetTree(examples, maxsize), - new Array[Example](0)) - // Copy coreset to nextbucket spillover - buckets(nextbucket).spillover.clear() - for (point <- coreset) buckets(nextbucket).spillover.enqueue(point) - // Clear curbucket - buckets(curbucket).points.clear() - buckets(curbucket).spillover.clear() - curbucket += 1 - nextbucket += 1 - cacheMap.incrementsCounter() - counter = nextbucket - } - val examples = (buckets(curbucket).points union buckets(curbucket).spillover).toArray - val tree = new TreeCoreset - //caching - //compute the minor and major numbers to select - //which coresets are going to be inserted in cache (major) - //and which are going to be pulled from the tree (minor) - val minor = cacheMap.minor(curbucket) - val major = curbucket - minor - val minorLevel = cacheMap.minorLevel(curbucket) - if (major != 0) { - majorCoreset = cacheMap.getCoreset(major) - } - /* Merge minor coreset (aka coreset) with major coreset - * examples have the lower level of the tree - * then we add major coreset-if that exists- to extract a coreset - * and push it in the cache - */ - val exPlusMajor = examples++majorCoreset - val mergedCoreset = tree.retrieveCoreset(tree.buildCoresetTree(exPlusMajor, maxsize), new Array[Example](0)) - buckets(nextbucket).points.clear() - cacheMap.insertCoreset(curbucket, mergedCoreset) - for (point <- mergedCoreset) buckets(nextbucket).points.enqueue(point) - // Clear curbucket - buckets(curbucket).points.clear() - buckets(curbucket).spillover.clear() - } - } - buckets(0).points.enqueue(change) - this - } - /** * Return an array of weighted examples corresponding to the coreset extracted from * the TreeCoreset data structure, in order to be used in k-means. @@ -221,34 +146,5 @@ class BucketManager(val n : Int, val maxsize : Int) extends Clusters { } } - /* Retrieve a coreset from the cache */ - def getCachedCoreset: Array[Example] = { - var majorCoreset = Array[Example]() - var coreset = Array[Example]() - var cnt = cacheMap.getCounter - if (cnt != 0) { - // smart start - if (cacheMap.getCoreset(cnt) != null) { - coreset = cacheMap.getCoreset(cnt) - coreset - } - // smart end - else { - val minor = cacheMap.minor(cnt) - val major = cnt - minor - - val minorLevel = cacheMap.minorLevel(cnt) - if (major != 0) { - majorCoreset = cacheMap.getCoreset(major) - } - val examples = buckets(minorLevel).points.toArray ++ majorCoreset - val tree = new TreeCoreset - coreset = tree.retrieveCoreset(tree.buildCoresetTree(examples, maxsize),new Array[Example](0)) - cacheMap.insertCoreset(cnt, coreset) - coreset - } - } - coreset - } } From 6d95c27f0e8aa6fdff8157bc72e142c1a1c668c9 Mon Sep 17 00:00:00 2001 From: Ioanna Kiriakidou Date: Wed, 22 Aug 2018 19:52:20 +0300 Subject: [PATCH 09/12] updated treeCoreset.scala prevent sum from NaN when funcost is zero. splitCoresetTree needed more cases when leaf has no elements --- .../clusterers/clusters/TreeCoreset.scala | 62 ++++++++++++++----- 1 file changed, 46 insertions(+), 16 deletions(-) diff --git a/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/TreeCoreset.scala b/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/TreeCoreset.scala index c0a3208c..bec85ff4 100755 --- a/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/TreeCoreset.scala +++ b/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/TreeCoreset.scala @@ -82,17 +82,23 @@ class TreeCoreset { * Select a new centre from the leaf node for splitting. * @return the new centre */ - def chooseCentre() : Example = { + def chooseCentre(): Example = { val funcost = this.weightedLeaf().cost - val points = elem.points + val points = this.elem.points var sum = 0.0 + val point = points.find(e => { + if (funcost == 0) false + else { + sum += costOfPoint(e) / funcost + if (sum >= Random.nextDouble) { + if (e.weight > 0.0) true + else false + } + else false + } + }) - for(point <- points) { - sum += costOfPoint(point)/funcost - if(sum >= Random.nextDouble) - return point - } - elem.centre + point.getOrElse(elem.centre) } } @@ -159,15 +165,39 @@ class TreeCoreset { splitCoresetTreeLeaf(CoresetTreeLeaf(e, c)) } case CoresetTreeNode(e, l, r, c) => { - if (Random.nextDouble > 0.5) { - val lchild = splitCoresetTree(l) - val newcost = lchild.cost + r.cost - CoresetTreeNode(e, lchild, r, newcost) + if(l.cost == 0 && r.cost == 0) { + if (l.elem.n == 0) { + val rchild = splitCoresetTree(r) + val newcost = l.cost + rchild.cost + CoresetTreeNode(e, l, rchild, newcost) + } + if (r.elem.n == 0) { + val lchild = splitCoresetTree(l) + val newcost = lchild.cost + r.cost + CoresetTreeNode(e, lchild, r, newcost) + } + else if (Random.nextDouble > 0.5) { + val lchild = splitCoresetTree(l) + val newcost = lchild.cost + r.cost + CoresetTreeNode(e, lchild, r, newcost) + } + else { + val rchild = splitCoresetTree(r) + val newcost = l.cost + rchild.cost + CoresetTreeNode(e, l, rchild, newcost) + } } - else { - val rchild = splitCoresetTree(r) - val newcost = l.cost + rchild.cost - CoresetTreeNode(e, l, rchild, newcost) + else + { + if(Random.nextDouble < l.cost/root.cost){ + val lchild = splitCoresetTree(l) + val newcost = lchild.cost + r.cost + CoresetTreeNode(e, lchild, r, newcost) + } else { + val rchild = splitCoresetTree(r) + val newcost = l.cost + rchild.cost + CoresetTreeNode(e, l, rchild, newcost) + } } } } From 57321ec0e920200784ea0cbcb9b02872b7b2dc8d Mon Sep 17 00:00:00 2001 From: Ioanna Kiriakidou Date: Wed, 22 Aug 2018 19:57:05 +0300 Subject: [PATCH 10/12] Delete Cache.scala --- .../streamdm/clusterers/clusters/Cache.scala | 99 ------------------- 1 file changed, 99 deletions(-) delete mode 100644 src/main/scala/org/apache/spark/streamdm/clusterers/clusters/Cache.scala diff --git a/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/Cache.scala b/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/Cache.scala deleted file mode 100644 index 13b1a5bc..00000000 --- a/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/Cache.scala +++ /dev/null @@ -1,99 +0,0 @@ -package org.apache.spark.streamdm.clusterers.clusters -import org.apache.spark.streamdm.core.Example -import scala.collection.mutable.Queue -import scala.collection.mutable.HashMap - -/* - * Cache class keeps a hashmap where it - * stores the number of the bucket that holds - * the major coreset and the coreset itself - * r is the merge threshold in this case - * we use r = 2 since we implemented an 2-way - * coreset tree - */ - - -class Cache(val r: Int) extends Serializable { - - val cacheMap: HashMap[Int,Array[Example]] = new HashMap[Int,Array[Example]]() - var counter: Int = 0 // the number of the buckets - - def size: Int = cacheMap.size - - def incrementsCounter(): Unit = { - counter += 1 - } - - def getCounter: Int = counter - /* - * insert the major coreset in the hashmap - * filter the coresets that exists in the partsum list - */ - def insertCoreset(n: Int,coreset:Array[Example] ): Unit = { - cacheMap.put(n,coreset) - val partsumList = partSum(n) - val filteredCache = cacheMap.filter{ case (k, _) => partsumList.contains(k) } - } - - - def removeCoreset(n: Int): Unit = { - cacheMap.remove(n) - } - - def getCoreset(n: Int): Array[Example] = cacheMap.getOrElse(n, null) - - /** - * return the partial sums of number n - * - * @param n - * @return - */ - private def partSum(n: Int) = { - val queue = new Queue[Int]() - val sumQueue = new Queue[Int]() - var weight = 1 - var num = n - while (num > 0) { - val a = num % r - num = num / r - if (a != 0) queue.enqueue(a * weight) - weight = weight * r - } - var temp = 0 - for(i<- queue.size - 1 to 1 by -1 ) { - temp += queue(i) - sumQueue.enqueue(temp) - } - sumQueue - } - - def major(n: Int): Int = n - minor(n) - - def minor(n: Int): Int = { - var num = n - if (num == 0) return 0 - var weight = 1 - while (num%r == 0){ - num = num / r - weight = weight * r - } - val minor = weight * (num % r) - minor - } - - /** - * Start from level 0 - * - * @param n - * @return - */ - def minorLevel(n: Int): Int = { - var level = 0 - var num = n - while (num % r ==0) { - num = num / r - level += 1 - } - level - } -} From 9b9e94414a4f8000b4d15e5f49e69cac2352a8d3 Mon Sep 17 00:00:00 2001 From: Ioanna Kiriakidou Date: Thu, 23 Aug 2018 13:44:30 +0300 Subject: [PATCH 11/12] reduce duplicates --- .../clusterers/clusters/TreeCoreset.scala | 58 +++++++++++-------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/TreeCoreset.scala b/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/TreeCoreset.scala index bec85ff4..426cf709 100755 --- a/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/TreeCoreset.scala +++ b/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/TreeCoreset.scala @@ -82,23 +82,16 @@ class TreeCoreset { * Select a new centre from the leaf node for splitting. * @return the new centre */ - def chooseCentre(): Example = { + def chooseCentre() : Example = { val funcost = this.weightedLeaf().cost - val points = this.elem.points + val points = elem.points var sum = 0.0 - val point = points.find(e => { - if (funcost == 0) false - else { - sum += costOfPoint(e) / funcost - if (sum >= Random.nextDouble) { - if (e.weight > 0.0) true - else false - } - else false + for (point <- points) { + sum += costOfPoint(point) / funcost + if (sum >= Random.nextDouble) + return point } - }) - - point.getOrElse(elem.centre) + elem.centre } } @@ -125,34 +118,49 @@ class TreeCoreset { * @param leaf coreset tree leaf for spliting * @return a coreset tree node with two leaves */ - private def splitCoresetTreeLeaf(leaf : CoresetTreeLeaf) : CoresetTreeNode = { - // Select a example from the points associated with the leaf as a new centre - // for one of the new leaf + private def splitCoresetTreeLeaf(leaf : CoresetTreeLeaf) : CoresetTreeNode = { + // Select a example from the points associated with the leaf as a new centre + // for one of the new leaf + val newcentre = leaf.chooseCentre + // The original centre as the other leaf centre val oldcentre = leaf.elem.centre // The points associated with the orignial leaf, the points will be assigned the new leaves val points = leaf.elem.points - // Assign points to leftpoints and rightpoints var leftpoints = new Array[Example](0) var rightpoints = new Array[Example](0) - for(point <- points) { - if(squaredDistance(point, newcentre) < squaredDistance(point, oldcentre)) + for (point <- points) { + if (squaredDistance(point, newcentre) < squaredDistance(point, oldcentre)) leftpoints = leftpoints :+ point else rightpoints = rightpoints :+ point } - - // Create new leaves + //prevent assigning all points to one child + //resplit points to leftpoints and rightpoints + if((leftpoints.length == 0 || rightpoints.length==0 ) && points.length>1){ + val newcentre = leaf.chooseCentre + var leftpoints = new Array[Example](0) + var rightpoints = new Array[Example](0) + for (point <- points) { + if (squaredDistance(point, newcentre) < squaredDistance(point, oldcentre)) + leftpoints = leftpoints :+ point + else + rightpoints = rightpoints :+ point + } + } + + // Create new leaves val leftElem = new CoresetTreeElem(leftpoints.length, leftpoints, newcentre) val leftleaf = CoresetTreeLeaf(leftElem, 0.0).weightedLeaf - + val rightElem = new CoresetTreeElem(rightpoints.length, rightpoints, oldcentre) val rightleaf = CoresetTreeLeaf(rightElem, 0.0).weightedLeaf - + // Return a coreset tree node with two leaves - new CoresetTreeNode(leaf.elem, leftleaf, rightleaf, leftleaf.cost+rightleaf.cost) + new CoresetTreeNode(leaf.elem, leftleaf, rightleaf, leftleaf.cost + rightleaf.cost) + } /** From 27d069f4829cc17afa7c10855b18eb912e1e5ab3 Mon Sep 17 00:00:00 2001 From: Ioanna Kiriakidou Date: Thu, 23 Aug 2018 13:46:14 +0300 Subject: [PATCH 12/12] Update BucketManager.scala --- .../spark/streamdm/clusterers/clusters/BucketManager.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala b/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala index 7287fe67..d3140664 100755 --- a/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala +++ b/src/main/scala/org/apache/spark/streamdm/clusterers/clusters/BucketManager.scala @@ -30,8 +30,6 @@ import scala.collection.mutable.Queue class BucketManager(val n : Int, val maxsize : Int) extends Clusters { type T = BucketManager - val cacheMap: Cache = new Cache(2) - var counter: Int = 0 /** * Inner class Bucket for new instance management, this class has two buffers for * recursively computing the coresets. @@ -57,7 +55,7 @@ class BucketManager(val n : Int, val maxsize : Int) extends Clusters { // Check if there is enough space in the first bucket if(buckets(0).isFull){ var curbucket : Int = 0 - var nextbucket : Int =1 + var nextbucket : Int = 1 // Check if the next bucket is empty if(!buckets(nextbucket).isFull) { // Copy curbucket points to nextbucket points