psendyk opened a new issue, #12652:
URL: https://github.com/apache/hudi/issues/12652

   **Describe the problem you faced**
   
   After enabling timeline server and using `REMOTE_ONLY` file system view, 
Spark Structured Streaming ingestion into Hudi fails on the second microbatch 
with `org.apache.hudi.exception.HoodieRemoteException: Connect to 
localhost:26754 [localhost/127.0.0.1] failed: Connection refused`
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1. Start Spark Structured Streaming ingestion into Hudi with following 
filesystem view config:
   ```
           HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_ENABLE.key() -> "true",
           FileSystemViewStorageConfig.VIEW_TYPE.key() -> 
FileSystemViewStorageType.REMOTE_ONLY.name(),
           FileSystemViewStorageConfig.REMOTE_BACKUP_VIEW_ENABLE.key() -> 
"false"
   ```
   2. Wait for the second micro-batch, the job should fail immediately during 
`Getting small files from partitions` stage
   
   **Expected behavior**
   
   The application should continue without failure
   
   **Environment Description**
   
   * Hudi version : 0.15.0
   
   * Spark version : 3.3.0
   
   * Hive version :
   
   * Hadoop version : 3.2.1
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : no
   
   
   **Additional context**
   
   This issue occurs because the timeline service is stopped after the first 
write and is never restarted. Specifically, the write client is closed, which 
then closes the timeline server in the `finally` block in 
`HoodieSparkSqlWriterInternal.writeInternal`. Here's a stack trace showing how 
this part of the code is reached:
   ```
           at 
org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:508)
        at 
org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187)
        at 
org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125)
        at 
org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$3(HoodieStreamingSink.scala:141)
        at 
org.apache.hudi.HoodieStreamingSink$$Lambda$3032/943647142.apply(Unknown Source)
        at scala.util.Try$.apply(Try.scala:213)
        at 
org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:133)
        at 
org.apache.hudi.HoodieStreamingSink$$Lambda$3031/1476625006.apply(Unknown 
Source)
        at 
org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:237)
        at 
org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:132)
          - locked <0x0000ffefe71a1808> (a org.apache.hudi.HoodieStreamingSink)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:660)
   ```
   
   The timeline server is only started when instantiating `HoodieBaseClient` 
via `startEmbeddedServerView` but the write client is reused across batches 
within `HoodieStreamingSink.addBatch`. Therefore, the second batch uses a 
client that has closed the timeline server. The error may first seem like a 
config issue but it only looks like this because the view storage config is 
updated with the timeline service config when the service is started, then 
reset to the client-provided config when the service is stopped. We can see the 
correct config pointing to the timeline service is used for the first write:
   ```
   25/01/16 15:18:13 INFO FileSystemViewManager: Creating remote view for 
basePath <REDACTED>. Server=ip-<REDACTED>:37393, Timeout=300
   ```
   but not for the second write:
   ```
   25/01/16 15:20:15 INFO FileSystemViewManager: Creating remote view for 
basePath <REDACTED>. Server=localhost:26754, Timeout=300
   ```
   
   I was able to work around this failure mode by modifying 
`HoodieSparkSqlWriterInternal.writeInternal` to restart the timeline server at 
the beginning of each write but as you can imagine this only gives us partial 
improvement (within a batch but not across batches). I'm wondering if there's a 
reason why the write client, and subsequently timeline server, is stopped after 
each batch?
   
   **Stacktrace**
   
   ```
   Caused by: org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 47 in stage 11.0 failed 4 times, most recent failure: Lost task 
47.3 in stage 11.0 (TID 12380) (ip-10-18-138-81.heap executor 6): 
org.apache.hudi.exception.HoodieRemoteException: Connect to localhost:26754 
[localhost/127.0.0.1] failed: Connection refused (Connection refused)
        at 
org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.getLatestFileSlicesStreamFromParams(RemoteHoodieTableFileSystemView.java:313)
        at 
org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.getLatestFileSlicesBeforeOrOn(RemoteHoodieTableFileSystemView.java:347)
        at 
org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner.getSmallFileCandidates(SparkUpsertDeltaCommitPartitioner.java:106)
        at 
org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner.getSmallFiles(SparkUpsertDeltaCommitPartitioner.java:66)
        at 
org.apache.hudi.table.action.commit.UpsertPartitioner.lambda$getSmallFilesForPartitions$f1d92f9e$1(UpsertPartitioner.java:285)
        at 
org.apache.spark.api.java.JavaPairRDD$.$anonfun$pairFunToScalaFun$1(JavaPairRDD.scala:1073)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
        at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
        at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
        at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
        at scala.collection.AbstractIterator.to(Iterator.scala:1431)
        at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
        at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431)
        at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
        at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1431)
        at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
        at 
org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2269)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:138)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to