From 91db9a0d4374c9cc98e97ff453b7f950183bf5a6 Mon Sep 17 00:00:00 2001 From: Christophe Bornet Date: Fri, 2 Sep 2022 11:16:27 +0200 Subject: [PATCH] Use builders to create steps --- .../pulsar/functions/transforms/CastStep.java | 39 ++++++++++++------- .../functions/transforms/DropFieldStep.java | 15 +++---- .../transforms/MergeKeyValueStep.java | 2 - .../transforms/TransformFunction.java | 16 +++++--- .../functions/transforms/CastStepTest.java | 6 ++- .../transforms/DropFieldStepTest.java | 34 ++++++++++------ 6 files changed, 69 insertions(+), 43 deletions(-) diff --git a/pulsar-transformations/src/main/java/com/datastax/oss/pulsar/functions/transforms/CastStep.java b/pulsar-transformations/src/main/java/com/datastax/oss/pulsar/functions/transforms/CastStep.java index 83ff6f87..cde63972 100644 --- a/pulsar-transformations/src/main/java/com/datastax/oss/pulsar/functions/transforms/CastStep.java +++ b/pulsar-transformations/src/main/java/com/datastax/oss/pulsar/functions/transforms/CastStep.java @@ -15,28 +15,16 @@ */ package com.datastax.oss.pulsar.functions.transforms; -import lombok.extern.slf4j.Slf4j; +import lombok.Builder; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.schema.SchemaType; -@Slf4j +@Builder public class CastStep implements TransformStep { private final SchemaType keySchemaType; private final SchemaType valueSchemaType; - public CastStep(SchemaType keySchemaType, SchemaType valueSchemaType) { - if (keySchemaType != null && keySchemaType != SchemaType.STRING) { - throw new IllegalArgumentException("Unsupported key schema-type for Cast: " + keySchemaType); - } - if (valueSchemaType != null && valueSchemaType != SchemaType.STRING) { - throw new IllegalArgumentException( - "Unsupported value schema-type for Cast: " + valueSchemaType); - } - this.keySchemaType = keySchemaType; - this.valueSchemaType = valueSchemaType; - } - @Override public void process(TransformContext transformContext) { if (transformContext.getKeySchema() != null) { @@ -54,4 +42,27 @@ public void process(TransformContext transformContext) { transformContext.setValueObject(transformContext.getValueObject().toString()); } } + + public static class CastStepBuilder { + private SchemaType keySchemaType; + private SchemaType valueSchemaType; + + public CastStepBuilder keySchemaType(SchemaType keySchemaType) { + if (keySchemaType != null && keySchemaType != SchemaType.STRING) { + throw new IllegalArgumentException( + "Unsupported key schema-type for Cast: " + keySchemaType); + } + this.keySchemaType = keySchemaType; + return this; + } + + public CastStepBuilder valueSchemaType(SchemaType valueSchemaType) { + if (valueSchemaType != null && valueSchemaType != SchemaType.STRING) { + throw new IllegalArgumentException( + "Unsupported value schema-type for Cast: " + valueSchemaType); + } + this.valueSchemaType = valueSchemaType; + return this; + } + } } diff --git a/pulsar-transformations/src/main/java/com/datastax/oss/pulsar/functions/transforms/DropFieldStep.java b/pulsar-transformations/src/main/java/com/datastax/oss/pulsar/functions/transforms/DropFieldStep.java index fd8d2688..9b819e5d 100644 --- a/pulsar-transformations/src/main/java/com/datastax/oss/pulsar/functions/transforms/DropFieldStep.java +++ b/pulsar-transformations/src/main/java/com/datastax/oss/pulsar/functions/transforms/DropFieldStep.java @@ -15,31 +15,28 @@ */ package com.datastax.oss.pulsar.functions.transforms; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -import lombok.extern.slf4j.Slf4j; +import lombok.Builder; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.pulsar.common.schema.SchemaType; /** This function removes a "field" from a message. */ -@Slf4j +@Builder public class DropFieldStep implements TransformStep { - private final List keyFields; - private final List valueFields; + @Builder.Default private final List keyFields = new ArrayList<>(); + @Builder.Default private final List valueFields = new ArrayList<>(); + private final Map keySchemaCache = new ConcurrentHashMap<>(); private final Map valueSchemaCache = new ConcurrentHashMap<>(); - public DropFieldStep(List keyFields, List valueFields) { - this.keyFields = keyFields; - this.valueFields = valueFields; - } - @Override public void process(TransformContext transformContext) { dropKeyFields(keyFields, transformContext); diff --git a/pulsar-transformations/src/main/java/com/datastax/oss/pulsar/functions/transforms/MergeKeyValueStep.java b/pulsar-transformations/src/main/java/com/datastax/oss/pulsar/functions/transforms/MergeKeyValueStep.java index 6f0217c3..eb4cda34 100644 --- a/pulsar-transformations/src/main/java/com/datastax/oss/pulsar/functions/transforms/MergeKeyValueStep.java +++ b/pulsar-transformations/src/main/java/com/datastax/oss/pulsar/functions/transforms/MergeKeyValueStep.java @@ -19,13 +19,11 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; -import lombok.extern.slf4j.Slf4j; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.schema.SchemaType; -@Slf4j public class MergeKeyValueStep implements TransformStep { private final Map> diff --git a/pulsar-transformations/src/main/java/com/datastax/oss/pulsar/functions/transforms/TransformFunction.java b/pulsar-transformations/src/main/java/com/datastax/oss/pulsar/functions/transforms/TransformFunction.java index ba31ce10..9d405f51 100644 --- a/pulsar-transformations/src/main/java/com/datastax/oss/pulsar/functions/transforms/TransformFunction.java +++ b/pulsar-transformations/src/main/java/com/datastax/oss/pulsar/functions/transforms/TransformFunction.java @@ -159,35 +159,39 @@ public void process(TransformContext transformContext) throws Exception { public static DropFieldStep newRemoveFieldFunction(Map step) { String fields = getRequiredStringConfig(step, "fields"); List fieldList = Arrays.asList(fields.split(",")); + DropFieldStep.DropFieldStepBuilder builder = DropFieldStep.builder(); return getStringConfig(step, "part") .map( part -> { if (part.equals("key")) { - return new DropFieldStep(fieldList, new ArrayList<>()); + return builder.keyFields(fieldList); } else if (part.equals("value")) { - return new DropFieldStep(new ArrayList<>(), fieldList); + return builder.valueFields(fieldList); } else { throw new IllegalArgumentException("invalid 'part' parameter: " + part); } }) - .orElseGet(() -> new DropFieldStep(fieldList, fieldList)); + .orElseGet(() -> builder.keyFields(fieldList).valueFields(fieldList)) + .build(); } public static CastStep newCastFunction(Map step) { String schemaTypeParam = getRequiredStringConfig(step, "schema-type"); SchemaType schemaType = SchemaType.valueOf(schemaTypeParam); + CastStep.CastStepBuilder builder = CastStep.builder(); return getStringConfig(step, "part") .map( part -> { if (part.equals("key")) { - return new CastStep(schemaType, null); + return builder.keySchemaType(schemaType); } else if (part.equals("value")) { - return new CastStep(null, schemaType); + return builder.valueSchemaType(schemaType); } else { throw new IllegalArgumentException("invalid 'part' parameter: " + part); } }) - .orElseGet(() -> new CastStep(schemaType, schemaType)); + .orElseGet(() -> builder.keySchemaType(schemaType).valueSchemaType(schemaType)) + .build(); } public static FlattenStep newFlattenFunction(Map step) { diff --git a/pulsar-transformations/src/test/java/com/datastax/oss/pulsar/functions/transforms/CastStepTest.java b/pulsar-transformations/src/test/java/com/datastax/oss/pulsar/functions/transforms/CastStepTest.java index ce8975ac..1588a61b 100644 --- a/pulsar-transformations/src/test/java/com/datastax/oss/pulsar/functions/transforms/CastStepTest.java +++ b/pulsar-transformations/src/test/java/com/datastax/oss/pulsar/functions/transforms/CastStepTest.java @@ -31,7 +31,11 @@ public class CastStepTest { @Test void testKeyValueAvroToString() throws Exception { Record record = Utils.createTestAvroKeyValueRecord(); - CastStep step = new CastStep(SchemaType.STRING, SchemaType.STRING); + CastStep step = + CastStep.builder() + .keySchemaType(SchemaType.STRING) + .valueSchemaType(SchemaType.STRING) + .build(); Record outputRecord = Utils.process(record, step); KeyValueSchema messageSchema = (KeyValueSchema) outputRecord.getSchema(); diff --git a/pulsar-transformations/src/test/java/com/datastax/oss/pulsar/functions/transforms/DropFieldStepTest.java b/pulsar-transformations/src/test/java/com/datastax/oss/pulsar/functions/transforms/DropFieldStepTest.java index 2a6df4da..f07bd9cd 100644 --- a/pulsar-transformations/src/test/java/com/datastax/oss/pulsar/functions/transforms/DropFieldStepTest.java +++ b/pulsar-transformations/src/test/java/com/datastax/oss/pulsar/functions/transforms/DropFieldStepTest.java @@ -20,7 +20,6 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertSame; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import org.apache.avro.generic.GenericData; @@ -63,7 +62,7 @@ void testAvro() throws Exception { Record record = new Utils.TestRecord<>(genericSchema, genericRecord, "test-key"); DropFieldStep step = - new DropFieldStep(new ArrayList<>(), Arrays.asList("firstName", "lastName")); + DropFieldStep.builder().valueFields(Arrays.asList("firstName", "lastName")).build(); Record outputRecord = Utils.process(record, step); assertEquals(outputRecord.getKey().orElse(null), "test-key"); @@ -77,8 +76,10 @@ void testAvro() throws Exception { @Test void testKeyValueAvro() throws Exception { DropFieldStep step = - new DropFieldStep( - Arrays.asList("keyField1", "keyField2"), Arrays.asList("valueField1", "valueField2")); + DropFieldStep.builder() + .keyFields(Arrays.asList("keyField1", "keyField2")) + .valueFields(Arrays.asList("valueField1", "valueField2")) + .build(); Record outputRecord = Utils.process(Utils.createTestAvroKeyValueRecord(), step); KeyValueSchema messageSchema = (KeyValueSchema) outputRecord.getSchema(); KeyValue messageValue = (KeyValue) outputRecord.getValue(); @@ -118,7 +119,8 @@ void testAvroNotModified() throws Exception { Record record = new Utils.TestRecord<>(genericSchema, genericRecord, "test-key"); - DropFieldStep step = new DropFieldStep(new ArrayList<>(), Collections.singletonList("other")); + DropFieldStep step = + DropFieldStep.builder().valueFields(Collections.singletonList("other")).build(); Record outputRecord = Utils.process(record, step); assertSame(outputRecord.getSchema(), record.getSchema()); assertSame(outputRecord.getValue(), record.getValue()); @@ -129,8 +131,10 @@ void testKeyValueAvroNotModified() throws Exception { Record record = Utils.createTestAvroKeyValueRecord(); DropFieldStep step = - new DropFieldStep( - Collections.singletonList("otherKey"), Collections.singletonList("otherValue")); + DropFieldStep.builder() + .keyFields(Collections.singletonList("otherKey")) + .valueFields(Collections.singletonList("otherValue")) + .build(); Record outputRecord = Utils.process(record, step); KeyValueSchema messageSchema = (KeyValueSchema) outputRecord.getSchema(); KeyValue messageValue = (KeyValue) outputRecord.getValue(); @@ -148,8 +152,10 @@ void testKeyValueAvroCached() throws Exception { Record record = Utils.createTestAvroKeyValueRecord(); DropFieldStep step = - new DropFieldStep( - Arrays.asList("keyField1", "keyField2"), Arrays.asList("valueField1", "valueField2")); + DropFieldStep.builder() + .keyFields(Arrays.asList("keyField1", "keyField2")) + .valueFields(Arrays.asList("valueField1", "valueField2")) + .build(); Record outputRecord = Utils.process(record, step); KeyValueSchema messageSchema = (KeyValueSchema) outputRecord.getSchema(); @@ -183,7 +189,10 @@ void testPrimitives() throws Exception { "test-key"); DropFieldStep step = - new DropFieldStep(Collections.singletonList("key"), Collections.singletonList("value")); + DropFieldStep.builder() + .keyFields(Collections.singletonList("key")) + .valueFields(Collections.singletonList("value")) + .build(); Record outputRecord = Utils.process(record, step); assertSame(outputRecord.getSchema(), record.getSchema()); @@ -204,7 +213,10 @@ void testKeyValuePrimitives() throws Exception { null); DropFieldStep step = - new DropFieldStep(Collections.singletonList("key"), Collections.singletonList("value")); + DropFieldStep.builder() + .keyFields(Collections.singletonList("key")) + .valueFields(Collections.singletonList("value")) + .build(); Record outputRecord = Utils.process(record, step); KeyValueSchema messageSchema = (KeyValueSchema) outputRecord.getSchema(); KeyValue messageValue = (KeyValue) outputRecord.getValue();