Kontinuation commented on code in PR #2297:
URL: https://github.com/apache/sedona/pull/2297#discussion_r2285729418
##########
spark/common/src/main/scala/org/apache/spark/sql/execution/datasources/geoparquet/GeoParquetFileFormat.scala:
##########
@@ -179,29 +184,29 @@ class GeoParquetFileFormat(val spatialFilter:
Option[GeoParquetSpatialFilter])
hadoopConf.set(ParquetInputFormat.READ_SUPPORT_CLASS,
classOf[ParquetReadSupport].getName)
hadoopConf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
requiredSchema.json)
hadoopConf.set(ParquetWriteSupport.SPARK_ROW_SCHEMA, requiredSchema.json)
- hadoopConf.set(
- SQLConf.SESSION_LOCAL_TIMEZONE.key,
- sparkSession.sessionState.conf.sessionLocalTimeZone)
+ val conf = new PortableSQLConf(sparkSession.sessionState.conf)
+ hadoopConf.set(PortableSQLConf.SESSION_LOCAL_TIMEZONE.key,
conf.sessionLocalTimeZone)
hadoopConf.setBoolean(
- SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
- sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
- hadoopConf.setBoolean(
- SQLConf.CASE_SENSITIVE.key,
- sparkSession.sessionState.conf.caseSensitiveAnalysis)
+ PortableSQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
+ conf.nestedSchemaPruningEnabled)
+ hadoopConf.setBoolean(PortableSQLConf.CASE_SENSITIVE.key,
conf.caseSensitiveAnalysis)
// Sets flags for `ParquetToSparkSchemaConverter`
hadoopConf.setBoolean(
- SQLConf.PARQUET_BINARY_AS_STRING.key,
- sparkSession.sessionState.conf.isParquetBinaryAsString)
+ PortableSQLConf.PARQUET_BINARY_AS_STRING.key,
+ conf.isParquetBinaryAsString)
hadoopConf.setBoolean(
- SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
- sparkSession.sessionState.conf.isParquetINT96AsTimestamp)
+ PortableSQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
+ conf.isParquetINT96AsTimestamp)
hadoopConf.setBoolean(
- SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
- sparkSession.sessionState.conf.parquetInferTimestampNTZEnabled)
+ PortableSQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED.key,
+ conf.parquetInferTimestampNTZEnabled)
hadoopConf.setBoolean(
- SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
- sparkSession.sessionState.conf.legacyParquetNanosAsLong)
+ PortableSQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key,
+ conf.legacyParquetNanosAsLong)
+
+ // Workaround "The file might have been updated during query execution" on
Databricks
+ hadoopConf.setBoolean("spark.databricks.scan.modTimeCheck.enabled", false)
Review Comment:
Databricks Runtime is more weird than I thought. Stripping off dependencies
on Spark's ParquetFileFormat result in another weird problem. When reading
geoparquet files, the following error occur
```
Job aborted due to stage failure: Task 0 in stage 58.0 failed 4 times, most
recent failure: Lost task 0.3 in stage 58.0 (TID 204)
(ip-10-24-152-151.us-west-2.compute.internal executor driver):
org.apache.spark.SparkException: Exception thrown in awaitResult: The file
might have been updated during query execution. Ensure that no pipeline updates
existing files during query execution and try again.
at
org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:51)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:519)
at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:604)
at
org.apache.spark.sql.execution.datasources.geoparquet.internal.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:434)
at
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(GeoParquetFileFormat.scala:385)
at
org.apache.spark.sql.execution.datasources.geoparquet.GeoParquetFileFormat$.$anonfun$mergeSchemasInParallel$1$adapted(GeoParquetFileFormat.scala:383)
at
org.apache.spark.sql.execution.datasources.geoparquet.internal.SchemaMergeUtils$.$anonfun$mergeSchemasInParallel$4(SchemaMergeUtils.scala:117)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:910)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:910)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at
org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:420)
at
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:417)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:384)
at
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$2(ResultTask.scala:76)
at
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:76)
at
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:227)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:204)
at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:166)
at
com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
at
com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:160)
at
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:105)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$11(Executor.scala:1224)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:112)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:1228)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:1080)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: com.databricks.common.filesystem.InconsistentReadException: The
file might have been updated during query execution. Ensure that no pipeline
updates existing files during query execution and try again.
at
com.databricks.common.filesystem.LokiS3AInputStream.withExceptionRewrites(LokiS3FS.scala:247)
at
com.databricks.common.filesystem.LokiS3AInputStream.read(LokiS3FS.scala:251)
at java.base/java.io.FilterInputStream.read(FilterInputStream.java:82)
at
com.databricks.spark.metrics.FSInputStreamWithMetrics.$anonfun$read$2(FileSystemWithMetrics.scala:83)
at
com.databricks.spark.metrics.FSInputStreamWithMetrics.withTimeAndBytesReadMetric(FileSystemWithMetrics.scala:67)
at
com.databricks.spark.metrics.FSInputStreamWithMetrics.read(FileSystemWithMetrics.scala:82)
at java.base/java.io.FilterInputStream.read(FilterInputStream.java:82)
at
org.apache.parquet.io.DelegatingSeekableInputStream.read(DelegatingSeekableInputStream.java:61)
at
org.apache.parquet.bytes.BytesUtils.readIntLittleEndian(BytesUtils.java:83)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:570)
at
org.apache.parquet.hadoop.ParquetFileReader.<init>(ParquetFileReader.java:828)
at
org.apache.parquet.hadoop.ParquetFileReader.open(ParquetFileReader.java:682)
at
org.apache.spark.sql.execution.datasources.geoparquet.internal.ParquetFooterReader.readFooter(ParquetFooterReader.java:89)
at
org.apache.spark.sql.execution.datasources.geoparquet.internal.ParquetFooterReader.readFooter(ParquetFooterReader.java:78)
at
org.apache.spark.sql.execution.datasources.geoparquet.internal.ParquetFileFormat$.$anonfun$readParquetFootersInParallel$1(ParquetFileFormat.scala:442)
at
org.apache.spark.util.ThreadUtils$.$anonfun$parmap$3(ThreadUtils.scala:601)
at
com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63)
at
org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.$anonfun$runWithCaptured$6(SparkThreadLocalForwardingThreadPoolExecutor.scala:119)
at
com.databricks.sql.transaction.tahoe.mst.MSTThreadHelper$.runWithMstTxnId(MSTThreadHelper.scala:57)
at
org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.$anonfun$runWithCaptured$5(SparkThreadLocalForwardingThreadPoolExecutor.scala:118)
at
com.databricks.spark.util.IdentityClaim$.withClaim(IdentityClaim.scala:48)
at
org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.$anonfun$runWithCaptured$4(SparkThreadLocalForwardingThreadPoolExecutor.scala:117)
at
com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
at
org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:116)
at
org.apache.spark.util.threads.SparkThreadLocalCapturingHelper.runWithCaptured$(SparkThreadLocalForwardingThreadPoolExecutor.scala:93)
at
org.apache.spark.util.threads.CapturedSparkThreadLocals.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:141)
at
org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:601)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at
java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1395)
at
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
at
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1193)
at
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1666)
at
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1633)
at
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)
Caused by:
shaded.databricks.org.apache.hadoop.fs.s3a.RemoteFileChangedException: open
`s3a://databricks-workspace-stack-9216f-bucket/oregon-prod/1030888365966037/FileStore/geoparquet_test/part-00000-tid-6314889273487124623-1c5646d6-0432-4113-b7dd-a6a5c1bc3864-87-1-c000.snappy.parquet':
Change reported by S3 during open at position 1002. File
s3a://databricks-workspace-stack-9216f-bucket/oregon-prod/1030888365966037/FileStore/geoparquet_test/part-00000-tid-6314889273487124623-1c5646d6-0432-4113-b7dd-a6a5c1bc3864-87-1-c000.snappy.parquet
at given modTime (1000) was unavailable, null
at
shaded.databricks.org.apache.hadoop.fs.s3a.impl.ChangeTracker.processResponse(ChangeTracker.java:210)
at
shaded.databricks.org.apache.hadoop.fs.s3a.S3AInputStream.reopen(S3AInputStream.java:307)
at
shaded.databricks.org.apache.hadoop.fs.s3a.S3AInputStream.lambda$lazySeek$2(S3AInputStream.java:469)
at
shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$3(Invoker.java:247)
at
shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:134)
at
shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:128)
at
shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.lambda$maybeRetry$5(Invoker.java:371)
at
shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:435)
at
shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:367)
at
shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:245)
at
shaded.databricks.org.apache.hadoop.fs.s3a.Invoker.maybeRetry(Invoker.java:289)
at
shaded.databricks.org.apache.hadoop.fs.s3a.S3AInputStream.lazySeek(S3AInputStream.java:462)
at
shaded.databricks.org.apache.hadoop.fs.s3a.S3AInputStream.read(S3AInputStream.java:495)
at java.base/java.io.FilterInputStream.read(FilterInputStream.java:82)
at
com.databricks.common.filesystem.LokiS3AInputStream.$anonfun$read$1(LokiS3FS.scala:251)
at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.java:23)
at
com.databricks.common.filesystem.LokiS3AInputStream.withExceptionRewrites(LokiS3FS.scala:244)
... 39 more
```
Setting `spark.databricks.scan.modTimeCheck.enabled` is for getting rid of
the above error. This looks very hacky, but I have not found a better to
resolve this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]