Skip to content

Commit

Permalink
Use builders to create steps
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet committed Sep 2, 2022
1 parent 88d9e32 commit 91db9a0
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,16 @@
*/
package com.datastax.oss.pulsar.functions.transforms;

import lombok.extern.slf4j.Slf4j;
import lombok.Builder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaType;

@Slf4j
@Builder
public class CastStep implements TransformStep {

private final SchemaType keySchemaType;
private final SchemaType valueSchemaType;

public CastStep(SchemaType keySchemaType, SchemaType valueSchemaType) {
if (keySchemaType != null && keySchemaType != SchemaType.STRING) {
throw new IllegalArgumentException("Unsupported key schema-type for Cast: " + keySchemaType);
}
if (valueSchemaType != null && valueSchemaType != SchemaType.STRING) {
throw new IllegalArgumentException(
"Unsupported value schema-type for Cast: " + valueSchemaType);
}
this.keySchemaType = keySchemaType;
this.valueSchemaType = valueSchemaType;
}

@Override
public void process(TransformContext transformContext) {
if (transformContext.getKeySchema() != null) {
Expand All @@ -54,4 +42,27 @@ public void process(TransformContext transformContext) {
transformContext.setValueObject(transformContext.getValueObject().toString());
}
}

public static class CastStepBuilder {
private SchemaType keySchemaType;
private SchemaType valueSchemaType;

public CastStepBuilder keySchemaType(SchemaType keySchemaType) {
if (keySchemaType != null && keySchemaType != SchemaType.STRING) {
throw new IllegalArgumentException(
"Unsupported key schema-type for Cast: " + keySchemaType);
}
this.keySchemaType = keySchemaType;
return this;
}

public CastStepBuilder valueSchemaType(SchemaType valueSchemaType) {
if (valueSchemaType != null && valueSchemaType != SchemaType.STRING) {
throw new IllegalArgumentException(
"Unsupported value schema-type for Cast: " + valueSchemaType);
}
this.valueSchemaType = valueSchemaType;
return this;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,28 @@
*/
package com.datastax.oss.pulsar.functions.transforms;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import lombok.Builder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.pulsar.common.schema.SchemaType;

/** This function removes a "field" from a message. */
@Slf4j
@Builder
public class DropFieldStep implements TransformStep {

private final List<String> keyFields;
private final List<String> valueFields;
@Builder.Default private final List<String> keyFields = new ArrayList<>();
@Builder.Default private final List<String> valueFields = new ArrayList<>();

private final Map<org.apache.avro.Schema, org.apache.avro.Schema> keySchemaCache =
new ConcurrentHashMap<>();
private final Map<org.apache.avro.Schema, org.apache.avro.Schema> valueSchemaCache =
new ConcurrentHashMap<>();

public DropFieldStep(List<String> keyFields, List<String> valueFields) {
this.keyFields = keyFields;
this.valueFields = valueFields;
}

@Override
public void process(TransformContext transformContext) {
dropKeyFields(keyFields, transformContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,11 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaType;

@Slf4j
public class MergeKeyValueStep implements TransformStep {

private final Map<org.apache.avro.Schema, Map<org.apache.avro.Schema, org.apache.avro.Schema>>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,35 +159,39 @@ public void process(TransformContext transformContext) throws Exception {
public static DropFieldStep newRemoveFieldFunction(Map<String, Object> step) {
String fields = getRequiredStringConfig(step, "fields");
List<String> fieldList = Arrays.asList(fields.split(","));
DropFieldStep.DropFieldStepBuilder builder = DropFieldStep.builder();
return getStringConfig(step, "part")
.map(
part -> {
if (part.equals("key")) {
return new DropFieldStep(fieldList, new ArrayList<>());
return builder.keyFields(fieldList);
} else if (part.equals("value")) {
return new DropFieldStep(new ArrayList<>(), fieldList);
return builder.valueFields(fieldList);
} else {
throw new IllegalArgumentException("invalid 'part' parameter: " + part);
}
})
.orElseGet(() -> new DropFieldStep(fieldList, fieldList));
.orElseGet(() -> builder.keyFields(fieldList).valueFields(fieldList))
.build();
}

public static CastStep newCastFunction(Map<String, Object> step) {
String schemaTypeParam = getRequiredStringConfig(step, "schema-type");
SchemaType schemaType = SchemaType.valueOf(schemaTypeParam);
CastStep.CastStepBuilder builder = CastStep.builder();
return getStringConfig(step, "part")
.map(
part -> {
if (part.equals("key")) {
return new CastStep(schemaType, null);
return builder.keySchemaType(schemaType);
} else if (part.equals("value")) {
return new CastStep(null, schemaType);
return builder.valueSchemaType(schemaType);
} else {
throw new IllegalArgumentException("invalid 'part' parameter: " + part);
}
})
.orElseGet(() -> new CastStep(schemaType, schemaType));
.orElseGet(() -> builder.keySchemaType(schemaType).valueSchemaType(schemaType))
.build();
}

public static FlattenStep newFlattenFunction(Map<String, Object> step) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ public class CastStepTest {
@Test
void testKeyValueAvroToString() throws Exception {
Record<GenericObject> record = Utils.createTestAvroKeyValueRecord();
CastStep step = new CastStep(SchemaType.STRING, SchemaType.STRING);
CastStep step =
CastStep.builder()
.keySchemaType(SchemaType.STRING)
.valueSchemaType(SchemaType.STRING)
.build();
Record<GenericObject> outputRecord = Utils.process(record, step);

KeyValueSchema messageSchema = (KeyValueSchema) outputRecord.getSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import org.apache.avro.generic.GenericData;
Expand Down Expand Up @@ -63,7 +62,7 @@ void testAvro() throws Exception {
Record<GenericObject> record = new Utils.TestRecord<>(genericSchema, genericRecord, "test-key");

DropFieldStep step =
new DropFieldStep(new ArrayList<>(), Arrays.asList("firstName", "lastName"));
DropFieldStep.builder().valueFields(Arrays.asList("firstName", "lastName")).build();
Record<?> outputRecord = Utils.process(record, step);
assertEquals(outputRecord.getKey().orElse(null), "test-key");

Expand All @@ -77,8 +76,10 @@ void testAvro() throws Exception {
@Test
void testKeyValueAvro() throws Exception {
DropFieldStep step =
new DropFieldStep(
Arrays.asList("keyField1", "keyField2"), Arrays.asList("valueField1", "valueField2"));
DropFieldStep.builder()
.keyFields(Arrays.asList("keyField1", "keyField2"))
.valueFields(Arrays.asList("valueField1", "valueField2"))
.build();
Record<GenericObject> outputRecord = Utils.process(Utils.createTestAvroKeyValueRecord(), step);
KeyValueSchema messageSchema = (KeyValueSchema) outputRecord.getSchema();
KeyValue messageValue = (KeyValue) outputRecord.getValue();
Expand Down Expand Up @@ -118,7 +119,8 @@ void testAvroNotModified() throws Exception {

Record<GenericObject> record = new Utils.TestRecord<>(genericSchema, genericRecord, "test-key");

DropFieldStep step = new DropFieldStep(new ArrayList<>(), Collections.singletonList("other"));
DropFieldStep step =
DropFieldStep.builder().valueFields(Collections.singletonList("other")).build();
Record<GenericObject> outputRecord = Utils.process(record, step);
assertSame(outputRecord.getSchema(), record.getSchema());
assertSame(outputRecord.getValue(), record.getValue());
Expand All @@ -129,8 +131,10 @@ void testKeyValueAvroNotModified() throws Exception {
Record<GenericObject> record = Utils.createTestAvroKeyValueRecord();

DropFieldStep step =
new DropFieldStep(
Collections.singletonList("otherKey"), Collections.singletonList("otherValue"));
DropFieldStep.builder()
.keyFields(Collections.singletonList("otherKey"))
.valueFields(Collections.singletonList("otherValue"))
.build();
Record<GenericObject> outputRecord = Utils.process(record, step);
KeyValueSchema messageSchema = (KeyValueSchema) outputRecord.getSchema();
KeyValue messageValue = (KeyValue) outputRecord.getValue();
Expand All @@ -148,8 +152,10 @@ void testKeyValueAvroCached() throws Exception {
Record<GenericObject> record = Utils.createTestAvroKeyValueRecord();

DropFieldStep step =
new DropFieldStep(
Arrays.asList("keyField1", "keyField2"), Arrays.asList("valueField1", "valueField2"));
DropFieldStep.builder()
.keyFields(Arrays.asList("keyField1", "keyField2"))
.valueFields(Arrays.asList("valueField1", "valueField2"))
.build();
Record<GenericObject> outputRecord = Utils.process(record, step);
KeyValueSchema messageSchema = (KeyValueSchema) outputRecord.getSchema();

Expand Down Expand Up @@ -183,7 +189,10 @@ void testPrimitives() throws Exception {
"test-key");

DropFieldStep step =
new DropFieldStep(Collections.singletonList("key"), Collections.singletonList("value"));
DropFieldStep.builder()
.keyFields(Collections.singletonList("key"))
.valueFields(Collections.singletonList("value"))
.build();
Record<GenericObject> outputRecord = Utils.process(record, step);

assertSame(outputRecord.getSchema(), record.getSchema());
Expand All @@ -204,7 +213,10 @@ void testKeyValuePrimitives() throws Exception {
null);

DropFieldStep step =
new DropFieldStep(Collections.singletonList("key"), Collections.singletonList("value"));
DropFieldStep.builder()
.keyFields(Collections.singletonList("key"))
.valueFields(Collections.singletonList("value"))
.build();
Record<GenericObject> outputRecord = Utils.process(record, step);
KeyValueSchema messageSchema = (KeyValueSchema) outputRecord.getSchema();
KeyValue messageValue = (KeyValue) outputRecord.getValue();
Expand Down

0 comments on commit 91db9a0

Please sign in to comment.