vinothchandar commented on a change in pull request #3173: URL: https://github.com/apache/hudi/pull/3173#discussion_r764911822
########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndex.java ########## @@ -122,13 +125,40 @@ public O updateLocation(O writeStatuses, HoodieEngineContext context, @PublicAPIMethod(maturity = ApiMaturityLevel.STABLE) public abstract boolean isImplicitWithStorage(); + /** + * An index might need customized partitioner other than general upsert and insert partitioner. + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) + public Option<Partitioner> getCustomizedPartitioner(WorkloadProfile profile, + HoodieEngineContext context, + HoodieTable table, + HoodieWriteConfig writeConfig) { + return Option.empty(); + } + + /** + * If the `getCustomizedPartitioner` returns a partitioner, it has to be true. + */ + @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) Review comment: instead of thinking of it as custom partitioner, I would prefer we introduce a notion of "storage layout". bucketing is not just an attribute of writing but storage itself. once bucketed, any writer/reader needs to respect that. ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java ########## @@ -200,6 +209,48 @@ .defaultValue("true") .withDocumentation("Similar to " + BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE + ", but for simple index."); + /** + * ***** Bucket Index Configs ***** + * Bucket Index is targeted to locate the record fast by hash in big data scenarios. + * The current implementation is a basic version, so there are some constraints: + * 1. Unsupported operation: bulk insert, cluster and so on. + * 2. Bucket num change requires rewriting the partition. + * 3. Predict the table size and future data growth well to set a reasonable bucket num. + * 4. A bucket size is recommended less than 3GB and avoid bing too small. + */ + // Bucket num equals file groups num in each partition. + // Bucket num can be set according to partition size and file group size. + public static final ConfigProperty<Integer> BUCKET_INDEX_NUM_BUCKETS = ConfigProperty + .key("hoodie.bucket.index.num.buckets") + .defaultValue(256) + .withDocumentation("Only applies if index type is BUCKET_INDEX. Determine the bucket num of the hudi table, " + + "and each partition is divided to N buckets."); + + public static final ConfigProperty<String> BUCKET_INDEX_HASH_FIELD = ConfigProperty + .key("hoodie.bucket.index.hash.field") + .noDefaultValue() + .withDocumentation("Index key. It is used to index the record and find its file group. " + + "If not set, use record key field as default"); + + public static final ConfigProperty<String> BUCKET_INDEX_HASH_FUNCTION = ConfigProperty + .key("hoodie.bucket.index.hash.function") + .defaultValue("JVMHash") Review comment: we have standard utils for hashing now, that we intend to use broadly across. Can we reuse `HashID`. Do we need the HiveHash per se? I feel we should default to something other than JVMHash. wdyt ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java ########## @@ -200,6 +209,48 @@ .defaultValue("true") .withDocumentation("Similar to " + BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE + ", but for simple index."); + /** + * ***** Bucket Index Configs ***** + * Bucket Index is targeted to locate the record fast by hash in big data scenarios. + * The current implementation is a basic version, so there are some constraints: + * 1. Unsupported operation: bulk insert, cluster and so on. Review comment: Right, while bucketing helps for write perf and also join performance for UUID joins per e.g, it goes against clustering and other layout optimizations that can be useful for query performance. This is one of the reasons I did not prefer baking bucketing into the storage design. ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.utils.HashFunction; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class BucketIdentifier { + // compatible with the spark bucket name + private static final Pattern BUCKET_NAME = Pattern.compile(".*_(\\d+)(?:\\..*)?$"); + + public static int getBucketId(HoodieRecord record, String indexKeyFields, int numBuckets, String hashFunction) { + return getBucketId(record.getKey(), indexKeyFields, numBuckets, hashFunction); + } + + public static int getBucketId(HoodieKey hoodieKey, String indexKeyFields, int numBuckets, String hashFunction) { + List<String> hashKeyFields; + if (!hoodieKey.getRecordKey().contains(":")) { + hashKeyFields = Arrays.asList(hoodieKey.getRecordKey()); + } else { + Map<String, String> recordKeyPairs = Arrays.stream(hoodieKey.getRecordKey().split(",")) + .map(p -> p.split(":")) + .collect(Collectors.toMap(p -> p[0], p -> p[1])); + hashKeyFields = Arrays.stream(indexKeyFields.split(",")) + .map(f -> recordKeyPairs.get(f)) + .collect(Collectors.toList()); + } + return mod(HashFunction.getHashFunction(hashFunction).hash(hashKeyFields) + & Integer.MAX_VALUE, numBuckets); + } + + public static int bucketIdFromFileId(String fileId) { + return Integer.parseInt(fileId.substring(0, 8)); + } + + public static String bucketIdStr(int n) { + return String.format("%08d", n); + } + + public static String bucketIdStr(int n, int m) { + return bucketIdStr(mod(n, m)); + } + + public static String newBucketFileIdPrefix(String bucketId) { + return FSUtils.createNewFileIdPfx().replaceFirst(".{8}", bucketId); + } + + public static boolean isBucketFileName(String name) { + return BUCKET_NAME.matcher(name).matches(); + } + + private static int mod(int x, int y) { + int r = x % y; + if (r < 0) { Review comment: why not reject a negaative x? and keep this simple? ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.utils.HashFunction; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class BucketIdentifier { + // compatible with the spark bucket name + private static final Pattern BUCKET_NAME = Pattern.compile(".*_(\\d+)(?:\\..*)?$"); + + public static int getBucketId(HoodieRecord record, String indexKeyFields, int numBuckets, String hashFunction) { + return getBucketId(record.getKey(), indexKeyFields, numBuckets, hashFunction); + } + + public static int getBucketId(HoodieKey hoodieKey, String indexKeyFields, int numBuckets, String hashFunction) { + List<String> hashKeyFields; + if (!hoodieKey.getRecordKey().contains(":")) { Review comment: this assumes the composite key generator? not sure we need this complexity, bucketing is anyway not that useful once you have many key fields right. it's only useful when the query has all the same fields as well ########## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala ########## @@ -219,17 +226,31 @@ class MergeOnReadSnapshotRelation(val sqlContext: SQLContext, } else { Option.empty } - val logPaths = fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala .map(logFile => MergeOnReadSnapshotRelation.getFilePath(logFile.getPath)).toList val logPathsOptional = if (logPaths.isEmpty) Option.empty else Option(logPaths) - HoodieMergeOnReadFileSplit(partitionedFile, logPathsOptional, queryInstant, metaClient.getBasePath, - maxCompactionMemoryInBytes, mergeType) + (fileSlice.getBaseFile, HoodieMergeOnReadFileSplit(partitionedFile, logPathsOptional, queryInstant, metaClient.getBasePath, + maxCompactionMemoryInBytes, mergeType)) }).toList - fileSplits + groupFilesWithBucket(fileSplits) } } } + + private def groupFilesWithBucket( Review comment: can you explain why this needs to happen? At a high level, I would expect bucketing not alter how we group files? we have 1 file group per bucket right? ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.utils.HashFunction; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class BucketIdentifier { + // compatible with the spark bucket name + private static final Pattern BUCKET_NAME = Pattern.compile(".*_(\\d+)(?:\\..*)?$"); + + public static int getBucketId(HoodieRecord record, String indexKeyFields, int numBuckets, String hashFunction) { + return getBucketId(record.getKey(), indexKeyFields, numBuckets, hashFunction); + } + + public static int getBucketId(HoodieKey hoodieKey, String indexKeyFields, int numBuckets, String hashFunction) { + List<String> hashKeyFields; + if (!hoodieKey.getRecordKey().contains(":")) { Review comment: I am asking if we can just hash the recordKey as-is and keep it simple? ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.utils.HashFunction; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class BucketIdentifier { Review comment: Lets add a UT for this? ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java ########## @@ -200,6 +209,48 @@ .defaultValue("true") .withDocumentation("Similar to " + BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE + ", but for simple index."); + /** + * ***** Bucket Index Configs ***** + * Bucket Index is targeted to locate the record fast by hash in big data scenarios. + * The current implementation is a basic version, so there are some constraints: + * 1. Unsupported operation: bulk insert, cluster and so on. + * 2. Bucket num change requires rewriting the partition. + * 3. Predict the table size and future data growth well to set a reasonable bucket num. + * 4. A bucket size is recommended less than 3GB and avoid bing too small. + */ + // Bucket num equals file groups num in each partition. + // Bucket num can be set according to partition size and file group size. + public static final ConfigProperty<Integer> BUCKET_INDEX_NUM_BUCKETS = ConfigProperty + .key("hoodie.bucket.index.num.buckets") + .defaultValue(256) + .withDocumentation("Only applies if index type is BUCKET_INDEX. Determine the bucket num of the hudi table, " + + "and each partition is divided to N buckets."); + + public static final ConfigProperty<String> BUCKET_INDEX_HASH_FIELD = ConfigProperty + .key("hoodie.bucket.index.hash.field") + .noDefaultValue() + .withDocumentation("Index key. It is used to index the record and find its file group. " + + "If not set, use record key field as default"); + + public static final ConfigProperty<String> BUCKET_INDEX_HASH_FUNCTION = ConfigProperty + .key("hoodie.bucket.index.hash.function") + .defaultValue("JVMHash") + .withDocumentation("Hash function. It is used to compute the index key hash value " + + "Possible options are [JVMHash | HiveHash]. "); + + public static final Set<WriteOperationType> BUCKET_INDEX_SUPPORTED_OPERATIONS = new HashSet<WriteOperationType>() {{ + add(WriteOperationType.INSERT); + add(WriteOperationType.INSERT_PREPPED); + add(WriteOperationType.UPSERT); + add(WriteOperationType.UPSERT_PREPPED); + add(WriteOperationType.INSERT_OVERWRITE); + add(WriteOperationType.DELETE); + add(WriteOperationType.COMPACT); + add(WriteOperationType.DELETE_PARTITION); + // TODO: HUDI-2155 bulk insert support bucket index. + // TODO: HUDI-2156 cluster the table with bucket index. Review comment: I guess we can only sort within buckets. ########## File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala ########## @@ -526,6 +526,12 @@ object DataSourceWriteOptions { .noDefaultValue() .withDocumentation("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql.") + // bucketSpec: CLUSTERED BY (trace_id) SORTED BY (trace_id ASC) INTO 65536 BUCKETS Review comment: add this to the doc? is this an example bucket spec? ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java ########## @@ -85,7 +85,9 @@ public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) { handles.put(partitionPath, handle); } - if (!handle.canWrite(payload.record)) { + // If index requires a customized partitioner, cannot open a new file here since it might violate + // the distribution required by index. + if (!handle.canWrite(payload.record) && !hoodieTable.getIndex().needCustomizedPartitioner()) { Review comment: this is a little opaque. but get why you are doing this. I ll think of better ways if possible ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java ########## @@ -54,7 +63,7 @@ .key("hoodie.index.type") .noDefaultValue() .withDocumentation("Type of index to use. Default is Bloom filter. " - + "Possible options are [BLOOM | GLOBAL_BLOOM |SIMPLE | GLOBAL_SIMPLE | INMEMORY | HBASE]. " + + "Possible options are [BLOOM | GLOBAL_BLOOM |SIMPLE | GLOBAL_SIMPLE | INMEMORY | HBASE | BUCKET]. " Review comment: I think `BUCKET` is ok. All these refer to member of the IndexType enum ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java ########## @@ -540,5 +602,28 @@ private String getDefaultIndexType(EngineType engineType) { public EngineType getEngineType() { return engineType; } + + private void validateBucketIndexConfig() { + if (hoodieIndexConfig.getString(INDEX_TYPE) + .equalsIgnoreCase(HoodieIndex.IndexType.BUCKET_INDEX.toString())) { + // check the bucket index hash field + if (StringUtils.isNullOrEmpty(hoodieIndexConfig.getString(BUCKET_INDEX_HASH_FIELD))) { Review comment: is `KeyGeneratorOptions.RECORDKEY_FIELD_NAME` the thing we set across all write paths (spark sql/datasource, deltastreamer, flink sink, flink streamer, java ...). if not we can just do this resolution of the hash field within the index? ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java ########## @@ -200,6 +209,48 @@ .defaultValue("true") .withDocumentation("Similar to " + BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE + ", but for simple index."); + /** + * ***** Bucket Index Configs ***** + * Bucket Index is targeted to locate the record fast by hash in big data scenarios. + * The current implementation is a basic version, so there are some constraints: Review comment: lets capture these as follow ups in an umbrella JIRA? ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java ########## @@ -214,7 +214,11 @@ private void initKeyGenIfNeeded(boolean populateMetaFields) { } protected Partitioner getPartitioner(WorkloadProfile profile) { - if (WriteOperationType.isChangingRecords(operationType)) { + Option<org.apache.hudi.table.action.commit.Partitioner> customizedPartitioner = table.getIndex() Review comment: here's is where I think we can do something like `getStorageLayout().getPartitioner()` instead of special casing . We can circle back to this. ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.utils.HashFunction; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class BucketIdentifier { + // compatible with the spark bucket name + private static final Pattern BUCKET_NAME = Pattern.compile(".*_(\\d+)(?:\\..*)?$"); + + public static int getBucketId(HoodieRecord record, String indexKeyFields, int numBuckets, String hashFunction) { + return getBucketId(record.getKey(), indexKeyFields, numBuckets, hashFunction); + } + + public static int getBucketId(HoodieKey hoodieKey, String indexKeyFields, int numBuckets, String hashFunction) { + List<String> hashKeyFields; + if (!hoodieKey.getRecordKey().contains(":")) { + hashKeyFields = Arrays.asList(hoodieKey.getRecordKey()); + } else { + Map<String, String> recordKeyPairs = Arrays.stream(hoodieKey.getRecordKey().split(",")) + .map(p -> p.split(":")) + .collect(Collectors.toMap(p -> p[0], p -> p[1])); + hashKeyFields = Arrays.stream(indexKeyFields.split(",")) + .map(f -> recordKeyPairs.get(f)) + .collect(Collectors.toList()); + } + return mod(HashFunction.getHashFunction(hashFunction).hash(hashKeyFields) Review comment: why the `&`? ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java ########## @@ -540,5 +602,28 @@ private String getDefaultIndexType(EngineType engineType) { public EngineType getEngineType() { return engineType; } + + private void validateBucketIndexConfig() { + if (hoodieIndexConfig.getString(INDEX_TYPE) + .equalsIgnoreCase(HoodieIndex.IndexType.BUCKET_INDEX.toString())) { + // check the bucket index hash field + if (StringUtils.isNullOrEmpty(hoodieIndexConfig.getString(BUCKET_INDEX_HASH_FIELD))) { Review comment: i.e we delay this until we instantiate the index? instead of building this dependency between configs here? ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieIndexConfig.java ########## @@ -200,6 +209,48 @@ .defaultValue("true") .withDocumentation("Similar to " + BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE + ", but for simple index."); + /** + * ***** Bucket Index Configs ***** + * Bucket Index is targeted to locate the record fast by hash in big data scenarios. + * The current implementation is a basic version, so there are some constraints: + * 1. Unsupported operation: bulk insert, cluster and so on. + * 2. Bucket num change requires rewriting the partition. + * 3. Predict the table size and future data growth well to set a reasonable bucket num. + * 4. A bucket size is recommended less than 3GB and avoid bing too small. + */ + // Bucket num equals file groups num in each partition. + // Bucket num can be set according to partition size and file group size. + public static final ConfigProperty<Integer> BUCKET_INDEX_NUM_BUCKETS = ConfigProperty + .key("hoodie.bucket.index.num.buckets") + .defaultValue(256) + .withDocumentation("Only applies if index type is BUCKET_INDEX. Determine the bucket num of the hudi table, " + + "and each partition is divided to N buckets."); + + public static final ConfigProperty<String> BUCKET_INDEX_HASH_FIELD = ConfigProperty + .key("hoodie.bucket.index.hash.field") + .noDefaultValue() + .withDocumentation("Index key. It is used to index the record and find its file group. " + + "If not set, use record key field as default"); + + public static final ConfigProperty<String> BUCKET_INDEX_HASH_FUNCTION = ConfigProperty + .key("hoodie.bucket.index.hash.function") + .defaultValue("JVMHash") + .withDocumentation("Hash function. It is used to compute the index key hash value " + + "Possible options are [JVMHash | HiveHash]. "); + + public static final Set<WriteOperationType> BUCKET_INDEX_SUPPORTED_OPERATIONS = new HashSet<WriteOperationType>() {{ Review comment: this is not a config per se. Canwe move this elsewhere as a constant? ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/AbstractSparkDeltaCommitActionExecutor.java ########## @@ -74,8 +74,8 @@ public Partitioner getUpsertPartitioner(WorkloadProfile profile) { public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) throws IOException { LOG.info("Merging updates for commit " + instantTime + " for file " + fileId); - - if (!table.getIndex().canIndexLogFiles() && mergeOnReadUpsertPartitioner.getSmallFileIds().contains(fileId)) { + if (!table.getIndex().canIndexLogFiles() && mergeOnReadUpsertPartitioner != null Review comment: why'd this be null? ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractWriteHelper.java ########## @@ -46,7 +46,7 @@ Instant lookupBegin = Instant.now(); I taggedRecords = dedupedRecords; - if (performTagging) { + if (performTagging || table.getIndex().needTaggingIfInsert()) { Review comment: Might be good to eventually let the index know of the operation type and have it decide whether to tag or not. but this is ok for now ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/SparkHoodieIndexFactory.java ########## @@ -63,6 +64,8 @@ public static HoodieIndex createIndex(HoodieWriteConfig config) { return new HoodieSimpleIndex<>(config, getKeyGeneratorForSimpleIndex(config)); case GLOBAL_SIMPLE: return new HoodieGlobalSimpleIndex<>(config, getKeyGeneratorForSimpleIndex(config)); + case BUCKET_INDEX: Review comment: add entry down below too to keep it consistent? ########## File path: hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bucket/SparkBucketIndex.java ########## @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.hudi.client.utils.LazyIterableIterator; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.table.WorkloadProfile; +import org.apache.hudi.table.action.commit.Partitioner; +import org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordLocation; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieIndexException; +import org.apache.hudi.index.HoodieIndexUtils; +import org.apache.hudi.table.HoodieTable; +import org.apache.spark.api.java.JavaRDD; + +public class SparkBucketIndex<T extends HoodieRecordPayload> + extends HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> { + + private static final Logger LOG = LogManager.getLogger(SparkBucketIndex.class); + + private final int numBuckets; + + public SparkBucketIndex(HoodieWriteConfig config) { + super(config); + numBuckets = config.getBucketIndexNumBuckets(); + LOG.info("use bucket index, numBuckets=" + numBuckets); + } + + @Override + public HoodieData<WriteStatus> updateLocation(HoodieData<WriteStatus> writeStatuses, + HoodieEngineContext context, + HoodieTable hoodieTable) + throws HoodieIndexException { + return writeStatuses; + } + + @Override + public HoodieData<HoodieRecord<T>> tagLocation(HoodieData<HoodieRecord<T>> records, + HoodieEngineContext context, + HoodieTable hoodieTable) + throws HoodieIndexException { + HoodieData<HoodieRecord<T>> taggedRecords = records.mapPartitions(recordIter -> { + // partitionPath -> bucketId -> fileInfo + Map<String, Map<Integer, Pair<String, String>>> partitionPathFileIDList = new HashMap<>(); + return new LazyIterableIterator<HoodieRecord<T>, HoodieRecord<T>>(recordIter) { + + @Override + protected void start() { + + } + + @Override + protected HoodieRecord<T> computeNext() { + HoodieRecord record = recordIter.next(); + int bucketId = BucketIdentifier.getBucketId(record, config.getBucketIndexHashField(), + numBuckets, config.getBucketIndexHashFunction()); + String partitionPath = record.getPartitionPath(); + if (!partitionPathFileIDList.containsKey(partitionPath)) { + partitionPathFileIDList.put(partitionPath, loadPartitionBucketIdFileIdMapping(hoodieTable, partitionPath)); Review comment: rename: partitionPath ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/CopyOnWriteInsertHandler.java ########## @@ -85,7 +85,9 @@ public void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> payload) { handles.put(partitionPath, handle); } - if (!handle.canWrite(payload.record)) { + // If index requires a customized partitioner, cannot open a new file here since it might violate + // the distribution required by index. + if (!handle.canWrite(payload.record) && !hoodieTable.getIndex().needCustomizedPartitioner()) { Review comment: how do we handle this for MOR/AppendHandle? ########## File path: hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala ########## @@ -39,7 +39,13 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Try -case class HoodieMergeOnReadPartition(index: Int, split: HoodieMergeOnReadFileSplit) extends Partition +/** + * Files a input task reads. Usually a file spilt corresponds to a task. For partitioned hudi table with bucket index, Review comment: IIUC we just replace the first 8 of the fileID prefix right. so even though the bucket numbers are same, the file names will be different/unique across table? ########## File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala ########## @@ -526,6 +526,12 @@ object DataSourceWriteOptions { .noDefaultValue() .withDocumentation("Mode to choose for Hive ops. Valid values are hms, jdbc and hiveql.") + // bucketSpec: CLUSTERED BY (trace_id) SORTED BY (trace_id ASC) INTO 65536 BUCKETS Review comment: clustered by - has very different meanings in hudi or even in Cloud DWH. its unfortunate, what Hive has done with it :) ########## File path: hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java ########## @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.index.bucket; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.utils.HashFunction; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +public class BucketIdentifier { + // compatible with the spark bucket name + private static final Pattern BUCKET_NAME = Pattern.compile(".*_(\\d+)(?:\\..*)?$"); + + public static int getBucketId(HoodieRecord record, String indexKeyFields, int numBuckets, String hashFunction) { + return getBucketId(record.getKey(), indexKeyFields, numBuckets, hashFunction); + } + + public static int getBucketId(HoodieKey hoodieKey, String indexKeyFields, int numBuckets, String hashFunction) { + List<String> hashKeyFields; + if (!hoodieKey.getRecordKey().contains(":")) { + hashKeyFields = Arrays.asList(hoodieKey.getRecordKey()); + } else { + Map<String, String> recordKeyPairs = Arrays.stream(hoodieKey.getRecordKey().split(",")) + .map(p -> p.split(":")) + .collect(Collectors.toMap(p -> p[0], p -> p[1])); + hashKeyFields = Arrays.stream(indexKeyFields.split(",")) + .map(f -> recordKeyPairs.get(f)) + .collect(Collectors.toList()); + } + return mod(HashFunction.getHashFunction(hashFunction).hash(hashKeyFields) + & Integer.MAX_VALUE, numBuckets); + } + + public static int bucketIdFromFileId(String fileId) { + return Integer.parseInt(fileId.substring(0, 8)); + } + + public static String bucketIdStr(int n) { + return String.format("%08d", n); + } + + public static String bucketIdStr(int n, int m) { + return bucketIdStr(mod(n, m)); + } + + public static String newBucketFileIdPrefix(String bucketId) { + return FSUtils.createNewFileIdPfx().replaceFirst(".{8}", bucketId); + } + + public static boolean isBucketFileName(String name) { + return BUCKET_NAME.matcher(name).matches(); + } + + private static int mod(int x, int y) { + int r = x % y; + if (r < 0) { Review comment: then we can avoid the method call - might be good for something that is done every record. So its will be a simple expression? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org