cshuo commented on code in PR #12967:
URL: https://github.com/apache/hudi/pull/12967#discussion_r1999967891


##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/FlinkWriteHandleFactory.java:
##########
@@ -283,4 +283,30 @@ public static <T, I, K, O> 
DeltaCommitWriteHandleFactory<T, I, K, O> getInstance
       return new FlinkAppendHandle<>(config, instantTime, table, 
partitionPath, fileID, recordItr, contextSupplier);
     }
   }
+
+  /**
+   * Write handle factory for delta commit.
+   */
+  private static class DeltaCommitRowDataHandleFactory<T, I, K, O> implements 
Factory<T, I, K, O> {

Review Comment:
   This is added for validating, no need any more, will remove it..



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkRowDataWriteClient.java:
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.io.v2.HandleRecords;
+import org.apache.hudi.table.action.commit.BucketInfo;
+
+import java.util.List;
+
+/**
+ * Interface for flink write client that supports writing RowData directly 
into the underneath filesystem.
+ * <p>
+ * todo: add support for Insert/InsertOverwrite/InsertOverwriteTable, see 
HUDI-9075
+ */
+public interface FlinkRowDataWriteClient {
+
+  /**
+   * Upsert a batch of new records into Hoodie table at the supplied 
instantTime.
+   *
+   * @param records rowdatas needed to be written, in form of iterator
+   * @param bucketInfo the bucket info for the target bucket
+   * @param instantTime instant time to commit the data
+   * @return Collection of WriteStatus to inspect errors and counts
+   */
+  List<WriteStatus> upsert(HandleRecords records, BucketInfo bucketInfo, 
String instantTime);

Review Comment:
   Discussed offline:
   `HandleRecords` is introduced for getting records iterator for data block 
and delete block  separately, since the iterator is a one-time iterator, if 
delete records and upsert records are mixed in one iterator, we can not divide 
them without materializing the iterator into List, thus optimization like 
object-reuse cannot be employed. 
   
   For api design and perf trade off, we'll remove `HandleRecords`, and using 
one iterator contains both upsert/delete records, and divide them in log write 
handle. 



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieFlinkIOFactory.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.io.hadoop.HoodieHadoopIOFactory;
+import org.apache.hudi.io.storage.HoodieFileWriterFactory;
+import org.apache.hudi.storage.HoodieStorage;
+
+/**
+ * Creates readers and writers for Flink record payloads
+ */
+public class HoodieFlinkIOFactory extends HoodieHadoopIOFactory {

Review Comment:
   Current convention using reflection to create reader/writer is a workaround, 
see HUDI-7746.



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/RowDataLogHandle.java:
##########
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.v2;
+
+import org.apache.hudi.avro.AvroSchemaCache;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
+import org.apache.hudi.common.model.HoodieDeltaWriteStat;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodiePartitionMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.IOType;
+import org.apache.hudi.common.table.log.AppendResult;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieAppendException;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.io.HoodieWriteHandle;
+import org.apache.hudi.io.MiniBatchHandle;
+import org.apache.hudi.io.log.block.HoodieFlinkParquetDataBlock;
+import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.util.Lazy;
+
+import org.apache.avro.Schema;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED;
+import static 
org.apache.hudi.common.model.HoodieRecordLocation.INVALID_POSITION;
+
+/**
+ * A write handle that supports creating a log file and writing records based 
on record Iterator.
+ * The differences from {@code FlinkAppendHandle} are:
+ *
+ * <p> 1. {@code RowDataLogHandle} does not convert RowData into Avro record 
before writing.
+ * <p> 2. {@code RowDataLogHandle} writes Parquet data block by default.
+ * <p> 3. {@code RowDataLogHandle} does not buffer data internally, instead, 
it employs
+ *        record iterator to write data blocks, thereby enhancing memory 
efficiency.
+ *
+ * <p>The back-up writer may roll over to a new log file if there already 
exists a log file for the
+ * given file group and instant.
+ */
+public class RowDataLogHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, 
O> implements MiniBatchHandle {

Review Comment:
   Will update to RowDataLogWriteHandle



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriter.java:
##########
@@ -50,4 +56,12 @@ public interface HoodieRowDataFileWriter {
    * Closes the {@link HoodieRowDataFileWriter} and may not take in any more 
writes.
    */
   void close() throws IOException;
+
+  default void writeWithMetadata(HoodieKey key, HoodieRecord record, Schema 
schema, Properties props) throws IOException {
+    writeRow(key.getRecordKey(), (RowData) record.getData());

Review Comment:
   yes



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/log/block/HoodieFlinkParquetDataBlock.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.log.block;
+
+import org.apache.hudi.avro.AvroSchemaCache;
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
+import org.apache.hudi.common.table.log.block.HoodieParquetDataBlock;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.storage.HoodieStorage;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED;
+import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+
+/**
+ * HoodieFlinkParquetDataBlock employs an HoodieRecord iterator rather than a 
HoodieRecord list for
+ * parquet data block, aiming to better utilize the optimizations of {@code 
BinaryInMemorySortBuffer},
+ * for example, object reusing to decrease GC costs.
+ *
+ * <p> todo: HoodieFlinkParquetDataBlock does not support record-position for 
update/delete currently,
+ * and it will be supported later, see HUDI-9192.
+ */
+public class HoodieFlinkParquetDataBlock extends HoodieParquetDataBlock {
+  private final Iterator<HoodieRecord> recordIterator;
+
+  public HoodieFlinkParquetDataBlock(
+      Iterator<HoodieRecord> recordIterator,
+      Map<HeaderMetadataType, String> header,
+      String keyField,
+      String compressionCodecName,
+      double expectedCompressionRatio,
+      boolean useDictionaryEncoding) {
+    super(Collections.emptyList(), header, keyField, compressionCodecName, 
expectedCompressionRatio, useDictionaryEncoding);
+    this.recordIterator = recordIterator;
+  }
+
+  @Override
+  public byte[] getContentBytes(HoodieStorage storage) throws IOException {
+    Map<String, String> paramsMap = new HashMap<>();
+    paramsMap.put(PARQUET_COMPRESSION_CODEC_NAME.key(), 
compressionCodecName.get());
+    paramsMap.put(PARQUET_COMPRESSION_RATIO_FRACTION.key(), 
String.valueOf(expectedCompressionRatio.get()));
+    paramsMap.put(PARQUET_DICTIONARY_ENABLED.key(), 
String.valueOf(useDictionaryEncoding.get()));
+    Schema writerSchema = AvroSchemaCache.intern(new Schema.Parser().parse(
+        
super.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.SCHEMA)));
+
+    Pair<byte[], Map<String, HoodieColumnRangeMetadata<Comparable>>> result =
+        HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(PARQUET)
+            .serializeRecordsToLogBlock(
+                storage,
+                recordIterator,
+                HoodieRecord.HoodieRecordType.FLINK,
+                writerSchema,
+                getSchema(),
+                getKeyFieldName(),
+                paramsMap);
+    this.recordColumnStats = Option.of(result.getRight());

Review Comment:
   yes, will update.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to