tglanz opened a new issue, #1833:
URL: https://github.com/apache/datafusion-comet/issues/1833

   ### Describe the bug
   
   Trying to insert/query from iceberg table according to 
https://github.com/apache/datafusion-comet/blob/main/docs/source/user-guide/iceberg.md
 and encountering the following error during insertion:
   
       java.lang.NoSuchMethodError: 
'org.apache.parquet.column.ParquetProperties$Builder 
org.apache.parquet.column.ParquetProperties$Builder.withStatisticsEnabled(java.lang.String,
 boolean)'
   
   It seems to be an issue with `parquet-column` version conflicts between 
`1.15.2` and `1.13.1` and the relocation of the jar:
   
   - Without the relocation, the insert fails but the select statement works 
fine because the version is 1.13.1 and has a missing method.
   - With relocation, the insert works but the select statement fails because 
the shaded type is not part of the function signature.
   
   (Below are the exact details)
   
   ### Steps to reproduce
   
   Build comet
   
       make release
   
   Apply the following patch to iceberg (as described 
[here](https://github.com/apache/datafusion-comet/blob/main/docs/source/user-guide/iceberg.md)):
   
   ```diff
   diff --git a/gradle.properties b/gradle.properties
   index 5da56c59d..bd6e35e2f 100644
   --- a/gradle.properties
   +++ b/gradle.properties
   @@ -18,7 +18,7 @@ jmhJsonOutputPath=build/reports/jmh/results.json
    jmhIncludeRegex=.*
    systemProp.defaultFlinkVersions=2.0
    systemProp.knownFlinkVersions=1.19,1.20,2.0
   -systemProp.defaultSparkVersions=4.0
   +systemProp.defaultSparkVersions=3.5
    systemProp.knownSparkVersions=3.4,3.5,4.0
    systemProp.defaultKafkaVersions=3
    systemProp.knownKafkaVersions=3
   diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
   index 89dd5bf45..79ac91f72 100644
   --- a/gradle/libs.versions.toml
   +++ b/gradle/libs.versions.toml
   @@ -37,7 +37,7 @@ awssdk-s3accessgrants = "2.3.0"
    bson-ver = "4.11.5"
    caffeine = "2.9.3"
    calcite = "1.39.0"
   -comet = "0.8.1"
   +comet = "0.9.0-SNAPSHOT"
    datasketches = "6.2.0"
    delta-standalone = "3.3.1"
    delta-spark = "3.3.1"
   diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle
   index af7a1d74d..426997463 100644
   --- a/spark/v3.5/build.gradle
   +++ b/spark/v3.5/build.gradle
   @@ -301,8 +301,8 @@ 
project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
        relocate 'org.apache.avro', 'org.apache.iceberg.shaded.org.apache.avro'
        relocate 'avro.shaded', 
'org.apache.iceberg.shaded.org.apache.avro.shaded'
        relocate 'com.thoughtworks.paranamer', 
'org.apache.iceberg.shaded.com.thoughtworks.paranamer'
   -    relocate 'org.apache.parquet', 
'org.apache.iceberg.shaded.org.apache.parquet'
   -    relocate 'shaded.parquet', 
'org.apache.iceberg.shaded.org.apache.parquet.shaded'
   +    // relocate 'org.apache.parquet', 
'org.apache.iceberg.shaded.org.apache.parquet'
   +    // relocate 'shaded.parquet', 
'org.apache.iceberg.shaded.org.apache.parquet.shaded'
        relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc'
        relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift'
        relocate 'org.apache.hc.client5', 
'org.apache.iceberg.shaded.org.apache.hc.client5'
   diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
   index a361a7f1b..9021cd5c9 100644
   --- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
   +++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java
   @@ -24,6 +24,7 @@ import java.util.Objects;
    import java.util.Set;
    import java.util.function.Supplier;
    import java.util.stream.Collectors;
   +import org.apache.comet.parquet.SupportsComet;
    import org.apache.iceberg.DeleteFile;
    import org.apache.iceberg.FileContent;
    import org.apache.iceberg.FileScanTask;
   @@ -63,7 +64,7 @@ import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    class SparkBatchQueryScan extends 
SparkPartitioningAwareScan<PartitionScanTask>
   -    implements SupportsRuntimeV2Filtering {
   +    implements SupportsRuntimeV2Filtering, SupportsComet {
    
      private static final Logger LOG = 
LoggerFactory.getLogger(SparkBatchQueryScan.class);
    
   @@ -290,4 +291,9 @@ class SparkBatchQueryScan extends 
SparkPartitioningAwareScan<PartitionScanTask>
            runtimeFilterExpressions,
            caseSensitive());
      }
   +
   +  @Override
   +  public boolean isCometEnabled() {
   +    return true;
   +  }
    }
   ```
   
   Build iceberg
   
       ./gradlew build -x test -x integrationTest
   
   export `COMET_JAR` and `ICEBERG_JAR` accordingly.
   
   Run
   
   ```bash
   $SPARK_HOME/bin/spark-shell \
       --jars $COMET_JAR,$ICEBERG_JAR \
       --conf spark.driver.extraClassPath=$COMET_JAR,$ICEBERG_JAR \
       --conf spark.executor.extraClassPath=$COMET_JAR,$ICEBERG_JAR \
       --conf 
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
 \
       --conf 
spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog \
       --conf spark.sql.catalog.spark_catalog.type=hadoop \
       --conf spark.sql.catalog.spark_catalog.warehouse=/tmp/warehouse \
       --conf spark.plugins=org.apache.spark.CometPlugin \
       --conf 
spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager
 \
       --conf spark.sql.iceberg.parquet.reader-type=COMET \
       --conf spark.comet.explainFallback.enabled=true \
       --conf spark.memory.offHeap.enabled=true \
       --conf spark.memory.offHeap.size=16g
   ```
   
   and run the following program:
   
   ```scala
   scala> spark.sql(s"CREATE TABLE IF NOT EXISTS t1 (c0 INT, c1 STRING) USING 
iceberg")
   25/06/03 16:13:45 INFO core/src/lib.rs: Comet native library version 0.9.0 
initialized
   25/06/03 16:13:45 WARN CometExecRule: Comet cannot execute some parts of 
this plan natively (set spark.comet.explainFallback.enabled=false to disable 
this logging):
    CreateTable [COMET: CreateTable is not supported]
   
   res0: org.apache.spark.sql.DataFrame = []
   
   scala> spark.sql(s"INSERT INTO t1 VALUES ${(0 until 100).map(i => (i, 
i+3)).mkString(",")};")
   25/06/03 16:13:59 WARN CometExecRule: Comet cannot execute some parts of 
this plan natively (set spark.comet.explainFallback.enabled=false to disable 
this logging):
    AppendData [COMET: AppendData is not supported]
   +-  LocalTableScan [COMET: LocalTableScan is not supported]
   
   25/06/03 16:14:00 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
   java.lang.NoSuchMethodError: 
'org.apache.parquet.column.ParquetProperties$Builder 
org.apache.parquet.column.ParquetProperties$Builder.withStatisticsEnabled(java.lang.String,
 boolean)'
           at 
org.apache.iceberg.parquet.Parquet$WriteBuilder.build(Parquet.java:446)
           at 
org.apache.iceberg.parquet.Parquet$DataWriteBuilder.build(Parquet.java:866)
           at 
org.apache.iceberg.data.BaseFileWriterFactory.newDataWriter(BaseFileWriterFactory.java:131)
           at 
org.apache.iceberg.io.RollingDataWriter.newWriter(RollingDataWriter.java:52)
           at 
org.apache.iceberg.io.RollingDataWriter.newWriter(RollingDataWriter.java:32)
           at 
org.apache.iceberg.io.RollingFileWriter.openCurrentWriter(RollingFileWriter.java:112)
           at 
org.apache.iceberg.io.RollingDataWriter.<init>(RollingDataWriter.java:47)
           at 
org.apache.iceberg.spark.source.SparkWrite$UnpartitionedDataWriter.<init>(SparkWrite.java:717)
           at 
org.apache.iceberg.spark.source.SparkWrite$WriterFactory.createWriter(SparkWrite.java:691)
           at 
org.apache.iceberg.spark.source.SparkWrite$WriterFactory.createWriter(SparkWrite.java:668)
           at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:441)
           at 
org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:430)
           at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:496)
           at 
org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:393)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
           at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
           at org.apache.spark.scheduler.Task.run(Task.scala:141)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
           at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
           at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
           at java.base/java.lang.Thread.run(Thread.java:829)
   ```
   
   ### Expected behavior
   
   Iceberg insert and select statements should not fail.
   
   ### Additional context
   
   Some insights:
   
   1. The missing method 
`org.apache.parquet.column.ParquetProperties$Builder.withStatisticsEnabled` is 
found in parquet-column version `1.15.2` (iceberg's required version) but not 
in `1.13.1` (spark v3.5 required version).
   
   1. If we bring back the relocation of the jars the insert will work.
   
   ```diff
   -// relocate 'org.apache.parquet', 
'org.apache.iceberg.shaded.org.apache.parquet'
   -// relocate 'shaded.parquet', 
'org.apache.iceberg.shaded.org.apache.parquet.shaded'
   +relocate 'org.apache.parquet', 
'org.apache.iceberg.shaded.org.apache.parquet'
   +relocate 'shaded.parquet', 
'org.apache.iceberg.shaded.org.apache.parquet.shaded'
   ```
   
   From some checks, it seems that by commenting out the relocation, spark 
loads `1.13.1` of parquet-column and hence the method is missing.
   
   However, assuming there is already data in the table (see next bullet), the 
select statement works fine:
   
   ```scala
   scala> spark.sql("SELECT * from t1").show()
   25/06/03 16:27:51 INFO core/src/lib.rs: Comet native library version 0.9.0 
initialized
   25/06/03 16:27:51 WARN CometExecRule: Comet cannot execute some parts of 
this plan natively (set spark.comet.explainFallback.enabled=false to disable 
this logging):
    CollectLimit [COMET: CollectLimit is not supported]
   +-  Project [COMET: toprettystring is not supported]
      +- CometScanWrapper
   
   +---+---+                                                                    
   
   | c0| c1|
   +---+---+
   |  0|  3|
   |  1|  4|
   |  2|  5|
   |  3|  6|
   |  4|  7|
   |  5|  8|
   |  6|  9|
   |  7| 10|
   |  8| 11|
   |  9| 12|
   | 10| 13|
   | 11| 14|
   | 12| 15|
   | 13| 16|
   | 14| 17|
   | 15| 18|
   | 16| 19|
   | 17| 20|
   | 18| 21|
   | 19| 22|
   +---+---+
   only showing top 20 rows
   ```
   
   1. Tested with latest releases and on heads
   
   1. If we keep the relocation, the insert works:
   
   ```scala
   scala> spark.sql(s"INSERT INTO t1 VALUES ${(0 until 100).map(i => (i, 
i+3)).mkString(",")};")
   25/06/03 16:24:47 INFO core/src/lib.rs: Comet native library version 0.9.0 
initialized
   25/06/03 16:24:47 WARN CometExecRule: Comet cannot execute some parts of 
this plan natively (set spark.comet.explainFallback.enabled=false to disable 
this logging):
    AppendData [COMET: AppendData is not supported]
   +-  LocalTableScan [COMET: LocalTableScan is not supported]
   
   res0: org.apache.spark.sql.DataFrame = []     
   ```
   
   However, the select statement now fails with
   
   ```scala
   scala> spark.sql("SELECT * from t1").show()
   25/06/03 16:25:27 WARN CometExecRule: Comet cannot execute some parts of 
this plan natively (set spark.comet.explainFallback.enabled=false to disable 
this logging):
    CollectLimit [COMET: CollectLimit is not supported]
   +-  Project [COMET: toprettystring is not supported]
      +- CometScanWrapper
   
   25/06/03 16:25:27 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 22)
   java.lang.NoSuchMethodError: 'org.apache.comet.parquet.ColumnReader 
org.apache.comet.parquet.Utils.getColumnReader(org.apache.spark.sql.types.DataType,
 org.apache.iceberg.shaded.org.apache.parquet.column.ColumnDescriptor, 
org.apache.comet.CometSchemaImporter, int, boolean, boolean)'
           at 
org.apache.iceberg.spark.data.vectorized.CometColumnReader.reset(CometColumnReader.java:95)
           at 
org.apache.iceberg.spark.data.vectorized.CometColumnarBatchReader.setRowGroupInfo(CometColumnarBatchReader.java:81)
           at 
org.apache.iceberg.parquet.VectorizedParquetReader$FileIterator.advance(VectorizedParquetReader.java:166)
           at 
org.apache.iceberg.parquet.VectorizedParquetReader$FileIterator.next(VectorizedParquetReader.java:139)
           at 
org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:129)
           at 
org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120)
           at 
org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158)
           at 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
           at 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
           at scala.Option.exists(Option.scala:376)
           at 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
           at 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97)
           at 
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
           at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
           at 
org.apache.spark.sql.comet.CometBatchScanExec$$anon$1.hasNext(CometBatchScanExec.scala:62)
           at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.cometcolumnartorow_nextBatch_0$(generated.java:30)
           at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:43)
           at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
           at 
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
           at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
           at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:893)
           at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:893)
           at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
           at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
           at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
           at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
           at org.apache.spark.scheduler.Task.run(Task.scala:141)
           at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
           at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
           at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
           at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
           at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
           at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
           at java.base/java.lang.Thread.run(Thread.java:829)
   ```
   
   which from my understanding is caused because the shaded type 
`org.apache.iceberg.shaded.org.apache.parquet.column.ColumnDescriptor` is now 
not part of the `org.apache.comet.parquet.Utils.getColumnReader` function 
signature.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to