Skip to content
This repository has been archived by the owner on Mar 11, 2024. It is now read-only.

Input Output Translation

Eric Arellano edited this page Jul 6, 2017 · 17 revisions

Input translation

In some cases it can be too much hassle to define the actual test Input of the data flow you're testing:

Input<String> input = InputBuilder
		.startWith("{ \"name\":\"hans\", \"age\":\"65\", \"ts\":12, \"id\":\"joto12345\" }")
		.emit(\"{ \"name\":\"fritz\", \"age\":\"12\", \"ts\":12, \"id\":\"joto12345\" }")
		.emit(\"{ \"name\":\"rose\", \"age\":\"21\", \"ts\":12, \"id\":\"joto12345\" }")
		.emit(\"{ \"name\":\"samanta\", \"age\":\"45\", \"ts\":12, \"id\":\"joto12345\" }");

In this example the dataflow only works with the values of name and age. The remaining data is kept for later stages of processing.

Specifying an InputTranslator lets you build your input with a using a different data type:

class TupleToJsonString 
	extends InputTranslator<Tuple2<String, Integer>, String> {

	public TupleToJsonString(Input<Tuple2<String, Integer>> input) {
		super(input);
	}

	@Override
	protected String translate(Tuple2<String, Integer> elem) {
		return "{ \"name\":\"" +
				elem.f0 +
				"\", \"age\":" +
				elem.f1 +
				", \"ts\":12, \"id\":\"joto12345\"}";
	}
}

Use the translator:

Input<Tuple2<String,Integer>> input = InputBuilder
		.startWith(Tuple2.of("hans",65))
		.emit(Tuple2.of("fritz",12))
		.emit(Tuple2.of("rose",21))
		.emit(Tuple2.of("samanta", 45));

Input<String> translatedInput = new TupleToJsonString(input);

Now test cases for your data flow are easier to set up, allowing you to write more and smaller tests.

Output translation

You can apply the same process to the verification of output from your data flow. Create a wrapper for Matcher:

//Example POJO as output:
class Person(){
	private String id;
	private String name;
	private int age;
	private Long ts;
// insert construtor, getter, setter...
}

class TupleToPerson extends MatcherTranslator<Person,Tuple2<String,Integer>> {

	public TupleToPerson(Matcher<Iterable<Tuple2<String, Integer>>> matcher) {
		super(matcher);
	}

	@Override
	protected Tuple2<String, Integer> translate(Person p) {
		return Tuple2.of(p.getName,p.getAge);
	}
}

Usage:

DataStream<Person> dataStream; 
//...

OutputMatcher<Tuple2<String, Integer>> matcher =
                new MatchTuples<Tuple2<String, Integer>>("name", "age")
                        .assertThat("name", isA(String.class))
                        .assertThat("age", lessThan(55))
                        .onEachRecord();

assertStream(dataStream, new TupleToPerson(matcher));