Skip to content

Commit

Permalink
Change package to com.datastax
Browse files Browse the repository at this point in the history
  • Loading branch information
cbornet committed Jun 16, 2022
1 parent 71beb55 commit 6afd5c5
Show file tree
Hide file tree
Showing 15 changed files with 35 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pulsar.functions.transforms;
package com.datastax.pulsar.functions.transforms;

import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pulsar.functions.transforms;
package com.datastax.pulsar.functions.transforms;

import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pulsar.functions.transforms;
package com.datastax.pulsar.functions.transforms;

import java.util.List;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pulsar.functions.transforms;
package com.datastax.pulsar.functions.transforms;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pulsar.functions.transforms;
package com.datastax.pulsar.functions.transforms;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pulsar.functions.transforms;
package com.datastax.pulsar.functions.transforms;

public interface TransformStep {
void process(TransformContext transformContext) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pulsar.functions.transforms;
package com.datastax.pulsar.functions.transforms;

public class UnwrapKeyValueStep implements TransformStep {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pulsar.functions.transforms;
package com.datastax.pulsar.functions.transforms;
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@

name: transforms
description: Transformation function
functionClass: org.apache.pulsar.functions.transforms.TransformFunction
functionClass: com.datastax.pulsar.functions.transforms.TransformFunction
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pulsar.functions.transforms;
package com.datastax.pulsar.functions.transforms;

import static org.apache.pulsar.functions.transforms.Utils.createTestAvroKeyValueRecord;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertSame;

Expand All @@ -31,7 +30,7 @@ public class CastStepTest {

@Test
void testKeyValueAvroToString() throws Exception {
Record<GenericObject> record = createTestAvroKeyValueRecord();
Record<GenericObject> record = Utils.createTestAvroKeyValueRecord();
CastStep step = new CastStep(SchemaType.STRING, SchemaType.STRING);
Utils.TestTypedMessageBuilder<?> message = Utils.process(record, step);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pulsar.functions.transforms;
package com.datastax.pulsar.functions.transforms;

import static org.apache.pulsar.functions.transforms.Utils.createTestAvroKeyValueRecord;
import static org.apache.pulsar.functions.transforms.Utils.getRecord;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotSame;
import static org.testng.Assert.assertSame;
Expand Down Expand Up @@ -69,7 +67,7 @@ void testAvro() throws Exception {
Utils.TestTypedMessageBuilder<?> message = Utils.process(record, step);
assertEquals(message.getKey(), "test-key");

GenericData.Record read = getRecord(message.getSchema(), (byte[]) message.getValue());
GenericData.Record read = Utils.getRecord(message.getSchema(), (byte[]) message.getValue());
assertEquals(read.get("age"), 42);
assertNull(read.getSchema().getField("firstName"));
assertNull(read.getSchema().getField("lastName"));
Expand All @@ -80,18 +78,18 @@ void testKeyValueAvro() throws Exception {
DropFieldStep step =
new DropFieldStep(
Arrays.asList("keyField1", "keyField2"), Arrays.asList("valueField1", "valueField2"));
Utils.TestTypedMessageBuilder<?> message = Utils.process(createTestAvroKeyValueRecord(), step);
Utils.TestTypedMessageBuilder<?> message = Utils.process(Utils.createTestAvroKeyValueRecord(), step);
KeyValueSchema messageSchema = (KeyValueSchema) message.getSchema();
KeyValue messageValue = (KeyValue) message.getValue();

GenericData.Record keyAvroRecord =
getRecord(messageSchema.getKeySchema(), (byte[]) messageValue.getKey());
Utils.getRecord(messageSchema.getKeySchema(), (byte[]) messageValue.getKey());
assertEquals(keyAvroRecord.get("keyField3"), new Utf8("key3"));
assertNull(keyAvroRecord.getSchema().getField("keyField1"));
assertNull(keyAvroRecord.getSchema().getField("keyField2"));

GenericData.Record valueAvroRecord =
getRecord(messageSchema.getValueSchema(), (byte[]) messageValue.getValue());
Utils.getRecord(messageSchema.getValueSchema(), (byte[]) messageValue.getValue());
assertEquals(valueAvroRecord.get("valueField3"), new Utf8("value3"));
assertNull(valueAvroRecord.getSchema().getField("valueField1"));
assertNull(valueAvroRecord.getSchema().getField("valueField2"));
Expand Down Expand Up @@ -127,7 +125,7 @@ void testAvroNotModified() throws Exception {

@Test
void testKeyValueAvroNotModified() throws Exception {
Record<GenericObject> record = createTestAvroKeyValueRecord();
Record<GenericObject> record = Utils.createTestAvroKeyValueRecord();

DropFieldStep step =
new DropFieldStep(
Expand All @@ -146,15 +144,15 @@ void testKeyValueAvroNotModified() throws Exception {

@Test
void testKeyValueAvroCached() throws Exception {
Record<GenericObject> record = createTestAvroKeyValueRecord();
Record<GenericObject> record = Utils.createTestAvroKeyValueRecord();

DropFieldStep step =
new DropFieldStep(
Arrays.asList("keyField1", "keyField2"), Arrays.asList("valueField1", "valueField2"));
Utils.TestTypedMessageBuilder<?> message = Utils.process(record, step);
KeyValueSchema messageSchema = (KeyValueSchema) message.getSchema();

message = Utils.process(createTestAvroKeyValueRecord(), step);
message = Utils.process(Utils.createTestAvroKeyValueRecord(), step);
KeyValueSchema newMessageSchema = (KeyValueSchema) message.getSchema();

// Schema was modified by process operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pulsar.functions.transforms;
package com.datastax.pulsar.functions.transforms;

import static org.apache.pulsar.functions.transforms.Utils.createTestAvroKeyValueRecord;
import static org.apache.pulsar.functions.transforms.Utils.getRecord;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotSame;
import static org.testng.Assert.assertSame;
Expand All @@ -37,13 +35,13 @@ public class MergeKeyValueStepTest {

@Test
void testKeyValueAvro() throws Exception {
Record<GenericObject> record = createTestAvroKeyValueRecord();
Record<GenericObject> record = Utils.createTestAvroKeyValueRecord();
Utils.TestTypedMessageBuilder<?> message = Utils.process(record, new MergeKeyValueStep());
KeyValueSchema messageSchema = (KeyValueSchema) message.getSchema();
KeyValue messageValue = (KeyValue) message.getValue();

GenericData.Record read =
getRecord(messageSchema.getValueSchema(), (byte[]) messageValue.getValue());
Utils.getRecord(messageSchema.getValueSchema(), (byte[]) messageValue.getValue());
assertEquals(
read.toString(),
"{\"keyField1\": \"key1\", \"keyField2\": \"key2\", \"keyField3\": \"key3\", "
Expand Down Expand Up @@ -96,13 +94,13 @@ void testKeyValuePrimitives() throws Exception {

@Test
void testKeyValueAvroCached() throws Exception {
Record<GenericObject> record = createTestAvroKeyValueRecord();
Record<GenericObject> record = Utils.createTestAvroKeyValueRecord();

MergeKeyValueStep step = new MergeKeyValueStep();
Utils.TestTypedMessageBuilder<?> message = Utils.process(record, step);
KeyValueSchema messageSchema = (KeyValueSchema) message.getSchema();

message = Utils.process(createTestAvroKeyValueRecord(), step);
message = Utils.process(Utils.createTestAvroKeyValueRecord(), step);
KeyValueSchema newMessageSchema = (KeyValueSchema) message.getSchema();

// Schema was modified by process operation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pulsar.functions.transforms;
package com.datastax.pulsar.functions.transforms;

import static org.apache.pulsar.functions.transforms.Utils.createTestAvroKeyValueRecord;
import static org.apache.pulsar.functions.transforms.Utils.getRecord;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertThrows;
import static org.testng.AssertJUnit.assertNull;
Expand Down Expand Up @@ -107,7 +105,7 @@ void testDropFields() throws Exception {
new Gson().fromJson(userConfig, new TypeToken<Map<String, Object>>() {}.getType());
TransformFunction transformFunction = new TransformFunction();

Record<GenericObject> record = createTestAvroKeyValueRecord();
Record<GenericObject> record = Utils.createTestAvroKeyValueRecord();
Utils.TestContext context = new Utils.TestContext(record, config);
transformFunction.initialize(context);
transformFunction.process(record.getValue(), context);
Expand All @@ -117,13 +115,13 @@ void testDropFields() throws Exception {
KeyValue messageValue = (KeyValue) message.getValue();

GenericData.Record keyAvroRecord =
getRecord(messageSchema.getKeySchema(), (byte[]) messageValue.getKey());
Utils.getRecord(messageSchema.getKeySchema(), (byte[]) messageValue.getKey());
assertEquals(keyAvroRecord.get("keyField3"), new Utf8("key3"));
assertNull(keyAvroRecord.getSchema().getField("keyField1"));
assertNull(keyAvroRecord.getSchema().getField("keyField2"));

GenericData.Record valueAvroRecord =
getRecord(messageSchema.getValueSchema(), (byte[]) messageValue.getValue());
Utils.getRecord(messageSchema.getValueSchema(), (byte[]) messageValue.getValue());
assertEquals(valueAvroRecord.get("valueField2"), new Utf8("value2"));
assertNull(valueAvroRecord.getSchema().getField("valueField1"));
assertNull(valueAvroRecord.getSchema().getField("valueField3"));
Expand All @@ -145,7 +143,7 @@ void testRemoveMergeAndToString() throws Exception {
new Gson().fromJson(userConfig, new TypeToken<Map<String, Object>>() {}.getType());
TransformFunction transformFunction = new TransformFunction();

Record<GenericObject> record = createTestAvroKeyValueRecord();
Record<GenericObject> record = Utils.createTestAvroKeyValueRecord();
Utils.TestContext context = new Utils.TestContext(record, config);
transformFunction.initialize(context);
transformFunction.process(record.getValue(), context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pulsar.functions.transforms;
package com.datastax.pulsar.functions.transforms;

import static org.apache.pulsar.functions.transforms.Utils.createTestAvroKeyValueRecord;
import static org.apache.pulsar.functions.transforms.Utils.getRecord;
import static org.junit.Assert.assertSame;
import static org.testng.Assert.assertEquals;

Expand All @@ -32,10 +30,10 @@ public class UnwrapKeyValueStepTest {

@Test
void testKeyValueUnwrapValue() throws Exception {
Record<GenericObject> record = createTestAvroKeyValueRecord();
Record<GenericObject> record = Utils.createTestAvroKeyValueRecord();
Utils.TestTypedMessageBuilder<?> message = Utils.process(record, new UnwrapKeyValueStep(false));

GenericData.Record read = getRecord(message.getSchema(), (byte[]) message.getValue());
GenericData.Record read = Utils.getRecord(message.getSchema(), (byte[]) message.getValue());
assertEquals(
read.toString(),
"{\"valueField1\": \"value1\", \"valueField2\": \"value2\", \"valueField3\": "
Expand All @@ -44,10 +42,10 @@ void testKeyValueUnwrapValue() throws Exception {

@Test
void testKeyValueUnwrapKey() throws Exception {
Record<GenericObject> record = createTestAvroKeyValueRecord();
Record<GenericObject> record = Utils.createTestAvroKeyValueRecord();
Utils.TestTypedMessageBuilder<?> message = Utils.process(record, new UnwrapKeyValueStep(true));

GenericData.Record read = getRecord(message.getSchema(), (byte[]) message.getValue());
GenericData.Record read = Utils.getRecord(message.getSchema(), (byte[]) message.getValue());
assertEquals(
read.toString(),
"{\"keyField1\": \"key1\", \"keyField2\": \"key2\", \"keyField3\": \"key3\"}");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pulsar.functions.transforms;
package com.datastax.pulsar.functions.transforms;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down

0 comments on commit 6afd5c5

Please sign in to comment.