cshuo commented on code in PR #12967: URL: https://github.com/apache/hudi/pull/12967#discussion_r1999967891
########## hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java: ########## @@ -283,4 +283,30 @@ public static <T, I, K, O> DeltaCommitWriteHandleFactory<T, I, K, O> getInstance return new FlinkAppendHandle<>(config, instantTime, table, partitionPath, fileID, recordItr, contextSupplier); } } + + /** + * Write handle factory for delta commit. + */ + private static class DeltaCommitRowDataHandleFactory<T, I, K, O> implements Factory<T, I, K, O> { Review Comment: This is added for validating, no need any more, will remove it.. ########## hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkRowDataWriteClient.java: ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.io.v2.HandleRecords; +import org.apache.hudi.table.action.commit.BucketInfo; + +import java.util.List; + +/** + * Interface for flink write client that supports writing RowData directly into the underneath filesystem. + * <p> + * todo: add support for Insert/InsertOverwrite/InsertOverwriteTable, see HUDI-9075 + */ +public interface FlinkRowDataWriteClient { + + /** + * Upsert a batch of new records into Hoodie table at the supplied instantTime. + * + * @param records rowdatas needed to be written, in form of iterator + * @param bucketInfo the bucket info for the target bucket + * @param instantTime instant time to commit the data + * @return Collection of WriteStatus to inspect errors and counts + */ + List<WriteStatus> upsert(HandleRecords records, BucketInfo bucketInfo, String instantTime); Review Comment: Discussed offline: `HandleRecords` is introduced for getting records iterator for data block and delete block separately, since the iterator is a one-time iterator, if delete records and upsert records are mixed in one iterator, we can not divide them without materializing the iterator into List, thus optimization like object-reuse cannot be employed. For api design and perf trade off, we'll remove `HandleRecords`, and using one iterator contains both upsert/delete records, and divide them in log write handle. ########## hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieFlinkIOFactory.java: ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.storage.row; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.io.hadoop.HoodieHadoopIOFactory; +import org.apache.hudi.io.storage.HoodieFileWriterFactory; +import org.apache.hudi.storage.HoodieStorage; + +/** + * Creates readers and writers for Flink record payloads + */ +public class HoodieFlinkIOFactory extends HoodieHadoopIOFactory { Review Comment: Current convention using reflection to create reader/writer is a workaround, see HUDI-7746. ########## hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/RowDataLogHandle.java: ########## @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.v2; + +import org.apache.hudi.avro.AvroSchemaCache; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.HoodieStorageConfig; +import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.DeleteRecord; +import org.apache.hudi.common.model.HoodieColumnRangeMetadata; +import org.apache.hudi.common.model.HoodieDeltaWriteStat; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.table.log.AppendResult; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.block.HoodieDeleteBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieAppendException; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.io.HoodieWriteHandle; +import org.apache.hudi.io.MiniBatchHandle; +import org.apache.hudi.io.log.block.HoodieFlinkParquetDataBlock; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.util.Lazy; + +import org.apache.avro.Schema; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME; +import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION; +import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED; +import static org.apache.hudi.common.model.HoodieRecordLocation.INVALID_POSITION; + +/** + * A write handle that supports creating a log file and writing records based on record Iterator. + * The differences from {@code FlinkAppendHandle} are: + * + * <p> 1. {@code RowDataLogHandle} does not convert RowData into Avro record before writing. + * <p> 2. {@code RowDataLogHandle} writes Parquet data block by default. + * <p> 3. {@code RowDataLogHandle} does not buffer data internally, instead, it employs + * record iterator to write data blocks, thereby enhancing memory efficiency. + * + * <p>The back-up writer may roll over to a new log file if there already exists a log file for the + * given file group and instant. + */ +public class RowDataLogHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O> implements MiniBatchHandle { Review Comment: Will update to RowDataLogWriteHandle ########## hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriter.java: ########## @@ -50,4 +56,12 @@ public interface HoodieRowDataFileWriter { * Closes the {@link HoodieRowDataFileWriter} and may not take in any more writes. */ void close() throws IOException; + + default void writeWithMetadata(HoodieKey key, HoodieRecord record, Schema schema, Properties props) throws IOException { + writeRow(key.getRecordKey(), (RowData) record.getData()); Review Comment: yes ########## hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/log/block/HoodieFlinkParquetDataBlock.java: ########## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io.log.block; + +import org.apache.hudi.avro.AvroSchemaCache; +import org.apache.hudi.common.model.HoodieColumnRangeMetadata; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; +import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.io.storage.HoodieIOFactory; +import org.apache.hudi.storage.HoodieStorage; + +import org.apache.avro.Schema; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME; +import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION; +import static org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED; +import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; + +/** + * HoodieFlinkParquetDataBlock employs an HoodieRecord iterator rather than a HoodieRecord list for + * parquet data block, aiming to better utilize the optimizations of {@code BinaryInMemorySortBuffer}, + * for example, object reusing to decrease GC costs. + * + * <p> todo: HoodieFlinkParquetDataBlock does not support record-position for update/delete currently, + * and it will be supported later, see HUDI-9192. + */ +public class HoodieFlinkParquetDataBlock extends HoodieParquetDataBlock { + private final Iterator<HoodieRecord> recordIterator; + + public HoodieFlinkParquetDataBlock( + Iterator<HoodieRecord> recordIterator, + Map<HeaderMetadataType, String> header, + String keyField, + String compressionCodecName, + double expectedCompressionRatio, + boolean useDictionaryEncoding) { + super(Collections.emptyList(), header, keyField, compressionCodecName, expectedCompressionRatio, useDictionaryEncoding); + this.recordIterator = recordIterator; + } + + @Override + public byte[] getContentBytes(HoodieStorage storage) throws IOException { + Map<String, String> paramsMap = new HashMap<>(); + paramsMap.put(PARQUET_COMPRESSION_CODEC_NAME.key(), compressionCodecName.get()); + paramsMap.put(PARQUET_COMPRESSION_RATIO_FRACTION.key(), String.valueOf(expectedCompressionRatio.get())); + paramsMap.put(PARQUET_DICTIONARY_ENABLED.key(), String.valueOf(useDictionaryEncoding.get())); + Schema writerSchema = AvroSchemaCache.intern(new Schema.Parser().parse( + super.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.SCHEMA))); + + Pair<byte[], Map<String, HoodieColumnRangeMetadata<Comparable>>> result = + HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(PARQUET) + .serializeRecordsToLogBlock( + storage, + recordIterator, + HoodieRecord.HoodieRecordType.FLINK, + writerSchema, + getSchema(), + getKeyFieldName(), + paramsMap); + this.recordColumnStats = Option.of(result.getRight()); Review Comment: yes, will update. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org