Skip to content

Commit

Permalink
[CALCITE-4768] Upgrade DataStax Driver for Apache Cassandra® version …
Browse files Browse the repository at this point in the history
…to latest 4.x

* Updated Cassandra from 3.11.2 to 4.0.1
* Updated Datastax Driver from 3.6.0 to 4.13.0
* Updated cassandra-unit from 3.5.0.1 to 4.3.1.0
* Cassandra tests are:
 - now compatible with Guava >= 25 but are not anymore compatible with Guava < 20
 - still incompatible with Eclipse OpenJ9 (due to Cassandra)
 - still incompatible with JDK11+ (due to cassandra-unit)
  • Loading branch information
asolimando authored and NobiGo committed Oct 29, 2021
1 parent 2c17f7a commit cbe6a7b
Show file tree
Hide file tree
Showing 23 changed files with 1,503 additions and 619 deletions.
2 changes: 1 addition & 1 deletion bom/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ dependencies {
apiv("com.alibaba.database:innodb-java-reader")
apiv("com.beust:jcommander")
apiv("org.checkerframework:checker-qual", "checkerframework")
apiv("com.datastax.cassandra:cassandra-driver-core")
apiv("com.datastax.oss:java-driver-core", "cassandra-java-driver-core")
apiv("com.esri.geometry:esri-geometry-api")
apiv("com.fasterxml.jackson.core:jackson-databind")
apiv("com.github.kstyrc:embedded-redis")
Expand Down
2 changes: 1 addition & 1 deletion cassandra/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ dependencies {
api(project(":core"))
api(project(":linq4j"))

api("com.datastax.cassandra:cassandra-driver-core")
api("com.datastax.oss:java-driver-core")
api("com.google.guava:guava")
api("org.slf4j:slf4j-api")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,39 @@
package org.apache.calcite.adapter.cassandra;

import org.apache.calcite.avatica.util.ByteString;
import org.apache.calcite.avatica.util.DateTimeUtils;
import org.apache.calcite.linq4j.Enumerator;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.sql.type.SqlTypeFactoryImpl;

import com.datastax.driver.core.LocalDate;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.TupleValue;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.data.TupleValue;

import org.checkerframework.checker.nullness.qual.Nullable;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.stream.IntStream;

/** Enumerator that reads from a Cassandra column family. */
class CassandraEnumerator implements Enumerator<Object> {
private Iterator<Row> iterator;
private Row current;
private List<RelDataTypeField> fieldTypes;
private final Iterator<Row> iterator;
private final List<RelDataTypeField> fieldTypes;
@Nullable private Row current;

/** Creates a CassandraEnumerator.
*
* @param results Cassandra result set ({@link com.datastax.driver.core.ResultSet})
* @param results Cassandra result set ({@link com.datastax.oss.driver.api.core.cql.ResultSet})
* @param protoRowType The type of resulting rows
*/
CassandraEnumerator(ResultSet results, RelProtoDataType protoRowType) {
Expand Down Expand Up @@ -80,10 +84,11 @@ class CassandraEnumerator implements Enumerator<Object> {
*
* @param index Index of the field within the Row object
*/
private Object currentRowField(int index) {
private @Nullable Object currentRowField(int index) {
assert current != null;
final Object o = current.get(index,
CassandraSchema.CODEC_REGISTRY.codecFor(
current.getColumnDefinitions().getType(index)));
current.getColumnDefinitions().get(index).getType()));

return convertToEnumeratorObject(o);
}
Expand All @@ -92,23 +97,26 @@ private Object currentRowField(int index) {
*
* @param obj Object to convert, if needed
*/
private Object convertToEnumeratorObject(Object obj) {
private @Nullable Object convertToEnumeratorObject(@Nullable Object obj) {
if (obj instanceof ByteBuffer) {
ByteBuffer buf = (ByteBuffer) obj;
byte [] bytes = new byte[buf.remaining()];
buf.get(bytes, 0, bytes.length);
return new ByteString(bytes);
} else if (obj instanceof LocalDate) {
// converts dates to the expected numeric format
return ((LocalDate) obj).getMillisSinceEpoch()
/ DateTimeUtils.MILLIS_PER_DAY;
return ((LocalDate) obj).toEpochDay();
} else if (obj instanceof Date) {
@SuppressWarnings("JdkObsolete")
long milli = ((Date) obj).toInstant().toEpochMilli();
return milli;
} else if (obj instanceof Instant) {
return ((Instant) obj).toEpochMilli();
} else if (obj instanceof LocalTime) {
return ((LocalTime) obj).toNanoOfDay();
} else if (obj instanceof LinkedHashSet) {
// MULTISET is handled as an array
return ((LinkedHashSet) obj).toArray();
return ((LinkedHashSet<?>) obj).toArray();
} else if (obj instanceof TupleValue) {
// STRUCT can be handled as an array
final TupleValue tupleValue = (TupleValue) obj;
Expand All @@ -119,6 +127,7 @@ private Object convertToEnumeratorObject(Object obj) {
CassandraSchema.CODEC_REGISTRY.codecFor(
tupleValue.getType().getComponentTypes().get(i)))
).map(this::convertToEnumeratorObject)
.map(Objects::requireNonNull) // "null" cannot appear inside collections
.toArray();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
Expand All @@ -44,6 +45,7 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;

import static org.apache.calcite.util.DateTimeStringUtils.ISO_DATETIME_FRACTIONAL_SECOND_FORMAT;
Expand All @@ -57,9 +59,9 @@ public class CassandraFilter extends Filter implements CassandraRel {
private final List<String> partitionKeys;
private Boolean singlePartition;
private final List<String> clusteringKeys;
private List<RelFieldCollation> implicitFieldCollations;
private RelCollation implicitCollation;
private String match;
private final List<RelFieldCollation> implicitFieldCollations;
private final RelCollation implicitCollation;
private final String match;

public CassandraFilter(
RelOptCluster cluster,
Expand Down Expand Up @@ -188,7 +190,7 @@ private String translateMatch(RexNode condition) {
* @return The value of the literal in the form of the actual type.
*/
private static Object literalValue(RexLiteral literal) {
Comparable value = RexLiteral.value(literal);
Comparable<?> value = RexLiteral.value(literal);
switch (literal.getTypeName()) {
case TIMESTAMP:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
Expand All @@ -200,7 +202,8 @@ private static Object literalValue(RexLiteral literal) {
assert value instanceof DateString;
return value.toString();
default:
return literal.getValue3();
Object val = literal.getValue3();
return val == null ? "null" : val;
}
}

Expand Down Expand Up @@ -255,7 +258,7 @@ private String translateBinary(String op, String rop, RexCall call) {
}

/** Translates a call to a binary operator. Returns null on failure. */
private String translateBinary2(String op, RexNode left, RexNode right) {
private @Nullable String translateBinary2(String op, RexNode left, RexNode right) {
switch (right.getKind()) {
case LITERAL:
break;
Expand Down Expand Up @@ -289,7 +292,9 @@ private String translateOp2(String op, String name, RexLiteral right) {
Object value = literalValue(right);
String valueString = value.toString();
if (value instanceof String) {
SqlTypeName typeName = rowType.getField(name, true, false).getType().getSqlTypeName();
RelDataTypeField field =
Objects.requireNonNull(rowType.getField(name, true, false));
SqlTypeName typeName = field.getType().getSqlTypeName();
if (typeName != SqlTypeName.CHAR) {
valueString = "'" + valueString + "'";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@
* Implementation of limits in Cassandra.
*/
public class CassandraLimit extends SingleRel implements CassandraRel {
public final RexNode offset;
public final RexNode fetch;
public final @Nullable RexNode offset;
public final @Nullable RexNode fetch;

public CassandraLimit(RelOptCluster cluster, RelTraitSet traitSet,
RelNode input, RexNode offset, RexNode fetch) {
RelNode input, @Nullable RexNode offset, @Nullable RexNode fetch) {
super(cluster, traitSet, input);
this.offset = offset;
this.fetch = fetch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public enum CassandraMethod {
MAP = builder.build();
}

CassandraMethod(Class clazz, String methodName, Class... argumentTypes) {
CassandraMethod(Class<?> clazz, String methodName, Class<?>... argumentTypes) {
this.method = Types.lookupMethod(clazz, methodName, argumentTypes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public CassandraProject(RelOptCluster cluster, RelTraitSet traitSet,
CassandraRules.cassandraFieldNames(getInput().getRowType()));
final Map<String, String> fields = new LinkedHashMap<>();
for (Pair<RexNode, String> pair : getNamedProjects()) {
assert pair.left != null;
final String name = pair.right;
final String originalName = pair.left.accept(translator);
fields.put(originalName, name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.rel.RelNode;

import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
Expand All @@ -43,15 +45,15 @@ class Implementor {
int fetch = -1;
final List<String> order = new ArrayList<>();

RelOptTable table;
CassandraTable cassandraTable;
@Nullable RelOptTable table;
@Nullable CassandraTable cassandraTable;

/** Adds newly projected fields and restricted predicates.
*
* @param fields New fields to be projected from a query
* @param predicates New predicates to be applied to the query
*/
public void add(Map<String, String> fields, List<String> predicates) {
public void add(@Nullable Map<String, String> fields, @Nullable List<String> predicates) {
if (fields != null) {
selectFields.putAll(fields);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import org.apache.calcite.rex.RexVisitorImpl;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.calcite.util.Pair;

import org.checkerframework.checker.nullness.qual.Nullable;
import org.immutables.value.Value;

import java.util.HashSet;
Expand Down Expand Up @@ -72,7 +72,7 @@ private CassandraRules() {}
.toRule(CassandraToEnumerableConverterRule.class);

@SuppressWarnings("MutablePublicArray")
public static final RelOptRule[] RULES = {
protected static final RelOptRule[] RULES = {
FILTER,
PROJECT,
SORT,
Expand Down Expand Up @@ -128,8 +128,11 @@ protected CassandraFilterRule(CassandraFilterRuleConfig config) {

// Get field names from the scan operation
CassandraTableScan scan = call.rel(1);
Pair<List<String>, List<String>> keyFields = scan.cassandraTable.getKeyFields();
Set<String> partitionKeys = new HashSet<>(keyFields.left);

List<String> partitionKeys = scan.cassandraTable.getPartitionKeys();
List<String> clusteringKeys = scan.cassandraTable.getClusteringKeys();
Set<String> partitionKeysSet = new HashSet<>(scan.cassandraTable.getPartitionKeys());

List<String> fieldNames = CassandraRules.cassandraFieldNames(filter.getInput().getRowType());

List<RexNode> disjunctions = RelOptUtil.disjunctions(condition);
Expand All @@ -139,14 +142,14 @@ protected CassandraFilterRule(CassandraFilterRuleConfig config) {
// Check that all conjunctions are primary key equalities
condition = disjunctions.get(0);
for (RexNode predicate : RelOptUtil.conjunctions(condition)) {
if (!isEqualityOnKey(predicate, fieldNames, partitionKeys, keyFields.right)) {
if (!isEqualityOnKey(predicate, fieldNames, partitionKeysSet, clusteringKeys)) {
return false;
}
}
}

// Either all of the partition keys must be specified or none
return partitionKeys.size() == keyFields.left.size() || partitionKeys.size() == 0;
// Either all the partition keys must be specified or none
return partitionKeysSet.size() == partitionKeys.size() || partitionKeysSet.isEmpty();
}

/** Check if the node is a supported predicate (primary key equality).
Expand All @@ -168,7 +171,7 @@ private static boolean isEqualityOnKey(RexNode node, List<String> fieldNames,
final RexNode right = call.operands.get(1);
String key = compareFieldWithLiteral(left, right, fieldNames);
if (key == null) {
key = compareFieldWithLiteral(right, left, fieldNames);
key = compareFieldWithLiteral(left, right, fieldNames);
}
if (key != null) {
return partitionKeys.remove(key) || clusteringKeys.contains(key);
Expand All @@ -184,17 +187,16 @@ private static boolean isEqualityOnKey(RexNode node, List<String> fieldNames,
* @param fieldNames Names of all columns in the table
* @return The field being compared or null if there is no key equality
*/
private static String compareFieldWithLiteral(RexNode left, RexNode right,
List<String> fieldNames) {
private static @Nullable String compareFieldWithLiteral(
RexNode left, RexNode right, List<String> fieldNames) {
// FIXME Ignore casts for new and assume they aren't really necessary
if (left.isA(SqlKind.CAST)) {
left = ((RexCall) left).getOperands().get(0);
}

if (left.isA(SqlKind.INPUT_REF) && right.isA(SqlKind.LITERAL)) {
final RexInputRef left1 = (RexInputRef) left;
String name = fieldNames.get(left1.getIndex());
return name;
RexInputRef left1 = (RexInputRef) left;
return fieldNames.get(left1.getIndex());
} else {
return null;
}
Expand All @@ -211,20 +213,22 @@ private static String compareFieldWithLiteral(RexNode left, RexNode right,
}
}

RelNode convert(LogicalFilter filter, CassandraTableScan scan) {
@Nullable RelNode convert(LogicalFilter filter, CassandraTableScan scan) {
final RelTraitSet traitSet = filter.getTraitSet().replace(CassandraRel.CONVENTION);
final Pair<List<String>, List<String>> keyFields = scan.cassandraTable.getKeyFields();
final List<String> partitionKeys = scan.cassandraTable.getPartitionKeys();
final List<String> clusteringKeys = scan.cassandraTable.getClusteringKeys();

return new CassandraFilter(
filter.getCluster(),
traitSet,
convert(filter.getInput(), CassandraRel.CONVENTION),
filter.getCondition(),
keyFields.left,
keyFields.right,
partitionKeys,
clusteringKeys,
scan.cassandraTable.getClusteringOrder());
}

/** Deprecated in favor of CassandraFilterRuleConfig. **/
/** Deprecated in favor of {@link CassandraFilterRuleConfig}. **/
@Deprecated
public interface Config extends CassandraFilterRuleConfig { }

Expand Down Expand Up @@ -322,7 +326,7 @@ private static boolean collationsCompatible(RelCollation sortCollation,
if (sortFieldCollations.size() > implicitFieldCollations.size()) {
return false;
}
if (sortFieldCollations.size() == 0) {
if (sortFieldCollations.isEmpty()) {
return true;
}

Expand Down Expand Up @@ -354,12 +358,9 @@ private static boolean collationsCompatible(RelCollation sortCollation,
}

@Override public void onMatch(RelOptRuleCall call) {
final Sort sort = call.rel(0);
Sort sort = call.rel(0);
CassandraFilter filter = call.rel(2);
final RelNode converted = convert(sort, filter);
if (converted != null) {
call.transformTo(converted);
}
call.transformTo(convert(sort, filter));
}

/** Deprecated in favor of CassandraSortRuleConfig. **/
Expand Down Expand Up @@ -413,11 +414,8 @@ public RelNode convert(EnumerableLimit limit) {
}

@Override public void onMatch(RelOptRuleCall call) {
final EnumerableLimit limit = call.rel(0);
final RelNode converted = convert(limit);
if (converted != null) {
call.transformTo(converted);
}
EnumerableLimit limit = call.rel(0);
call.transformTo(convert(limit));
}

/** Deprecated in favor of CassandraLimitRuleConfig. **/
Expand Down
Loading

0 comments on commit cbe6a7b

Please sign in to comment.