tglanz opened a new issue, #1833: URL: https://github.com/apache/datafusion-comet/issues/1833
### Describe the bug Trying to insert/query from iceberg table according to https://github.com/apache/datafusion-comet/blob/main/docs/source/user-guide/iceberg.md and encountering the following error during insertion: java.lang.NoSuchMethodError: 'org.apache.parquet.column.ParquetProperties$Builder org.apache.parquet.column.ParquetProperties$Builder.withStatisticsEnabled(java.lang.String, boolean)' It seems to be an issue with `parquet-column` version conflicts between `1.15.2` and `1.13.1` and the relocation of the jar: - Without the relocation, the insert fails but the select statement works fine because the version is 1.13.1 and has a missing method. - With relocation, the insert works but the select statement fails because the shaded type is not part of the function signature. (Below are the exact details) ### Steps to reproduce Build comet make release Apply the following patch to iceberg (as described [here](https://github.com/apache/datafusion-comet/blob/main/docs/source/user-guide/iceberg.md)): ```diff diff --git a/gradle.properties b/gradle.properties index 5da56c59d..bd6e35e2f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -18,7 +18,7 @@ jmhJsonOutputPath=build/reports/jmh/results.json jmhIncludeRegex=.* systemProp.defaultFlinkVersions=2.0 systemProp.knownFlinkVersions=1.19,1.20,2.0 -systemProp.defaultSparkVersions=4.0 +systemProp.defaultSparkVersions=3.5 systemProp.knownSparkVersions=3.4,3.5,4.0 systemProp.defaultKafkaVersions=3 systemProp.knownKafkaVersions=3 diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 89dd5bf45..79ac91f72 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -37,7 +37,7 @@ awssdk-s3accessgrants = "2.3.0" bson-ver = "4.11.5" caffeine = "2.9.3" calcite = "1.39.0" -comet = "0.8.1" +comet = "0.9.0-SNAPSHOT" datasketches = "6.2.0" delta-standalone = "3.3.1" delta-spark = "3.3.1" diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle index af7a1d74d..426997463 100644 --- a/spark/v3.5/build.gradle +++ b/spark/v3.5/build.gradle @@ -301,8 +301,8 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio relocate 'org.apache.avro', 'org.apache.iceberg.shaded.org.apache.avro' relocate 'avro.shaded', 'org.apache.iceberg.shaded.org.apache.avro.shaded' relocate 'com.thoughtworks.paranamer', 'org.apache.iceberg.shaded.com.thoughtworks.paranamer' - relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet' - relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded' + // relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet' + // relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded' relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc' relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift' relocate 'org.apache.hc.client5', 'org.apache.iceberg.shaded.org.apache.hc.client5' diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java index a361a7f1b..9021cd5c9 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java @@ -24,6 +24,7 @@ import java.util.Objects; import java.util.Set; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.comet.parquet.SupportsComet; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileContent; import org.apache.iceberg.FileScanTask; @@ -63,7 +64,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; class SparkBatchQueryScan extends SparkPartitioningAwareScan<PartitionScanTask> - implements SupportsRuntimeV2Filtering { + implements SupportsRuntimeV2Filtering, SupportsComet { private static final Logger LOG = LoggerFactory.getLogger(SparkBatchQueryScan.class); @@ -290,4 +291,9 @@ class SparkBatchQueryScan extends SparkPartitioningAwareScan<PartitionScanTask> runtimeFilterExpressions, caseSensitive()); } + + @Override + public boolean isCometEnabled() { + return true; + } } ``` Build iceberg ./gradlew build -x test -x integrationTest export `COMET_JAR` and `ICEBERG_JAR` accordingly. Run ```bash $SPARK_HOME/bin/spark-shell \ --jars $COMET_JAR,$ICEBERG_JAR \ --conf spark.driver.extraClassPath=$COMET_JAR,$ICEBERG_JAR \ --conf spark.executor.extraClassPath=$COMET_JAR,$ICEBERG_JAR \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.spark_catalog.type=hadoop \ --conf spark.sql.catalog.spark_catalog.warehouse=/tmp/warehouse \ --conf spark.plugins=org.apache.spark.CometPlugin \ --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ --conf spark.sql.iceberg.parquet.reader-type=COMET \ --conf spark.comet.explainFallback.enabled=true \ --conf spark.memory.offHeap.enabled=true \ --conf spark.memory.offHeap.size=16g ``` and run the following program: ```scala scala> spark.sql(s"CREATE TABLE IF NOT EXISTS t1 (c0 INT, c1 STRING) USING iceberg") 25/06/03 16:13:45 INFO core/src/lib.rs: Comet native library version 0.9.0 initialized 25/06/03 16:13:45 WARN CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging): CreateTable [COMET: CreateTable is not supported] res0: org.apache.spark.sql.DataFrame = [] scala> spark.sql(s"INSERT INTO t1 VALUES ${(0 until 100).map(i => (i, i+3)).mkString(",")};") 25/06/03 16:13:59 WARN CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging): AppendData [COMET: AppendData is not supported] +- LocalTableScan [COMET: LocalTableScan is not supported] 25/06/03 16:14:00 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1) java.lang.NoSuchMethodError: 'org.apache.parquet.column.ParquetProperties$Builder org.apache.parquet.column.ParquetProperties$Builder.withStatisticsEnabled(java.lang.String, boolean)' at org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:446) at org.apache.iceberg.parquet.Parquet$DataWriteBuilder.build(Parquet.java:866) at org.apache.iceberg.data.BaseFileWriterFactory.newDataWriter(BaseFileWriterFactory.java:131) at org.apache.iceberg.io.RollingDataWriter.newWriter(RollingDataWriter.java:52) at org.apache.iceberg.io.RollingDataWriter.newWriter(RollingDataWriter.java:32) at org.apache.iceberg.io.RollingFileWriter.openCurrentWriter(RollingFileWriter.java:112) at org.apache.iceberg.io.RollingDataWriter.<init>(RollingDataWriter.java:47) at org.apache.iceberg.spark.source.SparkWrite$UnpartitionedDataWriter.<init>(SparkWrite.java:717) at org.apache.iceberg.spark.source.SparkWrite$WriterFactory.createWriter(SparkWrite.java:691) at org.apache.iceberg.spark.source.SparkWrite$WriterFactory.createWriter(SparkWrite.java:668) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:441) at org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:430) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:496) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:393) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) ``` ### Expected behavior Iceberg insert and select statements should not fail. ### Additional context Some insights: 1. The missing method `org.apache.parquet.column.ParquetProperties$Builder.withStatisticsEnabled` is found in parquet-column version `1.15.2` (iceberg's required version) but not in `1.13.1` (spark v3.5 required version). 1. If we bring back the relocation of the jars the insert will work. ```diff -// relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet' -// relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded' +relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet' +relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded' ``` From some checks, it seems that by commenting out the relocation, spark loads `1.13.1` of parquet-column and hence the method is missing. However, assuming there is already data in the table (see next bullet), the select statement works fine: ```scala scala> spark.sql("SELECT * from t1").show() 25/06/03 16:27:51 INFO core/src/lib.rs: Comet native library version 0.9.0 initialized 25/06/03 16:27:51 WARN CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging): CollectLimit [COMET: CollectLimit is not supported] +- Project [COMET: toprettystring is not supported] +- CometScanWrapper +---+---+ | c0| c1| +---+---+ | 0| 3| | 1| 4| | 2| 5| | 3| 6| | 4| 7| | 5| 8| | 6| 9| | 7| 10| | 8| 11| | 9| 12| | 10| 13| | 11| 14| | 12| 15| | 13| 16| | 14| 17| | 15| 18| | 16| 19| | 17| 20| | 18| 21| | 19| 22| +---+---+ only showing top 20 rows ``` 1. Tested with latest releases and on heads 1. If we keep the relocation, the insert works: ```scala scala> spark.sql(s"INSERT INTO t1 VALUES ${(0 until 100).map(i => (i, i+3)).mkString(",")};") 25/06/03 16:24:47 INFO core/src/lib.rs: Comet native library version 0.9.0 initialized 25/06/03 16:24:47 WARN CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging): AppendData [COMET: AppendData is not supported] +- LocalTableScan [COMET: LocalTableScan is not supported] res0: org.apache.spark.sql.DataFrame = [] ``` However, the select statement now fails with ```scala scala> spark.sql("SELECT * from t1").show() 25/06/03 16:25:27 WARN CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging): CollectLimit [COMET: CollectLimit is not supported] +- Project [COMET: toprettystring is not supported] +- CometScanWrapper 25/06/03 16:25:27 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 22) java.lang.NoSuchMethodError: 'org.apache.comet.parquet.ColumnReader org.apache.comet.parquet.Utils.getColumnReader(org.apache.spark.sql.types.DataType, org.apache.iceberg.shaded.org.apache.parquet.column.ColumnDescriptor, org.apache.comet.CometSchemaImporter, int, boolean, boolean)' at org.apache.iceberg.spark.data.vectorized.CometColumnReader.reset(CometColumnReader.java:95) at org.apache.iceberg.spark.data.vectorized.CometColumnarBatchReader.setRowGroupInfo(CometColumnarBatchReader.java:81) at org.apache.iceberg.parquet.VectorizedParquetReader$FileIterator.advance(VectorizedParquetReader.java:166) at org.apache.iceberg.parquet.VectorizedParquetReader$FileIterator.next(VectorizedParquetReader.java:139) at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:129) at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120) at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63) at scala.Option.exists(Option.scala:376) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97) at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at org.apache.spark.sql.comet.CometBatchScanExec$$anon$1.hasNext(CometBatchScanExec.scala:62) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.cometcolumnartorow_nextBatch_0$(generated.java:30) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:43) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) ``` which from my understanding is caused because the shaded type `org.apache.iceberg.shaded.org.apache.parquet.column.ColumnDescriptor` is now not part of the `org.apache.comet.parquet.Utils.getColumnReader` function signature. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org