Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.
Alexander Kolb edited this page Aug 4, 2017 · 38 revisions

Testing Apache Flink's DataSet and DataStream API with Flinkspector is almost identical, except for data streams working with time characteristics. That's why no separate documentation, for testing these API's, is provided. The chapter on input describes how to define timed input for streaming data flows. Most examples are made for DataSet and DataStream.

Introduction

The Framework can be utilised by using one of the base classes for JUnit:

class Test extends DataSetTestBase {

    @org.junit.Test
    public myTest() {
        DataSet<Integer> dataSet = createTestDataSet(asList(1,2,3))
            .map((MapFunction<Integer,Integer>) (value) -> {return value + 1});

        ExpectedRecords<Integer> expected = 
            new ExpectedRecords<Integer>().expectAll(asList(2,3,4))

        assertDataSet(dataSet, expected);
    }

}

Philosophy

The concept of Flinkspector is to define a list of input for each input of a transformation and specify expectations for each endpoint.

The best tactic for testing a Flink job, with Flinkspector, is to divide the whole processing logic of the job into smaller processing steps. Do this by bundling multiple transformations into a method.

public static DataStream<Tuple2<Integer, String>> aggregateViews(
		DataStream<Tuple2<Integer, String>> stream) {
	return stream.timeWindowAll(Time.of(20, seconds)).sum(0);
}

You will be rewarded with the opportunity to incrementally test processing steps while the whole logic of the job has not been defined. Side effects include: more comprehensible code and the possibility to easily recompose steps.

Don't use anonymous or private inner classes for MapFunction, FilterFunction etc., even if Flinkspector enables you to test them. Test all functions you've written using separate unit tests.

Documentation