vinothchandar commented on code in PR #13414:
URL: https://github.com/apache/hudi/pull/13414#discussion_r2163836677


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -146,7 +146,9 @@ public class HoodieWriteConfig extends HoodieConfig {
       .defaultValue(HoodieTableVersion.current().versionCode())
       .withValidValues(
           String.valueOf(HoodieTableVersion.SIX.versionCode()),
-          String.valueOf(HoodieTableVersion.current().versionCode())
+          String.valueOf(HoodieTableVersion.current().versionCode()),

Review Comment:
   i think its better we explicitly list the versions here.. 6, 8, 9? will make 
this simpler going forward as well



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class EightToNineUpgradeHandler implements UpgradeHandler {
+
+  @Override
+  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, 
HoodieEngineContext context,
+                                             String instantTime, 
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+    HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    SecondaryIndexUpgradeDowngradeHelper.dropSecondaryIndexPartitions(config, 
context, table, "upgrading");

Review Comment:
   change to "upgrading to table version 9"?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1515,7 +1514,7 @@ protected void tryUpgrade(HoodieTableMetaClient 
metaClient, Option<String> insta
       }
 
       new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper)
-          .run(HoodieTableVersion.current(), instantTime.orElse(null));
+          .run(config.getWriteVersion(), instantTime.orElse(null));
 
       metaClient.reloadTableConfig();
       metaClient.reloadActiveTimeline();

Review Comment:
   this makes sense.. wondering why this was hard-coded to `current()` before. 
Does this break anything/change any behavior



##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java:
##########
@@ -263,6 +263,17 @@ public static <K, V> HoodieListPairData<K, V> lazy(Map<K, 
List<V>> data) {
     return new HoodieListPairData<>(explode(data), true);
   }
 
+  public static <K, V> HoodieListPairData<K, V> eagerMapKV(Map<K, V> data) {

Review Comment:
   This is turning the entry to a list.. Can we do this massaging outside.. 
prefer to have one `eager` method, that takes a map inside the xxxxData 
abstraction.. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SecondaryIndexUpgradeDowngradeHelper.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX;
+
+/**
+ * Helper class to handle secondary index operations during upgrade/downgrade.
+ */
+public class SecondaryIndexUpgradeDowngradeHelper {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SecondaryIndexUpgradeDowngradeHelper.class);
+
+  /**
+   * Drops secondary index partitions from metadata table.
+   *
+   * @param config Write config
+   * @param context Engine context
+   * @param table Hoodie table
+   * @param operationType Type of operation (upgrade/downgrade)
+   */
+  public static void dropSecondaryIndexPartitions(HoodieWriteConfig config, 
HoodieEngineContext context,

Review Comment:
   this seems like some generic index management method.. Can we house this 
with other util methods - around sec index/expr index management. 
   
   Else another author may duplicate this.



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java:
##########
@@ -67,15 +70,25 @@ public class HoodieIndexDefinition implements Serializable {
   // Any other configuration or properties specific to the index
   private Map<String, String> indexOptions;
 
+  // Version of the index
+  private HoodieIndexVersion version;
+
   public HoodieIndexDefinition() {

Review Comment:
   what is this used for.. if we have a builder. clean up?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -206,7 +210,8 @@ public Map<Pair<String, String>, BloomFilter> 
getBloomFilters(final List<Pair<St
 
     List<String> partitionIDFileIDStringsList = new 
ArrayList<>(partitionIDFileIDStrings);
     Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords =
-        getRecordsByKeys(partitionIDFileIDStringsList, metadataPartitionName);
+        getRecordsByKeys(HoodieListData.eager(partitionIDFileIDStringsList), 
metadataPartitionName)

Review Comment:
   why does it matter that this is eager



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -162,7 +166,8 @@ private void initIfNeeded() {
 
   @Override
   protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String 
key, String partitionName) {
-    Map<String, HoodieRecord<HoodieMetadataPayload>> recordsByKeys = 
getRecordsByKeys(Collections.singletonList(key), partitionName);
+    Map<String, HoodieRecord<HoodieMetadataPayload>> recordsByKeys = 
getRecordsByKeys(
+        HoodieListData.eager(Collections.singletonList(key)), 
partitionName).collectAsMapWithOverwriteStrategy();

Review Comment:
   same q. why eager 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -287,21 +290,44 @@ public Option<HoodieIndexMetadata> getIndexMetadata() {
     if (indexMetadataOpt.isPresent() && 
!indexMetadataOpt.get().getIndexDefinitions().isEmpty()) {
       return indexMetadataOpt;
     }
+    Option<HoodieIndexMetadata> indexDefOption = Option.empty();
     if (tableConfig.getRelativeIndexDefinitionPath().isPresent() && 
StringUtils.nonEmpty(tableConfig.getRelativeIndexDefinitionPath().get())) {
-      StoragePath indexDefinitionPath =
-          new StoragePath(basePath, 
tableConfig.getRelativeIndexDefinitionPath().get());
-      try {
-        Option<byte[]> bytesOpt = FileIOUtils.readDataFromPath(storage, 
indexDefinitionPath, true);
-        if (bytesOpt.isPresent()) {
-          return Option.of(HoodieIndexMetadata.fromJson(new 
String(bytesOpt.get())));
-        } else {
-          return Option.of(new HoodieIndexMetadata());
-        }
-      } catch (IOException e) {
-        throw new HoodieIOException("Could not load index definition at path: 
" + tableConfig.getRelativeIndexDefinitionPath().get(), e);
+      indexDefOption = loadIndexDefFromDisk(basePath, 
tableConfig.getRelativeIndexDefinitionPath().get(), storage);
+    }
+    populateIndexVersionIfMissing(tableConfig.getTableVersion(), 
indexDefOption);
+    return indexDefOption;
+  }
+
+  public static void populateIndexVersionIfMissing(HoodieTableVersion 
tableVersion, Option<HoodieIndexMetadata> indexDefOption) {

Review Comment:
   instead of populating like this, can we set a default value and let jackson 
json -> pojo mapping fill this out?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -294,18 +303,20 @@ public Map<String, HoodieRecordGlobalLocation> 
readRecordIndex(List<String> reco
    * @param secondaryKeys The list of secondary keys to read
    */
   @Override
-  public Map<String, HoodieRecordGlobalLocation> 
readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
+  public HoodiePairData<String, HoodieRecordGlobalLocation> 
readSecondaryIndex(HoodieData<String> secondaryKeys, String partitionName) {

Review Comment:
   can you point me to changes where we read v1 and v2 here?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class EightToNineUpgradeHandler implements UpgradeHandler {
+
+  @Override
+  public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, 
HoodieEngineContext context,
+                                             String instantTime, 
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+    HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    SecondaryIndexUpgradeDowngradeHelper.dropSecondaryIndexPartitions(config, 
context, table, "upgrading");

Review Comment:
   Do we need to recreate the index as well? or we push that to the first write 
after upgrade



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -294,18 +303,20 @@ public Map<String, HoodieRecordGlobalLocation> 
readRecordIndex(List<String> reco
    * @param secondaryKeys The list of secondary keys to read
    */
   @Override
-  public Map<String, HoodieRecordGlobalLocation> 
readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
+  public HoodiePairData<String, HoodieRecordGlobalLocation> 
readSecondaryIndex(HoodieData<String> secondaryKeys, String partitionName) {

Review Comment:
   or we assume we just read v2.. Reason is - the writer version can be 6,8,9.. 
and if its 6, don't we write v1 of SI? so the code here, shoudl read both?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -543,6 +545,10 @@ static HoodieIndexDefinition 
getSecondaryOrExpressionIndexDefinition(HoodieTable
     String fullIndexName = indexType.equals(PARTITION_NAME_SECONDARY_INDEX)
         ? PARTITION_NAME_SECONDARY_INDEX_PREFIX + userIndexName
         : PARTITION_NAME_EXPRESSION_INDEX_PREFIX + userIndexName;
+    HoodieTableVersion tableVersion = 
metaClient.getTableConfig().getTableVersion();
+    HoodieIndexVersion version = 
indexType.equals(PARTITION_NAME_SECONDARY_INDEX)

Review Comment:
   nit: indexVersion
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -543,6 +545,10 @@ static HoodieIndexDefinition 
getSecondaryOrExpressionIndexDefinition(HoodieTable
     String fullIndexName = indexType.equals(PARTITION_NAME_SECONDARY_INDEX)
         ? PARTITION_NAME_SECONDARY_INDEX_PREFIX + userIndexName
         : PARTITION_NAME_EXPRESSION_INDEX_PREFIX + userIndexName;
+    HoodieTableVersion tableVersion = 
metaClient.getTableConfig().getTableVersion();

Review Comment:
   should we use tableVersion in storage or the writer version.. 
   
   I general, do we need to tie tableVersion and indexVersion. 
   
   - This is helpful for ensuring a certain table version writes a certain 
index version. 
   - When reading, we should deduce this purely from whats in index definition.
   
   Ignore this comment, if this is only in the create index path. (which I 
think it is)
   
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1236,7 +1237,7 @@ protected Pair<List<HoodieFileGroupId>, 
HoodieData<HoodieRecord>> tagRecordsWith
       String mdtPartition = mdtRecord.getPartitionPath();
       List<FileSlice> latestFileSlices = 
partitionToLatestFileSlices.get(mdtPartition);
       FileSlice slice = 
latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(mdtRecord.getRecordKey(),
-          latestFileSlices.size()));
+          latestFileSlices.size(), mdtPartition, 
getExistingHoodieIndexVersionOrDefault(mdtPartition, metadataMetaClient)));

Review Comment:
   rename: existingIndexVersionOrDefault()



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java:
##########
@@ -183,8 +226,25 @@ public Builder withIndexOptions(Map<String, String> 
indexOptions) {
       return this;
     }
 
+    public Builder withVersion(HoodieIndexVersion version) {
+      if (indexName == null) {
+        throw new IllegalStateException("Please set index name first before 
setting version");
+      }
+      // Make sure the version enum matching the metadata partition is used.
+      
version.ensureVersionCanBeAssignedToIndexType(MetadataPartitionType.fromPartitionPath(indexName));

Review Comment:
   hmmm. this predates this PR.. but should n't we be able to directly use 
`indexType` to check version



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.config.ConfigProperty;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class TestEightToNineUpgradeHandler extends BaseUpgradeDowngradeHandlerTest {
+  private EightToNineUpgradeHandler upgradeHandler;
+
+  @BeforeEach
+  void setUp() {
+    upgradeHandler = new EightToNineUpgradeHandler();
+  }
+
+  @Test
+  void testUpgradeWithSecondaryIndexPartitions() {
+    // Setup test data

Review Comment:
   nit: go given-when-then style for comments? 
https://martinfowler.com/bliki/GivenWhenThen.html



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java:
##########
@@ -183,8 +226,25 @@ public Builder withIndexOptions(Map<String, String> 
indexOptions) {
       return this;
     }
 
+    public Builder withVersion(HoodieIndexVersion version) {
+      if (indexName == null) {
+        throw new IllegalStateException("Please set index name first before 
setting version");
+      }
+      // Make sure the version enum matching the metadata partition is used.
+      
version.ensureVersionCanBeAssignedToIndexType(MetadataPartitionType.fromPartitionPath(indexName));
+      this.version = version;
+      return this;
+    }
+
     public HoodieIndexDefinition build() {
-      return new HoodieIndexDefinition(indexName, indexType, indexFunction, 
sourceFields, indexOptions);
+      return new HoodieIndexDefinition(

Review Comment:
   lets validate that `version` is set. 



##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java:
##########
@@ -129,4 +130,6 @@ public abstract <I, K, V> List<V> reduceByKey(
   public abstract <I, O> O aggregate(HoodieData<I> data, O zeroValue, 
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp);
 
   public abstract <T> ReaderContextFactory<T> 
getReaderContextFactory(HoodieTableMetaClient metaClient);
+
+  public abstract void dropIndex(HoodieConfig config, List<String> 
metadataPartitions);

Review Comment:
   the engine context cannot be aware of things like table and index. please 
relocate



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieIndexingConfig.java:
##########
@@ -286,4 +286,5 @@ public boolean isIndexUsingColumnStats() {
   public boolean isIndexUsingRecordIndex() {
     return 
getIndexType().equalsIgnoreCase(MetadataPartitionType.RECORD_INDEX.name());
   }
+

Review Comment:
   revert



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class NineToEightDowngradeHandler implements DowngradeHandler {
+
+  @Override
+  public Pair<Map<ConfigProperty, String>, List<ConfigProperty>> 
downgrade(HoodieWriteConfig config, HoodieEngineContext context, String 
instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+    HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    SecondaryIndexUpgradeDowngradeHelper.dropSecondaryIndexPartitions(config, 
context, table, "downgrading");

Review Comment:
   more verbose comment



##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java:
##########
@@ -153,6 +154,23 @@ <L, W> HoodiePairData<L, W> mapToPair(
    */
   List<Pair<K, V>> collectAsList();
 
+  /**
+   * Collects results of the underlying collection into a {@link Map<Pair<K, 
V>>}
+   * If there are multiple pairs sharing the same key, the resulting map 
randomly picks one among them.

Review Comment:
   is it random or based on map order and what comes last? fix docs to be exact



##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java:
##########
@@ -185,6 +186,12 @@ public <I, O> O aggregate(HoodieData<I> data, O zeroValue, 
Functions.Function2<O
     return data.collectAsList().stream().reduce(zeroValue, seqOp::apply, 
combOp::apply);
   }
 
+  @Override
+  public void dropIndex(HoodieConfig config, List<String> metadataPartitions) {

Review Comment:
   same. engine context should be equivalent of SparkContext for e.g So cannot 
know about index or partitions



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java:
##########
@@ -145,20 +165,43 @@ public static Builder newBuilder() {
     return new Builder();
   }
 
+  /**
+   * Create a new Builder pre-populated with values from this instance.
+   */
+  public Builder toBuilder() {
+    Builder builder = new Builder();
+    builder.withIndexName(this.indexName)
+        .withIndexType(this.indexType)
+        .withIndexFunction(this.indexFunction)
+        .withSourceFields(new ArrayList<>(this.sourceFields))
+        .withIndexOptions(new HashMap<>(this.indexOptions));
+    if (this.version != null) {
+      builder.withVersion(this.version);
+    }
+    return builder;
+  }
+
   public static class Builder {
 
+    // e.g. create index <user index name> on myTable using column_stats(ts) 
options(expr='from_unixtime', format='yyyy-MM-dd')

Review Comment:
   do we need this.. its a good comment. but feels like it should be in the 
actual field in IndexDefintion. not the builder



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -287,21 +290,44 @@ public Option<HoodieIndexMetadata> getIndexMetadata() {
     if (indexMetadataOpt.isPresent() && 
!indexMetadataOpt.get().getIndexDefinitions().isEmpty()) {
       return indexMetadataOpt;
     }
+    Option<HoodieIndexMetadata> indexDefOption = Option.empty();
     if (tableConfig.getRelativeIndexDefinitionPath().isPresent() && 
StringUtils.nonEmpty(tableConfig.getRelativeIndexDefinitionPath().get())) {
-      StoragePath indexDefinitionPath =
-          new StoragePath(basePath, 
tableConfig.getRelativeIndexDefinitionPath().get());
-      try {
-        Option<byte[]> bytesOpt = FileIOUtils.readDataFromPath(storage, 
indexDefinitionPath, true);
-        if (bytesOpt.isPresent()) {
-          return Option.of(HoodieIndexMetadata.fromJson(new 
String(bytesOpt.get())));
-        } else {
-          return Option.of(new HoodieIndexMetadata());
-        }
-      } catch (IOException e) {
-        throw new HoodieIOException("Could not load index definition at path: 
" + tableConfig.getRelativeIndexDefinitionPath().get(), e);
+      indexDefOption = loadIndexDefFromDisk(basePath, 
tableConfig.getRelativeIndexDefinitionPath().get(), storage);
+    }
+    populateIndexVersionIfMissing(tableConfig.getTableVersion(), 
indexDefOption);
+    return indexDefOption;
+  }
+
+  public static void populateIndexVersionIfMissing(HoodieTableVersion 
tableVersion, Option<HoodieIndexMetadata> indexDefOption) {
+    indexDefOption.ifPresent(idxDefs ->
+        idxDefs.getIndexDefinitions().replaceAll((indexName, idxDef) -> {
+          
ValidationUtils.checkArgument(HoodieIndexVersion.isValidIndexDefinition(tableVersion,
 idxDef),
+              String.format("Table version %s, index definition %s", 
tableVersion, idxDef));
+          if (idxDef.getVersion() == null) {
+            // If version field is missing, it implies either of the cases 
(validated by isValidIndexDefinition):
+            // - It is table version 8, because we don't write version 
attributes in some hudi releases
+            // - It is table version 9, and it is not secondary index. Since 
we drop SI on upgrade and we always write version attributes.
+            return 
idxDef.toBuilder().withVersion(getCurrentVersion(tableVersion, 
idxDef.getIndexName())).build();
+          } else {
+            return idxDef;
+          }
+        }));
+  }
+
+  public static Option<HoodieIndexMetadata> loadIndexDefFromDisk(

Review Comment:
   rename: loadIndexDefsFromStorage



##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -262,7 +267,9 @@ public Map<Pair<String, String>, 
List<HoodieMetadataColumnStats>> getColumnStats
    * @param recordKeys The list of record keys to read
    */
   @Override
-  public Map<String, HoodieRecordGlobalLocation> readRecordIndex(List<String> 
recordKeys) {
+  public HoodiePairData<String, HoodieRecordGlobalLocation> 
readRecordIndex(HoodieData<String> recordKeys) {
+    ValidationUtils.checkState(recordKeys instanceof HoodieListData, 
"readRecordIndex only support HoodieListData at the moment");

Review Comment:
   why? can we make this layer generally work with `HoodieData` . Like to avoid 
this special casing. 



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -876,7 +902,7 @@ public static class Builder {
     private ConsistencyGuardConfig consistencyGuardConfig = 
ConsistencyGuardConfig.newBuilder().build();
     private FileSystemRetryConfig fileSystemRetryConfig = 
FileSystemRetryConfig.newBuilder().build();
     private HoodieMetaserverConfig metaserverConfig = 
HoodieMetaserverConfig.newBuilder().build();
-    private Option<TimelineLayoutVersion> layoutVersion = Option.empty();
+    private Option<TimelineLayoutVersion> layoutVersion = 
org.apache.hudi.common.util.Option.empty();

Review Comment:
   ?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -457,9 +457,17 @@ public static List<MetadataPartitionType> 
getEnabledPartitions(HoodieMetadataCon
         .collect(Collectors.toList());
   }
 
+  private static boolean isPartitionType(String partitionPath, String 
typePartition) {

Review Comment:
   typePartition -> partitionType?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -202,11 +207,12 @@ public List<String> 
getPartitionPathWithPathPrefixes(List<String> relativePathPr
   }
 
   @Override
-  public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(List<String> keyPrefixes,
+  public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(HoodieData<String> keyPrefixes,
                                                                                
  String partitionName,
                                                                                
  boolean shouldLoadInMemory) {
+    ValidationUtils.checkState(keyPrefixes instanceof HoodieListData, 
"getRecordsByKeyPrefixes only support HoodieListData at the moment");

Review Comment:
   same. lets remove special casing



##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -457,9 +457,17 @@ public static List<MetadataPartitionType> 
getEnabledPartitions(HoodieMetadataCon
         .collect(Collectors.toList());
   }
 
+  private static boolean isPartitionType(String partitionPath, String 
typePartition) {
+    return partitionPath.equals(typePartition) || 
partitionPath.startsWith(typePartition);
+  }
+
+  public boolean isPartitionType(String partitionPath) {
+    return isPartitionType(partitionPath, getPartitionPath());
+  }
+
   public static MetadataPartitionType fromPartitionPath(String partitionPath) {
     for (MetadataPartitionType partitionType : getValidValues()) {
-      if (partitionPath.equals(partitionType.getPartitionPath()) || 
partitionPath.startsWith(partitionType.getPartitionPath())) {
+      if (partitionType.isPartitionType(partitionPath)) {

Review Comment:
   rename: isPartitionType -> matches
   
   so it ll read .. partitionType.matches(partitionPath)



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieIndexVersion.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.Arrays;
+import java.util.List;
+
+public enum HoodieIndexVersion {
+  ALL_PARTITIONS_ONE(MetadataPartitionType.ALL_PARTITIONS, 1, 
Arrays.asList("0.14.0")),

Review Comment:
   can we break this up to one enum per index type.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java:
##########
@@ -317,7 +319,10 @@ public int getPartition(Object key) {
 
       // NOTE: It's crucial that [[targetPartitions]] be congruent w/ the 
number of
       //       actual file-groups in the Bloom Index in MT
-      return mapRecordKeyToFileGroupIndex(bloomIndexEncodedKey, 
targetPartitions);
+      String bloomPartitionPath = BLOOM_FILTERS.getPartitionPath();
+      return mapRecordKeyToFileGroupIndex(bloomIndexEncodedKey, 
targetPartitions, bloomPartitionPath,
+          // TODO[HUDI-9530] The version should come from the index def json 
file instead of a hard code value.

Review Comment:
   we fixing this in this PR?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -287,21 +290,44 @@ public Option<HoodieIndexMetadata> getIndexMetadata() {
     if (indexMetadataOpt.isPresent() && 
!indexMetadataOpt.get().getIndexDefinitions().isEmpty()) {
       return indexMetadataOpt;
     }
+    Option<HoodieIndexMetadata> indexDefOption = Option.empty();
     if (tableConfig.getRelativeIndexDefinitionPath().isPresent() && 
StringUtils.nonEmpty(tableConfig.getRelativeIndexDefinitionPath().get())) {
-      StoragePath indexDefinitionPath =
-          new StoragePath(basePath, 
tableConfig.getRelativeIndexDefinitionPath().get());
-      try {
-        Option<byte[]> bytesOpt = FileIOUtils.readDataFromPath(storage, 
indexDefinitionPath, true);
-        if (bytesOpt.isPresent()) {
-          return Option.of(HoodieIndexMetadata.fromJson(new 
String(bytesOpt.get())));
-        } else {
-          return Option.of(new HoodieIndexMetadata());
-        }
-      } catch (IOException e) {
-        throw new HoodieIOException("Could not load index definition at path: 
" + tableConfig.getRelativeIndexDefinitionPath().get(), e);
+      indexDefOption = loadIndexDefFromDisk(basePath, 
tableConfig.getRelativeIndexDefinitionPath().get(), storage);
+    }
+    populateIndexVersionIfMissing(tableConfig.getTableVersion(), 
indexDefOption);
+    return indexDefOption;
+  }
+
+  public static void populateIndexVersionIfMissing(HoodieTableVersion 
tableVersion, Option<HoodieIndexMetadata> indexDefOption) {

Review Comment:
   In other words, assume a default of 1. and throw error during writer code if 
version is not when writing index def? i.e version has to be there. 



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java:
##########
@@ -134,6 +140,8 @@ public void 
createOrUpdateColumnStatsIndexDefinition(HoodieTableMetaClient metaC
         .withIndexType(PARTITION_NAME_COLUMN_STATS)
         .withIndexFunction(PARTITION_NAME_COLUMN_STATS)
         .withSourceFields(columnsToIndex)
+        // Use the existing version if exists, otherwise fall back to the 
default version.
+        
.withVersion(getExistingHoodieIndexVersionOrDefault(PARTITION_NAME_COLUMN_STATS,
 metaClient))

Review Comment:
   I dont think we want to hand control of what version is written to user - 
right?  the versions should be picked by index def building for writing 
internally?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -457,9 +457,17 @@ public static List<MetadataPartitionType> 
getEnabledPartitions(HoodieMetadataCon
         .collect(Collectors.toList());
   }
 
+  private static boolean isPartitionType(String partitionPath, String 
typePartition) {
+    return partitionPath.equals(typePartition) || 
partitionPath.startsWith(typePartition);
+  }
+
+  public boolean isPartitionType(String partitionPath) {
+    return isPartitionType(partitionPath, getPartitionPath());

Review Comment:
   this is kind of confusing to read ;/



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -93,9 +95,11 @@ public <R> HoodieData<HoodieRecord<R>> 
tagLocation(HoodieData<HoodieRecord<R>> r
     }
 
     // Partition the record keys to lookup such that each partition looks up 
one record index shard
+    String partitionPath = 
MetadataPartitionType.RECORD_INDEX.getPartitionPath();
     JavaRDD<String> partitionedKeyRDD = HoodieJavaRDD.getJavaRDD(records)
         .map(HoodieRecord::getRecordKey)
-        .keyBy(k -> HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(k, 
numFileGroups))
+        .keyBy(k -> HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(k, 
numFileGroups, partitionPath,
+            getExistingHoodieIndexVersionOrDefault(partitionPath, 
hoodieTable.getMetaClient())))

Review Comment:
   callout: we should ensure none of these calls from executors read the index 
defs json. it should be read once on the driver. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1360,15 +1363,32 @@ private static List<Tuple3<String, String, Boolean>> 
fetchPartitionFileInfoTripl
   }
 
   /**
-   * Map a record key to a file group in partition of interest.
+   * Maps a record key to a file group index in the specified partition.
    * <p>
-   * Note: For hashing, the algorithm is same as String.hashCode() but is 
being defined here as hashCode()
-   * implementation is not guaranteed by the JVM to be consistent across JVM 
versions and implementations.
+   * For secondary index partitions (version >= 2), if the record key contains 
the secondary index separator,
+   * the secondary key portion is used for hashing. Otherwise, the full record 
key is used.
+   * <p>
+   * Note: The hashing algorithm is same as String.hashCode() but is defined 
here explicitly since
+   * hashCode() implementation is not guaranteed by the JVM to be consistent 
across versions.
    *
-   * @param recordKey record key for which the file group index is looked up 
for.
-   * @return An integer hash of the given string
+   * @param recordKey record key for which the file group index is looked up
+   * @param numFileGroups number of file groups to map the key to
+   * @param partitionName name of the partition
+   * @param version index version to determine hashing behavior
+   * @return file group index for the given record key
    */
-  public static int mapRecordKeyToFileGroupIndex(String recordKey, int 
numFileGroups) {
+  public static int mapRecordKeyToFileGroupIndex(

Review Comment:
   lets ensure UTs for thsi



##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java:
##########
@@ -263,6 +263,17 @@ public static <K, V> HoodieListPairData<K, V> lazy(Map<K, 
List<V>> data) {
     return new HoodieListPairData<>(explode(data), true);
   }
 
+  public static <K, V> HoodieListPairData<K, V> eagerMapKV(Map<K, V> data) {

Review Comment:
   its 4 places. and one of them V is a set? 
   
   alternatively, rename 
   
   eagerMapKV -> eager 
   eager -> eagerValueList
   
   
   
   
   
   
   
   



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -848,13 +848,7 @@ class HoodieSparkSqlWriterInternal {
         TableInstantInfo(basePath, instantTime, executor.getCommitActionType, 
executor.getWriteOperationType), Option.empty)
       (writeSuccessful, HOption.ofNullable(instantTime), compactionInstant, 
clusteringInstant, writeClient, tableConfig)
     } finally {
-      // close the write client in all cases
-      val asyncCompactionEnabled = isAsyncCompactionEnabled(writeClient, 
tableConfig, parameters, jsc.hadoopConfiguration())
-      val asyncClusteringEnabled = isAsyncClusteringEnabled(writeClient, 
parameters)
-      if (!asyncCompactionEnabled && !asyncClusteringEnabled) {
-        log.info("Closing write client")
-        writeClient.close()
-      }
+      handleWriteClientClosure(writeClient, tableConfig, parameters, 
jsc.hadoopConfiguration())

Review Comment:
   rename: closeWriteClient



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java:
##########
@@ -145,20 +165,43 @@ public static Builder newBuilder() {
     return new Builder();
   }
 
+  /**
+   * Create a new Builder pre-populated with values from this instance.
+   */
+  public Builder toBuilder() {
+    Builder builder = new Builder();
+    builder.withIndexName(this.indexName)
+        .withIndexType(this.indexType)
+        .withIndexFunction(this.indexFunction)
+        .withSourceFields(new ArrayList<>(this.sourceFields))
+        .withIndexOptions(new HashMap<>(this.indexOptions));
+    if (this.version != null) {

Review Comment:
   why'd this be `null` .. avoid using null as sentinels?



##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java:
##########
@@ -153,6 +154,23 @@ <L, W> HoodiePairData<L, W> mapToPair(
    */
   List<Pair<K, V>> collectAsList();
 
+  /**
+   * Collects results of the underlying collection into a {@link Map<Pair<K, 
V>>}
+   * If there are multiple pairs sharing the same key, the resulting map 
randomly picks one among them.

Review Comment:
   is this sth this PR is introducing? if so, I'd like such specific handling 
at the Data abstraction. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to