Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StreamDM-100 #101

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
124 changes: 62 additions & 62 deletions src/main/scala/org/apache/spark/streamdm/clusterers/StreamKM.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,96 +25,96 @@ import com.github.javacliparser._
import org.apache.spark.streamdm.core.specification.ExampleSpecification

/**
* Implements the StreamKM++ algorithm for data streams. StreamKM++ computes a
* small (weighted) sample of the stream by using <i>coresets</i>, and then uses
* it as an input to a k-means++ algorithm. It uses a data structure called
* <tt>BucketManager</tt> to handle the coresets.
*
* <p>It uses the following options:
* <ul>
* <li> Number of microclusters (<b>-m</b>)
* <li> Initial buffer size (<b>-b</b>)
* <li> Size of coresets (<b>-s</b>)
* <li> Learning window (<b>-w</b>) * </ul>
*/
* Implements the StreamKM++ algorithm for data streams. StreamKM++ computes a
* small (weighted) sample of the stream by using <i>coresets</i>, and then uses
* it as an input to a k-means++ algorithm. It uses a data structure called
* <tt>BucketManager</tt> to handle the coresets.
*
* <p>It uses the following options:
* <ul>
* <li> Number of microclusters (<b>-m</b>)
* <li> Initial buffer size (<b>-b</b>)
* <li> Size of coresets (<b>-s</b>)
* <li> Learning window (<b>-w</b>) * </ul>
*/
class StreamKM extends Clusterer {

type T = BucketManager

type T = BucketManager
var bucketmanager: BucketManager = null
var numInstances: Long = 0
var initialBuffer: Array[Example] = Array[Example]()
var clusters: Array[Example] = null

val kOption: IntOption = new IntOption("numClusters", 'k',
"Number of clusters for output", 10, 1, Integer.MAX_VALUE)

val repOption: IntOption = new IntOption("kMeansIters", 'i',
"Number of k-means iterations", 1000, 1, Integer.MAX_VALUE)

val sizeCoresetOption: IntOption = new IntOption("sizeCoreset", 's',
"Size of coreset", 10000, 1, Integer.MAX_VALUE)

val widthOption: IntOption = new IntOption("width",
'w', "Size of window for training learner.", 100000, 1, Integer.MAX_VALUE);
'w', "Size of window for training learner.", 100000, 1, Integer.MAX_VALUE);

var exampleLearnerSpecification: ExampleSpecification = null
/**
* Init the StreamKM++ algorithm.
*/

/**
* Init the StreamKM++ algorithm.
*/
def init(exampleSpecification: ExampleSpecification) : Unit = {
exampleLearnerSpecification = exampleSpecification
bucketmanager = new BucketManager(widthOption.getValue, sizeCoresetOption.getValue)
}

/**
* Maintain the BucketManager for coreset extraction, given an input DStream of Example.
* @param input a stream of instances
*/
def train(input: DStream[Example]): Unit = {
input.foreachRDD(rdd => {
rdd.foreach(ex => {
bucketmanager = bucketmanager.update(ex)
numInstances += 1
})
})
}



def train(input: DStream[Example]): Unit = {}

/**
* Gets the current Model used for the Learner.
* @return the Model object used for training
*/
* Gets the current Model used for the Learner.
* @return the Model object used for training
*/
def getModel: BucketManager = bucketmanager
/**
* Get the currently computed clusters
* @return an Array of Examples representing the clusters
*/

/**
* Get the currently computed clusters
* @return an Array of Examples representing the clusters
*/
def getClusters: Array[Example] = {
if(numInstances <= sizeCoresetOption.getValue) {
bucketmanager.buckets(0).points.toArray
}
}
else {
val streamingCoreset = bucketmanager.getCoreset
KMeans.cluster(streamingCoreset, kOption.getValue, repOption.getValue)
val streamingCoreset = bucketmanager.getCoreset
KMeans.cluster(streamingCoreset, kOption.getValue, repOption.getValue)
}
}

