zhangyue19921010 commented on code in PR #13017: URL: https://github.com/apache/hudi/pull/13017#discussion_r2015965829
########## hudi-common/src/main/java/org/apache/hudi/common/model/PartitionBucketIndexHashingConfig.java: ########## @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.FileIOUtils; +import org.apache.hudi.common.util.JsonUtils; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.storage.HoodieInstantWriter; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class PartitionBucketIndexHashingConfig implements Serializable { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(PartitionBucketIndexHashingConfig.class); + public static final String INITIAL_HASHING_CONFIG_INSTANT = HoodieTimeline.INIT_INSTANT_TS; + public static final String HASHING_CONFIG_FILE_SUFFIX = ".hashing_config"; + public static final Integer CURRENT_VERSION = 1; + private final String expressions; + private final int defaultBucketNumber; + private final String rule; + private final int version; + private final String instant; + + @JsonCreator + public PartitionBucketIndexHashingConfig(@JsonProperty("expressions") String expressions, + @JsonProperty("defaultBucketNumber") int defaultBucketNumber, + @JsonProperty("rule") String rule, + @JsonProperty("version") int version, + @JsonProperty("instant") String instant) { + this.expressions = expressions; + this.defaultBucketNumber = defaultBucketNumber; + this.rule = rule; + this.version = version; + this.instant = instant; + } + + public String getFilename() { + return instant + HASHING_CONFIG_FILE_SUFFIX; + } + + public String toJsonString() throws IOException { + return JsonUtils.getObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(this); + } + + public String getInstant() { + return this.instant; + } + + public static <T> T fromJsonString(String jsonStr, Class<T> clazz) throws Exception { + if (jsonStr == null || jsonStr.isEmpty()) { + // For empty commit file (no data or somethings bad happen). + return clazz.newInstance(); + } + return JsonUtils.getObjectMapper().readValue(jsonStr, clazz); + } + + public static PartitionBucketIndexHashingConfig fromBytes(byte[] bytes) throws IOException { + try { + return fromJsonString(new String(bytes, StandardCharsets.UTF_8), PartitionBucketIndexHashingConfig.class); + } catch (Exception e) { + throw new IOException("unable to load hashing config", e); + } + } + + public int getVersion() { + return version; + } + + public String getRule() { + return rule; + } + + public int getDefaultBucketNumber() { + return defaultBucketNumber; + } + + public String getExpressions() { + return expressions; + } + + /** + * Get the absolute path of hashing config meta folder. + */ + public static StoragePath getHashingConfigStorageFolder(String basePath) { + StoragePath metaPath = new StoragePath(basePath, HoodieTableMetaClient.METAFOLDER_NAME); + return new StoragePath(metaPath, HoodieTableMetaClient.BUCKET_INDEX_METAFOLDER_CONFIG_FOLDER); + } + + public static StoragePath getArchiveHashingConfigStorageFolder(String basePath) { + StoragePath metaPath = new StoragePath(basePath, HoodieTableMetaClient.METAFOLDER_NAME); + return new StoragePath(metaPath, HoodieTableMetaClient.BUCKET_INDEX_METAFOLDER_CONFIG_ARCHIVE_FOLDER); + } + + /** + * Get the absolute path of the specific <instant>.hashing_config path. + */ + public static StoragePath getHashingConfigPath(String basePath, String instant) { + StoragePath hashingBase = getHashingConfigStorageFolder(basePath); + return new StoragePath(hashingBase, instant + PartitionBucketIndexHashingConfig.HASHING_CONFIG_FILE_SUFFIX); + } + + public static StoragePath getArchiveHashingConfigPath(String basePath, String instant) { + StoragePath hashingBase = getArchiveHashingConfigStorageFolder(basePath); + return new StoragePath(hashingBase, instant + PartitionBucketIndexHashingConfig.HASHING_CONFIG_FILE_SUFFIX); + } + + /** + * Create and save <instant>.hashing_config base on given expressions, rule, defaultBucketNumber and instant. + * If given instant is null or empty, use INITIAL_HASHING_CONFIG_INSTANT. + */ + public static boolean saveHashingConfig(HoodieTableMetaClient metaClient, + String expressions, + String rule, + int defaultBucketNumber, + String instant) { + if (StringUtils.isNullOrEmpty(expressions)) { + return false; + } + String hashingInstant = StringUtils.isNullOrEmpty(instant) ? INITIAL_HASHING_CONFIG_INSTANT : instant; + PartitionBucketIndexHashingConfig hashingConfig = + new PartitionBucketIndexHashingConfig(expressions, defaultBucketNumber, rule, PartitionBucketIndexHashingConfig.CURRENT_VERSION, hashingInstant); + return saveHashingConfig(hashingConfig, metaClient); + } + + /** + * Save given hashing config. + */ + + public static boolean saveHashingConfig(PartitionBucketIndexHashingConfig hashingConfig, HoodieTableMetaClient metaClient) { + StoragePath hashingConfigPath = new StoragePath(metaClient.getHashingMetadataConfigPath(), hashingConfig.getFilename()); + HoodieStorage storage = metaClient.getStorage(); + try { + Option<byte []> content = Option.of(hashingConfig.toJsonString().getBytes(StandardCharsets.UTF_8)); + storage.createImmutableFileInPath(hashingConfigPath, content.map(HoodieInstantWriter::convertByteArrayToWriter)); + } catch (IOException ioe) { + throw new HoodieIOException("Failed to initHashingConfig ", ioe); + } + return true; + } + + public static Option<PartitionBucketIndexHashingConfig> loadHashingConfig(HoodieStorage storage, StoragePathInfo hashingConfig) { + return loadHashingConfig(storage, hashingConfig.getPath()); + } + + public static Option<PartitionBucketIndexHashingConfig> loadHashingConfig(HoodieStorage storage, StoragePath hashingConfig) { + if (hashingConfig == null) { + return Option.empty(); + } + try (InputStream is = storage.open(hashingConfig)) { + byte[] content = FileIOUtils.readAsByteArray(is); + return Option.of(PartitionBucketIndexHashingConfig.fromBytes(content)); + } catch (IOException e) { + LOG.error("Error when loading hashing config, for path: " + hashingConfig.getName(), e); + throw new HoodieIOException("Error while loading hashing config", e); + } + } + + /** + * Get Latest committed hashing config instant to load. + */ + public static String getLatestHashingConfigInstantToLoad(HoodieTableMetaClient metaClient) { + try { + List<String> allCommittedHashingConfig = getCommittedHashingConfig(metaClient); + return allCommittedHashingConfig.get(allCommittedHashingConfig.size() - 1); + } catch (Exception e) { + throw new HoodieException("Failed to get hashing config instant to load.", e); + } + } + + public static PartitionBucketIndexHashingConfig loadingLatestHashingConfig(HoodieTableMetaClient metaClient) { + String instantToLoad = getLatestHashingConfigInstantToLoad(metaClient); + Option<PartitionBucketIndexHashingConfig> latestHashingConfig = loadHashingConfig(metaClient.getStorage(), getHashingConfigPath(metaClient.getBasePath().toString(), instantToLoad)); + ValidationUtils.checkArgument(latestHashingConfig.isPresent(), "Can not load latest hashing config " + instantToLoad); + + return latestHashingConfig.get(); + } + + /** + * Archive hashing config. + */ + public static boolean archiveHashingConfigIfNecessary(HoodieTableMetaClient metaClient) throws IOException { + List<String> hashingConfigToArchive = getHashingConfigToArchive(metaClient); + if (hashingConfigToArchive.size() == 0) { + LOG.info("Nothing to archive " + hashingConfigToArchive); + return false; + } + + LOG.info("Start to archive hashing config " + hashingConfigToArchive); + return archiveHashingConfig(hashingConfigToArchive, metaClient); + } + + // for now we just remove active hashing config into archive folder + private static boolean archiveHashingConfig(List<String> hashingConfigToArchive, HoodieTableMetaClient metaClient) { + hashingConfigToArchive.forEach(instant -> { + StoragePath activeHashingPath = getHashingConfigPath(metaClient.getBasePath().toString(), instant); + StoragePath archiveHashingPath = getArchiveHashingConfigPath(metaClient.getBasePath().toString(), instant); + try { + metaClient.getStorage().rename(activeHashingPath, archiveHashingPath); + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + }); + return true; + } + + public static List<String> getArchiveHashingConfig(HoodieTableMetaClient metaClient) throws IOException { + return metaClient.getStorage() + .listDirectEntries(new StoragePath(metaClient.getArchiveHashingMetadataConfigPath())).stream().map(info -> { + String instant = getHashingConfigInstant(info.getPath().getName()); + if (StringUtils.isNullOrEmpty(instant)) { + throw new HoodieException("Failed to get hashing config instant to load."); + } + return instant; + }).sorted().collect(Collectors.toList()); + } + + /** + * Get all commit hashing config. + * During rollback we will delete hashing config first, then remove related pending instant. + * So that Listed uncommitted hashing instant always exist in active timeline. + * **Ascending order** like 20250325091919474, 20250325091923956, 20250325091927529 + */ + public static List<String> getCommittedHashingConfig(HoodieTableMetaClient metaClient) throws IOException { Review Comment: done ########## hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java: ########## @@ -1258,6 +1266,75 @@ void testBulkInsert(String indexType, boolean hiveStylePartitioning) { + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"); } + @ParameterizedTest + @MethodSource("testBulkInsertWithPartitionBucketIndex") + void testBulkInsertWithPartitionBucketIndex(String operationType, String tableType) throws IOException { + TableEnvironment tableEnv = batchTableEnv; + // csv source + String csvSourceDDL = TestConfigurations.getCsvSourceDDL("csv_source", "test_source_5.data"); + tableEnv.executeSql(csvSourceDDL); + String catalogName = "hudi_" + operationType; + String hudiCatalogDDL = catalog(catalogName) + .catalogPath(tempFile.getAbsolutePath()) + .end(); + + tableEnv.executeSql(hudiCatalogDDL); + String dbName = "hudi"; + tableEnv.executeSql("create database " + catalogName + "." + dbName); + String basePath = tempFile.getAbsolutePath() + "/hudi/hoodie_sink"; + + String hoodieTableDDL = sql(catalogName + ".hudi.hoodie_sink") + .option(FlinkOptions.PATH, basePath) + .option(FlinkOptions.TABLE_TYPE, tableType) + .option(FlinkOptions.OPERATION, operationType) + .option(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_INPUT, true) + .option(FlinkOptions.INDEX_TYPE, "BUCKET") + .option(FlinkOptions.HIVE_STYLE_PARTITIONING, "true") + .option(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS, "1") + .option(FlinkOptions.BUCKET_INDEX_PARTITION_RULE, "regex") + .option(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS, "partition=(par1|par2),2") + .end(); + tableEnv.executeSql(hoodieTableDDL); + + String insertInto = "insert into " + catalogName + ".hudi.hoodie_sink select * from csv_source"; + execInsertSql(tableEnv, insertInto); + + List<Row> result1 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from " + catalogName + ".hudi.hoodie_sink").execute().collect()); + assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT); + // apply filters + List<Row> result2 = CollectionUtil.iterableToList( + () -> tableEnv.sqlQuery("select * from " + catalogName + ".hudi.hoodie_sink where uuid > 'id5'").execute().collect()); + assertRowsEquals(result2, "[" + + "+I[id6, Emma, 20, 1970-01-01T00:00:06, par3], " + + "+I[id7, Bob, 44, 1970-01-01T00:00:07, par4], " + + "+I[id8, Han, 56, 1970-01-01T00:00:08, par4]]"); + HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(basePath, new org.apache.hadoop.conf.Configuration()); + List<String> actual = PartitionBucketIndexUtils.getAllFileIDWithPartition(metaClient); + + // based on expression partition=(par1|par2),2 and default bucket number 1 + // par1 and par2 have two buckets. + // par3 and par4 have one bucket. + ArrayList<String> expected = new ArrayList<>(); + expected.add("partition=par1" + "00000000"); + expected.add("partition=par1" + "00000001"); + expected.add("partition=par2" + "00000000"); + expected.add("partition=par2" + "00000001"); + expected.add("partition=par3" + "00000000"); + expected.add("partition=par4" + "00000000"); + + assertEquals(expected.stream().sorted().collect(Collectors.toList()), actual.stream().sorted().collect(Collectors.toList())); + } + + public static List<Arguments> testBulkInsertWithPartitionBucketIndex() { Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org