This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new bbda2428bfd1 feat(flink): Support write buffer based on flink managed
memory (#18319)
bbda2428bfd1 is described below
commit bbda2428bfd1ef80759eaae5bdc20fa078a5e95c
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Mar 19 09:55:22 2026 +0800
feat(flink): Support write buffer based on flink managed memory (#18319)
---
.../apache/hudi/configuration/FlinkOptions.java | 12 +++
.../apache/hudi/configuration/OptionsResolver.java | 27 ++++++
.../org/apache/hudi/sink/StreamWriteFunction.java | 15 ++-
.../AppendWriteFunctionWithBIMBufferSort.java | 17 +++-
...AppendWriteFunctionWithDisruptorBufferSort.java | 8 +-
.../apache/hudi/sink/buffer/BufferMemoryType.java | 43 ++++++++
.../hudi/sink/buffer/MemorySegmentPoolFactory.java | 65 +++++++++----
.../hudi/sink/bulk/BulkInsertWriteFunction.java | 8 ++
.../sink/common/AbstractStreamWriteFunction.java | 6 ++
.../hudi/sink/common/AbstractWriteFunction.java | 12 +++
.../hudi/sink/common/AbstractWriteOperator.java | 18 ++++
.../sink/partitioner/index/IndexWriteFunction.java | 10 +-
.../java/org/apache/hudi/sink/utils/Pipelines.java | 31 +++++-
.../org/apache/hudi/table/HoodieTableFactory.java | 5 +
.../sink/buffer/TestMemorySegmentPoolFactory.java | 108 +++++++++++++++++++++
.../utils/BucketStreamWriteFunctionWrapper.java | 4 +-
.../sink/utils/StreamWriteFunctionWrapper.java | 5 +-
.../apache/hudi/table/ITTestHoodieDataSource.java | 50 ++++++++++
.../main/java/org/apache/hudi/adapter/Utils.java | 25 +++++
.../main/java/org/apache/hudi/adapter/Utils.java | 25 +++++
.../main/java/org/apache/hudi/adapter/Utils.java | 26 +++++
.../main/java/org/apache/hudi/adapter/Utils.java | 26 +++++
.../main/java/org/apache/hudi/adapter/Utils.java | 26 +++++
.../main/java/org/apache/hudi/adapter/Utils.java | 26 +++++
24 files changed, 562 insertions(+), 36 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 461f00475b20..93f11df956f9 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -40,6 +40,7 @@ import org.apache.hudi.hive.ddl.HiveSyncMode;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.keygen.constant.KeyGeneratorType;
+import org.apache.hudi.sink.buffer.BufferMemoryType;
import org.apache.hudi.sink.overwrite.PartitionOverwriteMode;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
import org.apache.hudi.util.ClientIds;
@@ -707,6 +708,17 @@ public class FlinkOptions extends HoodieConfig {
.withDescription("Maximum memory in MB for a write task, when the
threshold hits,\n"
+ "it flushes the max size data bucket to avoid OOM, default 1GB");
+ @AdvancedConfig
+ public static final ConfigOption<String> WRITE_BUFFER_MEMORY_TYPE =
ConfigOptions
+ .key("write.buffer.memory.type")
+ .stringType()
+ .defaultValue(BufferMemoryType.ON_HEAP.name())
+ .withDescription("The memory type used for the write buffer. "
+ + "Supported values are ON_HEAP (default) and MANAGED. "
+ + "ON_HEAP uses JVM heap memory, while MANAGED uses Flink managed
memory "
+ + "which is accounted for in the task manager's memory budget "
+ + "and helps avoid OOM errors in containerized environments.");
+
@AdvancedConfig
public static final ConfigOption<Boolean> WRITE_BUFFER_SORT_ENABLED =
ConfigOptions
.key("write.buffer.sort.enabled")
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 7e455fa48015..57f031bbd85e 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -41,6 +41,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.bucket.partition.PartitionBucketIndexUtils;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.sink.buffer.BufferMemoryType;
import org.apache.hudi.sink.overwrite.PartitionOverwriteMode;
import org.apache.hudi.table.format.FilePathUtils;
import org.apache.hudi.table.format.HoodieFlinkIOFactory;
@@ -604,4 +605,30 @@ public class OptionsResolver {
public static int indexWriteParallelism(Configuration conf) {
return OptionsResolver.isStreamingIndexWriteEnabled(conf) ?
conf.get(FlinkOptions.INDEX_WRITE_TASKS) : 0;
}
+
+ /**
+ * Returns the write buffer size in bytes.
+ *
+ * @param conf the Flink configuration containing write memory settings
+ * @return the calculated write buffer size in bytes
+ */
+ public static long getWriteBufferSizeInBytes(Configuration conf) {
+ long mergeReaderMem = 100; // constant 100MB
+ long mergeMapMaxMem = conf.get(FlinkOptions.WRITE_MERGE_MAX_MEMORY);
+ long maxBufferSize = (long) ((conf.get(FlinkOptions.WRITE_TASK_MAX_SIZE) -
mergeReaderMem - mergeMapMaxMem) * 1024 * 1024);
+ final String errMsg = String.format("'%s' should be at least greater than
'%s' plus merge reader memory(constant 100MB now)",
+ FlinkOptions.WRITE_TASK_MAX_SIZE.key(),
FlinkOptions.WRITE_MERGE_MAX_MEMORY.key());
+ ValidationUtils.checkState(maxBufferSize > 0, errMsg);
+ return maxBufferSize;
+ }
+
+ /**
+ * Whether the flink managed memory is used for the write buffer.
+ *
+ * @param conf the Flink configuration
+ * @return true if the flink managed memory is used for the write buffer.
+ */
+ public static boolean isManagedMemoryBufferEnabled(Configuration conf) {
+ return
BufferMemoryType.MANAGED.name().equalsIgnoreCase(conf.get(FlinkOptions.WRITE_BUFFER_MEMORY_TYPE));
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 5459b5ccf764..4dd8f785ac6b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -34,7 +34,6 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
-import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
import org.apache.hudi.sink.buffer.RowDataBucket;
import org.apache.hudi.sink.buffer.TotalSizeTracer;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
@@ -60,6 +59,7 @@ import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
+import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
@@ -200,7 +200,7 @@ public class StreamWriteFunction extends
AbstractStreamWriteFunction<HoodieFlink
private void initBuffer() {
this.buckets = new LinkedHashMap<>();
- this.memorySegmentPool =
MemorySegmentPoolFactory.createMemorySegmentPool(config);
+ this.memorySegmentPool =
this.memorySegmentPoolFactory.createMemorySegmentPool(config,
OptionsResolver.getWriteBufferSizeInBytes(config));
}
private void initWriteFunction() {
@@ -466,6 +466,17 @@ public class StreamWriteFunction extends
AbstractStreamWriteFunction<HoodieFlink
writeMetrics.registerMetrics();
}
+ @Override
+ public void close() throws Exception {
+ try {
+ if (this.memorySegmentPool instanceof Closeable) {
+ ((Closeable) this.memorySegmentPool).close();
+ }
+ } finally {
+ super.close();
+ }
+ }
+
// -------------------------------------------------------------------------
// Getter/Setter
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBIMBufferSort.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBIMBufferSort.java
index 9d7cc69c9691..89d4fd2c8f12 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBIMBufferSort.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBIMBufferSort.java
@@ -19,11 +19,11 @@
package org.apache.hudi.sink.append;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.buffer.BufferType;
-import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
import org.apache.hudi.sink.utils.BufferUtils;
import org.apache.hudi.util.MutableIteratorWrapperIterator;
@@ -41,6 +41,7 @@ import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
+import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
@@ -70,6 +71,7 @@ public class AppendWriteFunctionWithBIMBufferSort<T> extends
AppendWriteFunction
private final long writeBufferSize;
private transient BinaryInMemorySortBuffer activeBuffer;
private transient BinaryInMemorySortBuffer backgroundBuffer;
+ private transient MemorySegmentPool[] memorySegmentPools;
private transient ExecutorService asyncWriteExecutor;
private transient AtomicReference<CompletableFuture<Void>> asyncWriteTask;
private transient AtomicBoolean isBackgroundBufferBeingProcessed;
@@ -89,14 +91,14 @@ public class AppendWriteFunctionWithBIMBufferSort<T>
extends AppendWriteFunction
SortCodeGenerator codeGenerator =
sortOperatorGen.createSortCodeGenerator();
GeneratedNormalizedKeyComputer keyComputer =
codeGenerator.generateNormalizedKeyComputer("SortComputer");
GeneratedRecordComparator recordComparator =
codeGenerator.generateRecordComparator("SortComparator");
- MemorySegmentPool[] pools =
MemorySegmentPoolFactory.createMemorySegmentPools(config, 2);
+ this.memorySegmentPools =
this.memorySegmentPoolFactory.createMemorySegmentPools(config, 2,
OptionsResolver.getWriteBufferSizeInBytes(config));
this.activeBuffer = BufferUtils.createBuffer(rowType,
- pools[0],
+ memorySegmentPools[0],
keyComputer.newInstance(Thread.currentThread().getContextClassLoader()),
recordComparator.newInstance(Thread.currentThread().getContextClassLoader()));
this.backgroundBuffer = BufferUtils.createBuffer(rowType,
- pools[1],
+ memorySegmentPools[1],
keyComputer.newInstance(Thread.currentThread().getContextClassLoader()),
recordComparator.newInstance(Thread.currentThread().getContextClassLoader()));
@@ -229,6 +231,13 @@ public class AppendWriteFunctionWithBIMBufferSort<T>
extends AppendWriteFunction
if (asyncWriteExecutor != null && !asyncWriteExecutor.isShutdown()) {
asyncWriteExecutor.shutdown();
}
+ if (memorySegmentPools != null) {
+ for (MemorySegmentPool memorySegmentPool : memorySegmentPools) {
+ if (memorySegmentPool instanceof Closeable) {
+ ((Closeable) memorySegmentPool).close();
+ }
+ }
+ }
} finally {
super.close();
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithDisruptorBufferSort.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithDisruptorBufferSort.java
index 8154f23899cd..dffa8bf8b91f 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithDisruptorBufferSort.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithDisruptorBufferSort.java
@@ -21,11 +21,11 @@ package org.apache.hudi.sink.append;
import org.apache.hudi.common.util.queue.DisruptorMessageQueue;
import org.apache.hudi.common.util.queue.HoodieConsumer;
import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.buffer.BufferType;
-import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
import org.apache.hudi.sink.utils.BufferUtils;
import org.apache.hudi.util.MutableIteratorWrapperIterator;
@@ -43,6 +43,7 @@ import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;
+import java.io.Closeable;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
@@ -97,7 +98,7 @@ public class AppendWriteFunctionWithDisruptorBufferSort<T>
extends AppendWriteFu
SortCodeGenerator codeGenerator =
sortOperatorGen.createSortCodeGenerator();
this.keyComputer =
codeGenerator.generateNormalizedKeyComputer("SortComputer");
this.recordComparator =
codeGenerator.generateRecordComparator("SortComparator");
- this.memorySegmentPool =
MemorySegmentPoolFactory.createMemorySegmentPool(config);
+ this.memorySegmentPool =
this.memorySegmentPoolFactory.createMemorySegmentPool(config,
OptionsResolver.getWriteBufferSizeInBytes(config));
initDisruptorBuffer();
@@ -196,6 +197,9 @@ public class AppendWriteFunctionWithDisruptorBufferSort<T>
extends AppendWriteFu
if (disruptorQueue != null) {
disruptorQueue.close();
}
+ if (this.memorySegmentPool instanceof Closeable) {
+ ((Closeable) this.memorySegmentPool).close();
+ }
} finally {
super.close();
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/BufferMemoryType.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/BufferMemoryType.java
new file mode 100644
index 000000000000..b911052ce7d9
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/BufferMemoryType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.buffer;
+
+/**
+ * Enum representing the memory type used for the Flink write buffer.
+ *
+ * <p>This determines how memory is allocated for buffering records
+ * before they are flushed to storage during write operations.
+ *
+ * @see org.apache.hudi.configuration.FlinkOptions#WRITE_BUFFER_MEMORY_TYPE
+ */
+public enum BufferMemoryType {
+ /**
+ * Uses JVM heap memory for the write buffer.
+ * This is the default memory type.
+ */
+ ON_HEAP,
+
+ /**
+ * Uses Flink managed memory for the write buffer.
+ * Managed memory is controlled by the Flink framework and is accounted
+ * for in the task manager's memory budget, which helps avoid OOM errors
+ * in containerized environments.
+ */
+ MANAGED
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/MemorySegmentPoolFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/MemorySegmentPoolFactory.java
index ae262810515d..6b38b7697d03 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/MemorySegmentPoolFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/MemorySegmentPoolFactory.java
@@ -18,21 +18,39 @@
package org.apache.hudi.sink.buffer;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.table.runtime.util.LazyMemorySegmentPool;
import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import javax.annotation.Nullable;
+
/**
- * Factory to create {@code MemorySegmentPool}, currently only heap based
memory pool {@code HeapMemorySegmentPool}
- * is supported.
- *
- * <p> todo support memory segment pool based on flink managed memory,
currently support heap pool only, see HUDI-9189.
+ * Factory to create {@code MemorySegmentPool} instances.
+ * <p>
+ * Supports two types of memory pools:
+ * <ul>
+ * <li>Heap-based memory pool ({@code HeapMemorySegmentPool}) - uses JVM
heap memory</li>
+ * <li>Flink managed memory pool ({@code LazyMemorySegmentPool}) - uses
Flink's managed memory</li>
+ * </ul>
*/
public class MemorySegmentPoolFactory {
- public static MemorySegmentPool createMemorySegmentPool(Configuration conf) {
- return createMemorySegmentPools(conf, 1)[0];
+ private final Option<Object> owner;
+ private final Option<MemoryManager> memoryManager;
+ private final long managedMemorySize;
+
+ public MemorySegmentPoolFactory(@Nullable Object owner, @Nullable
MemoryManager memoryManager, long managedMemorySize) {
+ this.owner = Option.ofNullable(owner);
+ this.memoryManager = Option.ofNullable(memoryManager);
+ this.managedMemorySize = managedMemorySize;
+ }
+
+ public MemorySegmentPool createMemorySegmentPool(Configuration conf, long
heapMemorySize) {
+ return createMemorySegmentPools(conf, 1, heapMemorySize)[0];
}
/**
@@ -42,26 +60,37 @@ public class MemorySegmentPoolFactory {
* @param numPools number of pools to create
* @return array of memory segment pools, each with size = totalSize /
numPools
*/
- public static MemorySegmentPool[] createMemorySegmentPools(Configuration
conf, int numPools) {
- ValidationUtils.checkArgument(numPools > 0, "numPools must be positive");
- long mergeReaderMem = 100; // constant 100MB
- long mergeMapMaxMem = conf.get(FlinkOptions.WRITE_MERGE_MAX_MEMORY);
- long maxBufferSize = (long) ((conf.get(FlinkOptions.WRITE_TASK_MAX_SIZE) -
mergeReaderMem - mergeMapMaxMem) * 1024 * 1024);
- final String errMsg = String.format("'%s' should be at least greater than
'%s' plus merge reader memory(constant 100MB now)",
- FlinkOptions.WRITE_TASK_MAX_SIZE.key(),
FlinkOptions.WRITE_MERGE_MAX_MEMORY.key());
- ValidationUtils.checkState(maxBufferSize > 0, errMsg);
+ public MemorySegmentPool[] createMemorySegmentPools(Configuration conf, int
numPools, long heapMemorySize) {
+ if (memoryManager.isEmpty()) {
+ return createHeapMemoryPools(conf, numPools, heapMemorySize);
+ } else {
+ return createManagedMemoryPools(numPools);
+ }
+ }
+ private MemorySegmentPool[] createHeapMemoryPools(Configuration conf, int
numPools, long heapMemorySize) {
+ ValidationUtils.checkArgument(numPools > 0, "numPools must be positive");
+ ValidationUtils.checkArgument(heapMemorySize > 0, "Buffer size must be
positive");
int pageSize = conf.get(FlinkOptions.WRITE_MEMORY_SEGMENT_PAGE_SIZE);
- long poolSize = maxBufferSize / numPools;
+ long poolSize = heapMemorySize / numPools;
MemorySegmentPool[] pools = new MemorySegmentPool[numPools];
+ ValidationUtils.checkArgument(poolSize >= pageSize,
+ String.format("The total size %s of memory pool should not be less
than page size %s.", poolSize, pageSize));
for (int i = 0; i < numPools; i++) {
pools[i] = new HeapMemorySegmentPool(pageSize, poolSize);
}
return pools;
}
- public static MemorySegmentPool createMemorySegmentPool(Configuration conf,
long maxBufferSize) {
- ValidationUtils.checkArgument(maxBufferSize > 0, "Buffer size should be a
positive number.");
- return new
HeapMemorySegmentPool(conf.get(FlinkOptions.WRITE_MEMORY_SEGMENT_PAGE_SIZE),
maxBufferSize * 1024 * 1024);
+ private MemorySegmentPool[] createManagedMemoryPools(int numPools) {
+ long poolSize = managedMemorySize / numPools;
+ MemorySegmentPool[] pools = new MemorySegmentPool[numPools];
+ int pageSize = memoryManager.get().getPageSize();
+ ValidationUtils.checkArgument(poolSize >= pageSize,
+ String.format("The total size %s of memory pool should not be less
than page size %s.", poolSize, pageSize));
+ for (int i = 0; i < numPools; i++) {
+ pools[i] = new LazyMemorySegmentPool(owner.get(), memoryManager.get(),
(int) (poolSize / pageSize));
+ }
+ return pools;
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
index 89453464392a..593b00333dcf 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
@@ -22,12 +22,14 @@ import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
import org.apache.hudi.sink.common.AbstractWriteFunction;
import org.apache.hudi.sink.event.Correspondent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.util.FlinkWriteClients;
import org.apache.hudi.utils.RuntimeContextUtils;
+import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
@@ -64,6 +66,7 @@ public class BulkInsertWriteFunction<I>
/**
* Config options.
*/
+ @Getter
private final Configuration config;
/**
@@ -122,6 +125,11 @@ public class BulkInsertWriteFunction<I>
}
}
+ @Override
+ public void setMemorySegmentPoolFactory(MemorySegmentPoolFactory
memorySegmentPoolFactory) {
+ // do nothing
+ }
+
/**
* End input action for batch source.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
index db99f95f0d0f..68bb2a7dc546 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
import org.apache.hudi.sink.event.CommitAckEvent;
import org.apache.hudi.sink.event.Correspondent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
@@ -64,6 +65,7 @@ public abstract class AbstractStreamWriteFunction<I>
/**
* Config options.
*/
+ @Getter
protected final Configuration config;
/**
@@ -98,6 +100,10 @@ public abstract class AbstractStreamWriteFunction<I>
@Setter
protected transient Correspondent correspondent;
+ @Getter
+ @Setter
+ protected transient MemorySegmentPoolFactory memorySegmentPoolFactory;
+
/**
* Gateway to send operator events to the operator coordinator.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java
index 437e1517eabb..399cd87db884 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java
@@ -19,8 +19,10 @@
package org.apache.hudi.sink.common;
import org.apache.hudi.adapter.ProcessFunctionAdapter;
+import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
import org.apache.hudi.sink.event.Correspondent;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
@@ -37,6 +39,16 @@ public abstract class AbstractWriteFunction<I> extends
ProcessFunctionAdapter<I,
*/
public abstract void setCorrespondent(Correspondent correspondent);
+ /**
+ * Sets up the {@code MemorySegmentPoolFactory} for creating write buffer.
+ */
+ public abstract void setMemorySegmentPoolFactory(MemorySegmentPoolFactory
memorySegmentPoolFactory);
+
+ /**
+ * Get the flink write configuration.
+ */
+ public abstract Configuration getConfig();
+
/**
* Sets up the event gateway.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java
index 0513ab6c5632..3f74002df575 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java
@@ -18,13 +18,20 @@
package org.apache.hudi.sink.common;
+import org.apache.hudi.adapter.Utils;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
import org.apache.hudi.sink.event.Correspondent;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.table.data.RowData;
/**
@@ -42,6 +49,17 @@ public abstract class AbstractWriteOperator<I>
this.function = function;
}
+ @Override
+ public void setup(StreamTask<?, ?> containingTask, StreamConfig config,
Output<StreamRecord<RowData>> output) {
+ super.setup(containingTask, config, output);
+
+ MemorySegmentPoolFactory memoryPoolFactory =
+ OptionsResolver.isManagedMemoryBufferEnabled(this.function.getConfig())
+ ? new MemorySegmentPoolFactory(containingTask,
containingTask.getEnvironment().getMemoryManager(),
Utils.computeManagedMemory(this))
+ : new MemorySegmentPoolFactory(null, null, -1);
+ this.function.setMemorySegmentPoolFactory(memoryPoolFactory);
+ }
+
public void setCorrespondent(Correspondent correspondent) {
this.function.setCorrespondent(correspondent);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
index 63ca8229bc79..ee06ac12ed5a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
@@ -26,7 +26,6 @@ import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.exception.MemoryPagesExhaustedException;
@@ -43,6 +42,7 @@ import
org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer;
import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.util.Collector;
+import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
@@ -91,7 +91,9 @@ public class IndexWriteFunction extends
AbstractStreamWriteFunction<RowData> {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.flinkTable = this.writeClient.getHoodieTable();
- this.memorySegmentPool =
MemorySegmentPoolFactory.createMemorySegmentPool(config,
config.get(FlinkOptions.INDEX_RLI_WRITE_BUFFER_SIZE));
+ this.memorySegmentPool =
this.memorySegmentPoolFactory.createMemorySegmentPool(
+ config,
+ config.get(FlinkOptions.INDEX_RLI_WRITE_BUFFER_SIZE) * 1024 * 1024);
this.indexDataBuffer =
BufferUtils.createBuffer(IndexRowUtils.INDEX_ROW_TYPE, memorySegmentPool);
}
@@ -196,7 +198,9 @@ public class IndexWriteFunction extends
AbstractStreamWriteFunction<RowData> {
@Override
public void close() throws Exception {
this.indexDataBuffer.dispose();
- this.memorySegmentPool.freePages();
+ if (this.memorySegmentPool instanceof Closeable) {
+ ((Closeable) this.memorySegmentPool).close();
+ }
super.close();
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 5390c482ac5a..34d016f94960 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -30,12 +30,14 @@ import
org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.sink.CleanFunction;
import org.apache.hudi.sink.StreamWriteOperator;
+import org.apache.hudi.sink.append.AppendWriteFunctions;
import org.apache.hudi.sink.append.AppendWriteOperator;
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator;
import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
import org.apache.hudi.sink.bucket.BucketStreamWriteOperator;
import org.apache.hudi.sink.bucket.ConsistentBucketAssignFunction;
+import org.apache.hudi.sink.buffer.BufferType;
import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
import org.apache.hudi.sink.bulk.RowDataKeyGen;
import org.apache.hudi.sink.bulk.RowDataKeyGens;
@@ -67,6 +69,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.table.data.RowData;
@@ -78,6 +81,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
import java.util.stream.Stream;
/**
@@ -220,10 +224,14 @@ public class Pipelines {
WriteOperatorFactory<RowData> operatorFactory =
AppendWriteOperator.getFactory(conf, rowType);
- return dataStream
+ SingleOutputStreamOperator<RowData> appendWriteDataStream = dataStream
.transform(opName("hoodie_append_write", conf),
TypeInformation.of(RowData.class), operatorFactory)
.uid(opUID("hoodie_stream_write", conf))
.setParallelism(conf.get(FlinkOptions.WRITE_TASKS));
+ if
(!BufferType.NONE.name().equalsIgnoreCase(AppendWriteFunctions.resolveBufferType(conf)))
{
+ declareManagedMemoryIfNecessary(conf, appendWriteDataStream, () ->
OptionsResolver.getWriteBufferSizeInBytes(conf));
+ }
+ return appendWriteDataStream;
}
/**
@@ -353,7 +361,7 @@ public class Pipelines {
// [HUDI-9036] BucketIndexPartitioner is also used in bulk insert
mode,
// keep use of HoodieKey here in partitionCustom for now
BucketIndexPartitioner<HoodieKey> partitioner = new
BucketIndexPartitioner<>(conf, indexKeyFields);
- return dataStream
+ SingleOutputStreamOperator<RowData> bucketWriteStream = dataStream
.partitionCustom(
partitioner,
record -> new HoodieKey(record.getRecordKey(),
record.getPartitionPath()))
@@ -363,12 +371,14 @@ public class Pipelines {
BucketStreamWriteOperator.getFactory(conf, rowType))
.uid(opUID("bucket_write", conf))
.setParallelism(conf.get(FlinkOptions.WRITE_TASKS));
+ declareManagedMemoryIfNecessary(conf, bucketWriteStream, () ->
OptionsResolver.getWriteBufferSizeInBytes(conf));
+ return bucketWriteStream;
case CONSISTENT_HASHING:
if (OptionsResolver.isInsertOverwrite(conf)) {
// TODO support insert overwrite for consistent bucket index
throw new HoodieException("Consistent hashing bucket index does
not work with insert overwrite using FLINK engine. Use simple bucket index or
Spark engine.");
}
- return dataStream
+ SingleOutputStreamOperator<RowData> consistentBucketWriteStream =
dataStream
.transform(
opName("consistent_bucket_assigner", conf),
new HoodieFlinkInternalRowTypeInfo(rowType),
@@ -382,6 +392,8 @@ public class Pipelines {
BucketStreamWriteOperator.getFactory(conf, rowType))
.uid(opUID("consistent_bucket_write", conf))
.setParallelism(conf.get(FlinkOptions.WRITE_TASKS));
+ declareManagedMemoryIfNecessary(conf, consistentBucketWriteStream,
() -> OptionsResolver.getWriteBufferSizeInBytes(conf));
+ return consistentBucketWriteStream;
default:
throw new HoodieNotSupportedException("Unknown bucket index engine
type: " + bucketIndexEngineType);
}
@@ -392,7 +404,7 @@ public class Pipelines {
String writeOperatorUid = opUID("stream_write", conf);
DataStream<HoodieFlinkInternalRow> bucketAssignStream =
createBucketAssignStream(dataStream, conf, rowType, writeOperatorUid);
boolean isStreamingIndexWriteEnabled =
OptionsResolver.isStreamingIndexWriteEnabled(conf);
- DataStream<RowData> writeDatastream =
+ SingleOutputStreamOperator<RowData> writeDatastream =
bucketAssignStream
// shuffle by fileId(bucket id)
.keyBy(HoodieFlinkInternalRow::getFileId)
@@ -402,9 +414,10 @@ public class Pipelines {
StreamWriteOperator.getFactory(conf, rowType))
.uid(writeOperatorUid)
.setParallelism(conf.get(FlinkOptions.WRITE_TASKS));
+ declareManagedMemoryIfNecessary(conf, writeDatastream, () ->
OptionsResolver.getWriteBufferSizeInBytes(conf));
if (isStreamingIndexWriteEnabled) {
// index writing pipeline
- return writeDatastream
+ SingleOutputStreamOperator<RowData> indexWriteDatastream =
writeDatastream
.partitionCustom(new RecordIndexPartitioner(conf),
IndexRowUtils::getHoodieKey)
.transform(
opName("index_write", conf),
@@ -412,6 +425,8 @@ public class Pipelines {
new IndexWriteOperator(conf,
OperatorIDGenerator.fromUid(writeOperatorUid)))
.uid(opUID("index_write", conf))
.setParallelism(conf.get(FlinkOptions.INDEX_WRITE_TASKS));
+ declareManagedMemoryIfNecessary(conf, indexWriteDatastream, () ->
conf.get(FlinkOptions.INDEX_RLI_WRITE_BUFFER_SIZE) * 1024L * 1024L);
+ return indexWriteDatastream;
} else {
return writeDatastream;
}
@@ -563,6 +578,12 @@ public class Pipelines {
: databaseName + "." + conf.get(FlinkOptions.TABLE_NAME);
}
+ public static void declareManagedMemoryIfNecessary(Configuration conf,
DataStream<?> dataStream, Supplier<Long> bufferSizeSupplier) {
+ if (OptionsResolver.isManagedMemoryBufferEnabled(conf)) {
+ ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(),
bufferSizeSupplier.get());
+ }
+ }
+
/**
* Dummy sink that does nothing.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 3cc72934e88d..7580fd80ce56 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -413,6 +413,11 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
"true");
conf.set(FlinkOptions.INDEX_GLOBAL_ENABLED, true);
conf.setString(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(),
"true");
+ // generally size of index data is much smaller than data record, so set
the buffer size of
+ // the index writer as 1/4 of that for data writer if it's not set by
user explicitly.
+ if (!conf.contains(FlinkOptions.INDEX_RLI_WRITE_BUFFER_SIZE)) {
+ conf.set(FlinkOptions.INDEX_RLI_WRITE_BUFFER_SIZE,
OptionsResolver.getWriteBufferSizeInBytes(conf) / 1024 / 1024 / 4);
+ }
} else {
conf.setString(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(),
"false");
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/buffer/TestMemorySegmentPoolFactory.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/buffer/TestMemorySegmentPoolFactory.java
new file mode 100644
index 000000000000..29dc0225f5b3
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/buffer/TestMemorySegmentPoolFactory.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.buffer;
+
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.table.runtime.util.LazyMemorySegmentPool;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for {@link MemorySegmentPoolFactory}.
+ */
+public class TestMemorySegmentPoolFactory {
+
+ @Test
+ public void testCreateHeapMemoryPool() {
+ Configuration conf = new Configuration();
+ conf.set(FlinkOptions.WRITE_MEMORY_SEGMENT_PAGE_SIZE, 32 * 1024);
+
+ MemorySegmentPoolFactory factory = new MemorySegmentPoolFactory(null,
null, 0);
+ MemorySegmentPool pool = factory.createMemorySegmentPool(conf, 1024 *
1024L);
+
+ assertNotNull(pool);
+ assertTrue(pool instanceof HeapMemorySegmentPool);
+ }
+
+ @Test
+ public void testCreateMultipleHeapMemoryPools() {
+ Configuration conf = new Configuration();
+ conf.set(FlinkOptions.WRITE_MEMORY_SEGMENT_PAGE_SIZE, 32 * 1024);
+
+ MemorySegmentPoolFactory factory = new MemorySegmentPoolFactory(null,
null, 0);
+ MemorySegmentPool[] pools = factory.createMemorySegmentPools(conf, 4, 4 *
1024 * 1024L);
+
+ assertNotNull(pools);
+ assertEquals(4, pools.length);
+ for (MemorySegmentPool pool : pools) {
+ assertNotNull(pool);
+ assertTrue(pool instanceof HeapMemorySegmentPool);
+ }
+ }
+
+ @Test
+ public void testCreateManagedMemoryPool() {
+ Configuration conf = new Configuration();
+ conf.set(FlinkOptions.WRITE_MEMORY_SEGMENT_PAGE_SIZE, 32 * 1024);
+
+ MemoryManager memoryManager = mock(MemoryManager.class);
+ when(memoryManager.getPageSize()).thenReturn(32 * 1024);
+
+ Object owner = new Object();
+ long managedMemorySize = 2 * 1024 * 1024L;
+
+ MemorySegmentPoolFactory factory = new MemorySegmentPoolFactory(owner,
memoryManager, managedMemorySize);
+ MemorySegmentPool pool = factory.createMemorySegmentPool(conf, 1024 *
1024L);
+
+ assertNotNull(pool);
+ assertTrue(pool instanceof LazyMemorySegmentPool);
+ }
+
+ @Test
+ public void testCreateMultipleManagedMemoryPools() {
+ Configuration conf = new Configuration();
+ conf.set(FlinkOptions.WRITE_MEMORY_SEGMENT_PAGE_SIZE, 32 * 1024);
+
+ MemoryManager memoryManager = mock(MemoryManager.class);
+ when(memoryManager.getPageSize()).thenReturn(32 * 1024);
+
+ Object owner = new Object();
+ long managedMemorySize = 4 * 1024 * 1024L;
+
+ MemorySegmentPoolFactory factory = new MemorySegmentPoolFactory(owner,
memoryManager, managedMemorySize);
+ MemorySegmentPool[] pools = factory.createMemorySegmentPools(conf, 4, 1024
* 1024L);
+
+ assertNotNull(pools);
+ assertEquals(4, pools.length);
+ for (MemorySegmentPool pool : pools) {
+ assertNotNull(pool);
+ assertTrue(pool instanceof LazyMemorySegmentPool);
+ }
+ }
+}
+
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
index 4181e72bf206..80f6e7e33471 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
@@ -26,6 +26,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.bucket.BucketStreamWriteFunction;
+import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
import org.apache.hudi.sink.common.AbstractWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
@@ -215,8 +216,9 @@ public class BucketStreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<
writeFunction.setRuntimeContext(runtimeContext);
writeFunction.setOperatorEventGateway(gateway);
writeFunction.initializeState(this.stateInitializationContext);
- writeFunction.open(conf);
writeFunction.setCorrespondent(new MockCorrespondent(this.coordinator));
+ writeFunction.setMemorySegmentPoolFactory(new
MemorySegmentPoolFactory(null, null, -1));
+ writeFunction.open(conf);
}
protected StreamWriteFunction createWriteFunction() {
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 5d0bd8cd6157..254592556669 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -29,6 +29,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.sink.StreamWriteFunction;
import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
import org.apache.hudi.sink.bootstrap.BootstrapOperator;
+import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
import org.apache.hudi.sink.common.AbstractWriteFunction;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.partitioner.BucketAssignFunction;
@@ -395,14 +396,16 @@ public class StreamWriteFunctionWrapper<I> implements
TestFunctionWrapper<I> {
writeFunction.setRuntimeContext(runtimeContext);
writeFunction.setOperatorEventGateway(gateway);
writeFunction.initializeState(this.stateInitializationContext);
- writeFunction.open(conf);
+ writeFunction.setMemorySegmentPoolFactory(new
MemorySegmentPoolFactory(null, null, -1));
writeFunction.setCorrespondent(correspondent);
+ writeFunction.open(conf);
}
private void setupIndexWriteFunction() throws Exception {
indexWriteFunction = new IndexWriteFunction(conf);
indexWriteFunction.setRuntimeContext(runtimeContext);
indexWriteFunction.setCorrespondent(correspondent);
+ indexWriteFunction.setMemorySegmentPoolFactory(new
MemorySegmentPoolFactory(null, null, -1));
indexWriteFunction.initializeState(this.indexStateInitializationContext);
indexWriteFunction.open(conf);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index fb096511f0a8..4a240490f7d4 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -35,6 +35,8 @@ import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.bucket.partition.PartitionBucketIndexUtils;
import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.sink.buffer.BufferMemoryType;
+import org.apache.hudi.sink.buffer.BufferType;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.catalog.HoodieCatalogTestUtils;
import org.apache.hudi.table.catalog.HoodieHiveCatalog;
@@ -3074,6 +3076,54 @@ public class ITTestHoodieDataSource {
assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT);
}
+ @ParameterizedTest
+ @EnumSource(value = HoodieTableType.class)
+ void testStreamWriteWithManagedMemory(HoodieTableType tableType) throws
Exception {
+ // create filesystem table named source
+ String createSource = TestConfigurations.getFileSourceDDL("source");
+ streamTableEnv.executeSql(createSource);
+
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
+ .option(FlinkOptions.TABLE_TYPE, tableType)
+ .option(FlinkOptions.WRITE_BUFFER_MEMORY_TYPE,
BufferMemoryType.MANAGED)
+ .end();
+ streamTableEnv.executeSql(hoodieTableDDL);
+
+ String insertInto = "insert into t1 select * from source";
+ execInsertSql(streamTableEnv, insertInto);
+
+ List<Row> result = CollectionUtil.iteratorToList(
+ streamTableEnv.sqlQuery("select * from t1").execute().collect());
+ assertRowsEquals(result, TestData.DATA_SET_SOURCE_INSERT);
+ }
+
+ @ParameterizedTest
+ @EnumSource(value = BufferType.class, names = {"BOUNDED_IN_MEMORY",
"DISRUPTOR"})
+ void testAppendWriteWithManagedMemory(BufferType bufferType) {
+ // create filesystem table named source
+ String createSource = TestConfigurations.getFileSourceDDL("source");
+ streamTableEnv.executeSql(createSource);
+
+ String hoodieTableDDL = sql("t1")
+ .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+ .options(getDefaultKeys())
+ .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
+ .option(FlinkOptions.OPERATION, "insert")
+ .option(FlinkOptions.WRITE_BUFFER_MEMORY_TYPE,
BufferMemoryType.MANAGED)
+ .option(FlinkOptions.WRITE_BUFFER_TYPE, bufferType.name())
+ .end();
+ streamTableEnv.executeSql(hoodieTableDDL);
+
+ String insertInto = "insert into t1 select * from source";
+ execInsertSql(streamTableEnv, insertInto);
+
+ List<Row> result = CollectionUtil.iteratorToList(
+ streamTableEnv.sqlQuery("select * from t1").execute().collect());
+ assertRowsEquals(result, TestData.DATA_SET_SOURCE_INSERT);
+ }
+
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
diff --git
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/Utils.java
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/Utils.java
index 61c6d0838415..51a58713dc72 100644
---
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/Utils.java
+++
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/Utils.java
@@ -26,8 +26,11 @@ import
org.apache.hudi.internal.schema.action.InternalSchemaChangeApplier;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -90,6 +93,28 @@ public class Utils {
return newSchema;
}
+ /**
+ * Computes the managed memory size available for the given stream operator.
+ * <p>
+ * This method retrieves the memory manager from the operator's execution
environment
+ * and calculates the memory size allocated for the operator based on the
configured
+ * managed memory fraction and the task manager's memory settings.
+ *
+ * @param operator the stream operator for which to compute managed memory
+ * @return the computed managed memory size in bytes
+ */
+ public static long computeManagedMemory(AbstractStreamOperator<?> operator) {
+ final Environment environment =
operator.getContainingTask().getEnvironment();
+ return environment
+ .getMemoryManager()
+ .computeMemorySize(
+ operator.getOperatorConfig()
+ .getManagedMemoryFractionOperatorUseCaseOfSlot(
+ ManagedMemoryUseCase.OPERATOR,
+ environment.getJobConfiguration(),
+ environment.getUserCodeClassLoader().asClassLoader()));
+ }
+
private static InternalSchema applyTableChange(InternalSchema oldSchema,
TableChange change, Function<LogicalType, Type> convertFunc) {
InternalSchemaChangeApplier changeApplier = new
InternalSchemaChangeApplier(oldSchema);
if (change instanceof TableChange.AddColumn) {
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/Utils.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/Utils.java
index e05407972130..f8cb836c0895 100644
---
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/Utils.java
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/Utils.java
@@ -26,8 +26,11 @@ import
org.apache.hudi.internal.schema.action.InternalSchemaChangeApplier;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -90,6 +93,28 @@ public class Utils {
return newSchema;
}
+ /**
+ * Computes the managed memory size available for the given stream operator.
+ * <p>
+ * This method retrieves the memory manager from the operator's execution
environment
+ * and calculates the memory size allocated for the operator based on the
configured
+ * managed memory fraction and the task manager's memory settings.
+ *
+ * @param operator the stream operator for which to compute managed memory
+ * @return the computed managed memory size in bytes
+ */
+ public static long computeManagedMemory(AbstractStreamOperator<?> operator) {
+ final Environment environment =
operator.getContainingTask().getEnvironment();
+ return environment
+ .getMemoryManager()
+ .computeMemorySize(
+ operator.getOperatorConfig()
+ .getManagedMemoryFractionOperatorUseCaseOfSlot(
+ ManagedMemoryUseCase.OPERATOR,
+ environment.getJobConfiguration(),
+ environment.getUserCodeClassLoader().asClassLoader()));
+ }
+
private static InternalSchema applyTableChange(InternalSchema oldSchema,
TableChange change, Function<LogicalType, Type> convertFunc) {
InternalSchemaChangeApplier changeApplier = new
InternalSchemaChangeApplier(oldSchema);
if (change instanceof TableChange.AddColumn) {
diff --git
a/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/adapter/Utils.java
b/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/adapter/Utils.java
index e05407972130..5454e29c1920 100644
---
a/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/adapter/Utils.java
+++
b/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/adapter/Utils.java
@@ -26,8 +26,11 @@ import
org.apache.hudi.internal.schema.action.InternalSchemaChangeApplier;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -90,6 +93,29 @@ public class Utils {
return newSchema;
}
+ /**
+ * Computes the managed memory size available for the given stream operator.
+ * <p>
+ * This method retrieves the memory manager from the operator's execution
environment
+ * and calculates the memory size allocated for the operator based on the
configured
+ * managed memory fraction and the task manager's memory settings.
+ *
+ * @param operator the stream operator for which to compute managed memory
+ * @return the computed managed memory size in bytes
+ */
+ public static long computeManagedMemory(AbstractStreamOperator<?> operator) {
+ final Environment environment =
operator.getContainingTask().getEnvironment();
+ return environment
+ .getMemoryManager()
+ .computeMemorySize(
+ operator.getOperatorConfig()
+ .getManagedMemoryFractionOperatorUseCaseOfSlot(
+ ManagedMemoryUseCase.OPERATOR,
+ environment.getJobConfiguration(),
+ environment.getTaskManagerInfo().getConfiguration(),
+ environment.getUserCodeClassLoader().asClassLoader()));
+ }
+
private static InternalSchema applyTableChange(InternalSchema oldSchema,
TableChange change, Function<LogicalType, Type> convertFunc) {
InternalSchemaChangeApplier changeApplier = new
InternalSchemaChangeApplier(oldSchema);
if (change instanceof TableChange.AddColumn) {
diff --git
a/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/adapter/Utils.java
b/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/adapter/Utils.java
index e05407972130..5454e29c1920 100644
---
a/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/adapter/Utils.java
+++
b/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/adapter/Utils.java
@@ -26,8 +26,11 @@ import
org.apache.hudi.internal.schema.action.InternalSchemaChangeApplier;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -90,6 +93,29 @@ public class Utils {
return newSchema;
}
+ /**
+ * Computes the managed memory size available for the given stream operator.
+ * <p>
+ * This method retrieves the memory manager from the operator's execution
environment
+ * and calculates the memory size allocated for the operator based on the
configured
+ * managed memory fraction and the task manager's memory settings.
+ *
+ * @param operator the stream operator for which to compute managed memory
+ * @return the computed managed memory size in bytes
+ */
+ public static long computeManagedMemory(AbstractStreamOperator<?> operator) {
+ final Environment environment =
operator.getContainingTask().getEnvironment();
+ return environment
+ .getMemoryManager()
+ .computeMemorySize(
+ operator.getOperatorConfig()
+ .getManagedMemoryFractionOperatorUseCaseOfSlot(
+ ManagedMemoryUseCase.OPERATOR,
+ environment.getJobConfiguration(),
+ environment.getTaskManagerInfo().getConfiguration(),
+ environment.getUserCodeClassLoader().asClassLoader()));
+ }
+
private static InternalSchema applyTableChange(InternalSchema oldSchema,
TableChange change, Function<LogicalType, Type> convertFunc) {
InternalSchemaChangeApplier changeApplier = new
InternalSchemaChangeApplier(oldSchema);
if (change instanceof TableChange.AddColumn) {
diff --git
a/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/adapter/Utils.java
b/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/adapter/Utils.java
index e05407972130..5454e29c1920 100644
---
a/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/adapter/Utils.java
+++
b/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/adapter/Utils.java
@@ -26,8 +26,11 @@ import
org.apache.hudi.internal.schema.action.InternalSchemaChangeApplier;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -90,6 +93,29 @@ public class Utils {
return newSchema;
}
+ /**
+ * Computes the managed memory size available for the given stream operator.
+ * <p>
+ * This method retrieves the memory manager from the operator's execution
environment
+ * and calculates the memory size allocated for the operator based on the
configured
+ * managed memory fraction and the task manager's memory settings.
+ *
+ * @param operator the stream operator for which to compute managed memory
+ * @return the computed managed memory size in bytes
+ */
+ public static long computeManagedMemory(AbstractStreamOperator<?> operator) {
+ final Environment environment =
operator.getContainingTask().getEnvironment();
+ return environment
+ .getMemoryManager()
+ .computeMemorySize(
+ operator.getOperatorConfig()
+ .getManagedMemoryFractionOperatorUseCaseOfSlot(
+ ManagedMemoryUseCase.OPERATOR,
+ environment.getJobConfiguration(),
+ environment.getTaskManagerInfo().getConfiguration(),
+ environment.getUserCodeClassLoader().asClassLoader()));
+ }
+
private static InternalSchema applyTableChange(InternalSchema oldSchema,
TableChange change, Function<LogicalType, Type> convertFunc) {
InternalSchemaChangeApplier changeApplier = new
InternalSchemaChangeApplier(oldSchema);
if (change instanceof TableChange.AddColumn) {
diff --git
a/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/adapter/Utils.java
b/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/adapter/Utils.java
index e05407972130..516ff9073e1b 100644
---
a/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/adapter/Utils.java
+++
b/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/adapter/Utils.java
@@ -27,7 +27,10 @@ import
org.apache.hudi.internal.schema.action.InternalSchemaChangeApplier;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -90,6 +93,29 @@ public class Utils {
return newSchema;
}
+ /**
+ * Computes the managed memory size available for the given stream operator.
+ * <p>
+ * This method retrieves the memory manager from the operator's execution
environment
+ * and calculates the memory size allocated for the operator based on the
configured
+ * managed memory fraction and the task manager's memory settings.
+ *
+ * @param operator the stream operator for which to compute managed memory
+ * @return the computed managed memory size in bytes
+ */
+ public static long computeManagedMemory(AbstractStreamOperator<?> operator) {
+ final Environment environment =
operator.getContainingTask().getEnvironment();
+ return environment
+ .getMemoryManager()
+ .computeMemorySize(
+ operator.getOperatorConfig()
+ .getManagedMemoryFractionOperatorUseCaseOfSlot(
+ ManagedMemoryUseCase.OPERATOR,
+ environment.getJobConfiguration(),
+ environment.getTaskManagerInfo().getConfiguration(),
+ environment.getUserCodeClassLoader().asClassLoader()));
+ }
+
private static InternalSchema applyTableChange(InternalSchema oldSchema,
TableChange change, Function<LogicalType, Type> convertFunc) {
InternalSchemaChangeApplier changeApplier = new
InternalSchemaChangeApplier(oldSchema);
if (change instanceof TableChange.AddColumn) {