Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

allow custom type converters to be utilized for collection types #1204

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

aashishs101
Copy link

Currently, the connector limits users' ability to create custom type converters for collection types. Instead, it only allows the use of the default collection converters, which just apply in-scope converters to the elements of a collection, while ignoring any user-defined converters for the collection type. This prevents use cases where a converter is being used to convert a non-collection type into a collection, or where a collection is being converted into another collection (for instance we want to only store the keys of a map as a list). Currently, when such a custom converter is specified, it will fail with the following exception:
[info] - should write to a table with HLLs *** FAILED *** [info] org.apache.spark.SparkException: Job aborted due to stage failure: Task 11 in stage 0.0 failed 1 times, most recent failure: Lost task 11.0 in stage 0.0 (TID 11, localhost, executor driver): com.datastax.spark.connector.types.TypeConversionException: Cannot convert object OurType(14,16384,0.7212525005219688) of type class com.us.commons.internal.util.fn.OurType to Map[AnyRef,AnyRef]. [info] at com.datastax.spark.connector.types.TypeConverter$$anonfun$convert$1.apply(TypeConverter.scala:45) [info] at com.datastax.spark.connector.types.TypeConverter$CollectionConverter$$anonfun$convertPF$35.applyOrElse(TypeConverter.scala:694) [info] at com.datastax.spark.connector.types.TypeConverter$class.convert(TypeConverter.scala:43) [info] at com.datastax.spark.connector.types.TypeConverter$CollectionConverter.convert(TypeConverter.scala:682)
This is because of lines like this one that don't try to look for the correct type converter in scope. The proposed change keeps the current functionality of using the default collection converters, but only when a custom one isn't also in scope.

JIRA Ticket: https://datastax-oss.atlassian.net/browse/SPARKC-556

@aashishs101
Copy link
Author

test this please

@aashishs101
Copy link
Author

test this please

@aashishs101
Copy link
Author

@RussellSpitzer I'm not sure why the automated testing isn't working on this branch

@aashishs101
Copy link
Author

@RussellSpitzer what steps would I need to go through to get this merged?

@aashishs101
Copy link
Author

@RussellSpitzer: sorry to bother, but wondering if it is possible to get this PR in?

@RussellSpitzer
Copy link
Contributor

@aashishs101 Sorry about that, we only check the JIRA so I miss if people post PR's without an accompanying ticket. Please file one and we'll get to review and testing ASAP.

@RussellSpitzer
Copy link
Contributor

test this please

@RussellSpitzer
Copy link
Contributor

Only authorized users can start the tests :)

@ds-jenkins-builds
Copy link

Build against Scala 2.11 finished with failure

@RussellSpitzer
Copy link
Contributor

Error Message
java.io.IOException: Couldn't find table test_table_converter5 in test_cassandra_rdd_spec - Found similar tables in that keyspace: test_cassandra_rdd_spec.test_table_converter5
Stacktrace
sbt.ForkMain$ForkError: java.io.IOException: Couldn't find table test_table_converter5 in test_cassandra_rdd_spec - Found similar tables in that keyspace:
test_cassandra_rdd_spec.test_table_converter5
	at com.datastax.spark.connector.cql.Schema$.tableFromCassandra(Schema.scala:358)
	at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:383)
	at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:35)
	at com.datastax.spark.connector.rdd.CassandraRDDSpec$$anonfun$141$$anonfun$apply$54.apply(CassandraRDDSpec.scala:1581)
	at com.datastax.spark.connector.rdd.CassandraRDDSpec$$anonfun$141$$anonfun$apply$54.apply(CassandraRDDSpec.scala:1572)
	at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
	at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
	at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)
	at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
	at com.datastax.spark.connector.rdd.CassandraRDDSpec$$anonfun$141.apply(CassandraRDDSpec.scala:1572)
	at com.datastax.spark.connector.rdd.CassandraRDDSpec$$anonfun$141.apply(CassandraRDDSpec.scala:1572)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants