vinothchandar commented on code in PR #13489:
URL: https://github.com/apache/hudi/pull/13489#discussion_r2178811227


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.IndexVersionUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class EightToNineUpgradeHandler implements UpgradeHandler {
+
+  @Override
+  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, 
HoodieEngineContext context,
+                                             String instantTime, 
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+    HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+
+    // Populate missing index versions indexes
+    Option<HoodieIndexMetadata> indexMetadataOpt = 
metaClient.getIndexMetadata();
+    if (indexMetadataOpt.isPresent()) {
+      
IndexVersionUtils.populateIndexVersionIfMissing(metaClient.getTableConfig().getTableVersion(),
 indexMetadataOpt);
+

Review Comment:
   can this remain within this class.. the util method.. We don't anticipate 
using this anywhere else. right



##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -226,6 +227,11 @@ public String getPartitionPath(HoodieTableMetaClient 
metaClient, String indexNam
       checkArgument(metaClient.getIndexMetadata().isPresent(), "Index 
definition is not present for index: " + indexName);
       return 
metaClient.getIndexMetadata().get().getIndexDefinitions().get(indexName).getIndexName();
     }
+
+    @Override
+    public SerializableBiFunction<String, Integer, Integer> 
getFileGroupIndexFunction(HoodieIndexVersion indexVersion) {

Review Comment:
   rename: getShardingFunction() or getFileGroupMappingFunction()



##########
hudi-common/src/main/java/org/apache/hudi/common/util/IndexVersionUtils.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.model.HoodieIndexMetadata;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.metadata.HoodieIndexVersion;
+
+/**
+ * Utility class for managing index version operations.
+ */
+public class IndexVersionUtils {
+
+  /**
+   * Populates missing version attributes in index definitions based on table 
version.
+   *
+   * @param tableVersion the table version to determine appropriate index 
versions
+   * @param indexDefOption optional index metadata containing index definitions
+   */
+  public static void populateIndexVersionIfMissing(HoodieTableVersion 
tableVersion, Option<HoodieIndexMetadata> indexDefOption) {
+    indexDefOption.ifPresent(idxDefs ->

Review Comment:
   lets move this closer to upgrade/downgrade unless used elsewhere



##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java:
##########
@@ -129,4 +129,21 @@ public abstract <I, K, V> List<V> reduceByKey(
   public abstract <I, O> O aggregate(HoodieData<I> data, O zeroValue, 
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp);
 
   public abstract <T> ReaderContextFactory<T> 
getReaderContextFactory(HoodieTableMetaClient metaClient);
+
+  /**
+   * Groups values by key and applies a function to each group of values.
+   * [1 iterator maps to 1 key] It only guarantees that items returned by the 
same iterator shares to the same key.
+   * [exact once across iterators] The item returned by the same iterator will 
not be returned by other iterators.
+   * [1 key maps to >= 1 iterators] Items belong to the same shard can be 
load-balanced across multiple iterators. It's up to API implementations to 
decide
+   *                                load balancing pattern and how many 
iterators to split into.
+   *
+   * @param data The input pair<ShardIndex, Item> to process.
+   * @param func Function to apply to each group of items with the same shard
+   * @param maxShardIndex The range of ShardIndex in data parameter. If data 
contain ShardIndex 1,2,6, any maxShardIndex >=6 is valid.
+   * @param preservesPartitioning whether to preserve partitioning in the 
resulting collection.

Review Comment:
   this still does not feel generic enough to shove into the engine context.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class NineToEightDowngradeHandler implements DowngradeHandler {
+
+  @Override
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
downgrade(HoodieWriteConfig config, HoodieEngineContext context, String 
instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+    HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    UpgradeDowngradeUtils.dropNonV1SecondaryIndexPartitions(
+        config, context, table, upgradeDowngradeHelper, "downgrading from 
table version nine to eight");

Review Comment:
   just making sure.. the extra `version` field in the index metadata is ok? it 
wont fail after downgrade?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java:
##########
@@ -214,4 +219,35 @@ static void rollbackFailedWritesAndCompact(HoodieTable 
table, HoodieEngineContex
       throw new HoodieException(e);
     }
   }
+
+  /**
+   * Drops secondary index partitions from metadata table that are V2 or 
higher.
+   *
+   * @param config        Write config
+   * @param context       Engine context
+   * @param table         Hoodie table
+   * @param operationType Type of operation (upgrade/downgrade)
+   */
+  public static void dropNonV1SecondaryIndexPartitions(HoodieWriteConfig 
config, HoodieEngineContext context,
+                                                       HoodieTable table, 
SupportsUpgradeDowngrade upgradeDowngradeHelper, String operationType) {

Review Comment:
   lets ensure this has UT.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -270,13 +263,49 @@ public boolean buildIndexDefinition(HoodieIndexDefinition 
indexDefinition) {
   public void deleteIndexDefinition(String indexName) {
     checkState(indexMetadataOpt.isPresent(), "Index metadata is not present");
     indexMetadataOpt.get().getIndexDefinitions().remove(indexName);
+    writeIndexMetadataToStorage();
+  }
+
+  /**
+   * Writes the current index metadata to storage.
+   */
+  public void writeIndexMetadataToStorage() {
+    if (!indexMetadataOpt.isPresent()) {
+      return;
+    }
+    writeIndexMetadataToStorage(indexMetadataOpt.get());
+  }
+
+  /**
+   * Writes the provided index metadata to storage.
+   *
+   * @param indexMetadata the index metadata to write
+   */
+  public void writeIndexMetadataToStorage(HoodieIndexMetadata indexMetadata) {
     String indexMetaPath = getIndexDefinitionPath();
     try {
       // TODO[HUDI-9094]: should not write byte array directly
       FileIOUtils.createFileInPath(storage, new StoragePath(indexMetaPath),
-          
Option.of(HoodieInstantWriter.convertByteArrayToWriter(getUTF8Bytes(indexMetadataOpt.get().toJson()))));
-    }  catch (IOException e) {
-      throw new HoodieIOException("Could not write expression index metadata 
at path: " + indexMetaPath, e);
+          
Option.of(HoodieInstantWriter.convertByteArrayToWriter(getUTF8Bytes(indexMetadata.toJson()))));
+    } catch (IOException e) {
+      throw new HoodieIOException("Could not write index metadata at path: " + 
indexMetaPath, e);
+    }
+  }
+
+  /**
+   * Static method to write index metadata to storage.
+   *
+   * @param storage the storage to write to
+   * @param indexDefinitionPath the path where the index metadata should be 
written
+   * @param indexMetadata the index metadata to write
+   */
+  public static void writeIndexMetadataToStorage(HoodieStorage storage, String 
indexDefinitionPath, HoodieIndexMetadata indexMetadata) {
+    try {

Review Comment:
   UTs,



##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java:
##########
@@ -129,4 +129,21 @@ public abstract <I, K, V> List<V> reduceByKey(
   public abstract <I, O> O aggregate(HoodieData<I> data, O zeroValue, 
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp);
 
   public abstract <T> ReaderContextFactory<T> 
getReaderContextFactory(HoodieTableMetaClient metaClient);
+
+  /**
+   * Groups values by key and applies a function to each group of values.
+   * [1 iterator maps to 1 key] It only guarantees that items returned by the 
same iterator shares to the same key.
+   * [exact once across iterators] The item returned by the same iterator will 
not be returned by other iterators.
+   * [1 key maps to >= 1 iterators] Items belong to the same shard can be 
load-balanced across multiple iterators. It's up to API implementations to 
decide
+   *                                load balancing pattern and how many 
iterators to split into.
+   *
+   * @param data The input pair<ShardIndex, Item> to process.
+   * @param func Function to apply to each group of items with the same shard
+   * @param maxShardIndex The range of ShardIndex in data parameter. If data 
contain ShardIndex 1,2,6, any maxShardIndex >=6 is valid.
+   * @param preservesPartitioning whether to preserve partitioning in the 
resulting collection.
+   * @param <R> Type of the result
+   * @return Result of applying the function to each group
+   */
+  public abstract <R> HoodieData<R> processValuesOfTheSameShards(
+      HoodiePairData<Integer, String> data, 
SerializableFunction<Iterator<String>, Iterator<R>> func, Integer 
maxShardIndex, boolean preservesPartitioning);

Review Comment:
   here, we need to use partitions and not shards.. since this class has no 
idea about MDT etc.  
   
   >Groups values by key and applies a function to each group of values.
   
   lets name this. mapGroupsByKey() or sth. Please look at spark RDD for 
inspiration. 
   
   



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java:
##########
@@ -183,8 +225,23 @@ public Builder withIndexOptions(Map<String, String> 
indexOptions) {
       return this;
     }
 
+    public Builder withVersion(HoodieIndexVersion version) {
+      // Make sure the version enum matching the metadata partition is used.
+      this.version = version;
+      return this;
+    }
+
     public HoodieIndexDefinition build() {
-      return new HoodieIndexDefinition(indexName, indexType, indexFunction, 
sourceFields, indexOptions);
+      ValidationUtils.checkArgument(indexName != null, "Could not build index 
definition with a null index name");
+      ValidationUtils.checkArgument(indexType != null, "Could not build index 
definition with a null index type");
+      return new HoodieIndexDefinition(
+          indexName,

Review Comment:
   how do we ensure, code does not skip writing versions.. if its covered in 
another place, I am good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to