This library is a port of the PROUD platform for distance based outlier detection in streaming environments using Hazelcast Jet version 4.3.
The PROUD platform was developed in Scala for Apache Flink by Theodore Toliopoulos as an implementation of algorithms developed for the academic paper "PROUD: PaRallel OUtlier Detection for Streams"[1] by Theodoros Toliopoulos, Christos Bellas, Anastasios Gounaris, and Apostolos Papadopoulos.
PROUD offers a variety of algorithms for distance based outlier detection that support queries in both single-query[2] (S) and multi-query[3] parameter spaces. For the multi-query outlier detection query case there are two distinct types. The first one is the multi-app level spatial parameters that are the minimum number of neighbours K in a specific spatial range R (RK).The second one is an extension that supports multiple window sizes W and window slide sizes S (RKWS).
Moreover, PROUD also provides two different ways of partitioning incoming data, namely grid-based and tree-based technique. The first one is only used for the euclidead space while the second one can be used for any metric space.
This version of PROUD was developed as part of a dissertation project in Java and Hazelcast Jet by Athanasios Kefalas. This library is provided AS IS and it is NOT recommended that it is used in a production setting or added in the process of any critical downstream systems.
PROUD Version | Jet Version | Changes |
---|---|---|
1.0 | 4.3 | First Release |
- Installation
- General
- Configuration
- PROUD Pipeline API
- Extension Points
- Execution
- Testing
- References
The library can be easily installed as a dependency using either gradle or maven.
To install PROUD using maven, add the following dependency
declaration to dependencies section in your pom.xml file:
<dependency>
<groupId>edu.auth</groupId>
<artifactId>jet-proud</artifactId>
<version>1.0</version>
</dependency>
Alternatively to install PROUD using gradle use the following:
compile group: 'edu.auth', name: 'jet-proud', version: 1.0
OR
compile "edu.auth:jet-proud:1.0"
⬆️ Contents |
---|
The PROUD library is designed to mimic the general pipeline model used by Hazelcast Jet while retaining compatibility with
the native Jet pipeline API. A new PROUD pipeline can be created from scratch or by upgrading an existing native Jet Pipeline
whilst a PROUD Pipeline can be downgraded to a native Jet Pipeline
at any point.
⬆️ Contents |
---|
Before the PROUD pipeline can be used the PROUD library must be correctly configured. In order to configure the library a client can use either a code based configuration or a command line argument based one.
⬆️ Contents |
---|
Code based configuration is possible by using a builder object. A new instance of the configuration builder can be created with the following:
import edu.auth.jetproud.proud.context.Proud;
Proud proud = Proud.builder()
From this basic builder instance the client is guided through configuring the context of the PROUD task. The builder compartmentalizes the configuration options to significantly minimize the chance of on invalid configuration which would result in a runtime error during the creation of the PROUD Pipeline. Some of the options available in the builder include the algorithm, the outlier query type, the outlier query, input selection, partitioning method selection and output selection.
The guided configuration of the builder leverages the type system of Java to provide a safer way to configure PROUD. However, when using user defined algorithms or partitioning methods the correctness of the configuration is reliant on the client as the exact specifications of the user defined component are unknown. Additionally, the proper combination of Algorithm - Outlier Query Type is also not checked by the builder at compile time in version 1.0 and the client must select the correct combination. For a list of supported Algorithm - Outlier Query Type combination view the next section.
⬆️ Contents |
---|
This option sets the algorithm that will be used for detecting outliers in the stream and can be set by the following:
// Proud with Naive algorithm Configuration
Proud proud = Proud.builder()
.forAlgorithm(ProudAlgorithmOption.Naive)
The available algorithms along with their compatible outlier query types are:
- Naive - S
- Advanced - S
- AdvancedExtended - S
- Slicing - S
- PMCod - S
- PMCodNet - S
- AMCod - RK
- Sop - RK, RKWS
- PSod - RK, RKWS
- PMCSky - RK, RKWS
While using a user defined algorithm is possible by selecting the option
- UserDefined - Unknown
⬆️ Contents |
---|
This option selects the type of outlier detection query to be used by PROUD. There are three available outlier query types:
- Single Space Query (S)
- Multi Space Query (RK)
- Multi Space Query with Multiple Windowing Parameters (RKWS)
The required query type can be set right after selecting the algorithm, like so:
// Single Space Outlier Query Type
Proud proud = Proud.builder()
.forAlgorithm(ProudAlgorithmOption.Naive)
.inSingleSpace()
// Multi Space Outlier Query Type
Proud proud = Proud.builder()
.forAlgorithm(ProudAlgorithmOption.AMCod)
.inMultiQuerySpace()
// Multi Space Query with Multiple Windowing Parameters Outlier Query Type
Proud proud = Proud.builder()
.forAlgorithm(ProudAlgorithmOption.Sop)
.inMultiQueryMultiWindowSpace()
⬆️ Contents |
---|
After the required outlier query type is selected, the parameters for the query must be configured. The builder automatically provides a function
named querying(...)
that can receive the appropriate parameters based on the query type. All query types can accept one or more of four parameters:
K - number of neighbours, R - distance range, W - window size in milliseconds and S - window slide size in milliseconds. Examples of the
definition of query parameters for each query type can be seen below.
// Single Space Outlier Query Type
Proud proud = Proud.builder()
.forAlgorithm(ProudAlgorithmOption.Naive)
.inSingleSpace()
.querying(50, 0.45, 10000, 500) // querying(int kNeighbours, double range, int window, int slide)
// Multi Space Outlier Query Type
Proud proud = Proud.builder()
.forAlgorithm(ProudAlgorithmOption.AMCod)
.inMultiQuerySpace()
.querying(List.of(50), List.of(0.45), 10000, 500) // querying(List<Integer> kNeighboursDimensions, List<Double> rangeDimensions, int window, int slide)
// Multi Space Query with Multiple Windowing Parameters Outlier Query Type
Proud proud = Proud.builder()
.forAlgorithm(ProudAlgorithmOption.Sop)
.inMultiQueryMultiWindowSpace()
.querying(List.of(50), List.of(0.45), List.of(10000), List.of(500)) // querying(List<Integer> kNeighboursDimensions, List<Double> rangeDimensions, List<Integer> windowDimensions, List<Integer> slideDimensions
⬆️ Contents |
---|
The next step includes the configuration of the input dataset and the source to read the dataset items from. PROUD supports by default a
file input datasource, and a kafka input datasource which can be later automatically resolved to a Hazelcast Jet Source
.
// Dataset - File Datasource
Proud proud = Proud.builder()
.forAlgorithm(ProudAlgorithmOption.Naive)
.inSingleSpace()
.querying(50, 0.45, 10000, 500)
.forDatasetNamed("$DATASET_NAME")
.locatedIn("$ABSOLUTE_PATH_TO_DATASET_FILE")
// Dataset - Kafka Datasource
Proud proud = Proud.builder()
.forAlgorithm(ProudAlgorithmOption.Naive)
.inSingleSpace()
.querying(50, 0.45, 10000, 500)
.forDatasetNamed("$DATASET_NAME")
.fromKafka()
.inTopic("$KAFKA_TOPIC")
⬆️ Contents |
---|
The configuration of the partitioning method to be used by PROUD can also be configured via the builder instance. The available partitioning methods and the algorithms they are supported by, are the following:
- Replication - Naive, Advanced
- Value Based with a Tree - All algorithms except from Naive & Advanced
- Value Based with a Grid - All algorithms except from Naive & Advanced
- User Defined - Unknown
The options for each partitioning method can be seen in the following sections.
⬆️ Contents |
---|
Replication Partitioning replicates the stream items in all Jet partitions, while flagging the items
that truly belong in the partition with 0
and replicated items with 1
.
// Proud with Replication Partitioning Configuration
Proud proud = Proud.builder()
.forAlgorithm(ProudAlgorithmOption.Naive)
.inSingleSpace()
.querying(50, 0.45, 10000, 500)
.forDatasetNamed("$DATASET_NAME")
.fromKafka()
.inTopic("$KAFKA_TOPIC")
.replicationPartitioned()
⬆️ Contents |
---|
Value based partitioning with Tree, uses a vantage point tree that queries the spatial coordinates of an item
and assign it to a primary partition, as well as replicates it to a number of neighbouring partitions. An item
that belongs to a specific partition is flagged with a 0
while replicated copies of the item in neighbouring
partitions are flagged with 1
. The vantage point tree needs to be initialized
with a sample of the dataset and must be available as a file.
Tree based partitioning can be configured by the following:
// Tree Partitioning
Proud proud = Proud.builder()
.forAlgorithm(ProudAlgorithmOption.PMCod)
.inSingleSpace()
.querying(50, 0.45, 10000, 500)
.forDatasetNamed("$DATASET_NAME")
.fromKafka()
.inTopic("$KAFKA_TOPIC")
.treePartitionedUsing("$ABSOLUTE_PATH_TO_TREE_INIT_FILE")
// Tree Partitioning with a custom initial node count
Proud proud = Proud.builder()
.forAlgorithm(ProudAlgorithmOption.PMCod)
.inSingleSpace()
.querying(50, 0.45, 10000, 500)
.forDatasetNamed("$DATASET_NAME")
.fromKafka()
.inTopic("$KAFKA_TOPIC")
.treePartitionedUsing("$ABSOLUTE_PATH_TO_TREE_INIT_FILE", 1205)
⬆️ Contents |
---|
Value based partitioning with Grid, uses a grid cell structure and the spatial coordinates of an item
in order to assign it to a primary partition, as well as replicate it to a number of neighbouring partitions. An item
that belongs to a specific partition is flagged with a 0
while replicated copies of the item in neighbouring
partitions are flagged with 1
. The grid based partitioning, requires a user defined function that
partitions the items by taking advantage of foreknowledge of the general spatial boundaries of the dataset. The GridPartitioning.GridPartitioner
is an interface that
encapsulates this functionality, and it is a required parameter when using grid base partitioning.
For the Stocks and TAO datasets a grid partitioner is implemented by default and an instance can
be created by invoking DefaultGridPartitioners.forDatasetNamed("STK")
. While the configuration of
grid partitioning can be seen below.
// Proud with Grid Partitioning Configuration
Proud proud = Proud.builder()
.forAlgorithm(ProudAlgorithmOption.PMCod)
.inSingleSpace()
.querying(50, 0.45, 10000, 500)
.forDatasetNamed("$DATASET_NAME")
.fromKafka()
.inTopic("$KAFKA_TOPIC")
.gridPartitionedUsing(new GridPartitioning.GridPartitioner(){...})
⬆️ Contents |
---|
PROUD allows for the definition and use of a user defined partitioning method when using the PROUD Pipeline. The feature however is opt-in and must be explicitly configured.
// Proud with User Defined Partitioning Configuration
Proud proud = Proud.builder()
.forAlgorithm(ProudAlgorithmOption.Naive)
.inSingleSpace()
.querying(50, 0.45, 10000, 500)
.forDatasetNamed("$DATASET_NAME")
.fromKafka()
.inTopic("$KAFKA_TOPIC")
.userDefinedPartitioning()
⬆️ Contents |
---|
By default, PROUD supports two types of output: influxDB and logger. The logger output simply prints the items in the console while the influxDB output saves results in a connected influxDB. Both types of output can be auto-resolved to a Hazelcast Sink.
The configuration of the output of PROUD can be seen below.
// Logger output
Proud proud = Proud.builder()
.forAlgorithm(ProudAlgorithmOption.Naive)
.inSingleSpace()
.querying(50, 0.45, 10000, 500)
.forDatasetNamed("$DATASET_NAME")
.fromKafka()
.inTopic("$KAFKA_TOPIC")
.replicationPartitioned()
.printingOutliers()
// InfluxDB output
Proud proud = Proud.builder()
.forAlgorithm(ProudAlgorithmOption.Naive)
.inSingleSpace()
.querying(50, 0.45, 10000, 500)
.forDatasetNamed("$DATASET_NAME")
.fromKafka()
.inTopic("$KAFKA_TOPIC")
.replicationPartitioned()
.writingOutliersToInfluxDB()
.inDatabase("$DB_NAME")
.locatedAt("$DB_HOST")
.authenticatedWith("$USERNAME","$PASSWORD")
⬆️ Contents |
---|
After the configuration is created using the builder the build()
method must be invoked to
finalize the configuration and perform a validation check. Additionally, in order to
enable debug mode in PROUD call the enablingDebug()
method on the builder before finalizing
the configuration. The finalized configuration is an instance that implements ProudContext
and
is used internally by PROUD to extract the configured options. A complete example of the configuration process can be seen below.
// Building a Proud Configuration
Proud proud = Proud.builder()
.forAlgorithm(ProudAlgorithmOption.Naive)
.inSingleSpace()
.querying(50, 0.45, 10000, 500)
.forDatasetNamed("$DATASET_NAME")
.fromKafka()
.inTopic("$KAFKA_TOPIC")
.replicationPartitioned()
.writingOutliersToInfluxDB()
.inDatabase("$DB_NAME")
.locatedAt("$DB_HOST")
.authenticatedWith("$USERNAME","$PASSWORD")
.enablingDebug() // Optional
.build();
⬆️ Contents |
---|
An alternative to using the builder object to configure PROUD is to use command line arguments as wells as environment variables to extract the configuration options. This method of configuring PROUD is compatible with the Flink version of PROUD and uses the same options and variables.
⬆️ Contents |
---|
Command | Options |
---|---|
space | Possible value are: "single", "rk", "rkws". |
algorithm | Possible values for single-query space are: "naive", "advanced", "advanced_extended", "slicing", "pmcod" and "pmcod_net". Possible values for multi-query space are "amcod", "sop", "psod" and "pmcsky". |
k | Possible values include one or more integers - minimum kNeighbours. 50[;80;100] |
R | Possible values include one or more floating point numbers - distance ranges. 0.45[;0.15;0.97] |
W | Possible values include one or more integers - window sizes. 1000[;2000;3000] |
S | Possible values include one or more integers - slide sizes. 250[;500;1000] |
dataset | Possible values include a String - the name of the dataset. |
partitioning | Partitioning option. "replication" partitioning is mandatory for "naive" and "advanced" algorithms whilst "grid" and "tree" partitioning is available for every other algorithm. "grid" technique needs pre-recorded data on the dataset's distribution. "tree" technique needs a file containing data points from the dataset in order to initialize the VP-tree |
tree_init (Optional) | Represents the number of data points to be read for initialization by the "tree" partitioning technique. Default value is 10000 |
⬆️ Contents |
---|
Variable | Options |
---|---|
JOB_INPUT | The absolute path of the location of the dataset files. |
KAFKA_BROKERS | The Kafka Broker to be used as the input |
KAFKA_TOPIC | The Kafka Topic |
INFLUXDB_DB | The name of the InfluxDB database |
INFLUXDB_HOST | The host of the InfluxDB database |
INFLUXDB_USER | The username needed for authenticating with the InfluxDB database |
INFLUXDB_PASSWORD | The password needed for authenticating with the InfluxDB database |
⬆️ Contents |
---|
The code below can used to create the required ProudContext
instance by reading the configuration from the command line
arguments and environment variables discussed above.
import edu.auth.jetproud.proud.context.Proud;
public class Main {
public static void main(String[] args) throws ProudArgumentException {
Proud proud = Proud.builder(args)
.enablingDebug()
.build();
}
}
⬆️ Contents |
---|
The command line configuration of PROUD is not recommended as it may only work with the two default datasets (Stocks, TAO) especially when using grid partitioning. Additionally, no user defined functionalities can be added with this style of configuration. The use of the builder object is the preferred way to configure PROUD, and the command line option is used for compatibility purposes only.
⬆️ Contents |
---|
After PROUD is configured the outlier detection process can be executed by invoking the
ProudPipeline
and providing the created configuration by means of
a ProudContext
instance.
The complete general flow can be viewed below.
// Configuration
Proud proud = Proud.builder()
.forAlgorithm(ProudAlgorithmOption.Naive)
.inSingleSpace()
.querying(50, 0.45, 10000, 500)
.forDatasetNamed("$DATASET_NAME")
.fromKafka()
.inTopic("$KAFKA_TOPIC")
.replicationPartitioned()
.writingOutliersToInfluxDB()
.inDatabase("$DB_NAME")
.locatedAt("$DB_HOST")
.authenticatedWith("$USERNAME","$PASSWORD")
.enablingDebug() // Optional
.build();
// Pipeline
ProudPipeline pipeline = ProudPipeline.create(proud);
pipeline.readData()
.partition()
.detectOutliers()
.aggregateAndWriteData();
// Execute PROUD Pipeline
Job job = ProudExecutor.executeJob(pipeline);
⬆️ Contents |
---|
The stream items are read from an instance that implements ProudSource
and can be ultimately resolved to a native Jet Source
. Instances of ProudSource
that are implemented by default in PROUD can read stream items from a file,
or a kafka topic.
⬆️ Contents |
---|
As was mentioned previously the source can be automatically resolved from the configuration. To create the appropriate source automatically use the code below:
// Configuration
Proud proud = /* Configuration Creation */ ;
// Create an appropriate source based
// on the specific configuration.
ProudSource.auto(proud);
⬆️ Contents |
---|
A file source reads stream items from a file located on the local filesystem.
All such sources implemented by default in PROUD are instances of ProudFileSource
which
implements the ProudSource
interface. By default, the home directory used in these sources
is retrieved from the configuration.
Proud file sources can be created by using one of the methods below:
// Configuration
Proud proud = /* Configuration Creation */ ;
// File source by reading the File Name from
// the configuration and using the default
// field and value delimiters ("&" and "," respectively)
ProudSource.file(proud);
// File source by reading the specified file
// and using the default field and value
// delimiters ("&" and "," respectively)
ProudSource.file(proud, "$FILE_NAME");
// File source by reading the File Name from
// the configuration and using the specified
// field and value delimiters
ProudSource.file(proud, "$FIELD_DELIMITER", "$VALUE_DELIMITER");
// File source by reading the specified file
// and using the specified field and value delimiters
ProudSource.file(proud, "$FILE_NAME", "$FIELD_DELIMITER", "$VALUE_DELIMITER");
⬆️ Contents |
---|
A Kafka source reads stream items from a Kafka broker and listening to a specific topic.
The Kafka source implemented by default in PROUD is an instance of ProudKafkaSource
which
implements the ProudSource
interface. By default, the Kafka connection information used in the source
are retrieved from the configuration.
Proud Kafka sources can be created by using the method below:
// Configuration
Proud proud = /* Configuration Creation */ ;
// Kafka source that connects to the broker and topic
// specified in the configuration
ProudSource.kafkaSource(proud);
⬆️ Contents |
---|
The definition of custom PROUD sources can be achieved by implementing the ProudSource
interface or by extending the ProudFileSource
or ProudKafkaSource
classes. As the ProudSource
interface is the more abstract type in the PROUD sources type hierarchy it can be implemented
to use any type of data connector and source internally. The only constraint is that all
implementing types must implement the readInto(Pipeline pipeline)
which reads data from a
native Jet source of any type and returns a StreamStage<T extends AnyProudData>
instance.
The PROUD Pipeline provides a method to use any user defined or default source:
// Configuration
Proud proud = /* Configuration Creation */ ;
// Proud Pipeline with a file source
pipeline.readFrom(ProudSource.file(proud))
.partition()
.detectOutliers()
.aggregateAndWriteData();
// Proud Pipeline with a custom source
pipeline.readFrom(new ProudSource<AnyProudData>() { ... })
.partition()
.detectOutliers()
.aggregateAndWriteData();
⬆️ Contents |
---|
The stream items that were read into the Proud Pipeline can be easily partitioned
by invoking the partition()
method. The underlying partitioning implementation is
automatically selected by checking the configuration options.
⬆️ Contents |
---|
The partitioned stream items in the Proud Pipeline can be easily processed in order
to detect outliers by invoking the detectOutliers()
method. The underlying
algorithm, parameters and processing is automatically selected by checking the configuration options.
⬆️ Contents |
---|
The results of the outlier detection process can be written to a sink that
implements the ProudSink
interface and can be ultimately resolved to native
Jet Sink
. Instances of ProudSink
that are implemented by default in PROUD
can be used to write data to an influxDB server or print them to the console.
⬆️ Contents |
---|
As it was mentioned previously the sink can be automatically resolved from the PROUD configuration. To create the appropriate sink automatically use the code below:
// Configuration
Proud proud = /* Configuration Creation */ ;
// Create an appropriate sink based
// on the specific configuration.
ProudSink.auto(proud);
⬆️ Contents |
---|
A logger sink writes stream items to the console stdout. The logger proud sink
implemented by default in PROUD is an instance of ProudPrintSink
that implements
the interface ProudSink
.
Proud logger sinks can be created by using the method below:
// Configuration
Proud proud = /* Configuration Creation */ ;
// Create a logger sink.
ProudSink.logger(proud);
⬆️ Contents |
---|
An InfluxDB sink writes stream items to an InfluxDB database. The InfluxDB proud sink
implemented by default in PROUD is an instance of ProudInfluxDBSink
that implements
the interface ProudSink
.
Proud InfluxDB sinks can be created by using the method below:
// Configuration
Proud proud = /* Configuration Creation */ ;
// Create an InfluxDB sink.
ProudSink.influxDB(proud);
⬆️ Contents |
---|
The definition of custom PROUD sinks can be achieved by implementing the ProudSink
interface or by extending the AnyProudSink
,ProudPrintSink
or ProudInfluxDBSink
classes.
The AnyProudSink
abstract class is the best type to use as an extension point in order
to use any data connector and sink internally, without losing the default implementation
of a required internal aggregation step. The only constraint is that all
implementing types must implement the createJetSink()
and the
convertResultItem(long slide, OutlierQuery query)
methods.
The PROUD Pipeline provides a method to use any user defined or default sink:
// Configuration
Proud proud = /* Configuration Creation */ ;
// Proud Pipeline with a logger sink
pipeline.readData()
.partition()
.detectOutliers()
.aggregateAndWriteTo(ProudSink.logger(proud));
// Proud Pipeline with a custom sink
pipeline.readData()
.partition()
.detectOutliers()
.aggregateAndWriteTo(new AnyProudSink<T>() { ... });
⬆️ Contents |
---|
The Proud Pipeline can be downgraded to a native Jet pipeline at any point as the Proud Pipeline
is a proxy extension of the native Jet Pipeline
type. A common downgrade use case is to
further process the results produced by the outlier detection process.
Proud Pipeline downgrade example:
// Configuration
Proud proud = /* Configuration Creation */ ;
// Proud Pipeline with a file source
pipeline.readFrom(ProudSource.file(proud))
.partition() // Proud Pipeline
.detectOutliers() // Proud Pipeline
.filter((res)->res.second.outlierCount > 50) // Downgraded to a Jet Pipeline
.map((it)->"Alert! Found "+it.second.outlierCount+" outliers.") // Jet Pipeline
.writeTo(Sinks.logger()); // Jet Pipeline, Jet Sink
⬆️ Contents |
---|
Alternatively, a native Jet pipeline may be upgraded to a Proud Pipeline to easily
incorporate outlier detection. This process can be easily achieved by using the
from(Pipeline, ProudContext)
method in ProudPipeline
.
A rather naive example of this operation can be seen below.
// A generic native Jet Pipeline
StreamStage<ProudDataConvertible> jetPipeline = Pipeline.create()
.readFrom(TestSources.itemStream(100))
.withIngestionTimestamps()
.map((num)->{
// Map stream items to ProudDataConvertible items
final Random random = new Random(num.timestamp());
final List<Double> coordinates = new ArrayList<>();
coordinates.add(random.nextDouble());
return new ProudDataConvertible(){
@Override
public int identifier() {
return (int) num.sequence();
}
@Override
public List<Double> coordinates() {
return coordinates;
}
@Override
public long arrivalTime() {
return num.timestamp();
}
};
});
// Proud Configuration
Proud proud = /* Configuration Creation */ ;
// Upgrade to Proud Pipeline
ProudPipeline.from(jetPipeline, proud)
.partition()
.detectOutliers()
.aggregateAndWriteData();
⬆️ Contents |
---|
In addition to the extension points for defining custom sources and sinks the Proud Pipeline supports the definition of user defined partitioning methods and outlier detection algorithms. Both extension points are available through a functional and an Object-Oriented API defined in the Proud Pipeline.
⬆️ Contents |
---|
A user defined partitioning method can be used by invoking the partition(...)
method on the Proud pipeline with either a UserDefinedProudPartitioning
instance
or a partitioning function as a parameter.
Example use of user defined partitioning:
// Proud Configuration
Proud proud = /* Configuration Creation */ ;
// Proud Pipeline with user defined partitioning using the Object-Oriented API
ProudPipeline.from(jetPipeline, proud)
.partition(new UserDefinedProudPartitioning(proud) {
@Override
public void partitionPoint(AnyProudData dataPoint, PartitioningState partitioningState) {
// Custom Partitioning
// Included in partition 1
partitioningState.pointIncludedIn(dataPoint, 1);
// Included in partition 2, but flagged with 1
partitioningState.pointIncludedFlaggedIn(dataPoint,2);
}
})
.detectOutliers()
.aggregateAndWriteData();
// Proud Pipeline with user defined partitioning using the functional API
ProudPipeline.from(jetPipeline, proud)
.partition((dataPoint, partitioningState)->{
// Custom Partitioning
// Included in partition 1
partitioningState.pointIncludedIn(dataPoint, 1);
// Included in partition 2, but flagged with 1
partitioningState.pointIncludedFlaggedIn(dataPoint,2);
})
.detectOutliers()
.aggregateAndWriteData();
⬆️ Contents |
---|
A user defined outlier detection method can be used by invoking the detectOutliers(...)
method on the Proud pipeline with either a UserDefinedProudAlgorithmExecutor
instance as a single parameter, or
a converter function as well as an outlier detection function as parameters. It should be noted that using the
Object-Oriented style with a UserDefinedProudAlgorithmExecutor
is more flexible than using the
functional style. The functional declaration of a user defined outlier detection algorithm requires
two functions, the first being a converter function that maps the stream items to a type that can be handled by the algorithm.
The second function is a window processing function that can be used to detect outliers in the stream. The custom algorithm defined
using the functional style API is expected to gracefully handle the query type configured in the Proud configuration.
Example use of user defined outlier detection:
// Proud Configuration
Proud proud = /* Configuration Creation */ ;
// Proud Pipeline with user defined outlier detection using the Object-Oriented API
ProudPipeline.from(jetPipeline, proud)
.partition()
.detectOutliers(new UserDefinedProudAlgorithmExecutor<AnyProudData>(proud) {
@Override
public List<ProudSpaceOption> supportedSpaceOptions() {
return new ArrayList<ProudSpaceOption>(){{
add(ProudSpaceOption.Single);
}};
}
@Override
protected <D extends AnyProudData> AnyProudData transform(D point) {
// Convert point to a type that is required by the custom algorithm
return point;
}
// Optional Override, if algorithm can process S queries
@Override
protected StreamStage<Tuple<Long, OutlierQuery>> processSingleSpace(StreamStage<KeyedWindowResult<Integer, List<Tuple<Integer, AnyProudData>>>> windowedStage) throws UnsupportedSpaceException {
// Custom outlier detection
return null;
}
// Optional Override, if algorithm can process RK queries
@Override
protected StreamStage<Tuple<Long, OutlierQuery>> processMultiQueryParamsSpace(StreamStage<KeyedWindowResult<Integer, List<Tuple<Integer, AnyProudData>>>> windowedStage) throws UnsupportedSpaceException {
// Custom outlier detection
return null;
}
// Optional Override, if algorithm can process RKWS queries
@Override
protected StreamStage<Tuple<Long, OutlierQuery>> processMultiQueryParamsMultiWindowParamsSpace(StreamStage<KeyedWindowResult<Integer, List<Tuple<Integer, AnyProudData>>>> windowedStage) throws UnsupportedSpaceException {
// Custom outlier detection
return null;
}
})
.aggregateAndWriteData();
// Proud Pipeline with user defined outlier detection using the functional API
ProudPipeline.from(jetPipeline, proud)
.partition()
.detectOutliers((item) -> item, (window, state) -> {
return new OutlierQuery(0,0,0, 0).withOutlierCount(20);
})
.aggregateAndWriteData();
⬆️ Contents |
---|
After the Proud library is configured and the pipeline is defined the pipeline can be submitted to Jet
for execution. This can be achieved either by using a predefined ProudExecutor
helper or the
native Jet API.
// Proud Executor pipeline execution
Job job = ProudExecutor.executeJob(pipeline);
// Native Jet API Execution
JetInstance jet = Jet.newJetInstance();
Job job = jet.newJob(pipeline.jetPipeline()); // !!! IMPORTANT: Do not submit the ProudPipeline directly !!!
job.join();
⬆️ Contents |
---|
The algorithms have been tested with two datasets the Stocks dataset and the TAO dataset. All the test cases were defined as JUnit Tests and they included all algorithms, for all their supported partitioning methods and all their supported outlier query types.
Any testing of the included algorithms can be done directly from the predefined unit tests or by installing Jet PROUD (via maven or gradle) in a new project and manually defining the desired tests.
⬆️ Contents |
---|
[1] Theodoros Toliopoulos, Christos Bellas, Anastasios Gounaris, and Apostolos Papadopoulos. 2020. PROUD: PaRallel OUtlier Detection for Streams. In Proceedings of the 2020 ACM SIGMOD International Conference on Management of Data (SIGMOD '20). Association for Computing Machinery, New York, NY, USA, 2717–2720. DOI:https://doi.org/10.1145/3318464.3384688
[2] Theodoros Toliopoulos, Anastasios Gounaris, Kostas Tsichlas, Apostolos Papadopoulos, Sandra Sampaio. Continuous outlier mining of streaming data in flink. Information Systems, Volume 93, 2020, 101569, ISSN 0306-4379. DOI:https://doi.org/10.1016/j.is.2020.101569.
[3] Theodoros Toliopoulos and Anastasios Gounaris, Multi-parameter streaming outlier detection. In Proceedings of the 2019 IEEE/WIC/ACM International Conference on Web Intelligence (WI), Thessaloniki, Greece, 2019, pp. 208-216. DOI:https://doi.org/10.1145/3350546.3352520.
⬆️ Contents |
---|