Skip to content

Commit

Permalink
Add support for STRING and BYTES to query step (#106)
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet authored Jul 15, 2023
1 parent edb2daa commit 364f215
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ public void process(TransformContext transformContext) {
private Object getField(
String key, String field, TransformSchemaType keySchemaType, Object keyObject) {
String fieldName = field.substring((key.length() + 1));
if (keyObject instanceof Map) {
return ((Map<String, Object>) keyObject).get(fieldName);
}
switch (keySchemaType) {
case AVRO:
GenericRecord avroRecord = (GenericRecord) keyObject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaBuilder;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.functions.api.Record;
Expand Down Expand Up @@ -134,6 +136,45 @@ public List<Map<String, String>> fetchData(String query, List<Object> params) {
Utils.process(record, queryStep);
}

@Test
void testKVStringJson() throws Exception {
Schema<KeyValue<String, String>> keyValueSchema =
Schema.KeyValue(Schema.STRING, Schema.STRING, KeyValueEncodingType.SEPARATED);

String key = "{\"keyField1\": \"key1\", \"keyField2\": \"key2\", \"keyField3\": \"key3\"}";
String value =
"{\"valueField1\": \"value1\", \"valueField2\": \"value2\", \"valueField3\": \"value3\"}";

KeyValue<String, String> keyValue = new KeyValue<>(key, value);

Record<GenericObject> record =
new Utils.TestRecord<>(
keyValueSchema,
AutoConsumeSchema.wrapPrimitiveObject(keyValue, SchemaType.KEY_VALUE, new byte[] {}),
null);

QueryStepDataSource dataSource =
new QueryStepDataSource() {
@Override
public List<Map<String, String>> fetchData(String query, List<Object> params) {
assertEquals(query, "select 1");
List<Object> expectedParams = List.of("value1", "key2");
assertEquals(params, expectedParams);
return List.of(Map.of());
}
};
List<String> fields = Arrays.asList("value.valueField1", "key.keyField2");
QueryStep queryStep =
QueryStep.builder()
.dataSource(dataSource)
.outputFieldName("value.result")
.query("select 1")
.fields(fields)
.build();

Utils.process(record, queryStep);
}

@Test
void testKVAvro() throws Exception {
Record<GenericObject> record = Utils.createTestAvroKeyValueRecord();
Expand Down

0 comments on commit 364f215

Please sign in to comment.