psendyk opened a new issue, #12652: URL: https://github.com/apache/hudi/issues/12652
**Describe the problem you faced** After enabling timeline server and using `REMOTE_ONLY` file system view, Spark Structured Streaming ingestion into Hudi fails on the second microbatch with `org.apache.hudi.exception.HoodieRemoteException: Connect to localhost:26754 [localhost/127.0.0.1] failed: Connection refused` **To Reproduce** Steps to reproduce the behavior: 1. Start Spark Structured Streaming ingestion into Hudi with following filesystem view config: ``` HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_ENABLE.key() -> "true", FileSystemViewStorageConfig.VIEW_TYPE.key() -> FileSystemViewStorageType.REMOTE_ONLY.name(), FileSystemViewStorageConfig.REMOTE_BACKUP_VIEW_ENABLE.key() -> "false" ``` 2. Wait for the second micro-batch, the job should fail immediately during `Getting small files from partitions` stage **Expected behavior** The application should continue without failure **Environment Description** * Hudi version : 0.15.0 * Spark version : 3.3.0 * Hive version : * Hadoop version : 3.2.1 * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : no **Additional context** This issue occurs because the timeline service is stopped after the first write and is never restarted. Specifically, the write client is closed, which then closes the timeline server in the `finally` block in `HoodieSparkSqlWriterInternal.writeInternal`. Here's a stack trace showing how this part of the code is reached: ``` at org.apache.hudi.HoodieSparkSqlWriterInternal.writeInternal(HoodieSparkSqlWriter.scala:508) at org.apache.hudi.HoodieSparkSqlWriterInternal.write(HoodieSparkSqlWriter.scala:187) at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:125) at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$3(HoodieStreamingSink.scala:141) at org.apache.hudi.HoodieStreamingSink$$Lambda$3032/943647142.apply(Unknown Source) at scala.util.Try$.apply(Try.scala:213) at org.apache.hudi.HoodieStreamingSink.$anonfun$addBatch$2(HoodieStreamingSink.scala:133) at org.apache.hudi.HoodieStreamingSink$$Lambda$3031/1476625006.apply(Unknown Source) at org.apache.hudi.HoodieStreamingSink.retry(HoodieStreamingSink.scala:237) at org.apache.hudi.HoodieStreamingSink.addBatch(HoodieStreamingSink.scala:132) - locked <0x0000ffefe71a1808> (a org.apache.hudi.HoodieStreamingSink) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:660) ``` The timeline server is only started when instantiating `HoodieBaseClient` via `startEmbeddedServerView` but the write client is reused across batches within `HoodieStreamingSink.addBatch`. Therefore, the second batch uses a client that has closed the timeline server. The error may first seem like a config issue but it only looks like this because the view storage config is updated with the timeline service config when the service is started, then reset to the client-provided config when the service is stopped. We can see the correct config pointing to the timeline service is used for the first write: ``` 25/01/16 15:18:13 INFO FileSystemViewManager: Creating remote view for basePath <REDACTED>. Server=ip-<REDACTED>:37393, Timeout=300 ``` but not for the second write: ``` 25/01/16 15:20:15 INFO FileSystemViewManager: Creating remote view for basePath <REDACTED>. Server=localhost:26754, Timeout=300 ``` I was able to work around this failure mode by modifying `HoodieSparkSqlWriterInternal.writeInternal` to restart the timeline server at the beginning of each write but as you can imagine this only gives us partial improvement (within a batch but not across batches). I'm wondering if there's a reason why the write client, and subsequently timeline server, is stopped after each batch? **Stacktrace** ``` Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 47 in stage 11.0 failed 4 times, most recent failure: Lost task 47.3 in stage 11.0 (TID 12380) (ip-10-18-138-81.heap executor 6): org.apache.hudi.exception.HoodieRemoteException: Connect to localhost:26754 [localhost/127.0.0.1] failed: Connection refused (Connection refused) at org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.getLatestFileSlicesStreamFromParams(RemoteHoodieTableFileSystemView.java:313) at org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView.getLatestFileSlicesBeforeOrOn(RemoteHoodieTableFileSystemView.java:347) at org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner.getSmallFileCandidates(SparkUpsertDeltaCommitPartitioner.java:106) at org.apache.hudi.table.action.deltacommit.SparkUpsertDeltaCommitPartitioner.getSmallFiles(SparkUpsertDeltaCommitPartitioner.java:66) at org.apache.hudi.table.action.commit.UpsertPartitioner.lambda$getSmallFilesForPartitions$f1d92f9e$1(UpsertPartitioner.java:285) at org.apache.spark.api.java.JavaPairRDD$.$anonfun$pairFunToScalaFun$1(JavaPairRDD.scala:1073) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49) at scala.collection.TraversableOnce.to(TraversableOnce.scala:366) at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364) at scala.collection.AbstractIterator.to(Iterator.scala:1431) at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358) at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1431) at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345) at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339) at scala.collection.AbstractIterator.toArray(Iterator.scala:1431) at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021) at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2269) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:138) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org