Davis-Zhang-Onehouse commented on code in PR #13414:
URL: https://github.com/apache/hudi/pull/13414#discussion_r2167686364
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -146,7 +146,9 @@ public class HoodieWriteConfig extends HoodieConfig {
.defaultValue(HoodieTableVersion.current().versionCode())
.withValidValues(
String.valueOf(HoodieTableVersion.SIX.versionCode()),
- String.valueOf(HoodieTableVersion.current().versionCode())
+ String.valueOf(HoodieTableVersion.current().versionCode()),
Review Comment:
done
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1236,7 +1237,7 @@ protected Pair<List<HoodieFileGroupId>,
HoodieData<HoodieRecord>> tagRecordsWith
String mdtPartition = mdtRecord.getPartitionPath();
List<FileSlice> latestFileSlices =
partitionToLatestFileSlices.get(mdtPartition);
FileSlice slice =
latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(mdtRecord.getRecordKey(),
- latestFileSlices.size()));
+ latestFileSlices.size(), mdtPartition,
getExistingHoodieIndexVersionOrDefault(mdtPartition, metadataMetaClient)));
Review Comment:
done
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToNineUpgradeHandler.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class EightToNineUpgradeHandler implements UpgradeHandler {
+
+ @Override
+ public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config,
HoodieEngineContext context,
+ String instantTime,
SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+ HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+ SecondaryIndexUpgradeDowngradeHelper.dropSecondaryIndexPartitions(config,
context, table, "upgrading");
Review Comment:
user needs to explicitly recreate the index. we will make sure it is
captured in the doc update.
If we want first write after upgrade to auto recreate, let's plan for the
priority and delivery time. As MVP I didn't include this.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1515,7 +1514,7 @@ protected void tryUpgrade(HoodieTableMetaClient
metaClient, Option<String> insta
}
new UpgradeDowngrade(metaClient, config, context, upgradeDowngradeHelper)
- .run(HoodieTableVersion.current(), instantTime.orElse(null));
+ .run(config.getWriteVersion(), instantTime.orElse(null));
metaClient.reloadTableConfig();
metaClient.reloadActiveTimeline();
Review Comment:
no test failure. so it should be good
##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java:
##########
@@ -153,6 +154,23 @@ <L, W> HoodiePairData<L, W> mapToPair(
*/
List<Pair<K, V>> collectAsList();
+ /**
+ * Collects results of the underlying collection into a {@link Map<Pair<K,
V>>}
+ * If there are multiple pairs sharing the same key, the resulting map
randomly picks one among them.
Review Comment:
> what comes last? fix docs to be exact
it is what comes last, but there is no control on what the order is so user
don't know what the "last" could be. I don't want caller to make false
assumption that the code does not guarantee.
It requires the underlying data structure is ordered, and caller knows what
the order is, which reveals implementation details and fail the purpose of
abstraction.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/NineToEightDowngradeHandler.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class NineToEightDowngradeHandler implements DowngradeHandler {
+
+ @Override
+ public Pair<Map<ConfigProperty, String>, List<ConfigProperty>>
downgrade(HoodieWriteConfig config, HoodieEngineContext context, String
instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+ HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+ SecondaryIndexUpgradeDowngradeHelper.dropSecondaryIndexPartitions(config,
context, table, "downgrading");
Review Comment:
done
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -543,6 +545,10 @@ static HoodieIndexDefinition
getSecondaryOrExpressionIndexDefinition(HoodieTable
String fullIndexName = indexType.equals(PARTITION_NAME_SECONDARY_INDEX)
? PARTITION_NAME_SECONDARY_INDEX_PREFIX + userIndexName
: PARTITION_NAME_EXPRESSION_INDEX_PREFIX + userIndexName;
+ HoodieTableVersion tableVersion =
metaClient.getTableConfig().getTableVersion();
+ HoodieIndexVersion version =
indexType.equals(PARTITION_NAME_SECONDARY_INDEX)
Review Comment:
done
##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/upgrade/TestEightToNineUpgradeHandler.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.config.ConfigProperty;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class TestEightToNineUpgradeHandler extends BaseUpgradeDowngradeHandlerTest {
+ private EightToNineUpgradeHandler upgradeHandler;
+
+ @BeforeEach
+ void setUp() {
+ upgradeHandler = new EightToNineUpgradeHandler();
+ }
+
+ @Test
+ void testUpgradeWithSecondaryIndexPartitions() {
+ // Setup test data
Review Comment:
done
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java:
##########
@@ -543,6 +545,10 @@ static HoodieIndexDefinition
getSecondaryOrExpressionIndexDefinition(HoodieTable
String fullIndexName = indexType.equals(PARTITION_NAME_SECONDARY_INDEX)
? PARTITION_NAME_SECONDARY_INDEX_PREFIX + userIndexName
: PARTITION_NAME_EXPRESSION_INDEX_PREFIX + userIndexName;
+ HoodieTableVersion tableVersion =
metaClient.getTableConfig().getTableVersion();
Review Comment:
> should we use tableVersion in storage
metaClient.getTableConfig().getTableVersion() read the hoodie.properties on
storage. Anything I missed here?
> do we need to tie tableVersion and indexVersion.
for table version < 9, table version matters as there is no indexVersion.
for table version >=9, the default version is controlled by
```
public static HoodieIndexVersion getCurrentVersion(HoodieTableVersion
tableVersion, MetadataPartitionType partitionType) {
```
and this function implementation can be altered as needed. For example, for
table version >=9, in the future we have a better index layout and we want
everyone to use that, change the hard code value v2 to some v3/v4 as we like.
We don't want to make it a writer config anyway so we only expect this value
change as we do releases.
##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieIndexingConfig.java:
##########
@@ -286,4 +286,5 @@ public boolean isIndexUsingColumnStats() {
public boolean isIndexUsingRecordIndex() {
return
getIndexType().equalsIgnoreCase(MetadataPartitionType.RECORD_INDEX.name());
}
+
Review Comment:
done
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SecondaryIndexUpgradeDowngradeHelper.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX_PREFIX;
+
+/**
+ * Helper class to handle secondary index operations during upgrade/downgrade.
+ */
+public class SecondaryIndexUpgradeDowngradeHelper {
+ private static final Logger LOG =
LoggerFactory.getLogger(SecondaryIndexUpgradeDowngradeHelper.class);
+
+ /**
+ * Drops secondary index partitions from metadata table.
+ *
+ * @param config Write config
+ * @param context Engine context
+ * @param table Hoodie table
+ * @param operationType Type of operation (upgrade/downgrade)
+ */
+ public static void dropSecondaryIndexPartitions(HoodieWriteConfig config,
HoodieEngineContext context,
Review Comment:
done. extract util func dropMDTPartitions in HoodieIndexUtils
##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java:
##########
@@ -263,6 +263,17 @@ public static <K, V> HoodieListPairData<K, V> lazy(Map<K,
List<V>> data) {
return new HoodieListPairData<>(explode(data), true);
}
+ public static <K, V> HoodieListPairData<K, V> eagerMapKV(Map<K, V> data) {
Review Comment:
will keep the rename the last thing to address before everything else is
good. As we might need to rebase and there will be a lot of conflicts. Let's
keep it open and resolve in the very end iteration.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java:
##########
@@ -145,20 +165,43 @@ public static Builder newBuilder() {
return new Builder();
}
+ /**
+ * Create a new Builder pre-populated with values from this instance.
+ */
+ public Builder toBuilder() {
+ Builder builder = new Builder();
+ builder.withIndexName(this.indexName)
+ .withIndexType(this.indexType)
+ .withIndexFunction(this.indexFunction)
+ .withSourceFields(new ArrayList<>(this.sourceFields))
+ .withIndexOptions(new HashMap<>(this.indexOptions));
+ if (this.version != null) {
+ builder.withVersion(this.version);
+ }
+ return builder;
+ }
+
public static class Builder {
+ // e.g. create index <user index name> on myTable using column_stats(ts)
options(expr='from_unixtime', format='yyyy-MM-dd')
Review Comment:
makes sense
##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java:
##########
@@ -153,6 +154,23 @@ <L, W> HoodiePairData<L, W> mapToPair(
*/
List<Pair<K, V>> collectAsList();
+ /**
+ * Collects results of the underlying collection into a {@link Map<Pair<K,
V>>}
+ * If there are multiple pairs sharing the same key, the resulting map
randomly picks one among them.
Review Comment:
> is this sth this PR is introducing?
As you can see, the PR introduced this API
> if so, I'd like such specific handling at the Data abstraction.
Can you elaborate - this is already the top class in the inheritance
hierarchy
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java:
##########
@@ -145,20 +165,43 @@ public static Builder newBuilder() {
return new Builder();
}
+ /**
+ * Create a new Builder pre-populated with values from this instance.
+ */
+ public Builder toBuilder() {
+ Builder builder = new Builder();
+ builder.withIndexName(this.indexName)
+ .withIndexType(this.indexType)
+ .withIndexFunction(this.indexFunction)
+ .withSourceFields(new ArrayList<>(this.sourceFields))
+ .withIndexOptions(new HashMap<>(this.indexOptions));
+ if (this.version != null) {
Review Comment:
done
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -287,21 +290,44 @@ public Option<HoodieIndexMetadata> getIndexMetadata() {
if (indexMetadataOpt.isPresent() &&
!indexMetadataOpt.get().getIndexDefinitions().isEmpty()) {
return indexMetadataOpt;
}
+ Option<HoodieIndexMetadata> indexDefOption = Option.empty();
if (tableConfig.getRelativeIndexDefinitionPath().isPresent() &&
StringUtils.nonEmpty(tableConfig.getRelativeIndexDefinitionPath().get())) {
- StoragePath indexDefinitionPath =
- new StoragePath(basePath,
tableConfig.getRelativeIndexDefinitionPath().get());
- try {
- Option<byte[]> bytesOpt = FileIOUtils.readDataFromPath(storage,
indexDefinitionPath, true);
- if (bytesOpt.isPresent()) {
- return Option.of(HoodieIndexMetadata.fromJson(new
String(bytesOpt.get())));
- } else {
- return Option.of(new HoodieIndexMetadata());
- }
- } catch (IOException e) {
- throw new HoodieIOException("Could not load index definition at path:
" + tableConfig.getRelativeIndexDefinitionPath().get(), e);
+ indexDefOption = loadIndexDefFromDisk(basePath,
tableConfig.getRelativeIndexDefinitionPath().get(), storage);
+ }
+ populateIndexVersionIfMissing(tableConfig.getTableVersion(),
indexDefOption);
+ return indexDefOption;
+ }
+
+ public static void populateIndexVersionIfMissing(HoodieTableVersion
tableVersion, Option<HoodieIndexMetadata> indexDefOption) {
+ indexDefOption.ifPresent(idxDefs ->
+ idxDefs.getIndexDefinitions().replaceAll((indexName, idxDef) -> {
+
ValidationUtils.checkArgument(HoodieIndexVersion.isValidIndexDefinition(tableVersion,
idxDef),
+ String.format("Table version %s, index definition %s",
tableVersion, idxDef));
+ if (idxDef.getVersion() == null) {
+ // If version field is missing, it implies either of the cases
(validated by isValidIndexDefinition):
+ // - It is table version 8, because we don't write version
attributes in some hudi releases
+ // - It is table version 9, and it is not secondary index. Since
we drop SI on upgrade and we always write version attributes.
+ return
idxDef.toBuilder().withVersion(getCurrentVersion(tableVersion,
idxDef.getIndexName())).build();
+ } else {
+ return idxDef;
+ }
+ }));
+ }
+
+ public static Option<HoodieIndexMetadata> loadIndexDefFromDisk(
Review Comment:
done
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieEngineContext.java:
##########
@@ -129,4 +130,6 @@ public abstract <I, K, V> List<V> reduceByKey(
public abstract <I, O> O aggregate(HoodieData<I> data, O zeroValue,
Functions.Function2<O, I, O> seqOp, Functions.Function2<O, O, O> combOp);
public abstract <T> ReaderContextFactory<T>
getReaderContextFactory(HoodieTableMetaClient metaClient);
+
+ public abstract void dropIndex(HoodieConfig config, List<String>
metadataPartitions);
Review Comment:
need some guidance.
## Why it is in this way
upgrade downgrade interface only gives access to the engine context. Also it
needs to be engine neutral.
we need to run something like a transaction to drop index, which means we
need to not just delete things but keep metadata like timeline and index def
and table config consistent.
The only change that does not incur massive interface level change is do
things like what the PR does. If we need to move to other place, I prefer tmr
we chat f2f to agree on an implementation plan so I don't make the wrong guess.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java:
##########
@@ -67,15 +70,25 @@ public class HoodieIndexDefinition implements Serializable {
// Any other configuration or properties specific to the index
private Map<String, String> indexOptions;
+ // Version of the index
+ private HoodieIndexVersion version;
+
public HoodieIndexDefinition() {
Review Comment:
done
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java:
##########
@@ -183,8 +226,25 @@ public Builder withIndexOptions(Map<String, String>
indexOptions) {
return this;
}
+ public Builder withVersion(HoodieIndexVersion version) {
+ if (indexName == null) {
+ throw new IllegalStateException("Please set index name first before
setting version");
+ }
+ // Make sure the version enum matching the metadata partition is used.
+
version.ensureVersionCanBeAssignedToIndexType(MetadataPartitionType.fromPartitionPath(indexName));
Review Comment:
no, index type is very confusing and misleading, I revised the PR to fully
base on index name instead of index type before.
index type for expression index can be column stats. The actual "index type"
can only be inferred from MDT partiton name by checking the prefix.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -262,7 +267,9 @@ public Map<Pair<String, String>,
List<HoodieMetadataColumnStats>> getColumnStats
* @param recordKeys The list of record keys to read
*/
@Override
- public Map<String, HoodieRecordGlobalLocation> readRecordIndex(List<String>
recordKeys) {
+ public HoodiePairData<String, HoodieRecordGlobalLocation>
readRecordIndex(HoodieData<String> recordKeys) {
+ ValidationUtils.checkState(recordKeys instanceof HoodieListData,
"readRecordIndex only support HoodieListData at the moment");
Review Comment:
yes, that make total sense and it comes as part of index join
productionization PR. I just want to split things into small PRs.
Here it is safe to add this validation as:
- before the change, all the code is using java collection
- after the change, we just wrap them with HoodieData.eager
The goal of this pr is nothing but "interface level" change.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -202,11 +207,12 @@ public List<String>
getPartitionPathWithPathPrefixes(List<String> relativePathPr
}
@Override
- public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(List<String> keyPrefixes,
+ public HoodieData<HoodieRecord<HoodieMetadataPayload>>
getRecordsByKeyPrefixes(HoodieData<String> keyPrefixes,
String partitionName,
boolean shouldLoadInMemory) {
+ ValidationUtils.checkState(keyPrefixes instanceof HoodieListData,
"getRecordsByKeyPrefixes only support HoodieListData at the moment");
Review Comment:
explained, let's chat more if your concern is not addressed
##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieIndexDefinition.java:
##########
@@ -183,8 +226,25 @@ public Builder withIndexOptions(Map<String, String>
indexOptions) {
return this;
}
+ public Builder withVersion(HoodieIndexVersion version) {
+ if (indexName == null) {
+ throw new IllegalStateException("Please set index name first before
setting version");
+ }
+ // Make sure the version enum matching the metadata partition is used.
+
version.ensureVersionCanBeAssignedToIndexType(MetadataPartitionType.fromPartitionPath(indexName));
+ this.version = version;
+ return this;
+ }
+
public HoodieIndexDefinition build() {
- return new HoodieIndexDefinition(indexName, indexType, indexFunction,
sourceFields, indexOptions);
+ return new HoodieIndexDefinition(
Review Comment:
added null check in the constructor
##########
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java:
##########
@@ -876,7 +902,7 @@ public static class Builder {
private ConsistencyGuardConfig consistencyGuardConfig =
ConsistencyGuardConfig.newBuilder().build();
private FileSystemRetryConfig fileSystemRetryConfig =
FileSystemRetryConfig.newBuilder().build();
private HoodieMetaserverConfig metaserverConfig =
HoodieMetaserverConfig.newBuilder().build();
- private Option<TimelineLayoutVersion> layoutVersion = Option.empty();
+ private Option<TimelineLayoutVersion> layoutVersion =
org.apache.hudi.common.util.Option.empty();
Review Comment:
removed
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -162,7 +166,8 @@ private void initIfNeeded() {
@Override
protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String
key, String partitionName) {
- Map<String, HoodieRecord<HoodieMetadataPayload>> recordsByKeys =
getRecordsByKeys(Collections.singletonList(key), partitionName);
+ Map<String, HoodieRecord<HoodieMetadataPayload>> recordsByKeys =
getRecordsByKeys(
+ HoodieListData.eager(Collections.singletonList(key)),
partitionName).collectAsMapWithOverwriteStrategy();
Review Comment:
same a as before
##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -206,7 +210,8 @@ public Map<Pair<String, String>, BloomFilter>
getBloomFilters(final List<Pair<St
List<String> partitionIDFileIDStringsList = new
ArrayList<>(partitionIDFileIDStrings);
Map<String, HoodieRecord<HoodieMetadataPayload>> hoodieRecords =
- getRecordsByKeys(partitionIDFileIDStringsList, metadataPartitionName);
+ getRecordsByKeys(HoodieListData.eager(partitionIDFileIDStringsList),
metadataPartitionName)
Review Comment:
all code refactor like this is to make sure
- we shift away from plain java collections
- no behavioral change (java list <=> HoodieData.eager)
I don't have strong opinion on if we should use lazy / eager
##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -457,9 +457,17 @@ public static List<MetadataPartitionType>
getEnabledPartitions(HoodieMetadataCon
.collect(Collectors.toList());
}
+ private static boolean isPartitionType(String partitionPath, String
typePartition) {
Review Comment:
done
##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -457,9 +457,17 @@ public static List<MetadataPartitionType>
getEnabledPartitions(HoodieMetadataCon
.collect(Collectors.toList());
}
+ private static boolean isPartitionType(String partitionPath, String
typePartition) {
+ return partitionPath.equals(typePartition) ||
partitionPath.startsWith(typePartition);
+ }
+
+ public boolean isPartitionType(String partitionPath) {
+ return isPartitionType(partitionPath, getPartitionPath());
Review Comment:
renamed
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieIndexVersion.java:
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metadata;
+
+import org.apache.hudi.common.model.HoodieIndexDefinition;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import java.util.Arrays;
+import java.util.List;
+
+public enum HoodieIndexVersion {
+ ALL_PARTITIONS_ONE(MetadataPartitionType.ALL_PARTITIONS, 1,
Arrays.asList("0.14.0")),
Review Comment:
done in a separate commit
##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -457,9 +457,17 @@ public static List<MetadataPartitionType>
getEnabledPartitions(HoodieMetadataCon
.collect(Collectors.toList());
}
+ private static boolean isPartitionType(String partitionPath, String
typePartition) {
+ return partitionPath.equals(typePartition) ||
partitionPath.startsWith(typePartition);
+ }
+
+ public boolean isPartitionType(String partitionPath) {
+ return isPartitionType(partitionPath, getPartitionPath());
+ }
+
public static MetadataPartitionType fromPartitionPath(String partitionPath) {
for (MetadataPartitionType partitionType : getValidValues()) {
- if (partitionPath.equals(partitionType.getPartitionPath()) ||
partitionPath.startsWith(partitionType.getPartitionPath())) {
+ if (partitionType.isPartitionType(partitionPath)) {
Review Comment:
rename to matchesPartitionPath
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkMetadataTableRecordIndex.java:
##########
@@ -93,9 +95,11 @@ public <R> HoodieData<HoodieRecord<R>>
tagLocation(HoodieData<HoodieRecord<R>> r
}
// Partition the record keys to lookup such that each partition looks up
one record index shard
+ String partitionPath =
MetadataPartitionType.RECORD_INDEX.getPartitionPath();
JavaRDD<String> partitionedKeyRDD = HoodieJavaRDD.getJavaRDD(records)
.map(HoodieRecord::getRecordKey)
- .keyBy(k -> HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(k,
numFileGroups))
+ .keyBy(k -> HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(k,
numFileGroups, partitionPath,
+ getExistingHoodieIndexVersionOrDefault(partitionPath,
hoodieTable.getMetaClient())))
Review Comment:
that's an excellent callout. I reviewed all call site and revised as
appropriate.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java:
##########
@@ -317,7 +319,10 @@ public int getPartition(Object key) {
// NOTE: It's crucial that [[targetPartitions]] be congruent w/ the
number of
// actual file-groups in the Bloom Index in MT
- return mapRecordKeyToFileGroupIndex(bloomIndexEncodedKey,
targetPartitions);
+ String bloomPartitionPath = BLOOM_FILTERS.getPartitionPath();
+ return mapRecordKeyToFileGroupIndex(bloomIndexEncodedKey,
targetPartitions, bloomPartitionPath,
+ // TODO[HUDI-9530] The version should come from the index def json
file instead of a hard code value.
Review Comment:
nope. It is not qualified for MVP because:
- Bloom fitler we anyway only have 1 index version as of today
- To get the index version it requires non trivial code change - need to
drill some deep holes to pass the config.
I agree it is something we need to fix, priority compare to other items
matters given we don't have abundant time
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/HoodieSparkIndexClient.java:
##########
@@ -134,6 +140,8 @@ public void
createOrUpdateColumnStatsIndexDefinition(HoodieTableMetaClient metaC
.withIndexType(PARTITION_NAME_COLUMN_STATS)
.withIndexFunction(PARTITION_NAME_COLUMN_STATS)
.withSourceFields(columnsToIndex)
+ // Use the existing version if exists, otherwise fall back to the
default version.
+
.withVersion(getExistingHoodieIndexVersionOrDefault(PARTITION_NAME_COLUMN_STATS,
metaClient))
Review Comment:
no, user cannot control.
then why we have "existing" or "default"?
- Existing: if index is already created, write using the existing version.
- default: if index is not created (first time creating index), use index
version based on the table version
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1360,15 +1363,32 @@ private static List<Tuple3<String, String, Boolean>>
fetchPartitionFileInfoTripl
}
/**
- * Map a record key to a file group in partition of interest.
+ * Maps a record key to a file group index in the specified partition.
* <p>
- * Note: For hashing, the algorithm is same as String.hashCode() but is
being defined here as hashCode()
- * implementation is not guaranteed by the JVM to be consistent across JVM
versions and implementations.
+ * For secondary index partitions (version >= 2), if the record key contains
the secondary index separator,
+ * the secondary key portion is used for hashing. Otherwise, the full record
key is used.
+ * <p>
+ * Note: The hashing algorithm is same as String.hashCode() but is defined
here explicitly since
+ * hashCode() implementation is not guaranteed by the JVM to be consistent
across versions.
*
- * @param recordKey record key for which the file group index is looked up
for.
- * @return An integer hash of the given string
+ * @param recordKey record key for which the file group index is looked up
+ * @param numFileGroups number of file groups to map the key to
+ * @param partitionName name of the partition
+ * @param version index version to determine hashing behavior
+ * @return file group index for the given record key
*/
- public static int mapRecordKeyToFileGroupIndex(String recordKey, int
numFileGroups) {
+ public static int mapRecordKeyToFileGroupIndex(
Review Comment:
we have it in this PR.
##########
hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieLocalEngineContext.java:
##########
@@ -185,6 +186,12 @@ public <I, O> O aggregate(HoodieData<I> data, O zeroValue,
Functions.Function2<O
return data.collectAsList().stream().reduce(zeroValue, seqOp::apply,
combOp::apply);
}
+ @Override
+ public void dropIndex(HoodieConfig config, List<String> metadataPartitions) {
Review Comment:
will revise once we align on the implementation plan tmr.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala:
##########
@@ -848,13 +848,7 @@ class HoodieSparkSqlWriterInternal {
TableInstantInfo(basePath, instantTime, executor.getCommitActionType,
executor.getWriteOperationType), Option.empty)
(writeSuccessful, HOption.ofNullable(instantTime), compactionInstant,
clusteringInstant, writeClient, tableConfig)
} finally {
- // close the write client in all cases
- val asyncCompactionEnabled = isAsyncCompactionEnabled(writeClient,
tableConfig, parameters, jsc.hadoopConfiguration())
- val asyncClusteringEnabled = isAsyncClusteringEnabled(writeClient,
parameters)
- if (!asyncCompactionEnabled && !asyncClusteringEnabled) {
- log.info("Closing write client")
- writeClient.close()
- }
+ handleWriteClientClosure(writeClient, tableConfig, parameters,
jsc.hadoopConfiguration())
Review Comment:
done
##########
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java:
##########
@@ -294,18 +303,20 @@ public Map<String, HoodieRecordGlobalLocation>
readRecordIndex(List<String> reco
* @param secondaryKeys The list of secondary keys to read
*/
@Override
- public Map<String, HoodieRecordGlobalLocation>
readSecondaryIndex(List<String> secondaryKeys, String partitionName) {
+ public HoodiePairData<String, HoodieRecordGlobalLocation>
readSecondaryIndex(HoodieData<String> secondaryKeys, String partitionName) {
Review Comment:
this PR I didn't add the change and it does not break anything.
write path: it does the proper hashing based on index, so it is good
read path: it read all file index and prefix matching which is compatible to
handle both v1 and v2.
we have the code change of what you want in the productionization PR, which
is separate from what this PR is - minimum code change to unblock hudi 1.x.
The read path has larger code change and we should not do everything in 1
single giant PR
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]