This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d4bad42de3 [GLUTEN-9335][VL] Support iceberg partition write (#10497)
d4bad42de3 is described below

commit d4bad42de30eba2e53b1d02c6386df62e2f3f8ce
Author: Jin Chengcheng <[email protected]>
AuthorDate: Wed Sep 3 13:17:40 2025 +0100

    [GLUTEN-9335][VL] Support iceberg partition write (#10497)
    
    Add Protobuf struct IcebergPartitionField to transfer the iceberg id 
information, add IcebergPartitionSpec to transfer partition information.
    Build with test and benchmark in CI and fix IcebergWriteTest build.
    Set the file format to orc to bypass native parquet write for partitioned 
tpch iceberg suite, after facebookincubator/velox#14670 which supports fanout 
false mode merged, we can relax the restriction.
    
    Relevant PR: facebookincubator/velox#13874
---
 .../write/IcebergColumnarBatchDataWriter.scala     |  1 -
 .../connector/write/IcebergDataWriteFactory.scala  |  2 +-
 .../iceberg/transforms/IcebergTransformUtil.scala  | 35 ++++++++++++--
 .../gluten/execution/VeloxTPCHIcebergSuite.scala   |  4 ++
 .../execution/enhanced/VeloxIcebergSuite.scala     | 54 +++++++++++++++++++++-
 .../apache/gluten/proto/IcebergPartitionSpec.proto |  7 +--
 cpp/velox/compute/iceberg/IcebergWriter.cc         | 12 +++--
 cpp/velox/tests/iceberg/IcebergWriteTest.cc        | 14 +++++-
 ...velox-buildstatic-centos-7-enhanced-features.sh |  2 +-
 .../apache/gluten/execution/IcebergWriteExec.scala | 12 ++---
 .../iceberg/spark/source/IcebergWriteUtil.scala    | 35 ++++----------
 11 files changed, 128 insertions(+), 50 deletions(-)

diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergColumnarBatchDataWriter.scala
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergColumnarBatchDataWriter.scala
index 4064c55b27..068cdf6cd4 100644
--- 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergColumnarBatchDataWriter.scala
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergColumnarBatchDataWriter.scala
@@ -62,7 +62,6 @@ case class IcebergColumnarBatchDataWriter(
 
   private def parseDataFile(json: String, spec: PartitionSpec, sortOrder: 
SortOrder): DataFile = {
     val dataFile = mapper.readValue(json, classOf[DataFileJson])
-
     val builder = DataFiles
       .builder(spec)
       .withPath(dataFile.path)
diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala
 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala
index df7c471dd0..7004fa57ec 100644
--- 
a/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/gluten/connector/write/IcebergDataWriteFactory.scala
@@ -56,7 +56,7 @@ case class IcebergDataWriteFactory(
     val fields = partitionSpec
       .fields()
       .stream()
-      .map[IcebergPartitionField](IcebergTransformUtil.convertPartitionField _)
+      .map[IcebergPartitionField](f => 
IcebergTransformUtil.convertPartitionField(f, partitionSpec))
       .collect(Collectors.toList[IcebergPartitionField])
     val specProto = IcebergPartitionSpec
       .newBuilder()
diff --git 
a/backends-velox/src-iceberg/main/scala/org/apache/iceberg/transforms/IcebergTransformUtil.scala
 
b/backends-velox/src-iceberg/main/scala/org/apache/iceberg/transforms/IcebergTransformUtil.scala
index a0de70a7f9..3cd82bd5e5 100644
--- 
a/backends-velox/src-iceberg/main/scala/org/apache/iceberg/transforms/IcebergTransformUtil.scala
+++ 
b/backends-velox/src-iceberg/main/scala/org/apache/iceberg/transforms/IcebergTransformUtil.scala
@@ -16,16 +16,18 @@
  */
 package org.apache.iceberg.transforms
 
+import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.proto.{IcebergPartitionField, TransformType}
 
-import org.apache.iceberg.PartitionField
+import org.apache.iceberg.{PartitionField, PartitionSpec}
 
 object IcebergTransformUtil {
 
-  def convertPartitionField(field: PartitionField): IcebergPartitionField = {
+  def convertPartitionField(field: PartitionField, spec: PartitionSpec): 
IcebergPartitionField = {
     val transform = field.transform()
-    // TODO: if the field is in nest column, concat it.
-    var builder = IcebergPartitionField.newBuilder().setName(field.name())
+    val sourceName = spec.schema().asStruct().field(field.sourceId()).name()
+    var builder =
+      
IcebergPartitionField.newBuilder().setName(sourceName).setSourceId(field.sourceId())
     builder = transform match {
       case _: Identity[_] => builder.setTransform(TransformType.IDENTITY)
       case _: Years[_] => builder.setTransform(TransformType.YEAR)
@@ -34,7 +36,32 @@ object IcebergTransformUtil {
       case _: Hours[_] => builder.setTransform(TransformType.HOUR)
       case b: Bucket[_] => 
builder.setTransform(TransformType.BUCKET).setParameter(b.numBuckets())
       case t: Truncate[_] => 
builder.setTransform(TransformType.TRUNCATE).setParameter(t.width)
+      case t: Timestamps => builder.setTransform(convertTimestamps(t))
+      case d: Dates => builder.setTransform(convertDates(d))
     }
     builder.build()
   }
+
+  private def convertTimestamps(timestamps: Timestamps): TransformType = {
+    // We could not match the enum instance because Iceberg 1.5.0 enum is 
different, and we fall
+    // back TimestampNano data type
+    timestamps.toString match {
+      case "hour" => TransformType.HOUR
+      case "day" => TransformType.DAY
+      case "month" => TransformType.MONTH
+      case "year" => TransformType.YEAR
+      case _ => throw new GlutenNotSupportException()
+    }
+  }
+
+  private def convertDates(dates: Dates): TransformType = {
+    // We could not match the enum instance because Iceberg 1.5.0 enum is 
different, and we fall
+    // back TimestampNano data type
+    dates match {
+      case Dates.DAY => TransformType.DAY
+      case Dates.MONTH => TransformType.MONTH
+      case Dates.YEAR => TransformType.YEAR
+      case _ => throw new GlutenNotSupportException()
+    }
+  }
 }
diff --git 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/VeloxTPCHIcebergSuite.scala
 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/VeloxTPCHIcebergSuite.scala
index 5bc26e81b7..5456e0ba89 100644
--- 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/VeloxTPCHIcebergSuite.scala
+++ 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/VeloxTPCHIcebergSuite.scala
@@ -93,6 +93,7 @@ class VeloxTPCHIcebergSuite extends VeloxTPCHSuite {
 }
 
 class VeloxPartitionedTableTPCHIcebergSuite extends VeloxTPCHIcebergSuite {
+
   override protected def createTPCHNotNullTables(): Unit = {
     TPCHTables.map {
       table =>
@@ -103,8 +104,11 @@ class VeloxPartitionedTableTPCHIcebergSuite extends 
VeloxTPCHIcebergSuite {
           .repartition(table.partitionColumns.map(col): _*)
           .sortWithinPartitions(table.partitionColumns.map(col): _*)
 
+        // Use ORC to disable native write in case of OOM, velox behavior 
likes FANOUT_ENABLED true
+        // The number of writers too much.
         tableDF.write
           .format("iceberg")
+          .option(SparkWriteOptions.WRITE_FORMAT, "orc")
           .partitionBy(table.partitionColumns: _*)
           .option(SparkWriteOptions.FANOUT_ENABLED, "false")
           .mode("overwrite")
diff --git 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
index ee228d5120..44ee905d01 100644
--- 
a/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
+++ 
b/backends-velox/src-iceberg/test/scala/org/apache/gluten/execution/enhanced/VeloxIcebergSuite.scala
@@ -46,8 +46,7 @@ class VeloxIcebergSuite extends IcebergSuite {
     }
   }
 
-  // TODO: support later
-  ignore("iceberg insert partition table identity transform") {
+  test("iceberg insert partition table identity transform") {
     withTable("iceberg_tb2") {
       spark.sql("""
                   |create table if not exists iceberg_tb2(a int, b int)
@@ -111,6 +110,57 @@ class VeloxIcebergSuite extends IcebergSuite {
                                  |""".stripMargin)
       val result = selectDf.collect()
       assert(result.length == 5)
+
+    }
+  }
+
+  test("iceberg insert partition table bucket transform") {
+    withTable("iceberg_tb2") {
+      spark.sql("""
+                  |create table if not exists iceberg_tb2(a int, b int)
+                  |using iceberg
+                  |partitioned by (bucket(16, a));
+                  |""".stripMargin)
+      val df = spark.sql("""
+                           |insert into table iceberg_tb2 values(1098, 189)
+                           |""".stripMargin)
+      assert(
+        df.queryExecution.executedPlan
+          .asInstanceOf[CommandResultExec]
+          .commandPhysicalPlan
+          .isInstanceOf[VeloxIcebergAppendDataExec])
+      val selectDf = spark.sql("""
+                                 |select * from iceberg_tb2;
+                                 |""".stripMargin)
+      val result = selectDf.collect()
+      assert(result.length == 1)
+      assert(result(0).get(0) == 1098)
+      assert(result(0).get(1) == 189)
+    }
+  }
+
+  test("iceberg insert partition table truncate transform") {
+    withTable("iceberg_tb2") {
+      spark.sql("""
+                  |create table if not exists iceberg_tb2(a int, b int)
+                  |using iceberg
+                  |partitioned by (truncate(16, a));
+                  |""".stripMargin)
+      val df = spark.sql("""
+                           |insert into table iceberg_tb2 values(1098, 189)
+                           |""".stripMargin)
+      assert(
+        df.queryExecution.executedPlan
+          .asInstanceOf[CommandResultExec]
+          .commandPhysicalPlan
+          .isInstanceOf[VeloxIcebergAppendDataExec])
+      val selectDf = spark.sql("""
+                                 |select * from iceberg_tb2;
+                                 |""".stripMargin)
+      val result = selectDf.collect()
+      assert(result.length == 1)
+      assert(result(0).get(0) == 1098)
+      assert(result(0).get(1) == 189)
     }
   }
 
diff --git 
a/backends-velox/src/main/resources/org/apache/gluten/proto/IcebergPartitionSpec.proto
 
b/backends-velox/src/main/resources/org/apache/gluten/proto/IcebergPartitionSpec.proto
index 1a99058d8a..e59fb88a9c 100644
--- 
a/backends-velox/src/main/resources/org/apache/gluten/proto/IcebergPartitionSpec.proto
+++ 
b/backends-velox/src/main/resources/org/apache/gluten/proto/IcebergPartitionSpec.proto
@@ -17,9 +17,10 @@ enum TransformType {
 }
 
 message IcebergPartitionField {
-  string name = 1;
-  TransformType transform = 2;
-  optional int32 parameter = 3;  // Optional parameter for transform config
+  int32 source_id = 1;
+  string name = 2;
+  TransformType transform = 3;
+  optional int32 parameter = 4;  // Optional parameter for transform config
 }
 
 message IcebergPartitionSpec {
diff --git a/cpp/velox/compute/iceberg/IcebergWriter.cc 
b/cpp/velox/compute/iceberg/IcebergWriter.cc
index 5ba500e670..11718c8901 100644
--- a/cpp/velox/compute/iceberg/IcebergWriter.cc
+++ b/cpp/velox/compute/iceberg/IcebergWriter.cc
@@ -20,6 +20,7 @@
 #include "IcebergPartitionSpec.pb.h"
 #include "compute/ProtobufUtils.h"
 #include "compute/iceberg/IcebergFormat.h"
+#include "config/VeloxConfig.h"
 #include "utils/ConfigExtractor.h"
 #include "velox/connectors/hive/iceberg/IcebergDataSink.h"
 #include "velox/connectors/hive/iceberg/IcebergDeleteFile.h"
@@ -100,9 +101,14 @@ IcebergWriter::IcebergWriter(
     std::shared_ptr<facebook::velox::memory::MemoryPool> memoryPool,
     std::shared_ptr<facebook::velox::memory::MemoryPool> connectorPool)
     : rowType_(rowType), field_(convertToIcebergNestedField(field)), 
pool_(memoryPool), connectorPool_(connectorPool) {
-  auto connectorSessionProperties_ = getHiveConfig(
-      
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
 std::string>(sparkConfs)));
-  connectorConfig_ = 
std::make_shared<facebook::velox::connector::hive::HiveConfig>(connectorSessionProperties_);
+  auto veloxCfg =
+      
std::make_shared<facebook::velox::config::ConfigBase>(std::unordered_map<std::string,
 std::string>(sparkConfs));
+  connectorSessionProperties_ = 
std::make_shared<facebook::velox::config::ConfigBase>(
+      std::unordered_map<std::string, std::string>(), true);
+  connectorSessionProperties_->set(
+      
facebook::velox::connector::hive::HiveConfig::kMaxPartitionsPerWritersSession,
+      std::to_string(veloxCfg->get<int32_t>(kMaxPartitions, 10000)));
+  connectorConfig_ = 
std::make_shared<facebook::velox::connector::hive::HiveConfig>(getHiveConfig(veloxCfg));
   connectorQueryCtx_ = std::make_unique<connector::ConnectorQueryCtx>(
       pool_.get(),
       connectorPool_.get(),
diff --git a/cpp/velox/tests/iceberg/IcebergWriteTest.cc 
b/cpp/velox/tests/iceberg/IcebergWriteTest.cc
index 59b2dc754f..574d749370 100644
--- a/cpp/velox/tests/iceberg/IcebergWriteTest.cc
+++ b/cpp/velox/tests/iceberg/IcebergWriteTest.cc
@@ -29,7 +29,7 @@ namespace gluten {
 class VeloxIcebergWriteTest : public ::testing::Test, public 
test::VectorTestBase {
  protected:
   static void SetUpTestCase() {
-    memory::MemoryManager::testingSetInstance({});
+    
memory::MemoryManager::testingSetInstance(memory::MemoryManager::Options{});
     parquet::registerParquetWriterFactory();
     Type::registerSerDe();
     dwio::common::registerFileSinks();
@@ -43,11 +43,23 @@ class VeloxIcebergWriteTest : public ::testing::Test, 
public test::VectorTestBas
 TEST_F(VeloxIcebergWriteTest, write) {
   auto vector = makeRowVector({makeFlatVector<int8_t>({1, 2}), 
makeFlatVector<int16_t>({1, 2})});
   auto tmpPath = tmpDir_->getPath();
+  std::vector<connector::hive::iceberg::IcebergPartitionSpec::Field> fields;
+  auto partitionSpec = std::make_shared<const 
connector::hive::iceberg::IcebergPartitionSpec>(0, fields);
+
+  gluten::IcebergNestedField root;
+  root.set_id(0);
+  gluten::IcebergNestedField* child1 = root.add_children();
+  child1->set_id(1);
+  gluten::IcebergNestedField* child2 = root.add_children();
+  child2->set_id(2);
+
   auto writer = std::make_unique<IcebergWriter>(
       asRowType(vector->type()),
       1,
       tmpPath + "/iceberg_write_test_table",
       common::CompressionKind::CompressionKind_ZSTD,
+      partitionSpec,
+      root,
       std::unordered_map<std::string, std::string>(),
       pool_,
       connectorPool_);
diff --git a/dev/ci-velox-buildstatic-centos-7-enhanced-features.sh 
b/dev/ci-velox-buildstatic-centos-7-enhanced-features.sh
index 84b3281bae..586db397a8 100755
--- a/dev/ci-velox-buildstatic-centos-7-enhanced-features.sh
+++ b/dev/ci-velox-buildstatic-centos-7-enhanced-features.sh
@@ -20,5 +20,5 @@ set -e
 source /opt/rh/devtoolset-11/enable
 source /opt/rh/rh-git227/enable
 export NUM_THREADS=4
-./dev/builddeps-veloxbe.sh --enable_vcpkg=ON --build_arrow=OFF 
--build_tests=OFF --build_benchmarks=OFF \
+./dev/builddeps-veloxbe.sh --enable_vcpkg=ON --build_arrow=OFF 
--build_tests=ON --build_benchmarks=ON \
                            --build_examples=OFF --enable_s3=ON --enable_gcs=ON 
--enable_hdfs=ON --enable_abfs=ON --enable_enhanced_features=ON
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
index 0f040f8775..5565ccbf35 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergWriteExec.scala
@@ -55,7 +55,7 @@ trait IcebergWriteExec extends ColumnarV2TableWriteExec {
 
   private def validatePartitionType(schema: Schema, field: PartitionField): 
Boolean = {
     val partitionType = schema.findType(field.sourceId())
-    val unSupportType = Seq(TypeID.DOUBLE, TypeID.FLOAT, TypeID.BINARY, 
TypeID.DECIMAL)
+    val unSupportType = Seq(TypeID.DOUBLE, TypeID.FLOAT, TypeID.DECIMAL)
     !unSupportType.contains(partitionType.typeId())
   }
 
@@ -70,9 +70,6 @@ trait IcebergWriteExec extends ColumnarV2TableWriteExec {
       return ValidationResult.failed("Contains unsupported data type")
     }
     val spec = IcebergWriteUtil.getTable(write).spec()
-    if (spec.isPartitioned) {
-      return ValidationResult.failed("Not support write partition table")
-    }
     if (spec.isPartitioned) {
       val topIds = spec.schema().columns().asScala.map(c => c.fieldId())
       if (
@@ -81,12 +78,11 @@ trait IcebergWriteExec extends ColumnarV2TableWriteExec {
           .stream()
           .anyMatch(
             f =>
-              !f.transform().isIdentity
-                || !validatePartitionType(spec.schema(), f) || 
!topIds.contains(f.sourceId()))
+              !validatePartitionType(spec.schema(), f) || !topIds
+                .contains(f.sourceId()) || f.transform().isVoid)
       ) {
         return ValidationResult.failed(
-          "Not support write non identity partition table," +
-            "or contains unsupported partition type, or is nested partition 
column")
+          "Not support write unsupported partition type, or is nested 
partition column")
       }
     }
     if (IcebergWriteUtil.getTable(write).sortOrder().isSorted) {
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
 
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
index 1913eca5ef..7ebd4cad44 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/IcebergWriteUtil.scala
@@ -16,18 +16,16 @@
  */
 package org.apache.iceberg.spark.source
 
-import org.apache.spark.sql.connector.write.{BatchWrite, Write, 
WriterCommitMessage}
+import org.apache.spark.sql.connector.write.{Write, WriterCommitMessage}
 
 import org.apache.iceberg._
+import org.apache.iceberg.spark.SparkWriteConf
 import org.apache.iceberg.spark.source.SparkWrite.TaskCommit
 import org.apache.iceberg.types.Type
 import org.apache.iceberg.types.Type.TypeID
 import org.apache.iceberg.types.Types.{ListType, MapType}
 
 object IcebergWriteUtil {
-  def isBatchAppend(write: BatchWrite): Boolean = {
-    write.getClass.getSimpleName.equals("BatchAppend")
-  }
 
   def isDataWrite(write: Write): Boolean = {
     write.isInstanceOf[SparkWrite]
@@ -60,32 +58,24 @@ object IcebergWriteUtil {
     field.get(write).asInstanceOf[java.util.Map[String, String]]
   }
 
+  def getWriteConf(write: Write): SparkWriteConf = {
+    val field = classOf[SparkWrite].getDeclaredField("writeConf")
+    field.setAccessible(true)
+    field.get(write).asInstanceOf[SparkWriteConf]
+  }
+
   def getTable(write: Write): Table = {
     val field = classOf[SparkWrite].getDeclaredField("table")
     field.setAccessible(true)
     field.get(write).asInstanceOf[Table]
   }
 
-  def getSparkWrite(write: BatchWrite): SparkWrite = {
-    // Access the enclosing SparkWrite instance from BatchAppend
-    val outerInstanceField = write.getClass.getDeclaredField("this$0")
-    outerInstanceField.setAccessible(true)
-    outerInstanceField.get(write).asInstanceOf[SparkWrite]
-  }
-
   def getFileFormat(write: Write): FileFormat = {
     val field = classOf[SparkWrite].getDeclaredField("format")
     field.setAccessible(true)
     field.get(write).asInstanceOf[FileFormat]
   }
 
-  def getFileFormat(write: BatchWrite): FileFormat = {
-    val sparkWrite = getSparkWrite(write)
-    val field = classOf[SparkWrite].getDeclaredField("format")
-    field.setAccessible(true)
-    field.get(sparkWrite).asInstanceOf[FileFormat]
-  }
-
   def getDirectory(write: Write): String = {
     val field = classOf[SparkWrite].getDeclaredField("table")
     field.setAccessible(true)
@@ -100,14 +90,7 @@ object IcebergWriteUtil {
   def getPartitionSpec(write: Write): PartitionSpec = {
     val field = classOf[SparkWrite].getDeclaredField("table")
     field.setAccessible(true)
-    getTable(write).spec()
-  }
-
-  def getDirectory(write: BatchWrite): String = {
-    val sparkWrite = getSparkWrite(write)
-    val field = classOf[SparkWrite].getDeclaredField("table")
-    field.setAccessible(true)
-    getTable(sparkWrite).locationProvider().newDataLocation("")
+    getTable(write).specs().get(getWriteConf(write).outputSpecId())
   }
 
   // Similar to the UnpartitionedDataWriter#commit


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to