Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.encoders.ExpressionEncoder org.apache.spark.sql.catalyst.encoders.RowEncoder.apply(org.apache.spark.sql.types.StructType)' #1209

Open
CatalinMihaiIonescu opened this issue Apr 9, 2024 · 6 comments
Assignees

Comments

@CatalinMihaiIonescu
Copy link

CatalinMihaiIonescu commented Apr 9, 2024

Hello,

I am trying to update my application to spark 3.5.1 (Scala version 2.12.18, OpenJDK 64-Bit Server VM, 17.0.10) but the scala 2.12 connector keeps throwing this error.

While trying to stream data I get the following:

DriverStacktrace:

java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.encoders.ExpressionEncoder org.apache.spark.sql.catalyst.encoders.RowEncoder.apply(org.apache.spark.sql.types.StructType)'
	at org.apache.spark.sql.PreScala213SparkSqlUtils.createExpressionEncoder(PreScala213SparkSqlUtils.java:53)
	at com.google.cloud.spark.bigquery.spark3.Spark3DataFrameToRDDConverter.convertToRDD(Spark3DataFrameToRDDConverter.java:40)
	at com.google.cloud.spark.bigquery.BigQueryStreamWriter$.writeBatch(BigQueryStreamWriter.scala:50)
	at com.google.cloud.spark.bigquery.BigQueryStreamingSink.addBatch(BigQueryStreamingSink.scala:53)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:732)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:729)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:427)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:425)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:729)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:286)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:427)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:425)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:249)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:67)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:239)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:311)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:289)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.$anonfun$run$1(StreamExecution.scala:211)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:211)

I am using spark 3.5.1 and spark-bigquery-with-dependencies_2.12:0.37.0, everything works fine on spark 3.4.2.

@davidrabinowitz
Copy link
Member

Please try the spark-3.5-bigquery:0.37.0 connector.

@CatalinMihaiIonescu
Copy link
Author

CatalinMihaiIonescu commented Apr 10, 2024

I have tried and it gives me the following error:

org.apache.spark.SparkUnsupportedOperationException: Data source bigquery does not support streamed writing.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.streamedOperatorUnsupportedByDataSourceError(QueryExecutionErrors.scala:696)
	at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:326)
	at org.apache.spark.sql.streaming.DataStreamWriter.createV1Sink(DataStreamWriter.scala:442)
	at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:404)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:251)

I am using a standalone Spark Cluster.

@malhomaid
Copy link

I have tried and it gives me the following error:

org.apache.spark.SparkUnsupportedOperationException: Data source bigquery does not support streamed writing.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.streamedOperatorUnsupportedByDataSourceError(QueryExecutionErrors.scala:696)
	at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:326)
	at org.apache.spark.sql.streaming.DataStreamWriter.createV1Sink(DataStreamWriter.scala:442)
	at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:404)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:251)

I am using a standalone Spark Cluster.

I'm using the same version and I have the same issue

@malhomaid
Copy link

I worked around it by using writeStream.foreachBatch

def write_to_bigquery(batch_df, batch_id):
    batch_df.write.format('bigquery') \
        .option("table", "") \
        .option("temporaryGcsBucket", "") \
        .option("checkpointLocation", "") \
        .option("writeMethod", "direct") \
        .save()

df.writeStream \
    .foreachBatch(write_to_bigquery) \
    .start() \
    .awaitTermination()

@CatalinMihaiIonescu
Copy link
Author

I worked around it by using writeStream.foreachBatch

def write_to_bigquery(batch_df, batch_id):
    batch_df.write.format('bigquery') \
        .option("table", "") \
        .option("temporaryGcsBucket", "") \
        .option("checkpointLocation", "") \
        .option("writeMethod", "direct") \
        .save()

df.writeStream \
    .foreachBatch(write_to_bigquery) \
    .start() \
    .awaitTermination()

it seems to me like you are using Direct writing, we need to do batch streaming

@CatalinMihaiIonescu
Copy link
Author

CatalinMihaiIonescu commented Sep 11, 2024

this still happens in spark 3.5.2 and with the 0.41.0 version of the connector

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants