If you have an exact version which version of the google connector is used then the source can be checked to see what really happened: https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/83a6c9809ad49a44895d59558e666e5fc183e0bf/gcs/src/main/java/com/google/cloud/hadoop/fs/gcs/GoogleHadoopOutputStream.java#L114
The linked code is the master but it just doesn't fit... G On Thu, Jan 21, 2021 at 9:18 AM Gabor Somogyi <[email protected]> wrote: > I've doubled checked this and came to the same conclusion just like > Jungtaek. > I've added a comment to the stackoverflow post to reach more poeple with > the answer. > > G > > > On Thu, Jan 21, 2021 at 6:53 AM Jungtaek Lim <[email protected]> > wrote: > >> I quickly looked into the attached log in SO post, and the problem >> doesn't seem to be related to Kafka. The error stack trace is from >> checkpointing to GCS, and the implementation of OutputStream for GCS seems >> to be provided with Google. >> >> Could you please elaborate the stack trace or upload the log with >> redacting secure texts? >> >> On Thu, Jan 21, 2021 at 2:38 PM German Schiavon <[email protected]> >> wrote: >> >>> Hi, >>> >>> I couldn't reproduce this error :/ I wonder if there is something else >>> underline causing it... >>> >>> *Input* >>> ➜ kafka_2.12-2.5.0 ./bin/kafka-console-producer.sh --bootstrap-server >>> localhost:9092 --topic test1 >>> {"name": "pedro", "age": 50} >>> >{"name": "pedro", "age": 50} >>> >{"name": "pedro", "age": 50} >>> >{"name": "pedro", "age": 50} >>> >>> *Output* >>> ➜ kafka_2.12-2.5.0 ./bin/kafka-console-consumer.sh --bootstrap-server >>> localhost:9092 --topic sink >>> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":1} >>> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":2} >>> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":3} >>> {"value":"{\"name\": \"pedro\", \"age\": 50}","count":4} >>> >>> >>> val rawDF = spark >>> .readStream >>> .format("kafka") >>> .option("kafka.bootstrap.servers", "localhost:9092") >>> .option("subscribe", "test1") >>> .load >>> .selectExpr("CAST(value AS STRING)") >>> >>> >>> val groupDF = rawDF.groupBy("value").agg(count(lit(1)).alias("count")) >>> val kafka_stream_output = groupDF.selectExpr("to_json(struct(*)) AS value") >>> >>> kafka_stream_output >>> .writeStream >>> .format("kafka") >>> .outputMode("update") >>> .option("kafka.bootstrap.servers", "localhost:9092") >>> .option("topic", "sink") >>> .option("checkpointLocation", "/tmp/check") >>> .start() >>> >>> spark.streams.awaitAnyTermination() >>> >>> >>> On Wed, 20 Jan 2021 at 23:22, gshen <[email protected]> wrote: >>> >>>> This SO post is pretty much the exact same issue: >>>> >>>> >>>> https://stackoverflow.com/questions/59962680/spark-structured-streaming-error-while-sending-aggregated-result-to-kafka-topic >>>> >>>> The user mentions it's an issue with >>>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4 >>>> >>>> >>>> >>>> -- >>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe e-mail: [email protected] >>>> >>>>
