Kontinuation commented on code in PR #2297:
URL: https://github.com/apache/sedona/pull/2297#discussion_r2285729418


##########
spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetFileFormat.scala:
##########
@@ -179,29 +184,29 @@ class GeoParquetFileFormat(val spatialFilter: 
Option[GeoParquetSpatialFilter])
     hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS, 
classOf[ParquetReadSupport].getName)
     hadoopConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, 
requiredSchema.json)
     hadoopConf.set(ParquetWriteSupport.SPARK_ROW_SCHEMA, requiredSchema.json)
-    hadoopConf.set(
-      SQLConf.SESSION_LOCAL_TIMEZONE.key,
-      sparkSession.sessionState.conf.sessionLocalTimeZone)
+    val conf = new PortableSQLConf(sparkSession.sessionState.conf)
+    hadoopConf.set(PortableSQLConf.SESSION_LOCAL_TIMEZONE.key, 
conf.sessionLocalTimeZone)
     hadoopConf.setBoolean(
-      SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
-      sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
-    hadoopConf.setBoolean(
-      SQLConf.CASE_SENSITIVE.key,
-      sparkSession.sessionState.conf.caseSensitiveAnalysis)
+      PortableSQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
+      conf.nestedSchemaPruningEnabled)
+    hadoopConf.setBoolean(PortableSQLConf.CASE_SENSITIVE.key, 
conf.caseSensitiveAnalysis)
 
     // Sets flags for `ParquetToSparkSchemaConverter`
     hadoopConf.setBoolean(
-      SQLConf.PARQUET_BINARY_AS_STRING.key,
-      sparkSession.sessionState.conf.isParquetBinaryAsString)
+      PortableSQLConf.PARQUET_BINARY_AS_STRING.key,
+      conf.isParquetBinaryAsString)
     hadoopConf.setBoolean(
-      SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
-      sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+      PortableSQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+      conf.isParquetINT96AsTimestamp)
     hadoopConf.setBoolean(
-      SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
-      sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled)
+      PortableSQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
+      conf.parquetInferTimestampNTZEnabled)
     hadoopConf.setBoolean(
-      SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
-      sparkSession.sessionState.conf.legacyParquetNanosAsLong)
+      PortableSQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+      conf.legacyParquetNanosAsLong)
+
+    // Workaround "The file might have been updated during query execution" on 
Databricks
+    hadoopConf.setBoolean("spark.databricks.scan.modTimeCheck.enabled", false)

Review Comment:
   Databricks Runtime is more weird than I thought. Stripping off dependencies 
