cshuo commented on code in PR #13409: URL: https://github.com/apache/hudi/pull/13409#discussion_r2204041938
########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.append; + +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator; +import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer; +import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; +import org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer; +import org.apache.flink.table.runtime.util.MemorySegmentPool; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.StreamWriteOperatorCoordinator; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Collector; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory; +import org.apache.hudi.sink.bulk.sort.SortOperatorGen; +import org.apache.hudi.sink.utils.BufferUtils; +import org.apache.flink.runtime.operators.sort.QuickSort; +import org.apache.hudi.util.MutableIteratorWrapperIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Sink function to write the data to the underneath filesystem with buffer sort + * to improve the parquet compression rate. + * + * <p>The function writes base files directly for each checkpoint, + * the file may roll over when it’s size hits the configured threshold. + * + * @param <T> Type of the input record + * @see StreamWriteOperatorCoordinator + */ +public class AppendWriteFunctionWithBufferSort<T> extends AppendWriteFunction<T> { + private static final Logger LOG = LoggerFactory.getLogger(AppendWriteFunctionWithBufferSort.class); + private final long writeBufferSize; + private final List<String> sortKeyList; Review Comment: can be a local variable ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.append; + +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator; +import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer; +import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; +import org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer; +import org.apache.flink.table.runtime.util.MemorySegmentPool; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.StreamWriteOperatorCoordinator; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Collector; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory; +import org.apache.hudi.sink.bulk.sort.SortOperatorGen; +import org.apache.hudi.sink.utils.BufferUtils; +import org.apache.flink.runtime.operators.sort.QuickSort; +import org.apache.hudi.util.MutableIteratorWrapperIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Sink function to write the data to the underneath filesystem with buffer sort + * to improve the parquet compression rate. + * + * <p>The function writes base files directly for each checkpoint, + * the file may roll over when it’s size hits the configured threshold. + * + * @param <T> Type of the input record + * @see StreamWriteOperatorCoordinator + */ +public class AppendWriteFunctionWithBufferSort<T> extends AppendWriteFunction<T> { + private static final Logger LOG = LoggerFactory.getLogger(AppendWriteFunctionWithBufferSort.class); + private final long writeBufferSize; + private final List<String> sortKeyList; + private final GeneratedNormalizedKeyComputer keyComputer; + private final GeneratedRecordComparator recordComparator; + private transient BinaryInMemorySortBuffer buffer; + private transient MemorySegmentPool memorySegmentPool; Review Comment: can be a local variable ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.append; + +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator; +import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer; +import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; +import org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer; +import org.apache.flink.table.runtime.util.MemorySegmentPool; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.StreamWriteOperatorCoordinator; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Collector; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory; +import org.apache.hudi.sink.bulk.sort.SortOperatorGen; +import org.apache.hudi.sink.utils.BufferUtils; +import org.apache.flink.runtime.operators.sort.QuickSort; +import org.apache.hudi.util.MutableIteratorWrapperIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Sink function to write the data to the underneath filesystem with buffer sort + * to improve the parquet compression rate. + * + * <p>The function writes base files directly for each checkpoint, + * the file may roll over when it’s size hits the configured threshold. + * + * @param <T> Type of the input record + * @see StreamWriteOperatorCoordinator + */ +public class AppendWriteFunctionWithBufferSort<T> extends AppendWriteFunction<T> { + private static final Logger LOG = LoggerFactory.getLogger(AppendWriteFunctionWithBufferSort.class); + private final long writeBufferSize; + private final List<String> sortKeyList; + private final GeneratedNormalizedKeyComputer keyComputer; + private final GeneratedRecordComparator recordComparator; Review Comment: `keyComputer` and `recordComparator` can be local variables, and initialized in `open(..)` ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.append; + +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator; +import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer; +import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; +import org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer; +import org.apache.flink.table.runtime.util.MemorySegmentPool; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.StreamWriteOperatorCoordinator; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Collector; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory; +import org.apache.hudi.sink.bulk.sort.SortOperatorGen; +import org.apache.hudi.sink.utils.BufferUtils; +import org.apache.flink.runtime.operators.sort.QuickSort; +import org.apache.hudi.util.MutableIteratorWrapperIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Sink function to write the data to the underneath filesystem with buffer sort + * to improve the parquet compression rate. + * + * <p>The function writes base files directly for each checkpoint, + * the file may roll over when it’s size hits the configured threshold. + * + * @param <T> Type of the input record + * @see StreamWriteOperatorCoordinator + */ +public class AppendWriteFunctionWithBufferSort<T> extends AppendWriteFunction<T> { + private static final Logger LOG = LoggerFactory.getLogger(AppendWriteFunctionWithBufferSort.class); + private final long writeBufferSize; + private final List<String> sortKeyList; + private final GeneratedNormalizedKeyComputer keyComputer; + private final GeneratedRecordComparator recordComparator; + private transient BinaryInMemorySortBuffer buffer; + private transient MemorySegmentPool memorySegmentPool; + private transient SortOperatorGen sortOperatorGen; + + public AppendWriteFunctionWithBufferSort(Configuration config, RowType rowType) { + super(config, rowType); + this.writeBufferSize = config.get(FlinkOptions.WRITE_BUFFER_SIZE); + String sortKeys = config.get(FlinkOptions.WRITE_BUFFER_SORT_KEYS); + if (sortKeys == null) { + throw new IllegalArgumentException("Sort keys can't be null for append write with buffer sort."); + } + this.sortKeyList = Arrays.stream(sortKeys.split(",")).map(key -> key.trim()).collect(Collectors.toList()); + this.sortOperatorGen = new SortOperatorGen(rowType, sortKeyList.toArray(new String[0])); + SortCodeGenerator codeGenerator = sortOperatorGen.createSortCodeGenerator(); + this.keyComputer = codeGenerator.generateNormalizedKeyComputer("SortComputer"); + this.recordComparator = codeGenerator.generateRecordComparator("SortComparator"); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.memorySegmentPool = MemorySegmentPoolFactory.createMemorySegmentPool(config); + this.buffer = BufferUtils.createBuffer(rowType, + memorySegmentPool, + this.keyComputer.newInstance(Thread.currentThread().getContextClassLoader()), + this.recordComparator.newInstance(Thread.currentThread().getContextClassLoader())); + } + + @Override + public void processElement(T value, Context ctx, Collector<RowData> out) throws Exception { + RowData data = (RowData) value; + boolean result = buffer.write(data); + // If write result is false or buffer hit size limit + if (!result || buffer.size() >= writeBufferSize) { Review Comment: When `result` is false, it means the record is not inserted into the buffer successfully, we should retry inserting it again. See similar logic here: https://github.com/apache/hudi/blob/60576bcccf08d07abb9d1ee41cf52656fc491bbf/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java#L294 ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.append; + +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator; +import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer; +import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; +import org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer; +import org.apache.flink.table.runtime.util.MemorySegmentPool; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.StreamWriteOperatorCoordinator; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Collector; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory; +import org.apache.hudi.sink.bulk.sort.SortOperatorGen; +import org.apache.hudi.sink.utils.BufferUtils; +import org.apache.flink.runtime.operators.sort.QuickSort; +import org.apache.hudi.util.MutableIteratorWrapperIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Sink function to write the data to the underneath filesystem with buffer sort + * to improve the parquet compression rate. + * + * <p>The function writes base files directly for each checkpoint, + * the file may roll over when it’s size hits the configured threshold. + * + * @param <T> Type of the input record + * @see StreamWriteOperatorCoordinator + */ +public class AppendWriteFunctionWithBufferSort<T> extends AppendWriteFunction<T> { + private static final Logger LOG = LoggerFactory.getLogger(AppendWriteFunctionWithBufferSort.class); + private final long writeBufferSize; + private final List<String> sortKeyList; + private final GeneratedNormalizedKeyComputer keyComputer; + private final GeneratedRecordComparator recordComparator; + private transient BinaryInMemorySortBuffer buffer; + private transient MemorySegmentPool memorySegmentPool; + private transient SortOperatorGen sortOperatorGen; + + public AppendWriteFunctionWithBufferSort(Configuration config, RowType rowType) { + super(config, rowType); + this.writeBufferSize = config.get(FlinkOptions.WRITE_BUFFER_SIZE); Review Comment: Do we still need `WRITE_BUFFER_SIZE` (records count) to control the flushing of buffer? Since we can already flush the buffer by checking whether the buffer is full. ########## hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/append/ITTestAppendWriteFunctionWithBufferSort.java: ########## @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.append; + +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.TimestampType; +import org.apache.flink.table.types.logical.VarCharType; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.utils.TestWriteBase; +import org.apache.hudi.utils.TestConfigurations; +import org.apache.hudi.utils.TestData; + +import org.apache.avro.generic.GenericRecord; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.TimestampData; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Test cases for {@link AppendWriteFunctionWithBufferSort}. + */ +public class ITTestAppendWriteFunctionWithBufferSort extends TestWriteBase { + + private Configuration conf; + private RowType rowType; + + @BeforeEach + public void before(@TempDir File tempDir) throws Exception { + super.before(); + this.conf = TestConfigurations.getDefaultConf(tempDir.getAbsolutePath()); + this.conf.setBoolean(FlinkOptions.WRITE_BUFFER_SORT_ENABLED, true); + this.conf.setString(FlinkOptions.OPERATION, "insert"); + this.conf.setString(FlinkOptions.WRITE_BUFFER_SORT_KEYS, "name,age"); + this.conf.setLong(FlinkOptions.WRITE_BUFFER_SIZE, 100); + + // Define the row type with fields: name (STRING), age (INT), partition (STRING) + List<RowType.RowField> fields = new ArrayList<>(); + fields.add(new RowType.RowField("uuid", VarCharType.STRING_TYPE)); + fields.add(new RowType.RowField("name", VarCharType.STRING_TYPE)); + fields.add(new RowType.RowField("age", new IntType())); + fields.add(new RowType.RowField("ts", new TimestampType())); + fields.add(new RowType.RowField("partition", VarCharType.STRING_TYPE)); + this.rowType = new RowType(fields); + } + + @Test + public void testBufferFlushOnSize() throws Exception { + // Create test data that exceeds buffer size + List<RowData> inputData = new ArrayList<>(); + for (int i = 0; i < 150; i++) { + inputData.add(createRowData("uuid" + i, "Name" + i, i, "1970-01-01 00:00:01.123", "p1")); + } + + // Write the data + TestWriteBase.TestHarness.instance() + .preparePipeline(tempFile, conf) + .consume(inputData) + .endInput(); + + // Verify all data was written + List<GenericRecord> actualData = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1); + assertEquals(100, actualData.size()); + } + + @Test + public void testBufferFlushOnCheckpoint() throws Exception { + // Create test data + List<RowData> inputData = Arrays.asList( + createRowData("uuid1", "Bob", 30, "1970-01-01 00:00:01.123", "p1"), + createRowData("uuid1", "Alice", 25, "1970-01-01 00:00:01.124", "p1") + ); + + // Write the data and wait for timer + TestWriteBase.TestHarness.instance() + .preparePipeline(tempFile, conf) + .consume(inputData) + .checkpoint(1) + .endInput(); + + // Verify data was written + List<GenericRecord> actualData = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1); + assertEquals(2, actualData.size()); + } + + @Test + public void testSortedResult() throws Exception { + // Create test data + List<RowData> inputData = Arrays.asList( + createRowData("uuid1", "Bob", 30, "1970-01-01 00:00:01.123", "p1"), + createRowData("uuid1", "Alice", 25, "1970-01-01 00:00:01.124", "p1"), + createRowData("uuid1", "Bob", 21, "1970-01-01 00:00:31.124", "p1") + ); + + List<String> expected = Arrays.asList( + convert(createRowData("uuid1", "Alice", 25, "1970-01-01 00:00:01.124", "p1")), + convert(createRowData("uuid1", "Bob", 21, "1970-01-01 00:00:31.124", "p1")), + convert(createRowData("uuid1", "Bob", 30, "1970-01-01 00:00:01.123", "p1")) + ); + + // Write the data and wait for timer + TestWriteBase.TestHarness.instance() + .preparePipeline(tempFile, conf) + .consume(inputData) + .checkpoint(1) + .endInput(); + + // Verify data was written + List<GenericRecord> result = TestData.readAllData(new File(conf.get(FlinkOptions.PATH)), rowType, 1); + assertEquals(3, result.size()); + + List<String> filteredResult = Review Comment: Can we reusing the existed utilities in `TestData`, e.g., `filterOutVariables`. ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/BufferUtils.java: ########## @@ -36,13 +36,18 @@ public class BufferUtils { private static final int MIN_REQUIRED_BUFFERS = 3; public static BinaryInMemorySortBuffer createBuffer(RowType rowType, MemorySegmentPool memorySegmentPool) { + return createBuffer(rowType, memorySegmentPool, new NaturalOrderRecordComparator()); + } + + public static BinaryInMemorySortBuffer createBuffer(RowType rowType, MemorySegmentPool memorySegmentPool, RecordComparator recordComparator) { Review Comment: This method seems unnecessary. ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.append; + +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator; +import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer; +import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; +import org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer; +import org.apache.flink.table.runtime.util.MemorySegmentPool; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.StreamWriteOperatorCoordinator; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Collector; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory; +import org.apache.hudi.sink.bulk.sort.SortOperatorGen; +import org.apache.hudi.sink.utils.BufferUtils; +import org.apache.flink.runtime.operators.sort.QuickSort; +import org.apache.hudi.util.MutableIteratorWrapperIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Sink function to write the data to the underneath filesystem with buffer sort + * to improve the parquet compression rate. + * + * <p>The function writes base files directly for each checkpoint, + * the file may roll over when it’s size hits the configured threshold. + * + * @param <T> Type of the input record + * @see StreamWriteOperatorCoordinator + */ +public class AppendWriteFunctionWithBufferSort<T> extends AppendWriteFunction<T> { + private static final Logger LOG = LoggerFactory.getLogger(AppendWriteFunctionWithBufferSort.class); + private final long writeBufferSize; + private final List<String> sortKeyList; + private final GeneratedNormalizedKeyComputer keyComputer; + private final GeneratedRecordComparator recordComparator; + private transient BinaryInMemorySortBuffer buffer; + private transient MemorySegmentPool memorySegmentPool; + private transient SortOperatorGen sortOperatorGen; Review Comment: can be a local variable ########## hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBufferSort.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.sink.append; + +import org.apache.flink.table.data.binary.BinaryRowData; +import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator; +import org.apache.flink.table.runtime.generated.GeneratedNormalizedKeyComputer; +import org.apache.flink.table.runtime.generated.GeneratedRecordComparator; +import org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer; +import org.apache.flink.table.runtime.util.MemorySegmentPool; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.sink.StreamWriteOperatorCoordinator; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.Collector; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory; +import org.apache.hudi.sink.bulk.sort.SortOperatorGen; +import org.apache.hudi.sink.utils.BufferUtils; +import org.apache.flink.runtime.operators.sort.QuickSort; +import org.apache.hudi.util.MutableIteratorWrapperIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; + +/** + * Sink function to write the data to the underneath filesystem with buffer sort + * to improve the parquet compression rate. + * + * <p>The function writes base files directly for each checkpoint, + * the file may roll over when it’s size hits the configured threshold. + * + * @param <T> Type of the input record + * @see StreamWriteOperatorCoordinator + */ +public class AppendWriteFunctionWithBufferSort<T> extends AppendWriteFunction<T> { + private static final Logger LOG = LoggerFactory.getLogger(AppendWriteFunctionWithBufferSort.class); + private final long writeBufferSize; + private final List<String> sortKeyList; + private final GeneratedNormalizedKeyComputer keyComputer; + private final GeneratedRecordComparator recordComparator; + private transient BinaryInMemorySortBuffer buffer; + private transient MemorySegmentPool memorySegmentPool; + private transient SortOperatorGen sortOperatorGen; + + public AppendWriteFunctionWithBufferSort(Configuration config, RowType rowType) { + super(config, rowType); + this.writeBufferSize = config.get(FlinkOptions.WRITE_BUFFER_SIZE); + String sortKeys = config.get(FlinkOptions.WRITE_BUFFER_SORT_KEYS); + if (sortKeys == null) { + throw new IllegalArgumentException("Sort keys can't be null for append write with buffer sort."); + } + this.sortKeyList = Arrays.stream(sortKeys.split(",")).map(key -> key.trim()).collect(Collectors.toList()); + this.sortOperatorGen = new SortOperatorGen(rowType, sortKeyList.toArray(new String[0])); + SortCodeGenerator codeGenerator = sortOperatorGen.createSortCodeGenerator(); + this.keyComputer = codeGenerator.generateNormalizedKeyComputer("SortComputer"); + this.recordComparator = codeGenerator.generateRecordComparator("SortComparator"); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + this.memorySegmentPool = MemorySegmentPoolFactory.createMemorySegmentPool(config); + this.buffer = BufferUtils.createBuffer(rowType, + memorySegmentPool, + this.keyComputer.newInstance(Thread.currentThread().getContextClassLoader()), + this.recordComparator.newInstance(Thread.currentThread().getContextClassLoader())); + } + + @Override + public void processElement(T value, Context ctx, Collector<RowData> out) throws Exception { + RowData data = (RowData) value; + boolean result = buffer.write(data); + // If write result is false or buffer hit size limit + if (!result || buffer.size() >= writeBufferSize) { + sortAndSend(); + } + } + + @Override + public void snapshotState() { + try { + sortAndSend(); + } catch (IOException e) { + LOG.error("Fail to sort and flush data in buffer during snapshot state."); + throw new FlinkRuntimeException(e); + } + super.snapshotState(); + } + + /** + * For append writing, the flushing can be triggered with two conditions: + * 1. Checkpoint trigger. in which current remaining data in buffer are flushed and committed. + * 2. Binary buffer is full. set the size of buffer as the max size of parquet file, Review Comment: Nit: newline for "Set the size" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
