giannisp-verneek opened a new issue, #5386: URL: https://github.com/apache/hudi/issues/5386
I have a pipeline where I send simple messages to a Kafka topic, write them via HudiDeltaStreamer to a Hudi table and finally read it through Spark. I first send a set of messages, and read them with Spark as a Spark dataframe. I see the expected output. I send a new set of messages that update the same keys. Although the delta streamer receives the messages I cannot read the hudi table incrementally through Spark. The message's schema does not include a date, but I am getting an error for `creteDateRebaseFuncInWrite`. I am using a VM from Oracle's cloud to run this small pipeline: ``` Oracle Linux Server release 8.5 NAME="Oracle Linux Server" VERSION="8.5" ID="ol" ID_LIKE="fedora" VARIANT="Server" VARIANT_ID="server" VERSION_ID="8.5" PLATFORM_ID="platform:el8" PRETTY_NAME="Oracle Linux Server 8.5" ANSI_COLOR="0;31" CPE_NAME="cpe:/o:oracle:linux:8:5:server" HOME_URL="https://linux.oracle.com/" BUG_REPORT_URL="https://bugzilla.oracle.com/" ORACLE_BUGZILLA_PRODUCT="Oracle Linux 8" ORACLE_BUGZILLA_PRODUCT_VERSION=8.5 ORACLE_SUPPORT_PRODUCT="Oracle Linux" ORACLE_SUPPORT_PRODUCT_VERSION=8.5 Red Hat Enterprise Linux release 8.5 (Ootpa) Oracle Linux Server release 8.5 ``` I'm using the same properties and a very similar schema to the Docker Hudi streaming example **To Reproduce** Steps to reproduce the behavior: 1. Start a Kafka and create a topic 2. Run HoodieDeltaStreamer:3. ``` spark-submit --jars /<abs_path?/hudi-utilities-bundle_2.12-0.10.1.jar \ --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /<abs_path>/hudi-utilities-bundle_2.12-0.10.1.jar \ --spark-master <master_url> \ --table-type MERGE_ON_READ \ --source-class org.apache.hudi.utilities.sources.JsonKafkaSource \ --source-ordering-field ts \ --target-base-path /<abs_path> \ --target-table <name> \ --props /<abs_path>/kafka.properties \ --schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \ --disable-compaction \ --continuous ``` 3. Send a set of messages 4. Read the table through Pyspark 5. Update the same keys 6. Read the same table **Expected behavior** See a table with the updated values. **Environment Description** * Hudi version : 0.10.1 Scala 2.12 * Spark version : 3.1.3 Scala 2.12 * Hive version : N/A * Hadoop version : N/A * Storage (HDFS/S3/GCS..) : Local * Running on Docker? (yes/no) : No * Kafka version: 3.1.0 Scala 2.12 * Java version: openjdk version "11.0.14.1" 2022-02-08 LTS OpenJDK Runtime Environment 18.9 (build 11.0.14.1+1-LTS) OpenJDK 64-Bit Server VM 18.9 (build 11.0.14.1+1-LTS, mixed mode, sharing) **Stacktrace** ``` 22/04/21 14:35:38 INFO DAGScheduler: ResultStage 0 (showString at NativeMethodAccessorImpl.java:0) failed in 1.711 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (10.0.0.91 executor 0): java.lang.NoSuchMethodError: 'scala.Function1 org.apache.spark.sql.execution.datasources.DataSourceUtils$.creteDateRebaseFuncInWrite(scala.Enumeration$Value, java.lang.String)' at org.apache.hudi.spark.org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:63) at org.apache.hudi.spark.org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55) at org.apache.hudi.spark.org.apache.spark.sql.avro.HoodieAvroSerializer.<init>(HoodieAvroSerializer.scala:28) at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.<init>(HoodieMergeOnReadRDD.scala:206) at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:200) at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:78) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Driver stacktrace: 22/04/21 14:35:38 INFO DAGScheduler: Job 0 failed: showString at NativeMethodAccessorImpl.java:0, took 1.741374 s Traceback (most recent call last): File "hudi_read_test.py", line 27, in <module> new_df.show() File "/mnt/data/spark-3.1.3-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 484, in show File "/mnt/data/spark-3.1.3-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__ File "/mnt/data/spark-3.1.3-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco File "/mnt/data/spark-3.1.3-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o46.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (10.0.0.91 executor 0): java.lang.NoSuchMethodError: 'scala.Function1 org.apache.spark.sql.execution.datasources.DataSourceUtils$.creteDateRebaseFuncInWrite(scala.Enumeration$Value, java.lang.String)' at org.apache.hudi.spark.org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:63) at org.apache.hudi.spark.org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55) at org.apache.hudi.spark.org.apache.spark.sql.avro.HoodieAvroSerializer.<init>(HoodieAvroSerializer.scala:28) at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.<init>(HoodieMergeOnReadRDD.scala:206) at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:200) at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:78) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2303) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2252) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2251) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2251) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2490) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2432) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2421) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3709) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2735) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3700) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3698) at org.apache.spark.sql.Dataset.head(Dataset.scala:2735) at org.apache.spark.sql.Dataset.take(Dataset.scala:2942) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:302) at org.apache.spark.sql.Dataset.showString(Dataset.scala:339) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.NoSuchMethodError: 'scala.Function1 org.apache.spark.sql.execution.datasources.DataSourceUtils$.creteDateRebaseFuncInWrite(scala.Enumeration$Value, java.lang.String)' at org.apache.hudi.spark.org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:63) at org.apache.hudi.spark.org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55) at org.apache.hudi.spark.org.apache.spark.sql.avro.HoodieAvroSerializer.<init>(HoodieAvroSerializer.scala:28) at org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.<init>(HoodieMergeOnReadRDD.scala:206) at org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:200) at org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:78) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 more 22/04/21 14:35:38 INFO SparkContext: Invoking stop() from shutdown hook -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org