tsValues = new ArrayList<>();
- for (Object field : (Collection>) fieldValue) {
- tsValues.add(getTime(field));
- }
- return tsValues;
+ return ((Collection>) fieldValue).stream().map(this::getTime).collect(Collectors.toList());
}
-
return getTime(fieldValue);
}
@@ -38,6 +34,7 @@ private Instant getTime(Object field) {
@Override
public boolean matches() {
return descriptor.getType() == Descriptors.FieldDescriptor.Type.MESSAGE
- && descriptor.getMessageType().getFullName().equals(com.google.protobuf.Timestamp.getDescriptor().getFullName());
+ && descriptor.getMessageType().getFullName()
+ .equals(com.google.protobuf.Timestamp.getDescriptor().getFullName());
}
}
diff --git a/src/main/java/io/odpf/depot/metrics/BigQueryMetrics.java b/src/main/java/org/raystack/depot/metrics/BigQueryMetrics.java
similarity index 67%
rename from src/main/java/io/odpf/depot/metrics/BigQueryMetrics.java
rename to src/main/java/org/raystack/depot/metrics/BigQueryMetrics.java
index 19b209e2..307eec1f 100644
--- a/src/main/java/io/odpf/depot/metrics/BigQueryMetrics.java
+++ b/src/main/java/org/raystack/depot/metrics/BigQueryMetrics.java
@@ -1,10 +1,10 @@
-package io.odpf.depot.metrics;
+package org.raystack.depot.metrics;
-import io.odpf.depot.config.OdpfSinkConfig;
+import org.raystack.depot.config.SinkConfig;
public class BigQueryMetrics extends SinkMetrics {
- public BigQueryMetrics(OdpfSinkConfig config) {
+ public BigQueryMetrics(SinkConfig config) {
super(config);
}
@@ -16,6 +16,16 @@ public enum BigQueryAPIType {
TABLE_INSERT_ALL,
}
+ public enum BigQueryStorageAPIType {
+ STREAM_WRITER_CREATED,
+ STREAM_WRITER_CLOSED,
+ STREAM_WRITER_APPEND
+ }
+
+ public enum BigQueryStorageAPIError {
+ ROW_APPEND_ERROR
+ }
+
public enum BigQueryErrorType {
UNKNOWN_ERROR,
INVALID_SCHEMA_ERROR,
@@ -26,6 +36,7 @@ public enum BigQueryErrorType {
public static final String BIGQUERY_SINK_PREFIX = "bigquery_";
public static final String BIGQUERY_TABLE_TAG = "table=%s";
public static final String BIGQUERY_DATASET_TAG = "dataset=%s";
+ public static final String BIGQUERY_PROJECT_TAG = "project=%s";
public static final String BIGQUERY_API_TAG = "api=%s";
public static final String BIGQUERY_ERROR_TAG = "error=%s";
@@ -40,4 +51,8 @@ public String getBigqueryOperationLatencyMetric() {
public String getBigqueryTotalErrorsMetrics() {
return getApplicationPrefix() + SINK_PREFIX + BIGQUERY_SINK_PREFIX + "errors_total";
}
+
+ public String getBigqueryPayloadSizeMetrics() {
+ return getApplicationPrefix() + SINK_PREFIX + BIGQUERY_SINK_PREFIX + "payload_size_bytes";
+ }
}
diff --git a/src/main/java/io/odpf/depot/metrics/BigTableMetrics.java b/src/main/java/org/raystack/depot/metrics/BigTableMetrics.java
similarity index 89%
rename from src/main/java/io/odpf/depot/metrics/BigTableMetrics.java
rename to src/main/java/org/raystack/depot/metrics/BigTableMetrics.java
index 125a8982..772ec976 100644
--- a/src/main/java/io/odpf/depot/metrics/BigTableMetrics.java
+++ b/src/main/java/org/raystack/depot/metrics/BigTableMetrics.java
@@ -1,6 +1,6 @@
-package io.odpf.depot.metrics;
+package org.raystack.depot.metrics;
-import io.odpf.depot.config.OdpfSinkConfig;
+import org.raystack.depot.config.SinkConfig;
public class BigTableMetrics extends SinkMetrics {
@@ -9,11 +9,10 @@ public class BigTableMetrics extends SinkMetrics {
public static final String BIGTABLE_TABLE_TAG = "table=%s";
public static final String BIGTABLE_ERROR_TAG = "error=%s";
- public BigTableMetrics(OdpfSinkConfig config) {
+ public BigTableMetrics(SinkConfig config) {
super(config);
}
-
public enum BigTableErrorType {
QUOTA_FAILURE, // A quota check failed.
PRECONDITION_FAILURE, // Some preconditions have failed.
diff --git a/src/main/java/io/odpf/depot/metrics/Instrumentation.java b/src/main/java/org/raystack/depot/metrics/Instrumentation.java
similarity index 98%
rename from src/main/java/io/odpf/depot/metrics/Instrumentation.java
rename to src/main/java/org/raystack/depot/metrics/Instrumentation.java
index 6f1ee9c6..8ea5415b 100644
--- a/src/main/java/io/odpf/depot/metrics/Instrumentation.java
+++ b/src/main/java/org/raystack/depot/metrics/Instrumentation.java
@@ -1,4 +1,4 @@
-package io.odpf.depot.metrics;
+package org.raystack.depot.metrics;
import lombok.Getter;
import org.slf4j.Logger;
@@ -8,7 +8,6 @@
import java.io.IOException;
import java.time.Instant;
-
/**
* Instrumentation.
*
@@ -64,7 +63,6 @@ public boolean isDebugEnabled() {
return logger.isDebugEnabled();
}
-
// ===================== CountTelemetry =================
public void captureCount(String metric, Long count, String... tags) {
@@ -91,7 +89,6 @@ public void captureDuration(String metric, long duration, String... tags) {
statsDReporter.captureDuration(metric, duration, tags);
}
-
// =================== ERROR ===================
public void captureNonFatalError(String metric, Throwable e, String template, Object... t) {
diff --git a/src/main/java/io/odpf/depot/metrics/JsonParserMetrics.java b/src/main/java/org/raystack/depot/metrics/JsonParserMetrics.java
similarity index 70%
rename from src/main/java/io/odpf/depot/metrics/JsonParserMetrics.java
rename to src/main/java/org/raystack/depot/metrics/JsonParserMetrics.java
index 63094e9c..4448db28 100644
--- a/src/main/java/io/odpf/depot/metrics/JsonParserMetrics.java
+++ b/src/main/java/org/raystack/depot/metrics/JsonParserMetrics.java
@@ -1,9 +1,9 @@
-package io.odpf.depot.metrics;
+package org.raystack.depot.metrics;
-import io.odpf.depot.config.OdpfSinkConfig;
+import org.raystack.depot.config.SinkConfig;
public class JsonParserMetrics extends SinkMetrics {
- public JsonParserMetrics(OdpfSinkConfig config) {
+ public JsonParserMetrics(SinkConfig config) {
super(config);
}
diff --git a/src/main/java/io/odpf/depot/metrics/SinkMetrics.java b/src/main/java/org/raystack/depot/metrics/SinkMetrics.java
similarity index 84%
rename from src/main/java/io/odpf/depot/metrics/SinkMetrics.java
rename to src/main/java/org/raystack/depot/metrics/SinkMetrics.java
index 17335968..26065b44 100644
--- a/src/main/java/io/odpf/depot/metrics/SinkMetrics.java
+++ b/src/main/java/org/raystack/depot/metrics/SinkMetrics.java
@@ -1,6 +1,6 @@
-package io.odpf.depot.metrics;
+package org.raystack.depot.metrics;
-import io.odpf.depot.config.OdpfSinkConfig;
+import org.raystack.depot.config.SinkConfig;
import lombok.Getter;
public class SinkMetrics {
@@ -15,7 +15,7 @@ public class SinkMetrics {
@Getter
private final String applicationPrefix;
- public SinkMetrics(OdpfSinkConfig config) {
+ public SinkMetrics(SinkConfig config) {
this.applicationPrefix = config.getMetricsApplicationPrefix();
}
diff --git a/src/main/java/io/odpf/depot/metrics/StatsDReporter.java b/src/main/java/org/raystack/depot/metrics/StatsDReporter.java
similarity index 98%
rename from src/main/java/io/odpf/depot/metrics/StatsDReporter.java
rename to src/main/java/org/raystack/depot/metrics/StatsDReporter.java
index 57974113..286d2a03 100644
--- a/src/main/java/io/odpf/depot/metrics/StatsDReporter.java
+++ b/src/main/java/org/raystack/depot/metrics/StatsDReporter.java
@@ -1,4 +1,4 @@
-package io.odpf.depot.metrics;
+package org.raystack.depot.metrics;
import com.timgroup.statsd.StatsDClient;
import org.slf4j.Logger;
diff --git a/src/main/java/io/odpf/depot/metrics/StatsDReporterBuilder.java b/src/main/java/org/raystack/depot/metrics/StatsDReporterBuilder.java
similarity index 96%
rename from src/main/java/io/odpf/depot/metrics/StatsDReporterBuilder.java
rename to src/main/java/org/raystack/depot/metrics/StatsDReporterBuilder.java
index 9bc2eb5a..90a51fd6 100644
--- a/src/main/java/io/odpf/depot/metrics/StatsDReporterBuilder.java
+++ b/src/main/java/org/raystack/depot/metrics/StatsDReporterBuilder.java
@@ -1,9 +1,9 @@
-package io.odpf.depot.metrics;
+package org.raystack.depot.metrics;
import com.timgroup.statsd.NoOpStatsDClient;
import com.timgroup.statsd.NonBlockingStatsDClientBuilder;
import com.timgroup.statsd.StatsDClient;
-import io.odpf.depot.config.MetricsConfig;
+import org.raystack.depot.config.MetricsConfig;
import lombok.extern.slf4j.Slf4j;
/**
diff --git a/src/main/java/io/odpf/depot/redis/RedisSink.java b/src/main/java/org/raystack/depot/redis/RedisSink.java
similarity index 51%
rename from src/main/java/io/odpf/depot/redis/RedisSink.java
rename to src/main/java/org/raystack/depot/redis/RedisSink.java
index 6407142a..c2e93966 100644
--- a/src/main/java/io/odpf/depot/redis/RedisSink.java
+++ b/src/main/java/org/raystack/depot/redis/RedisSink.java
@@ -1,22 +1,22 @@
-package io.odpf.depot.redis;
+package org.raystack.depot.redis;
-import io.odpf.depot.OdpfSink;
-import io.odpf.depot.OdpfSinkResponse;
-import io.odpf.depot.error.ErrorInfo;
-import io.odpf.depot.message.OdpfMessage;
-import io.odpf.depot.metrics.Instrumentation;
-import io.odpf.depot.redis.client.RedisClient;
-import io.odpf.depot.redis.client.response.RedisResponse;
-import io.odpf.depot.redis.parsers.RedisParser;
-import io.odpf.depot.redis.util.RedisSinkUtils;
-import io.odpf.depot.redis.record.RedisRecord;
+import org.raystack.depot.message.Message;
+import org.raystack.depot.metrics.Instrumentation;
+import org.raystack.depot.redis.client.RedisClient;
+import org.raystack.depot.redis.client.response.RedisResponse;
+import org.raystack.depot.redis.parsers.RedisParser;
+import org.raystack.depot.redis.record.RedisRecord;
+import org.raystack.depot.redis.util.RedisSinkUtils;
+import org.raystack.depot.Sink;
+import org.raystack.depot.SinkResponse;
+import org.raystack.depot.error.ErrorInfo;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-public class RedisSink implements OdpfSink {
+public class RedisSink implements Sink {
private final RedisClient redisClient;
private final RedisParser redisParser;
private final Instrumentation instrumentation;
@@ -28,20 +28,23 @@ public RedisSink(RedisClient redisClient, RedisParser redisParser, Instrumentati
}
@Override
- public OdpfSinkResponse pushToSink(List messages) {
+ public SinkResponse pushToSink(List messages) {
List records = redisParser.convert(messages);
- Map> splitterRecords = records.stream().collect(Collectors.partitioningBy(RedisRecord::isValid));
+ Map> splitterRecords = records.stream()
+ .collect(Collectors.partitioningBy(RedisRecord::isValid));
List invalidRecords = splitterRecords.get(Boolean.FALSE);
List validRecords = splitterRecords.get(Boolean.TRUE);
- OdpfSinkResponse odpfSinkResponse = new OdpfSinkResponse();
- invalidRecords.forEach(invalidRecord -> odpfSinkResponse.addErrors(invalidRecord.getIndex(), invalidRecord.getErrorInfo()));
+ SinkResponse sinkResponse = new SinkResponse();
+ invalidRecords.forEach(
+ invalidRecord -> sinkResponse.addErrors(invalidRecord.getIndex(), invalidRecord.getErrorInfo()));
if (validRecords.size() > 0) {
List responses = redisClient.send(validRecords);
- Map errorInfoMap = RedisSinkUtils.getErrorsFromResponse(validRecords, responses, instrumentation);
- errorInfoMap.forEach(odpfSinkResponse::addErrors);
+ Map errorInfoMap = RedisSinkUtils.getErrorsFromResponse(validRecords, responses,
+ instrumentation);
+ errorInfoMap.forEach(sinkResponse::addErrors);
instrumentation.logInfo("Pushed a batch of {} records to Redis", validRecords.size());
}
- return odpfSinkResponse;
+ return sinkResponse;
}
@Override
diff --git a/src/main/java/io/odpf/depot/redis/RedisSinkFactory.java b/src/main/java/org/raystack/depot/redis/RedisSinkFactory.java
similarity index 66%
rename from src/main/java/io/odpf/depot/redis/RedisSinkFactory.java
rename to src/main/java/org/raystack/depot/redis/RedisSinkFactory.java
index db7d41c6..3d4a5804 100644
--- a/src/main/java/io/odpf/depot/redis/RedisSinkFactory.java
+++ b/src/main/java/org/raystack/depot/redis/RedisSinkFactory.java
@@ -1,21 +1,20 @@
-package io.odpf.depot.redis;
-
+package org.raystack.depot.redis;
+import org.raystack.depot.common.Tuple;
+import org.raystack.depot.config.RedisSinkConfig;
+import org.raystack.depot.message.MessageParser;
+import org.raystack.depot.message.MessageParserFactory;
+import org.raystack.depot.message.MessageSchema;
+import org.raystack.depot.message.SinkConnectorSchemaMessageMode;
+import org.raystack.depot.metrics.Instrumentation;
+import org.raystack.depot.metrics.StatsDReporter;
+import org.raystack.depot.redis.client.RedisClientFactory;
+import org.raystack.depot.redis.parsers.RedisEntryParser;
+import org.raystack.depot.redis.parsers.RedisEntryParserFactory;
+import org.raystack.depot.redis.parsers.RedisParser;
+import org.raystack.depot.utils.MessageConfigUtils;
import com.timgroup.statsd.NoOpStatsDClient;
-import io.odpf.depot.OdpfSink;
-import io.odpf.depot.common.Tuple;
-import io.odpf.depot.config.RedisSinkConfig;
-import io.odpf.depot.message.OdpfMessageParser;
-import io.odpf.depot.message.OdpfMessageParserFactory;
-import io.odpf.depot.message.OdpfMessageSchema;
-import io.odpf.depot.message.SinkConnectorSchemaMessageMode;
-import io.odpf.depot.metrics.Instrumentation;
-import io.odpf.depot.metrics.StatsDReporter;
-import io.odpf.depot.redis.client.RedisClientFactory;
-import io.odpf.depot.redis.parsers.RedisEntryParser;
-import io.odpf.depot.redis.parsers.RedisEntryParserFactory;
-import io.odpf.depot.redis.parsers.RedisParser;
-import io.odpf.depot.utils.MessageConfigUtils;
+import org.raystack.depot.Sink;
import java.io.IOException;
@@ -37,7 +36,8 @@ public RedisSinkFactory(RedisSinkConfig sinkConfig) {
public void init() {
try {
Instrumentation instrumentation = new Instrumentation(statsDReporter, RedisSinkFactory.class);
- String redisConfig = String.format("\n\tredis.urls = %s\n\tredis.key.template = %s\n\tredis.sink.data.type = %s"
+ String redisConfig = String.format(
+ "\n\tredis.urls = %s\n\tredis.key.template = %s\n\tredis.sink.data.type = %s"
+ "\n\tredis.deployment.type = %s\n\tredis.ttl.type = %s\n\tredis.ttl.value = %d\n\t",
sinkConfig.getSinkRedisUrls(),
sinkConfig.getSinkRedisKeyTemplate(),
@@ -53,16 +53,19 @@ public void init() {
redisConfig += "redis.keyvalue.data.field.name=" + sinkConfig.getSinkRedisKeyValueDataFieldName();
break;
case HASHSET:
- redisConfig += "redis.hashset.field.to.column.mapping=" + sinkConfig.getSinkRedisHashsetFieldToColumnMapping().toString();
+ redisConfig += "redis.hashset.field.to.column.mapping="
+ + sinkConfig.getSinkRedisHashsetFieldToColumnMapping().toString();
break;
default:
}
instrumentation.logInfo(redisConfig);
instrumentation.logInfo("Redis server type = {}", sinkConfig.getSinkRedisDeploymentType());
- OdpfMessageParser messageParser = OdpfMessageParserFactory.getParser(sinkConfig, statsDReporter);
- Tuple modeAndSchema = MessageConfigUtils.getModeAndSchema(sinkConfig);
- OdpfMessageSchema schema = messageParser.getSchema(modeAndSchema.getSecond());
- RedisEntryParser redisEntryParser = RedisEntryParserFactory.getRedisEntryParser(sinkConfig, statsDReporter, schema);
+ MessageParser messageParser = MessageParserFactory.getParser(sinkConfig, statsDReporter);
+ Tuple modeAndSchema = MessageConfigUtils
+ .getModeAndSchema(sinkConfig);
+ MessageSchema schema = messageParser.getSchema(modeAndSchema.getSecond());
+ RedisEntryParser redisEntryParser = RedisEntryParserFactory.getRedisEntryParser(sinkConfig, statsDReporter,
+ schema);
this.redisParser = new RedisParser(messageParser, redisEntryParser, modeAndSchema);
instrumentation.logInfo("Connection to redis established successfully");
} catch (IOException e) {
@@ -75,7 +78,7 @@ public void init() {
*
* @return RedisSink
*/
- public OdpfSink create() {
+ public Sink create() {
return new RedisSink(
RedisClientFactory.getClient(sinkConfig, statsDReporter),
redisParser,
diff --git a/src/main/java/io/odpf/depot/redis/client/RedisClient.java b/src/main/java/org/raystack/depot/redis/client/RedisClient.java
similarity index 58%
rename from src/main/java/io/odpf/depot/redis/client/RedisClient.java
rename to src/main/java/org/raystack/depot/redis/client/RedisClient.java
index 16d4894b..af52de1d 100644
--- a/src/main/java/io/odpf/depot/redis/client/RedisClient.java
+++ b/src/main/java/org/raystack/depot/redis/client/RedisClient.java
@@ -1,7 +1,7 @@
-package io.odpf.depot.redis.client;
+package org.raystack.depot.redis.client;
-import io.odpf.depot.redis.client.response.RedisResponse;
-import io.odpf.depot.redis.record.RedisRecord;
+import org.raystack.depot.redis.client.response.RedisResponse;
+import org.raystack.depot.redis.record.RedisRecord;
import java.io.Closeable;
import java.util.List;
diff --git a/src/main/java/io/odpf/depot/redis/client/RedisClientFactory.java b/src/main/java/org/raystack/depot/redis/client/RedisClientFactory.java
similarity index 62%
rename from src/main/java/io/odpf/depot/redis/client/RedisClientFactory.java
rename to src/main/java/org/raystack/depot/redis/client/RedisClientFactory.java
index 57bc3995..f3f1cd34 100644
--- a/src/main/java/io/odpf/depot/redis/client/RedisClientFactory.java
+++ b/src/main/java/org/raystack/depot/redis/client/RedisClientFactory.java
@@ -1,13 +1,12 @@
-package io.odpf.depot.redis.client;
-
-
-import io.odpf.depot.config.RedisSinkConfig;
-import io.odpf.depot.exception.ConfigurationException;
-import io.odpf.depot.metrics.Instrumentation;
-import io.odpf.depot.metrics.StatsDReporter;
-import io.odpf.depot.redis.enums.RedisSinkDeploymentType;
-import io.odpf.depot.redis.ttl.RedisTTLFactory;
-import io.odpf.depot.redis.ttl.RedisTtl;
+package org.raystack.depot.redis.client;
+
+import org.raystack.depot.config.RedisSinkConfig;
+import org.raystack.depot.exception.ConfigurationException;
+import org.raystack.depot.metrics.Instrumentation;
+import org.raystack.depot.metrics.StatsDReporter;
+import org.raystack.depot.redis.enums.RedisSinkDeploymentType;
+import org.raystack.depot.redis.ttl.RedisTTLFactory;
+import org.raystack.depot.redis.ttl.RedisTtl;
import org.apache.commons.lang3.StringUtils;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
@@ -30,18 +29,22 @@ public static RedisClient getClient(RedisSinkConfig redisSinkConfig, StatsDRepor
: getRedisStandaloneClient(redisTTL, redisSinkConfig, statsDReporter);
}
- private static RedisStandaloneClient getRedisStandaloneClient(RedisTtl redisTTL, RedisSinkConfig redisSinkConfig, StatsDReporter statsDReporter) {
+ private static RedisStandaloneClient getRedisStandaloneClient(RedisTtl redisTTL, RedisSinkConfig redisSinkConfig,
+ StatsDReporter statsDReporter) {
HostAndPort hostAndPort;
try {
hostAndPort = HostAndPort.parseString(StringUtils.trim(redisSinkConfig.getSinkRedisUrls()));
} catch (IllegalArgumentException e) {
- throw new ConfigurationException(String.format("Invalid url for redis standalone: %s", redisSinkConfig.getSinkRedisUrls()));
+ throw new ConfigurationException(
+ String.format("Invalid url for redis standalone: %s", redisSinkConfig.getSinkRedisUrls()));
}
Jedis jedis = new Jedis(hostAndPort);
- return new RedisStandaloneClient(new Instrumentation(statsDReporter, RedisStandaloneClient.class), redisTTL, jedis);
+ return new RedisStandaloneClient(new Instrumentation(statsDReporter, RedisStandaloneClient.class), redisTTL,
+ jedis);
}
- private static RedisClusterClient getRedisClusterClient(RedisTtl redisTTL, RedisSinkConfig redisSinkConfig, StatsDReporter statsDReporter) {
+ private static RedisClusterClient getRedisClusterClient(RedisTtl redisTTL, RedisSinkConfig redisSinkConfig,
+ StatsDReporter statsDReporter) {
String[] redisUrls = redisSinkConfig.getSinkRedisUrls().split(DELIMITER);
HashSet nodes = new HashSet<>();
try {
@@ -49,9 +52,11 @@ private static RedisClusterClient getRedisClusterClient(RedisTtl redisTTL, Redis
nodes.add(HostAndPort.parseString(StringUtils.trim(redisUrl)));
}
} catch (IllegalArgumentException e) {
- throw new ConfigurationException(String.format("Invalid url(s) for redis cluster: %s", redisSinkConfig.getSinkRedisUrls()));
+ throw new ConfigurationException(
+ String.format("Invalid url(s) for redis cluster: %s", redisSinkConfig.getSinkRedisUrls()));
}
JedisCluster jedisCluster = new JedisCluster(nodes);
- return new RedisClusterClient(new Instrumentation(statsDReporter, RedisClusterClient.class), redisTTL, jedisCluster);
+ return new RedisClusterClient(new Instrumentation(statsDReporter, RedisClusterClient.class), redisTTL,
+ jedisCluster);
}
}
diff --git a/src/main/java/io/odpf/depot/redis/client/RedisClusterClient.java b/src/main/java/org/raystack/depot/redis/client/RedisClusterClient.java
similarity index 74%
rename from src/main/java/io/odpf/depot/redis/client/RedisClusterClient.java
rename to src/main/java/org/raystack/depot/redis/client/RedisClusterClient.java
index 7dea53fe..44568bed 100644
--- a/src/main/java/io/odpf/depot/redis/client/RedisClusterClient.java
+++ b/src/main/java/org/raystack/depot/redis/client/RedisClusterClient.java
@@ -1,9 +1,9 @@
-package io.odpf.depot.redis.client;
+package org.raystack.depot.redis.client;
-import io.odpf.depot.metrics.Instrumentation;
-import io.odpf.depot.redis.client.response.RedisResponse;
-import io.odpf.depot.redis.record.RedisRecord;
-import io.odpf.depot.redis.ttl.RedisTtl;
+import org.raystack.depot.redis.client.response.RedisResponse;
+import org.raystack.depot.redis.record.RedisRecord;
+import org.raystack.depot.redis.ttl.RedisTtl;
+import org.raystack.depot.metrics.Instrumentation;
import lombok.AllArgsConstructor;
import redis.clients.jedis.JedisCluster;
diff --git a/src/main/java/io/odpf/depot/redis/client/RedisStandaloneClient.java b/src/main/java/org/raystack/depot/redis/client/RedisStandaloneClient.java
similarity index 81%
rename from src/main/java/io/odpf/depot/redis/client/RedisStandaloneClient.java
rename to src/main/java/org/raystack/depot/redis/client/RedisStandaloneClient.java
index 12046cab..5ece6753 100644
--- a/src/main/java/io/odpf/depot/redis/client/RedisStandaloneClient.java
+++ b/src/main/java/org/raystack/depot/redis/client/RedisStandaloneClient.java
@@ -1,10 +1,10 @@
-package io.odpf.depot.redis.client;
+package org.raystack.depot.redis.client;
-import io.odpf.depot.metrics.Instrumentation;
-import io.odpf.depot.redis.client.response.RedisResponse;
-import io.odpf.depot.redis.client.response.RedisStandaloneResponse;
-import io.odpf.depot.redis.record.RedisRecord;
-import io.odpf.depot.redis.ttl.RedisTtl;
+import org.raystack.depot.redis.client.response.RedisResponse;
+import org.raystack.depot.redis.client.response.RedisStandaloneResponse;
+import org.raystack.depot.redis.record.RedisRecord;
+import org.raystack.depot.redis.ttl.RedisTtl;
+import org.raystack.depot.metrics.Instrumentation;
import lombok.AllArgsConstructor;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
diff --git a/src/main/java/io/odpf/depot/redis/client/entry/RedisEntry.java b/src/main/java/org/raystack/depot/redis/client/entry/RedisEntry.java
similarity index 72%
rename from src/main/java/io/odpf/depot/redis/client/entry/RedisEntry.java
rename to src/main/java/org/raystack/depot/redis/client/entry/RedisEntry.java
index 2ced72ab..3cf93e1b 100644
--- a/src/main/java/io/odpf/depot/redis/client/entry/RedisEntry.java
+++ b/src/main/java/org/raystack/depot/redis/client/entry/RedisEntry.java
@@ -1,8 +1,8 @@
-package io.odpf.depot.redis.client.entry;
+package org.raystack.depot.redis.client.entry;
-import io.odpf.depot.redis.client.response.RedisClusterResponse;
-import io.odpf.depot.redis.client.response.RedisStandaloneResponse;
-import io.odpf.depot.redis.ttl.RedisTtl;
+import org.raystack.depot.redis.client.response.RedisStandaloneResponse;
+import org.raystack.depot.redis.client.response.RedisClusterResponse;
+import org.raystack.depot.redis.ttl.RedisTtl;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.Pipeline;
diff --git a/src/main/java/io/odpf/depot/redis/client/entry/RedisHashSetFieldEntry.java b/src/main/java/org/raystack/depot/redis/client/entry/RedisHashSetFieldEntry.java
similarity index 86%
rename from src/main/java/io/odpf/depot/redis/client/entry/RedisHashSetFieldEntry.java
rename to src/main/java/org/raystack/depot/redis/client/entry/RedisHashSetFieldEntry.java
index 9160c5c9..fea0e924 100644
--- a/src/main/java/io/odpf/depot/redis/client/entry/RedisHashSetFieldEntry.java
+++ b/src/main/java/org/raystack/depot/redis/client/entry/RedisHashSetFieldEntry.java
@@ -1,9 +1,9 @@
-package io.odpf.depot.redis.client.entry;
+package org.raystack.depot.redis.client.entry;
-import io.odpf.depot.metrics.Instrumentation;
-import io.odpf.depot.redis.client.response.RedisClusterResponse;
-import io.odpf.depot.redis.client.response.RedisStandaloneResponse;
-import io.odpf.depot.redis.ttl.RedisTtl;
+import org.raystack.depot.redis.client.response.RedisClusterResponse;
+import org.raystack.depot.redis.client.response.RedisStandaloneResponse;
+import org.raystack.depot.metrics.Instrumentation;
+import org.raystack.depot.redis.ttl.RedisTtl;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import redis.clients.jedis.JedisCluster;
diff --git a/src/main/java/io/odpf/depot/redis/client/entry/RedisKeyValueEntry.java b/src/main/java/org/raystack/depot/redis/client/entry/RedisKeyValueEntry.java
similarity index 83%
rename from src/main/java/io/odpf/depot/redis/client/entry/RedisKeyValueEntry.java
rename to src/main/java/org/raystack/depot/redis/client/entry/RedisKeyValueEntry.java
index 7d55ffb4..8c4d3a10 100644
--- a/src/main/java/io/odpf/depot/redis/client/entry/RedisKeyValueEntry.java
+++ b/src/main/java/org/raystack/depot/redis/client/entry/RedisKeyValueEntry.java
@@ -1,9 +1,9 @@
-package io.odpf.depot.redis.client.entry;
+package org.raystack.depot.redis.client.entry;
-import io.odpf.depot.metrics.Instrumentation;
-import io.odpf.depot.redis.client.response.RedisClusterResponse;
-import io.odpf.depot.redis.client.response.RedisStandaloneResponse;
-import io.odpf.depot.redis.ttl.RedisTtl;
+import org.raystack.depot.redis.client.response.RedisClusterResponse;
+import org.raystack.depot.redis.client.response.RedisStandaloneResponse;
+import org.raystack.depot.metrics.Instrumentation;
+import org.raystack.depot.redis.ttl.RedisTtl;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import redis.clients.jedis.JedisCluster;
diff --git a/src/main/java/io/odpf/depot/redis/client/entry/RedisListEntry.java b/src/main/java/org/raystack/depot/redis/client/entry/RedisListEntry.java
similarity index 84%
rename from src/main/java/io/odpf/depot/redis/client/entry/RedisListEntry.java
rename to src/main/java/org/raystack/depot/redis/client/entry/RedisListEntry.java
index 23975404..f33ad671 100644
--- a/src/main/java/io/odpf/depot/redis/client/entry/RedisListEntry.java
+++ b/src/main/java/org/raystack/depot/redis/client/entry/RedisListEntry.java
@@ -1,9 +1,9 @@
-package io.odpf.depot.redis.client.entry;
+package org.raystack.depot.redis.client.entry;
-import io.odpf.depot.metrics.Instrumentation;
-import io.odpf.depot.redis.client.response.RedisClusterResponse;
-import io.odpf.depot.redis.client.response.RedisStandaloneResponse;
-import io.odpf.depot.redis.ttl.RedisTtl;
+import org.raystack.depot.redis.client.response.RedisClusterResponse;
+import org.raystack.depot.redis.client.response.RedisStandaloneResponse;
+import org.raystack.depot.metrics.Instrumentation;
+import org.raystack.depot.redis.ttl.RedisTtl;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import redis.clients.jedis.JedisCluster;
diff --git a/src/main/java/io/odpf/depot/redis/client/response/RedisClusterResponse.java b/src/main/java/org/raystack/depot/redis/client/response/RedisClusterResponse.java
similarity index 92%
rename from src/main/java/io/odpf/depot/redis/client/response/RedisClusterResponse.java
rename to src/main/java/org/raystack/depot/redis/client/response/RedisClusterResponse.java
index e245f90a..6a0555c7 100644
--- a/src/main/java/io/odpf/depot/redis/client/response/RedisClusterResponse.java
+++ b/src/main/java/org/raystack/depot/redis/client/response/RedisClusterResponse.java
@@ -1,4 +1,4 @@
-package io.odpf.depot.redis.client.response;
+package org.raystack.depot.redis.client.response;
import lombok.Getter;
diff --git a/src/main/java/io/odpf/depot/redis/client/response/RedisResponse.java b/src/main/java/org/raystack/depot/redis/client/response/RedisResponse.java
similarity index 63%
rename from src/main/java/io/odpf/depot/redis/client/response/RedisResponse.java
rename to src/main/java/org/raystack/depot/redis/client/response/RedisResponse.java
index 4e5c6c28..dad50441 100644
--- a/src/main/java/io/odpf/depot/redis/client/response/RedisResponse.java
+++ b/src/main/java/org/raystack/depot/redis/client/response/RedisResponse.java
@@ -1,4 +1,4 @@
-package io.odpf.depot.redis.client.response;
+package org.raystack.depot.redis.client.response;
public interface RedisResponse {
String getMessage();
diff --git a/src/main/java/io/odpf/depot/redis/client/response/RedisStandaloneResponse.java b/src/main/java/org/raystack/depot/redis/client/response/RedisStandaloneResponse.java
similarity index 95%
rename from src/main/java/io/odpf/depot/redis/client/response/RedisStandaloneResponse.java
rename to src/main/java/org/raystack/depot/redis/client/response/RedisStandaloneResponse.java
index 8791c190..bc168ccc 100644
--- a/src/main/java/io/odpf/depot/redis/client/response/RedisStandaloneResponse.java
+++ b/src/main/java/org/raystack/depot/redis/client/response/RedisStandaloneResponse.java
@@ -1,4 +1,4 @@
-package io.odpf.depot.redis.client.response;
+package org.raystack.depot.redis.client.response;
import lombok.Getter;
import redis.clients.jedis.Response;
diff --git a/src/main/java/io/odpf/depot/redis/enums/RedisSinkDataType.java b/src/main/java/org/raystack/depot/redis/enums/RedisSinkDataType.java
similarity index 64%
rename from src/main/java/io/odpf/depot/redis/enums/RedisSinkDataType.java
rename to src/main/java/org/raystack/depot/redis/enums/RedisSinkDataType.java
index 23d791a2..bb9688cd 100644
--- a/src/main/java/io/odpf/depot/redis/enums/RedisSinkDataType.java
+++ b/src/main/java/org/raystack/depot/redis/enums/RedisSinkDataType.java
@@ -1,4 +1,4 @@
-package io.odpf.depot.redis.enums;
+package org.raystack.depot.redis.enums;
public enum RedisSinkDataType {
LIST,
diff --git a/src/main/java/io/odpf/depot/redis/enums/RedisSinkDeploymentType.java b/src/main/java/org/raystack/depot/redis/enums/RedisSinkDeploymentType.java
similarity index 63%
rename from src/main/java/io/odpf/depot/redis/enums/RedisSinkDeploymentType.java
rename to src/main/java/org/raystack/depot/redis/enums/RedisSinkDeploymentType.java
index 85a70bcd..e4f8968b 100644
--- a/src/main/java/io/odpf/depot/redis/enums/RedisSinkDeploymentType.java
+++ b/src/main/java/org/raystack/depot/redis/enums/RedisSinkDeploymentType.java
@@ -1,4 +1,4 @@
-package io.odpf.depot.redis.enums;
+package org.raystack.depot.redis.enums;
public enum RedisSinkDeploymentType {
STANDALONE,
diff --git a/src/main/java/io/odpf/depot/redis/enums/RedisSinkTtlType.java b/src/main/java/org/raystack/depot/redis/enums/RedisSinkTtlType.java
similarity index 65%
rename from src/main/java/io/odpf/depot/redis/enums/RedisSinkTtlType.java
rename to src/main/java/org/raystack/depot/redis/enums/RedisSinkTtlType.java
index 41d76a4f..f06b2178 100644
--- a/src/main/java/io/odpf/depot/redis/enums/RedisSinkTtlType.java
+++ b/src/main/java/org/raystack/depot/redis/enums/RedisSinkTtlType.java
@@ -1,4 +1,4 @@
-package io.odpf.depot.redis.enums;
+package org.raystack.depot.redis.enums;
public enum RedisSinkTtlType {
EXACT_TIME,
diff --git a/src/main/java/org/raystack/depot/redis/parsers/RedisEntryParser.java b/src/main/java/org/raystack/depot/redis/parsers/RedisEntryParser.java
new file mode 100644
index 00000000..6c6c493d
--- /dev/null
+++ b/src/main/java/org/raystack/depot/redis/parsers/RedisEntryParser.java
@@ -0,0 +1,11 @@
+package org.raystack.depot.redis.parsers;
+
+import org.raystack.depot.redis.client.entry.RedisEntry;
+import org.raystack.depot.message.ParsedMessage;
+
+import java.util.List;
+
+public interface RedisEntryParser {
+
+ List getRedisEntry(ParsedMessage parsedMessage);
+}
diff --git a/src/main/java/io/odpf/depot/redis/parsers/RedisEntryParserFactory.java b/src/main/java/org/raystack/depot/redis/parsers/RedisEntryParserFactory.java
similarity index 86%
rename from src/main/java/io/odpf/depot/redis/parsers/RedisEntryParserFactory.java
rename to src/main/java/org/raystack/depot/redis/parsers/RedisEntryParserFactory.java
index 974522c6..c699f192 100644
--- a/src/main/java/io/odpf/depot/redis/parsers/RedisEntryParserFactory.java
+++ b/src/main/java/org/raystack/depot/redis/parsers/RedisEntryParserFactory.java
@@ -1,10 +1,10 @@
-package io.odpf.depot.redis.parsers;
+package org.raystack.depot.redis.parsers;
-import io.odpf.depot.common.Template;
-import io.odpf.depot.config.RedisSinkConfig;
-import io.odpf.depot.exception.InvalidTemplateException;
-import io.odpf.depot.message.OdpfMessageSchema;
-import io.odpf.depot.metrics.StatsDReporter;
+import org.raystack.depot.common.Template;
+import org.raystack.depot.config.RedisSinkConfig;
+import org.raystack.depot.exception.InvalidTemplateException;
+import org.raystack.depot.message.MessageSchema;
+import org.raystack.depot.metrics.StatsDReporter;
import java.util.Map;
import java.util.Properties;
@@ -18,7 +18,7 @@ public class RedisEntryParserFactory {
public static RedisEntryParser getRedisEntryParser(
RedisSinkConfig redisSinkConfig,
StatsDReporter statsDReporter,
- OdpfMessageSchema schema) {
+ MessageSchema schema) {
Template keyTemplate;
try {
keyTemplate = new Template(redisSinkConfig.getSinkRedisKeyTemplate());
@@ -51,8 +51,7 @@ public static RedisEntryParser getRedisEntryParser(
} catch (InvalidTemplateException e) {
throw new IllegalArgumentException(e.getMessage());
}
- }
- ));
+ }));
return new RedisHashSetEntryParser(statsDReporter, keyTemplate, fieldTemplates, schema);
}
}
diff --git a/src/main/java/org/raystack/depot/redis/parsers/RedisHashSetEntryParser.java b/src/main/java/org/raystack/depot/redis/parsers/RedisHashSetEntryParser.java
new file mode 100644
index 00000000..913685bf
--- /dev/null
+++ b/src/main/java/org/raystack/depot/redis/parsers/RedisHashSetEntryParser.java
@@ -0,0 +1,41 @@
+package org.raystack.depot.redis.parsers;
+
+import org.raystack.depot.message.field.GenericFieldFactory;
+import org.raystack.depot.redis.client.entry.RedisEntry;
+import org.raystack.depot.redis.client.entry.RedisHashSetFieldEntry;
+import org.raystack.depot.common.Template;
+import org.raystack.depot.message.MessageSchema;
+import org.raystack.depot.message.ParsedMessage;
+import org.raystack.depot.metrics.Instrumentation;
+import org.raystack.depot.metrics.StatsDReporter;
+import lombok.AllArgsConstructor;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Redis hash set parser.
+ */
+@AllArgsConstructor
+public class RedisHashSetEntryParser implements RedisEntryParser {
+ private final StatsDReporter statsDReporter;
+ private final Template keyTemplate;
+ private final Map fieldTemplates;
+ private final MessageSchema schema;
+
+ @Override
+ public List getRedisEntry(ParsedMessage parsedMessage) {
+ String redisKey = keyTemplate.parse(parsedMessage, schema);
+ return fieldTemplates
+ .entrySet()
+ .stream()
+ .map(fieldTemplate -> {
+ String field = fieldTemplate.getValue().parse(parsedMessage, schema);
+ String redisValue = GenericFieldFactory
+ .getField(parsedMessage.getFieldByName(fieldTemplate.getKey(), schema)).getString();
+ return new RedisHashSetFieldEntry(redisKey, field, redisValue,
+ new Instrumentation(statsDReporter, RedisHashSetFieldEntry.class));
+ }).collect(Collectors.toList());
+ }
+}
diff --git a/src/main/java/org/raystack/depot/redis/parsers/RedisKeyValueEntryParser.java b/src/main/java/org/raystack/depot/redis/parsers/RedisKeyValueEntryParser.java
new file mode 100644
index 00000000..1ff13b08
--- /dev/null
+++ b/src/main/java/org/raystack/depot/redis/parsers/RedisKeyValueEntryParser.java
@@ -0,0 +1,31 @@
+package org.raystack.depot.redis.parsers;
+
+import org.raystack.depot.message.field.GenericFieldFactory;
+import org.raystack.depot.redis.client.entry.RedisEntry;
+import org.raystack.depot.redis.client.entry.RedisKeyValueEntry;
+import org.raystack.depot.common.Template;
+import org.raystack.depot.message.MessageSchema;
+import org.raystack.depot.message.ParsedMessage;
+import org.raystack.depot.metrics.Instrumentation;
+import org.raystack.depot.metrics.StatsDReporter;
+import lombok.AllArgsConstructor;
+
+import java.util.Collections;
+import java.util.List;
+
+@AllArgsConstructor
+public class RedisKeyValueEntryParser implements RedisEntryParser {
+ private final StatsDReporter statsDReporter;
+ private final Template keyTemplate;
+ private final String fieldName;
+ private final MessageSchema schema;
+
+ @Override
+ public List getRedisEntry(ParsedMessage parsedMessage) {
+ String redisKey = keyTemplate.parse(parsedMessage, schema);
+ String redisValue = GenericFieldFactory.getField(parsedMessage.getFieldByName(fieldName, schema)).getString();
+ RedisKeyValueEntry redisKeyValueEntry = new RedisKeyValueEntry(redisKey, redisValue,
+ new Instrumentation(statsDReporter, RedisKeyValueEntry.class));
+ return Collections.singletonList(redisKeyValueEntry);
+ }
+}
diff --git a/src/main/java/org/raystack/depot/redis/parsers/RedisListEntryParser.java b/src/main/java/org/raystack/depot/redis/parsers/RedisListEntryParser.java
new file mode 100644
index 00000000..d7873c6c
--- /dev/null
+++ b/src/main/java/org/raystack/depot/redis/parsers/RedisListEntryParser.java
@@ -0,0 +1,33 @@
+package org.raystack.depot.redis.parsers;
+
+import org.raystack.depot.message.field.GenericFieldFactory;
+import org.raystack.depot.redis.client.entry.RedisEntry;
+import org.raystack.depot.redis.client.entry.RedisListEntry;
+import org.raystack.depot.common.Template;
+import org.raystack.depot.message.MessageSchema;
+import org.raystack.depot.message.ParsedMessage;
+import org.raystack.depot.metrics.Instrumentation;
+import org.raystack.depot.metrics.StatsDReporter;
+import lombok.AllArgsConstructor;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Redis list parser.
+ */
+@AllArgsConstructor
+public class RedisListEntryParser implements RedisEntryParser {
+ private final StatsDReporter statsDReporter;
+ private final Template keyTemplate;
+ private final String field;
+ private final MessageSchema schema;
+
+ @Override
+ public List getRedisEntry(ParsedMessage parsedMessage) {
+ String redisKey = keyTemplate.parse(parsedMessage, schema);
+ String redisValue = GenericFieldFactory.getField(parsedMessage.getFieldByName(field, schema)).getString();
+ return Collections.singletonList(
+ new RedisListEntry(redisKey, redisValue, new Instrumentation(statsDReporter, RedisListEntry.class)));
+ }
+}
diff --git a/src/main/java/io/odpf/depot/redis/parsers/RedisParser.java b/src/main/java/org/raystack/depot/redis/parsers/RedisParser.java
similarity index 59%
rename from src/main/java/io/odpf/depot/redis/parsers/RedisParser.java
rename to src/main/java/org/raystack/depot/redis/parsers/RedisParser.java
index bed1237d..16372e5e 100644
--- a/src/main/java/io/odpf/depot/redis/parsers/RedisParser.java
+++ b/src/main/java/org/raystack/depot/redis/parsers/RedisParser.java
@@ -1,16 +1,16 @@
-package io.odpf.depot.redis.parsers;
+package org.raystack.depot.redis.parsers;
-import io.odpf.depot.common.Tuple;
-import io.odpf.depot.error.ErrorInfo;
-import io.odpf.depot.error.ErrorType;
-import io.odpf.depot.exception.ConfigurationException;
-import io.odpf.depot.exception.DeserializerException;
-import io.odpf.depot.message.OdpfMessage;
-import io.odpf.depot.message.OdpfMessageParser;
-import io.odpf.depot.message.ParsedOdpfMessage;
-import io.odpf.depot.message.SinkConnectorSchemaMessageMode;
-import io.odpf.depot.redis.client.entry.RedisEntry;
-import io.odpf.depot.redis.record.RedisRecord;
+import org.raystack.depot.redis.client.entry.RedisEntry;
+import org.raystack.depot.redis.record.RedisRecord;
+import org.raystack.depot.common.Tuple;
+import org.raystack.depot.error.ErrorInfo;
+import org.raystack.depot.error.ErrorType;
+import org.raystack.depot.exception.ConfigurationException;
+import org.raystack.depot.exception.DeserializerException;
+import org.raystack.depot.message.Message;
+import org.raystack.depot.message.MessageParser;
+import org.raystack.depot.message.ParsedMessage;
+import org.raystack.depot.message.SinkConnectorSchemaMessageMode;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -19,26 +19,27 @@
import java.util.List;
import java.util.stream.IntStream;
-
/**
- * Convert Odpf messages to RedisRecords.
+ * Convert Messages to RedisRecords.
*/
@AllArgsConstructor
@Slf4j
public class RedisParser {
- private final OdpfMessageParser odpfMessageParser;
+ private final MessageParser messageParser;
private final RedisEntryParser redisEntryParser;
private final Tuple modeAndSchema;
- public List convert(List messages) {
+ public List convert(List messages) {
List records = new ArrayList<>();
IntStream.range(0, messages.size()).forEach(index -> {
try {
- ParsedOdpfMessage parsedOdpfMessage = odpfMessageParser.parse(messages.get(index), modeAndSchema.getFirst(), modeAndSchema.getSecond());
- List redisDataEntries = redisEntryParser.getRedisEntry(parsedOdpfMessage);
+ ParsedMessage parsedMessage = messageParser.parse(messages.get(index), modeAndSchema.getFirst(),
+ modeAndSchema.getSecond());
+ List redisDataEntries = redisEntryParser.getRedisEntry(parsedMessage);
for (RedisEntry redisEntry : redisDataEntries) {
- records.add(new RedisRecord(redisEntry, (long) index, null, messages.get(index).getMetadataString(), true));
+ records.add(new RedisRecord(redisEntry, (long) index, null, messages.get(index).getMetadataString(),
+ true));
}
} catch (UnsupportedOperationException e) {
records.add(createAndLogErrorRecord(e, ErrorType.INVALID_MESSAGE_ERROR, index, messages));
@@ -53,9 +54,10 @@ public List convert(List messages) {
return records;
}
- private RedisRecord createAndLogErrorRecord(Exception e, ErrorType type, int index, List messages) {
+ private RedisRecord createAndLogErrorRecord(Exception e, ErrorType type, int index, List messages) {
ErrorInfo errorInfo = new ErrorInfo(e, type);
- RedisRecord record = new RedisRecord(null, (long) index, errorInfo, messages.get(index).getMetadataString(), false);
+ RedisRecord record = new RedisRecord(null, (long) index, errorInfo, messages.get(index).getMetadataString(),
+ false);
log.error("Error while parsing record for message. Record: {}, Error: {}", record, errorInfo);
return record;
}
diff --git a/src/main/java/io/odpf/depot/redis/record/RedisRecord.java b/src/main/java/org/raystack/depot/redis/record/RedisRecord.java
similarity index 72%
rename from src/main/java/io/odpf/depot/redis/record/RedisRecord.java
rename to src/main/java/org/raystack/depot/redis/record/RedisRecord.java
index d526306d..cf471450 100644
--- a/src/main/java/io/odpf/depot/redis/record/RedisRecord.java
+++ b/src/main/java/org/raystack/depot/redis/record/RedisRecord.java
@@ -1,16 +1,15 @@
-package io.odpf.depot.redis.record;
+package org.raystack.depot.redis.record;
-import io.odpf.depot.error.ErrorInfo;
-import io.odpf.depot.redis.client.entry.RedisEntry;
-import io.odpf.depot.redis.client.response.RedisClusterResponse;
-import io.odpf.depot.redis.client.response.RedisStandaloneResponse;
-import io.odpf.depot.redis.ttl.RedisTtl;
+import org.raystack.depot.redis.client.entry.RedisEntry;
+import org.raystack.depot.redis.client.response.RedisClusterResponse;
+import org.raystack.depot.redis.client.response.RedisStandaloneResponse;
+import org.raystack.depot.error.ErrorInfo;
+import org.raystack.depot.redis.ttl.RedisTtl;
import lombok.AllArgsConstructor;
import lombok.Getter;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.Pipeline;
-
@AllArgsConstructor
public class RedisRecord {
private RedisEntry redisEntry;
diff --git a/src/main/java/io/odpf/depot/redis/ttl/DurationTtl.java b/src/main/java/org/raystack/depot/redis/ttl/DurationTtl.java
similarity index 93%
rename from src/main/java/io/odpf/depot/redis/ttl/DurationTtl.java
rename to src/main/java/org/raystack/depot/redis/ttl/DurationTtl.java
index 557056a5..e6bbb299 100644
--- a/src/main/java/io/odpf/depot/redis/ttl/DurationTtl.java
+++ b/src/main/java/org/raystack/depot/redis/ttl/DurationTtl.java
@@ -1,11 +1,10 @@
-package io.odpf.depot.redis.ttl;
+package org.raystack.depot.redis.ttl;
import lombok.AllArgsConstructor;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
-
@AllArgsConstructor
public class DurationTtl implements RedisTtl {
private int ttlInSeconds;
diff --git a/src/main/java/io/odpf/depot/redis/ttl/ExactTimeTtl.java b/src/main/java/org/raystack/depot/redis/ttl/ExactTimeTtl.java
similarity index 93%
rename from src/main/java/io/odpf/depot/redis/ttl/ExactTimeTtl.java
rename to src/main/java/org/raystack/depot/redis/ttl/ExactTimeTtl.java
index e678a754..971b3c31 100644
--- a/src/main/java/io/odpf/depot/redis/ttl/ExactTimeTtl.java
+++ b/src/main/java/org/raystack/depot/redis/ttl/ExactTimeTtl.java
@@ -1,11 +1,10 @@
-package io.odpf.depot.redis.ttl;
+package org.raystack.depot.redis.ttl;
import lombok.AllArgsConstructor;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
-
@AllArgsConstructor
public class ExactTimeTtl implements RedisTtl {
private long unixTime;
diff --git a/src/main/java/io/odpf/depot/redis/ttl/NoRedisTtl.java b/src/main/java/org/raystack/depot/redis/ttl/NoRedisTtl.java
similarity index 90%
rename from src/main/java/io/odpf/depot/redis/ttl/NoRedisTtl.java
rename to src/main/java/org/raystack/depot/redis/ttl/NoRedisTtl.java
index 076f45cd..8b22434b 100644
--- a/src/main/java/io/odpf/depot/redis/ttl/NoRedisTtl.java
+++ b/src/main/java/org/raystack/depot/redis/ttl/NoRedisTtl.java
@@ -1,4 +1,4 @@
-package io.odpf.depot.redis.ttl;
+package org.raystack.depot.redis.ttl;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.Pipeline;
diff --git a/src/main/java/io/odpf/depot/redis/ttl/RedisTTLFactory.java b/src/main/java/org/raystack/depot/redis/ttl/RedisTTLFactory.java
similarity index 78%
rename from src/main/java/io/odpf/depot/redis/ttl/RedisTTLFactory.java
rename to src/main/java/org/raystack/depot/redis/ttl/RedisTTLFactory.java
index f6fb0642..5f6c2f80 100644
--- a/src/main/java/io/odpf/depot/redis/ttl/RedisTTLFactory.java
+++ b/src/main/java/org/raystack/depot/redis/ttl/RedisTTLFactory.java
@@ -1,9 +1,8 @@
-package io.odpf.depot.redis.ttl;
+package org.raystack.depot.redis.ttl;
-
-import io.odpf.depot.config.RedisSinkConfig;
-import io.odpf.depot.exception.ConfigurationException;
-import io.odpf.depot.redis.enums.RedisSinkTtlType;
+import org.raystack.depot.redis.enums.RedisSinkTtlType;
+import org.raystack.depot.config.RedisSinkConfig;
+import org.raystack.depot.exception.ConfigurationException;
public class RedisTTLFactory {
diff --git a/src/main/java/io/odpf/depot/redis/ttl/RedisTtl.java b/src/main/java/org/raystack/depot/redis/ttl/RedisTtl.java
similarity index 88%
rename from src/main/java/io/odpf/depot/redis/ttl/RedisTtl.java
rename to src/main/java/org/raystack/depot/redis/ttl/RedisTtl.java
index 2ebc8292..255340cc 100644
--- a/src/main/java/io/odpf/depot/redis/ttl/RedisTtl.java
+++ b/src/main/java/org/raystack/depot/redis/ttl/RedisTtl.java
@@ -1,4 +1,4 @@
-package io.odpf.depot.redis.ttl;
+package org.raystack.depot.redis.ttl;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.Pipeline;
diff --git a/src/main/java/io/odpf/depot/redis/util/RedisSinkUtils.java b/src/main/java/org/raystack/depot/redis/util/RedisSinkUtils.java
similarity index 56%
rename from src/main/java/io/odpf/depot/redis/util/RedisSinkUtils.java
rename to src/main/java/org/raystack/depot/redis/util/RedisSinkUtils.java
index 466d60ab..ba9c7b17 100644
--- a/src/main/java/io/odpf/depot/redis/util/RedisSinkUtils.java
+++ b/src/main/java/org/raystack/depot/redis/util/RedisSinkUtils.java
@@ -1,10 +1,10 @@
-package io.odpf.depot.redis.util;
+package org.raystack.depot.redis.util;
-import io.odpf.depot.error.ErrorInfo;
-import io.odpf.depot.error.ErrorType;
-import io.odpf.depot.metrics.Instrumentation;
-import io.odpf.depot.redis.client.response.RedisResponse;
-import io.odpf.depot.redis.record.RedisRecord;
+import org.raystack.depot.redis.client.response.RedisResponse;
+import org.raystack.depot.redis.record.RedisRecord;
+import org.raystack.depot.error.ErrorInfo;
+import org.raystack.depot.error.ErrorType;
+import org.raystack.depot.metrics.Instrumentation;
import java.util.HashMap;
import java.util.List;
@@ -12,7 +12,8 @@
import java.util.stream.IntStream;
public class RedisSinkUtils {
- public static Map getErrorsFromResponse(List redisRecords, List responses, Instrumentation instrumentation) {
+ public static Map getErrorsFromResponse(List redisRecords,
+ List responses, Instrumentation instrumentation) {
Map errors = new HashMap<>();
IntStream.range(0, responses.size()).forEach(
index -> {
@@ -21,10 +22,10 @@ public static Map getErrorsFromResponse(List redis
RedisRecord record = redisRecords.get(index);
instrumentation.logError("Error while inserting to redis for message. Record: {}, Error: {}",
record.toString(), response.getMessage());
- errors.put(record.getIndex(), new ErrorInfo(new Exception(response.getMessage()), ErrorType.DEFAULT_ERROR));
+ errors.put(record.getIndex(),
+ new ErrorInfo(new Exception(response.getMessage()), ErrorType.DEFAULT_ERROR));
}
- }
- );
+ });
return errors;
}
}
diff --git a/src/main/java/io/odpf/depot/stencil/OdpfStencilUpdateListener.java b/src/main/java/org/raystack/depot/stencil/DepotStencilUpdateListener.java
similarity index 55%
rename from src/main/java/io/odpf/depot/stencil/OdpfStencilUpdateListener.java
rename to src/main/java/org/raystack/depot/stencil/DepotStencilUpdateListener.java
index d5ff22a6..f795535f 100644
--- a/src/main/java/io/odpf/depot/stencil/OdpfStencilUpdateListener.java
+++ b/src/main/java/org/raystack/depot/stencil/DepotStencilUpdateListener.java
@@ -1,17 +1,17 @@
-package io.odpf.depot.stencil;
+package org.raystack.depot.stencil;
import com.google.protobuf.Descriptors;
-import io.odpf.depot.message.OdpfMessageParser;
-import io.odpf.stencil.SchemaUpdateListener;
+import org.raystack.depot.message.MessageParser;
+import org.raystack.stencil.SchemaUpdateListener;
import lombok.Getter;
import lombok.Setter;
import java.util.Map;
-public abstract class OdpfStencilUpdateListener implements SchemaUpdateListener {
+public abstract class DepotStencilUpdateListener implements SchemaUpdateListener {
@Getter
@Setter
- private OdpfMessageParser odpfMessageParser;
+ private MessageParser messageParser;
public void onSchemaUpdate(final Map newDescriptor) {
// default implementation is empty
diff --git a/src/main/java/io/odpf/depot/utils/DateUtils.java b/src/main/java/org/raystack/depot/utils/DateUtils.java
similarity index 94%
rename from src/main/java/io/odpf/depot/utils/DateUtils.java
rename to src/main/java/org/raystack/depot/utils/DateUtils.java
index 5b57d93d..b244e1c3 100644
--- a/src/main/java/io/odpf/depot/utils/DateUtils.java
+++ b/src/main/java/org/raystack/depot/utils/DateUtils.java
@@ -1,4 +1,4 @@
-package io.odpf.depot.utils;
+package org.raystack.depot.utils;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
diff --git a/src/main/java/io/odpf/depot/utils/JsonUtils.java b/src/main/java/org/raystack/depot/utils/JsonUtils.java
similarity index 87%
rename from src/main/java/io/odpf/depot/utils/JsonUtils.java
rename to src/main/java/org/raystack/depot/utils/JsonUtils.java
index 37c3e3c2..89e77e01 100644
--- a/src/main/java/io/odpf/depot/utils/JsonUtils.java
+++ b/src/main/java/org/raystack/depot/utils/JsonUtils.java
@@ -1,6 +1,6 @@
-package io.odpf.depot.utils;
+package org.raystack.depot.utils;
-import io.odpf.depot.config.OdpfSinkConfig;
+import org.raystack.depot.config.SinkConfig;
import org.json.JSONObject;
public class JsonUtils {
@@ -12,7 +12,7 @@ public class JsonUtils {
* @param payload Json Payload in byyes
* @return Json object
*/
- public static JSONObject getJsonObject(OdpfSinkConfig config, byte[] payload) {
+ public static JSONObject getJsonObject(SinkConfig config, byte[] payload) {
JSONObject jsonObject = new JSONObject(new String(payload));
if (!config.getSinkConnectorSchemaJsonParserStringModeEnabled()) {
return jsonObject;
diff --git a/src/main/java/io/odpf/depot/utils/MessageConfigUtils.java b/src/main/java/org/raystack/depot/utils/MessageConfigUtils.java
similarity index 56%
rename from src/main/java/io/odpf/depot/utils/MessageConfigUtils.java
rename to src/main/java/org/raystack/depot/utils/MessageConfigUtils.java
index fcc3f7eb..c884e935 100644
--- a/src/main/java/io/odpf/depot/utils/MessageConfigUtils.java
+++ b/src/main/java/org/raystack/depot/utils/MessageConfigUtils.java
@@ -1,15 +1,16 @@
-package io.odpf.depot.utils;
+package org.raystack.depot.utils;
-import io.odpf.depot.common.Tuple;
-import io.odpf.depot.config.OdpfSinkConfig;
-import io.odpf.depot.message.SinkConnectorSchemaMessageMode;
+import org.raystack.depot.config.SinkConfig;
+import org.raystack.depot.message.SinkConnectorSchemaMessageMode;
+import org.raystack.depot.common.Tuple;
public class MessageConfigUtils {
- public static Tuple getModeAndSchema(OdpfSinkConfig sinkConfig) {
+ public static Tuple getModeAndSchema(SinkConfig sinkConfig) {
SinkConnectorSchemaMessageMode mode = sinkConfig.getSinkConnectorSchemaMessageMode();
String schemaClass = mode == SinkConnectorSchemaMessageMode.LOG_MESSAGE
- ? sinkConfig.getSinkConnectorSchemaProtoMessageClass() : sinkConfig.getSinkConnectorSchemaProtoKeyClass();
+ ? sinkConfig.getSinkConnectorSchemaProtoMessageClass()
+ : sinkConfig.getSinkConnectorSchemaProtoKeyClass();
return new Tuple<>(mode, schemaClass);
}
}
diff --git a/src/main/java/io/odpf/depot/utils/ProtoUtils.java b/src/main/java/org/raystack/depot/utils/ProtoUtils.java
similarity index 92%
rename from src/main/java/io/odpf/depot/utils/ProtoUtils.java
rename to src/main/java/org/raystack/depot/utils/ProtoUtils.java
index 48bacb87..d47a1b1f 100644
--- a/src/main/java/io/odpf/depot/utils/ProtoUtils.java
+++ b/src/main/java/org/raystack/depot/utils/ProtoUtils.java
@@ -1,4 +1,4 @@
-package io.odpf.depot.utils;
+package org.raystack.depot.utils;
import com.google.protobuf.DynamicMessage;
@@ -37,6 +37,7 @@ private static List collectNestedFields(DynamicMessage node) {
}
private static List