zhangyue19921010 commented on code in PR #13017: URL: https://github.com/apache/hudi/pull/13017#discussion_r2015965356
########## hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/bucket/TestPartitionBucketIndexCalculator.java: ########## @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.index.bucket.partition.PartitionBucketIndexCalculator; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.Arguments; + +import java.io.IOException; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class TestPartitionBucketIndexCalculator extends HoodieCommonTestHarness { + + private static final String DEFAULT_RULE = "regex"; + private static final String DEFAULT_EXPRESSIONS = "\\d{4}-(06-(01|17|18)|11-(01|10|11)),256"; + private static final int DEFAULT_BUCKET_NUMBER = 10; + private PartitionBucketIndexCalculator calc; + + void setUp(String expression, String rule, int defaultBucketNumber) throws IOException { + initMetaClient(); + PartitionBucketIndexHashingConfig.saveHashingConfig(metaClient, expression, rule, defaultBucketNumber, null); + this.calc = PartitionBucketIndexCalculator.getInstance(expression, rule, defaultBucketNumber); + } + + /** + * Test basic regex rule matching. + */ + @ParameterizedTest(name = "Partition {0} should map to bucket {1}") + @MethodSource("providePartitionsAndExpectedBuckets") + void testBasicRegexRuleMatching(String partition, int expectedBucket) throws IOException { + setUp(DEFAULT_EXPRESSIONS, DEFAULT_RULE, DEFAULT_BUCKET_NUMBER); + assertEquals(expectedBucket, calc.computeNumBuckets(partition)); + } + + /** + * Provide test data for parameterized tests. + */ + private static Stream<Arguments> providePartitionsAndExpectedBuckets() { + return Stream.of( + // Matching the regex pattern - should get bucket 256 + Arguments.of("2023-06-01", 256), + Arguments.of("2023-06-17", 256), + Arguments.of("2023-06-18", 256), + Arguments.of("2023-11-01", 256), + Arguments.of("2023-11-10", 256), + Arguments.of("2023-11-11", 256), + Arguments.of("2022-06-01", 256), + Arguments.of("2021-11-10", 256), + + // Not matching the regex pattern - should get default bucket 10 + Arguments.of("2023-06-02", 10), + Arguments.of("2023-06-19", 10), + Arguments.of("2023-07-01", 10), + Arguments.of("2023-11-12", 10), + Arguments.of("2023-11-02", 10), + Arguments.of("2023-05-01", 10), + Arguments.of("not-a-date", 10), + Arguments.of("", 10) + ); + } + + @Test + void testCaching() throws IOException { + setUp(DEFAULT_EXPRESSIONS, DEFAULT_RULE, DEFAULT_BUCKET_NUMBER); + // Calculate bucket number for the same partition multiple times + String partition = "2023-06-01"; + + // First call should calculate and cache + int bucket1 = calc.computeNumBuckets(partition); + assertEquals(256, bucket1); + assertEquals(1, calc.getCacheSize()); + + // Second call should use the cache + int bucket2 = calc.computeNumBuckets(partition); + assertEquals(256, bucket2); + assertEquals(1, calc.getCacheSize()); + + // Different partition should increase cache size + calc.computeNumBuckets("2023-11-10"); + assertEquals(2, calc.getCacheSize()); + } + + /** + * Test multiple regex rules with priority. + */ + @Test + void testMultipleRegexRulesWithPriority() throws IOException { + String expressions = "\\d{4}-06-01,256;\\d{4}-06-\\d{2},128;\\d{4}-11-\\d{2},64"; + setUp(expressions, DEFAULT_RULE, DEFAULT_BUCKET_NUMBER); + PartitionBucketIndexCalculator calc = PartitionBucketIndexCalculator.getInstance(expressions, DEFAULT_RULE, DEFAULT_BUCKET_NUMBER); + // Setup configuration with multiple rules + + // First rule should take priority + assertEquals(256, calc.computeNumBuckets("2023-06-01")); + + // Second rule should match when first doesn't + assertEquals(128, calc.computeNumBuckets("2023-06-02")); + assertEquals(128, calc.computeNumBuckets("2023-06-30")); + + // Third rule should match for November dates + assertEquals(64, calc.computeNumBuckets("2023-11-01")); Review Comment: all added. ########## hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java: ########## @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.JsonUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.storage.HoodieInstantWriter; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class PartitionBucketIndexHashingConfig implements Serializable { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(PartitionBucketIndexHashingConfig.class); + public static final String INITIAL_HASHING_CONFIG_INSTANT = HoodieTimeline.INIT_INSTANT_TS; + public static final String HASHING_CONFIG_FILE_SUFFIX = ".hashing_config"; + public static final Integer CURRENT_VERSION = 1; + private final String expressions; + private final int defaultBucketNumber; + private final String rule; + private final int version; + private final String instant; + + @JsonCreator + public PartitionBucketIndexHashingConfig(@JsonProperty("expressions") String expressions, + @JsonProperty("defaultBucketNumber") int defaultBucketNumber, + @JsonProperty("rule") String rule, + @JsonProperty("version") int version, + @JsonProperty("instant") String instant) { + this.expressions = expressions; + this.defaultBucketNumber = defaultBucketNumber; + this.rule = rule; + this.version = version; + this.instant = instant; + } + + public String getFilename() { + return instant + HASHING_CONFIG_FILE_SUFFIX; + } + + public String toJsonString() throws IOException { + return JsonUtils.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this); + } + + public String getInstant() { + return this.instant; + } + + public static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws Exception { + if (jsonStr == null || jsonStr.isEmpty()) { + // For empty commit file (no data or somethings bad happen). + return clazz.newInstance(); + } + return JsonUtils.getObjectMapper().readValue(jsonStr, clazz); + } + + public static PartitionBucketIndexHashingConfig fromBytes(byte[] bytes) throws IOException { + try { + return fromJsonString(new String(bytes, StandardCharsets.UTF_8), PartitionBucketIndexHashingConfig.class); + } catch (Exception e) { + throw new IOException("unable to load hashing config", e); + } + } + + public int getVersion() { + return version; + } + + public String getRule() { + return rule; + } + + public int getDefaultBucketNumber() { + return defaultBucketNumber; + } + + public String getExpressions() { + return expressions; + } + + /** + * Get the absolute path of hashing config meta folder. + */ + public static StoragePath getHashingConfigStorageFolder(String basePath) { + StoragePath metaPath = new StoragePath(basePath, HoodieTableMetaClient.METAFOLDER_NAME); + return new StoragePath(metaPath, HoodieTableMetaClient.BUCKET_INDEX_METAFOLDER_CONFIG_FOLDER); + } + + public static StoragePath getArchiveHashingConfigStorageFolder(String basePath) { + StoragePath metaPath = new StoragePath(basePath, HoodieTableMetaClient.METAFOLDER_NAME); + return new StoragePath(metaPath, HoodieTableMetaClient.BUCKET_INDEX_METAFOLDER_CONFIG_ARCHIVE_FOLDER); + } + + /** + * Get the absolute path of the specific <instant>.hashing_config path. + */ + public static StoragePath getHashingConfigPath(String basePath, String instant) { + StoragePath hashingBase = getHashingConfigStorageFolder(basePath); + return new StoragePath(hashingBase, instant + PartitionBucketIndexHashingConfig.HASHING_CONFIG_FILE_SUFFIX); + } + + public static StoragePath getArchiveHashingConfigPath(String basePath, String instant) { + StoragePath hashingBase = getArchiveHashingConfigStorageFolder(basePath); + return new StoragePath(hashingBase, instant + PartitionBucketIndexHashingConfig.HASHING_CONFIG_FILE_SUFFIX); + } + + /** + * Create and save <instant>.hashing_config base on given expressions, rule, defaultBucketNumber and instant. + * If given instant is null or empty, use INITIAL_HASHING_CONFIG_INSTANT. + */ + public static boolean saveHashingConfig(HoodieTableMetaClient metaClient, + String expressions, + String rule, + int defaultBucketNumber, + String instant) { + if (StringUtils.isNullOrEmpty(expressions)) { + return false; + } + String hashingInstant = StringUtils.isNullOrEmpty(instant) ? INITIAL_HASHING_CONFIG_INSTANT : instant; + PartitionBucketIndexHashingConfig hashingConfig = + new PartitionBucketIndexHashingConfig(expressions, defaultBucketNumber, rule, PartitionBucketIndexHashingConfig.CURRENT_VERSION, hashingInstant); + return saveHashingConfig(hashingConfig, metaClient); + } + + /** + * Save given hashing config. + */ + + public static boolean saveHashingConfig(PartitionBucketIndexHashingConfig hashingConfig, HoodieTableMetaClient metaClient) { + StoragePath hashingConfigPath = new StoragePath(metaClient.getHashingMetadataConfigPath(), hashingConfig.getFilename()); + HoodieStorage storage = metaClient.getStorage(); + try { + Option<byte []> content = Option.of(hashingConfig.toJsonString().getBytes(StandardCharsets.UTF_8)); + storage.createImmutableFileInPath(hashingConfigPath, content.map(HoodieInstantWriter::convertByteArrayToWriter)); + } catch (IOException ioe) { + throw new HoodieIOException("Failed to initHashingConfig ", ioe); + } + return true; + } + + public static Option<PartitionBucketIndexHashingConfig> loadHashingConfig(HoodieStorage storage, StoragePathInfo hashingConfig) { + return loadHashingConfig(storage, hashingConfig.getPath()); + } + + public static Option<PartitionBucketIndexHashingConfig> loadHashingConfig(HoodieStorage storage, StoragePath hashingConfig) { + if (hashingConfig == null) { + return Option.empty(); + } + try (InputStream is = storage.open(hashingConfig)) { + byte[] content = FileIOUtils.readAsByteArray(is); + return Option.of(PartitionBucketIndexHashingConfig.fromBytes(content)); + } catch (IOException e) { + LOG.error("Error when loading hashing config, for path: " + hashingConfig.getName(), e); + throw new HoodieIOException("Error while loading hashing config", e); + } + } + + /** + * Get Latest committed hashing config instant to load. + */ + public static String getLatestHashingConfigInstantToLoad(HoodieTableMetaClient metaClient) { + try { + List<String> allCommittedHashingConfig = getCommittedHashingConfig(metaClient); + return allCommittedHashingConfig.get(allCommittedHashingConfig.size() - 1); + } catch (Exception e) { + throw new HoodieException("Failed to get hashing config instant to load.", e); + } + } + + public static PartitionBucketIndexHashingConfig loadingLatestHashingConfig(HoodieTableMetaClient metaClient) { + String instantToLoad = getLatestHashingConfigInstantToLoad(metaClient); + Option<PartitionBucketIndexHashingConfig> latestHashingConfig = loadHashingConfig(metaClient.getStorage(), getHashingConfigPath(metaClient.getBasePath().toString(), instantToLoad)); + ValidationUtils.checkArgument(latestHashingConfig.isPresent(), "Can not load latest hashing config " + instantToLoad); + + return latestHashingConfig.get(); + } + + /** + * Archive hashing config. + */ + public static boolean archiveHashingConfigIfNecessary(HoodieTableMetaClient metaClient) throws IOException { + List<String> hashingConfigToArchive = getHashingConfigToArchive(metaClient); + if (hashingConfigToArchive.size() == 0) { + LOG.info("Nothing to archive " + hashingConfigToArchive); + return false; + } + + LOG.info("Start to archive hashing config " + hashingConfigToArchive); + return archiveHashingConfig(hashingConfigToArchive, metaClient); + } + + // for now we just remove active hashing config into archive folder + private static boolean archiveHashingConfig(List<String> hashingConfigToArchive, HoodieTableMetaClient metaClient) { + hashingConfigToArchive.forEach(instant -> { + StoragePath activeHashingPath = getHashingConfigPath(metaClient.getBasePath().toString(), instant); + StoragePath archiveHashingPath = getArchiveHashingConfigPath(metaClient.getBasePath().toString(), instant); + try { + metaClient.getStorage().rename(activeHashingPath, archiveHashingPath); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + }); + return true; + } + + public static List<String> getArchiveHashingConfig(HoodieTableMetaClient metaClient) throws IOException { Review Comment: done ########## hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java: ########## @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.JsonUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.storage.HoodieInstantWriter; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class PartitionBucketIndexHashingConfig implements Serializable { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(PartitionBucketIndexHashingConfig.class); + public static final String INITIAL_HASHING_CONFIG_INSTANT = HoodieTimeline.INIT_INSTANT_TS; + public static final String HASHING_CONFIG_FILE_SUFFIX = ".hashing_config"; + public static final Integer CURRENT_VERSION = 1; + private final String expressions; + private final int defaultBucketNumber; + private final String rule; + private final int version; + private final String instant; + + @JsonCreator + public PartitionBucketIndexHashingConfig(@JsonProperty("expressions") String expressions, + @JsonProperty("defaultBucketNumber") int defaultBucketNumber, + @JsonProperty("rule") String rule, + @JsonProperty("version") int version, + @JsonProperty("instant") String instant) { + this.expressions = expressions; + this.defaultBucketNumber = defaultBucketNumber; + this.rule = rule; + this.version = version; + this.instant = instant; + } + + public String getFilename() { + return instant + HASHING_CONFIG_FILE_SUFFIX; + } + + public String toJsonString() throws IOException { + return JsonUtils.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this); + } + + public String getInstant() { + return this.instant; + } + + public static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws Exception { + if (jsonStr == null || jsonStr.isEmpty()) { + // For empty commit file (no data or somethings bad happen). + return clazz.newInstance(); + } + return JsonUtils.getObjectMapper().readValue(jsonStr, clazz); + } + + public static PartitionBucketIndexHashingConfig fromBytes(byte[] bytes) throws IOException { + try { + return fromJsonString(new String(bytes, StandardCharsets.UTF_8), PartitionBucketIndexHashingConfig.class); + } catch (Exception e) { + throw new IOException("unable to load hashing config", e); + } + } + + public int getVersion() { + return version; + } + + public String getRule() { + return rule; + } + + public int getDefaultBucketNumber() { + return defaultBucketNumber; + } + + public String getExpressions() { + return expressions; + } + + /** + * Get the absolute path of hashing config meta folder. + */ + public static StoragePath getHashingConfigStorageFolder(String basePath) { + StoragePath metaPath = new StoragePath(basePath, HoodieTableMetaClient.METAFOLDER_NAME); + return new StoragePath(metaPath, HoodieTableMetaClient.BUCKET_INDEX_METAFOLDER_CONFIG_FOLDER); + } + + public static StoragePath getArchiveHashingConfigStorageFolder(String basePath) { + StoragePath metaPath = new StoragePath(basePath, HoodieTableMetaClient.METAFOLDER_NAME); + return new StoragePath(metaPath, HoodieTableMetaClient.BUCKET_INDEX_METAFOLDER_CONFIG_ARCHIVE_FOLDER); + } + + /** + * Get the absolute path of the specific <instant>.hashing_config path. + */ + public static StoragePath getHashingConfigPath(String basePath, String instant) { + StoragePath hashingBase = getHashingConfigStorageFolder(basePath); + return new StoragePath(hashingBase, instant + PartitionBucketIndexHashingConfig.HASHING_CONFIG_FILE_SUFFIX); + } + + public static StoragePath getArchiveHashingConfigPath(String basePath, String instant) { + StoragePath hashingBase = getArchiveHashingConfigStorageFolder(basePath); + return new StoragePath(hashingBase, instant + PartitionBucketIndexHashingConfig.HASHING_CONFIG_FILE_SUFFIX); + } + + /** + * Create and save <instant>.hashing_config base on given expressions, rule, defaultBucketNumber and instant. + * If given instant is null or empty, use INITIAL_HASHING_CONFIG_INSTANT. + */ + public static boolean saveHashingConfig(HoodieTableMetaClient metaClient, + String expressions, + String rule, + int defaultBucketNumber, + String instant) { + if (StringUtils.isNullOrEmpty(expressions)) { + return false; + } + String hashingInstant = StringUtils.isNullOrEmpty(instant) ? INITIAL_HASHING_CONFIG_INSTANT : instant; + PartitionBucketIndexHashingConfig hashingConfig = + new PartitionBucketIndexHashingConfig(expressions, defaultBucketNumber, rule, PartitionBucketIndexHashingConfig.CURRENT_VERSION, hashingInstant); + return saveHashingConfig(hashingConfig, metaClient); + } + + /** + * Save given hashing config. + */ + + public static boolean saveHashingConfig(PartitionBucketIndexHashingConfig hashingConfig, HoodieTableMetaClient metaClient) { + StoragePath hashingConfigPath = new StoragePath(metaClient.getHashingMetadataConfigPath(), hashingConfig.getFilename()); + HoodieStorage storage = metaClient.getStorage(); + try { + Option<byte []> content = Option.of(hashingConfig.toJsonString().getBytes(StandardCharsets.UTF_8)); + storage.createImmutableFileInPath(hashingConfigPath, content.map(HoodieInstantWriter::convertByteArrayToWriter)); + } catch (IOException ioe) { + throw new HoodieIOException("Failed to initHashingConfig ", ioe); + } + return true; + } + + public static Option<PartitionBucketIndexHashingConfig> loadHashingConfig(HoodieStorage storage, StoragePathInfo hashingConfig) { + return loadHashingConfig(storage, hashingConfig.getPath()); + } + + public static Option<PartitionBucketIndexHashingConfig> loadHashingConfig(HoodieStorage storage, StoragePath hashingConfig) { + if (hashingConfig == null) { + return Option.empty(); + } + try (InputStream is = storage.open(hashingConfig)) { + byte[] content = FileIOUtils.readAsByteArray(is); + return Option.of(PartitionBucketIndexHashingConfig.fromBytes(content)); + } catch (IOException e) { + LOG.error("Error when loading hashing config, for path: " + hashingConfig.getName(), e); + throw new HoodieIOException("Error while loading hashing config", e); + } + } + + /** + * Get Latest committed hashing config instant to load. + */ + public static String getLatestHashingConfigInstantToLoad(HoodieTableMetaClient metaClient) { + try { + List<String> allCommittedHashingConfig = getCommittedHashingConfig(metaClient); + return allCommittedHashingConfig.get(allCommittedHashingConfig.size() - 1); + } catch (Exception e) { + throw new HoodieException("Failed to get hashing config instant to load.", e); + } + } + + public static PartitionBucketIndexHashingConfig loadingLatestHashingConfig(HoodieTableMetaClient metaClient) { + String instantToLoad = getLatestHashingConfigInstantToLoad(metaClient); + Option<PartitionBucketIndexHashingConfig> latestHashingConfig = loadHashingConfig(metaClient.getStorage(), getHashingConfigPath(metaClient.getBasePath().toString(), instantToLoad)); + ValidationUtils.checkArgument(latestHashingConfig.isPresent(), "Can not load latest hashing config " + instantToLoad); + + return latestHashingConfig.get(); + } + + /** + * Archive hashing config. + */ + public static boolean archiveHashingConfigIfNecessary(HoodieTableMetaClient metaClient) throws IOException { + List<String> hashingConfigToArchive = getHashingConfigToArchive(metaClient); + if (hashingConfigToArchive.size() == 0) { + LOG.info("Nothing to archive " + hashingConfigToArchive); + return false; + } + + LOG.info("Start to archive hashing config " + hashingConfigToArchive); + return archiveHashingConfig(hashingConfigToArchive, metaClient); + } + + // for now we just remove active hashing config into archive folder + private static boolean archiveHashingConfig(List<String> hashingConfigToArchive, HoodieTableMetaClient metaClient) { + hashingConfigToArchive.forEach(instant -> { + StoragePath activeHashingPath = getHashingConfigPath(metaClient.getBasePath().toString(), instant); + StoragePath archiveHashingPath = getArchiveHashingConfigPath(metaClient.getBasePath().toString(), instant); + try { + metaClient.getStorage().rename(activeHashingPath, archiveHashingPath); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + }); + return true; + } + + public static List<String> getArchiveHashingConfig(HoodieTableMetaClient metaClient) throws IOException { + return metaClient.getStorage() + .listDirectEntries(new StoragePath(metaClient.getArchiveHashingMetadataConfigPath())).stream().map(info -> { + String instant = getHashingConfigInstant(info.getPath().getName()); + if (StringUtils.isNullOrEmpty(instant)) { + throw new HoodieException("Failed to get hashing config instant to load."); + } + return instant; + }).sorted().collect(Collectors.toList()); + } + + /** + * Get all commit hashing config. + * During rollback we will delete hashing config first, then remove related pending instant. + * So that Listed uncommitted hashing instant always exist in active timeline. + * **Ascending order** like 20250325091919474, 20250325091923956, 20250325091927529 + */ + public static List<String> getCommittedHashingConfig(HoodieTableMetaClient metaClient) throws IOException { + List<String> allActiveHashingConfig = metaClient.getStorage() Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org