/**
* Assigns examples to clusters, given the current Clusters data structure.
* @param input the DStream of Examples to be assigned a cluster
* @return a DStream of tuples containing the original Example and the
* assigned cluster.
*/
* Maintain the BucketManager for coreset extraction, given an input DStream of Example.
* @param input a stream of instances
* @return a DStream of tuples containing the original Example and the
* assigned cluster.
*/
def assign(input: DStream[Example]): DStream[(Example,Double)] = {
input.map(x => {
val assignedCl = getClusters.foldLeft((0,Double.MaxValue,0))(
(cl,centr) => {
val dist = centr.in.distanceTo(x.in)
if(dist<cl._2) ((cl._3,dist,cl._3+1))
else ((cl._1,cl._2,cl._3+1))
input.map(ex=> {
numInstances += 1
bucketmanager = bucketmanager.update(ex)
if(numInstances <= sizeCoresetOption.getValue){
clusters = KMeans.cluster(bucketmanager.buckets(0).points.toArray,kOption.getValue,repOption.getValue)
}
else
{
val streamingCoreset = bucketmanager.getCoreset
clusters = KMeans.cluster(streamingCoreset,kOption.getValue,repOption.getValue)
}

val assignedCl = clusters.foldLeft((0, Double.MaxValue, 0))(
(cl, centr) => {
val dist = centr.in.distanceTo(ex.in)
if (dist < cl._2) ((cl._3, dist, cl._3 + 1))
else ((cl._1, cl._2, cl._3 + 1))
})._1
(x,assignedCl)
(ex,assignedCl)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.spark.streamdm.clusterers.clusters

import org.apache.spark.streamdm.core._
import scala.collection.mutable.Queue
import scala.util.control.Breaks._


/**
Expand All @@ -31,7 +30,6 @@ import scala.util.control.Breaks._
class BucketManager(val n : Int, val maxsize : Int) extends Clusters {

type T = BucketManager

/**
* Inner class Bucket for new instance management, this class has two buffers for
* recursively computing the coresets.
Expand All @@ -57,7 +55,7 @@ class BucketManager(val n : Int, val maxsize : Int) extends Clusters {
// Check if there is enough space in the first bucket
if(buckets(0).isFull){
var curbucket : Int = 0
var nextbucket : Int =1
var nextbucket : Int = 1
// Check if the next bucket is empty
if(!buckets(nextbucket).isFull) {
// Copy curbucket points to nextbucket points
Expand Down Expand Up @@ -122,25 +120,29 @@ class BucketManager(val n : Int, val maxsize : Int) extends Clusters {
* @return the coreset for the examples entered into the buckets.
*/
def getCoreset: Array[Example] = {
if(buckets(L-1).isFull) {
buckets(L-1).points.toArray
}else {
var isFound: Boolean = false
if (buckets(L - 1).isFull) {
buckets(L - 1).points.toArray
} else {
var i = 0
var coreset = Array[Example]()
for(i <- 0 until L) {
if(buckets(i).isFull) {

for (i <- 0 until L) {
if (buckets(i).isFull && isFound == false) {
coreset = buckets(i).points.toArray
break
isFound=true
}
}
val start = i+1
for(j <- start until L) {
val start = i + 1
for (j <- start until L) {
val examples = buckets(j).points.toArray ++ coreset
val tree = new TreeCoreset
coreset = tree.retrieveCoreset(tree.buildCoresetTree(examples, maxsize),
new Array[Example](0))
new Array[Example](0))
}
coreset
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,16 @@ class TreeCoreset {
* Select a new centre from the leaf node for splitting.
* @return the new centre
*/
def chooseCentre() : Example = {
def chooseCentre() : Example = {
val funcost = this.weightedLeaf().cost
val points = elem.points
var sum = 0.0

for(point <- points) {
sum += costOfPoint(point)/funcost
if(sum >= Random.nextDouble)
return point
}
elem.centre
for (point <- points) {
sum += costOfPoint(point) / funcost
if (sum >= Random.nextDouble)
return point
}
elem.centre
}
}

Expand All @@ -119,34 +118,49 @@ class TreeCoreset {
* @param leaf coreset tree leaf for spliting
* @return a coreset tree node with two leaves
*/
private def splitCoresetTreeLeaf(leaf : CoresetTreeLeaf) : CoresetTreeNode = {
// Select a example from the points associated with the leaf as a new centre
// for one of the new leaf
private def splitCoresetTreeLeaf(leaf : CoresetTreeLeaf) : CoresetTreeNode = {
// Select a example from the points associated with the leaf as a new centre
// for one of the new leaf

val newcentre = leaf.chooseCentre

// The original centre as the other leaf centre
val oldcentre = leaf.elem.centre
// The points associated with the orignial leaf, the points will be assigned the new leaves
val points = leaf.elem.points

// Assign points to leftpoints and rightpoints
var leftpoints = new Array[Example](0)
var rightpoints = new Array[Example](0)
for(point <- points) {
if(squaredDistance(point, newcentre) < squaredDistance(point, oldcentre))
for (point <- points) {
if (squaredDistance(point, newcentre) < squaredDistance(point, oldcentre))
leftpoints = leftpoints :+ point
else
rightpoints = rightpoints :+ point
}

// Create new leaves
//prevent assigning all points to one child
//resplit points to leftpoints and rightpoints
if((leftpoints.length == 0 || rightpoints.length==0 ) && points.length>1){
val newcentre = leaf.chooseCentre
var leftpoints = new Array[Example](0)
var rightpoints = new Array[Example](0)
for (point <- points) {
if (squaredDistance(point, newcentre) < squaredDistance(point, oldcentre))
leftpoints = leftpoints :+ point
else
rightpoints = rightpoints :+ point
}
}

// Create new leaves
val leftElem = new CoresetTreeElem(leftpoints.length, leftpoints, newcentre)
val leftleaf = CoresetTreeLeaf(leftElem, 0.0).weightedLeaf

val rightElem = new CoresetTreeElem(rightpoints.length, rightpoints, oldcentre)
val rightleaf = CoresetTreeLeaf(rightElem, 0.0).weightedLeaf

// Return a coreset tree node with two leaves
new CoresetTreeNode(leaf.elem, leftleaf, rightleaf, leftleaf.cost+rightleaf.cost)
new CoresetTreeNode(leaf.elem, leftleaf, rightleaf, leftleaf.cost + rightleaf.cost)

}

/**
Expand All @@ -159,15 +173,39 @@ class TreeCoreset {
splitCoresetTreeLeaf(CoresetTreeLeaf(e, c))
}
case CoresetTreeNode(e, l, r, c) => {
if (Random.nextDouble > 0.5) {
val lchild = splitCoresetTree(l)
val newcost = lchild.cost + r.cost
CoresetTreeNode(e, lchild, r, newcost)
if(l.cost == 0 && r.cost == 0) {
if (l.elem.n == 0) {
val rchild = splitCoresetTree(r)
val newcost = l.cost + rchild.cost
CoresetTreeNode(e, l, rchild, newcost)
}
if (r.elem.n == 0) {
val lchild = splitCoresetTree(l)
val newcost = lchild.cost + r.cost
CoresetTreeNode(e, lchild, r, newcost)
}
else if (Random.nextDouble > 0.5) {
val lchild = splitCoresetTree(l)
val newcost = lchild.cost + r.cost
CoresetTreeNode(e, lchild, r, newcost)
}
else {
val rchild = splitCoresetTree(r)
val newcost = l.cost + rchild.cost
CoresetTreeNode(e, l, rchild, newcost)
}
}
else {
val rchild = splitCoresetTree(r)
val newcost = l.cost + rchild.cost
CoresetTreeNode(e, l, rchild, newcost)
else
{
if(Random.nextDouble < l.cost/root.cost){
val lchild = splitCoresetTree(l)
val newcost = lchild.cost + r.cost
CoresetTreeNode(e, lchild, r, newcost)
} else {
val rchild = splitCoresetTree(r)
val newcost = l.cost + rchild.cost
CoresetTreeNode(e, l, rchild, newcost)
}
}
}
}
Expand Down