on Spark's ParquetFileFormat result in another weird problem. When reading 
geoparquet files, the following error occur
   
   ```
   Job aborted due to stage failure: Task 0 in stage 58.0 failed 4 times, most 
recent failure: Lost task 0.3 in stage 58.0 (TID 204) 
(ip-10-24-152-151.us-west-2.compute.internal executor driver): 
org.apache.spark.SparkException: Exception thrown in awaitResult: The file 
might have been updated during query execution. Ensure that no pipeline updates 
existing files during query execution and try again. at 
org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:51) 
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:519) at 
org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:604) at 
org.apache.spark.sql.execution.datasources.geoparquet.internal.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:434)
 at 
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(GeoParquetFileFormat.scala:385)
 at 
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetFileFormat$.$ano
 nfun$mergeSchemasInParallel$1$adapted(GeoParquetFileFormat.scala:383) at 
org.apache.spark.sql.execution.datasources.geoparquet.internal.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$4(SchemaMergeUtils.scala:117)
 at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:910) at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:910) at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at 
org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:420) at 
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:417) at 
org.apache.spark.rdd.RDD.iterator(RDD.scala:384) at 
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$2(ResultTask.scala:76) 
at 
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
 at 
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:76) 
at com.databricks.
 spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:227) at 
org.apache.spark.scheduler.Task.doRunTask(Task.scala:204) at 
org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:166) at 
com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
 at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104) at 
com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109) 
at scala.util.Using$.resource(Using.scala:269) at 
com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108) at 
org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:160) at 
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
 at org.apache.spark.scheduler.Task.run(Task.scala:105) at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$11(Executor.scala:1224)
 
 at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
 at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:112) at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:1228)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at 
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:1080) at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
 at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
 at java.base/java.lang.Thread.run(Thread.java:840) Caused by: 
com.databricks.common.filesystem.InconsistentReadException: The file might have 
been updated during query execution. Ensure that no pipeline updates existing 
files during query execut
 ion and try again. at 
com.databricks.common.filesystem.LokiS3AInputStream.withExceptionRewrites(LokiS3FS.scala:247)
 at 
com.databricks.common.filesystem.LokiS3AInputStream.read(LokiS3FS.scala:251) at 
java.base/java.io.FilterInputStream.read(FilterInputStream.java:82) at 
com.databricks.spark.metrics.FSInputStreamWithMetrics.$anonfun$read$2(FileSystemWithMetrics.scala:83)
 at 
com.databricks.spark.metrics.FSInputStreamWithMetrics.withTimeAndBytesReadMetric(FileSystemWithMetrics.scala:67)
 at 
com.databricks.spark.metrics.FSInputStreamWithMetrics.read(FileSystemWithMetrics.scala:82)
 at java.base/java.io.FilterInputStream.read(FilterInputStream.java:82) at 
org.apache.parquet.io.DelegatingSeekableInputStream.read(DelegatingSeekableInputStream.java:61)
 at org.apache.parquet.bytes.BytesUtils.readIntLittleEndian(BytesUtils.java:83) 
at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:570)
 at 
org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:828
 ) at 
org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:682) at 
org.apache.spark.sql.execution.datasources.geoparquet.internal.ParquetFooterReader.readFooter(ParquetFooterReader.java:89)
 at 
org.apache.spark.sql.execution.datasources.geoparquet.internal.ParquetFooterReader.readFooter(ParquetFooterReader.java:78)
 at 
org.apache.spark.sql.execution.datasources.geoparquet.internal.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:442)
 at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$3(ThreadUtils.scala:601) 
at 
com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63)
 at 
org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.$anonfun$runWithCaptured$6(SparkThreadLocalForwardingThreadPoolExecutor.scala:119)
 at 
com.databricks.sql.transaction.tahoe.mst.MSTThreadHelper$.runWithMstTxnId(MSTThreadHelper.scala:57)
 at 
org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.$anonfun$runWithCaptured$5(S
 parkThreadLocalForwardingThreadPoolExecutor.scala:118) at 
com.databricks.spark.util.IdentityClaim$.withClaim(IdentityClaim.scala:48) at 
org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.$anonfun$runWithCaptured$4(SparkThreadLocalForwardingThreadPoolExecutor.scala:117)
 at 
com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
 at 
org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:116)
 at 
org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured$(SparkThreadLocalForwardingThreadPoolExecutor.scala:93)
 at 
org.apache.spark.util.threads.CapturedSparkThreadLocals.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:141)
 at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:601) 
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) at 
scala.util.Success.$anonfun$map$1(Try.scala:255) at 
scala.util.Success.map(Try.sc
 ala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at 
scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) at 
scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at 
scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at 
java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1395)
 at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373) 
at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1193)
 at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1666) at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1633) 
at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
 Caused by: 
shaded.databricks.org.apache.hadoop.fs.s3a.RemoteFileChangedException: open 
`s3a://databricks-workspace-stack-9216f-bucket/oregon-prod/1030888365966037/FileStore/geoparquet_test/part-00000-tid-6314889
 273487124623-1c5646d6-0432-4113-b7dd-a6a5c1bc3864-87-1-c000.snappy.parquet': 
Change reported by S3 during open at position 1002. File 
s3a://databricks-workspace-stack-9216f-bucket/oregon-prod/1030888365966037/FileStore/geoparquet_test/part-00000-tid-6314889273487124623-1c5646d6-0432-4113-b7dd-a6a5c1bc3864-87-1-c000.snappy.parquet
 at given modTime (1000) was unavailable, null at 
shaded.databricks.org.apache.hadoop.fs.s3a.impl.ChangeTracker.processResponse(ChangeTracker.java:210)
 at 
shaded.databricks.org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:307)
 at 
shaded.databricks.org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$2(S3AInputStream.java:469)
 at 
shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$3(Invoker.java:247)
 at shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:134) 
at shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:128) at 
shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry
 $5(Invoker.java:371) at 
shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:435)
 at 
shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:367) 
at 
shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:245) 
at 
shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:289) 
at 
shaded.databricks.org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:462)
 at 
shaded.databricks.org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:495)
 at java.base/java.io.FilterInputStream.read(FilterInputStream.java:82) at 
com.databricks.common.filesystem.LokiS3AInputStream.$anonfun$read$1(LokiS3FS.scala:251)
 at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.java:23) at 
com.databricks.common.filesystem.LokiS3AInputStream.withExceptionRewrites(LokiS3FS.scala:244)
 ... 39 more
   ```
   
   Setting `spark.databricks.scan.modTimeCheck.enabled` is for getting rid of 
the above error. This looks very hacky, but I have not found a better to 
resolve this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to