giannisp-verneek opened a new issue, #5386:
URL: https://github.com/apache/hudi/issues/5386

   I have a pipeline where I send simple messages to a Kafka topic, write them 
via HudiDeltaStreamer to a Hudi table and finally read it through Spark. I 
first send a set of messages, and read them with Spark as a Spark dataframe. I 
see the expected output. I send a new set of messages that update the same 
keys. Although the delta streamer receives the messages I cannot read the hudi 
table incrementally through Spark. The message's schema does not include a 
date, but I am getting an error for `creteDateRebaseFuncInWrite`. I am using a 
VM from Oracle's cloud to run this small pipeline:
   
   ```
   Oracle Linux Server release 8.5
   NAME="Oracle Linux Server"
   VERSION="8.5"
   ID="ol"
   ID_LIKE="fedora"
   VARIANT="Server"
   VARIANT_ID="server"
   VERSION_ID="8.5"
   PLATFORM_ID="platform:el8"
   PRETTY_NAME="Oracle Linux Server 8.5"
   ANSI_COLOR="0;31"
   CPE_NAME="cpe:/o:oracle:linux:8:5:server"
   HOME_URL="https://linux.oracle.com/";
   BUG_REPORT_URL="https://bugzilla.oracle.com/";
   
   ORACLE_BUGZILLA_PRODUCT="Oracle Linux 8"
   ORACLE_BUGZILLA_PRODUCT_VERSION=8.5
   ORACLE_SUPPORT_PRODUCT="Oracle Linux"
   ORACLE_SUPPORT_PRODUCT_VERSION=8.5
   Red Hat Enterprise Linux release 8.5 (Ootpa)
   Oracle Linux Server release 8.5
   ```
   
   I'm using the same properties and a very similar schema to the Docker Hudi 
streaming example
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Start a Kafka and create a topic
   2. Run HoodieDeltaStreamer:3. 
   ```
   spark-submit --jars /<abs_path?/hudi-utilities-bundle_2.12-0.10.1.jar \
                --class 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
/<abs_path>/hudi-utilities-bundle_2.12-0.10.1.jar  \
                --spark-master <master_url>   \
                --table-type MERGE_ON_READ   \
                --source-class 
org.apache.hudi.utilities.sources.JsonKafkaSource   \
                --source-ordering-field ts   \
                --target-base-path /<abs_path>   \
                --target-table <name>  \
                --props /<abs_path>/kafka.properties   \
                --schemaprovider-class 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider   \
                --disable-compaction \
                --continuous
   ```
   3.  Send a set of messages
   4. Read the table through Pyspark
   5. Update the same keys 
   6. Read the same table
   
   **Expected behavior**
   
   See a table with the updated values. 
   
   **Environment Description**
   
   * Hudi version : 0.10.1 Scala 2.12
   
   * Spark version : 3.1.3 Scala 2.12
   
   * Hive version : N/A
   
   * Hadoop version : N/A
   
   * Storage (HDFS/S3/GCS..) : Local
   
   * Running on Docker? (yes/no) : No
   * Kafka version: 3.1.0 Scala 2.12
   
   * Java version: openjdk version "11.0.14.1" 2022-02-08 LTS
   OpenJDK Runtime Environment 18.9 (build 11.0.14.1+1-LTS)
   OpenJDK 64-Bit Server VM 18.9 (build 11.0.14.1+1-LTS, mixed mode, sharing)
   
   **Stacktrace**
   
   ```
   22/04/21 14:35:38 INFO DAGScheduler: ResultStage 0 (showString at 
NativeMethodAccessorImpl.java:0) failed in 1.711 s due to Job aborted due to 
stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost 
task 0.3 in stage 0.0 (TID 3) (10.0.0.91 executor 0): 
java.lang.NoSuchMethodError: 'scala.Function1 
org.apache.spark.sql.execution.datasources.DataSourceUtils$.creteDateRebaseFuncInWrite(scala.Enumeration$Value,
 java.lang.String)'
        at 
org.apache.hudi.spark.org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:63)
        at 
org.apache.hudi.spark.org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55)
        at 
org.apache.hudi.spark.org.apache.spark.sql.avro.HoodieAvroSerializer.<init>(HoodieAvroSerializer.scala:28)
        at 
org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.<init>(HoodieMergeOnReadRDD.scala:206)
        at 
org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:200)
        at 
org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:78)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
   
   Driver stacktrace:
   22/04/21 14:35:38 INFO DAGScheduler: Job 0 failed: showString at 
NativeMethodAccessorImpl.java:0, took 1.741374 s
   Traceback (most recent call last):
     File "hudi_read_test.py", line 27, in <module>
       new_df.show()
     File 
"/mnt/data/spark-3.1.3-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
 line 484, in show
     File 
"/mnt/data/spark-3.1.3-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py",
 line 1305, in __call__
     File 
"/mnt/data/spark-3.1.3-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/sql/utils.py",
 line 111, in deco
     File 
"/mnt/data/spark-3.1.3-bin-hadoop3.2/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py",
 line 328, in get_return_value
   py4j.protocol.Py4JJavaError: An error occurred while calling o46.showString.
   : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 
(TID 3) (10.0.0.91 executor 0): java.lang.NoSuchMethodError: 'scala.Function1 
org.apache.spark.sql.execution.datasources.DataSourceUtils$.creteDateRebaseFuncInWrite(scala.Enumeration$Value,
 java.lang.String)'
        at 
org.apache.hudi.spark.org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:63)
        at 
org.apache.hudi.spark.org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55)
        at 
org.apache.hudi.spark.org.apache.spark.sql.avro.HoodieAvroSerializer.<init>(HoodieAvroSerializer.scala:28)
        at 
org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.<init>(HoodieMergeOnReadRDD.scala:206)
        at 
org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:200)
        at 
org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:78)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
   
   Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2303)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2252)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2251)
        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2251)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
        at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
        at scala.Option.foreach(Option.scala:407)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2490)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2432)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2421)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
        at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
        at 
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
        at 
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
        at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3709)
        at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2735)
        at 
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3700)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3698)
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2735)
        at org.apache.spark.sql.Dataset.take(Dataset.scala:2942)
        at org.apache.spark.sql.Dataset.getRows(Dataset.scala:302)
        at org.apache.spark.sql.Dataset.showString(Dataset.scala:339)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:829)
   Caused by: java.lang.NoSuchMethodError: 'scala.Function1 
org.apache.spark.sql.execution.datasources.DataSourceUtils$.creteDateRebaseFuncInWrite(scala.Enumeration$Value,
 java.lang.String)'
        at 
org.apache.hudi.spark.org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:63)
        at 
org.apache.hudi.spark.org.apache.spark.sql.avro.AvroSerializer.<init>(AvroSerializer.scala:55)
        at 
org.apache.hudi.spark.org.apache.spark.sql.avro.HoodieAvroSerializer.<init>(HoodieAvroSerializer.scala:28)
        at 
org.apache.hudi.HoodieMergeOnReadRDD$$anon$3.<init>(HoodieMergeOnReadRDD.scala:206)
        at 
org.apache.hudi.HoodieMergeOnReadRDD.payloadCombineFileIterator(HoodieMergeOnReadRDD.scala:200)
        at 
org.apache.hudi.HoodieMergeOnReadRDD.compute(HoodieMergeOnReadRDD.scala:78)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        ... 1 more
   
   22/04/21 14:35:38 INFO SparkContext: Invoking stop() from shutdown hook
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to