This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bbda2428bfd1 feat(flink): Support write buffer based on flink managed 
memory (#18319)
bbda2428bfd1 is described below

commit bbda2428bfd1ef80759eaae5bdc20fa078a5e95c
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Mar 19 09:55:22 2026 +0800

    feat(flink): Support write buffer based on flink managed memory (#18319)
---
 .../apache/hudi/configuration/FlinkOptions.java    |  12 +++
 .../apache/hudi/configuration/OptionsResolver.java |  27 ++++++
 .../org/apache/hudi/sink/StreamWriteFunction.java  |  15 ++-
 .../AppendWriteFunctionWithBIMBufferSort.java      |  17 +++-
 ...AppendWriteFunctionWithDisruptorBufferSort.java |   8 +-
 .../apache/hudi/sink/buffer/BufferMemoryType.java  |  43 ++++++++
 .../hudi/sink/buffer/MemorySegmentPoolFactory.java |  65 +++++++++----
 .../hudi/sink/bulk/BulkInsertWriteFunction.java    |   8 ++
 .../sink/common/AbstractStreamWriteFunction.java   |   6 ++
 .../hudi/sink/common/AbstractWriteFunction.java    |  12 +++
 .../hudi/sink/common/AbstractWriteOperator.java    |  18 ++++
 .../sink/partitioner/index/IndexWriteFunction.java |  10 +-
 .../java/org/apache/hudi/sink/utils/Pipelines.java |  31 +++++-
 .../org/apache/hudi/table/HoodieTableFactory.java  |   5 +
 .../sink/buffer/TestMemorySegmentPoolFactory.java  | 108 +++++++++++++++++++++
 .../utils/BucketStreamWriteFunctionWrapper.java    |   4 +-
 .../sink/utils/StreamWriteFunctionWrapper.java     |   5 +-
 .../apache/hudi/table/ITTestHoodieDataSource.java  |  50 ++++++++++
 .../main/java/org/apache/hudi/adapter/Utils.java   |  25 +++++
 .../main/java/org/apache/hudi/adapter/Utils.java   |  25 +++++
 .../main/java/org/apache/hudi/adapter/Utils.java   |  26 +++++
 .../main/java/org/apache/hudi/adapter/Utils.java   |  26 +++++
 .../main/java/org/apache/hudi/adapter/Utils.java   |  26 +++++
 .../main/java/org/apache/hudi/adapter/Utils.java   |  26 +++++
 24 files changed, 562 insertions(+), 36 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 461f00475b20..93f11df956f9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -40,6 +40,7 @@ import org.apache.hudi.hive.ddl.HiveSyncMode;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.keygen.constant.KeyGeneratorType;
+import org.apache.hudi.sink.buffer.BufferMemoryType;
 import org.apache.hudi.sink.overwrite.PartitionOverwriteMode;
 import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
 import org.apache.hudi.util.ClientIds;
@@ -707,6 +708,17 @@ public class FlinkOptions extends HoodieConfig {
       .withDescription("Maximum memory in MB for a write task, when the 
threshold hits,\n"
           + "it flushes the max size data bucket to avoid OOM, default 1GB");
 
+  @AdvancedConfig
+  public static final ConfigOption<String> WRITE_BUFFER_MEMORY_TYPE = 
ConfigOptions
+      .key("write.buffer.memory.type")
+      .stringType()
+      .defaultValue(BufferMemoryType.ON_HEAP.name())
+      .withDescription("The memory type used for the write buffer. "
+          + "Supported values are ON_HEAP (default) and MANAGED. "
+          + "ON_HEAP uses JVM heap memory, while MANAGED uses Flink managed 
memory "
+          + "which is accounted for in the task manager's memory budget "
+          + "and helps avoid OOM errors in containerized environments.");
+
   @AdvancedConfig
   public static final ConfigOption<Boolean> WRITE_BUFFER_SORT_ENABLED = 
ConfigOptions
       .key("write.buffer.sort.enabled")
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 7e455fa48015..57f031bbd85e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -41,6 +41,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.bucket.partition.PartitionBucketIndexUtils;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.sink.buffer.BufferMemoryType;
 import org.apache.hudi.sink.overwrite.PartitionOverwriteMode;
 import org.apache.hudi.table.format.FilePathUtils;
 import org.apache.hudi.table.format.HoodieFlinkIOFactory;
@@ -604,4 +605,30 @@ public class OptionsResolver {
   public static int indexWriteParallelism(Configuration conf) {
     return OptionsResolver.isStreamingIndexWriteEnabled(conf) ? 
conf.get(FlinkOptions.INDEX_WRITE_TASKS) : 0;
   }
+
+  /**
+   * Returns the write buffer size in bytes.
+   *
+   * @param conf the Flink configuration containing write memory settings
+   * @return the calculated write buffer size in bytes
+   */
+  public static long getWriteBufferSizeInBytes(Configuration conf) {
+    long mergeReaderMem = 100; // constant 100MB
+    long mergeMapMaxMem = conf.get(FlinkOptions.WRITE_MERGE_MAX_MEMORY);
+    long maxBufferSize = (long) ((conf.get(FlinkOptions.WRITE_TASK_MAX_SIZE) - 
mergeReaderMem - mergeMapMaxMem) * 1024 * 1024);
+    final String errMsg = String.format("'%s' should be at least greater than 
'%s' plus merge reader memory(constant 100MB now)",
+        FlinkOptions.WRITE_TASK_MAX_SIZE.key(), 
FlinkOptions.WRITE_MERGE_MAX_MEMORY.key());
+    ValidationUtils.checkState(maxBufferSize > 0, errMsg);
+    return maxBufferSize;
+  }
+
+  /**
+   * Whether the flink managed memory is used for the write buffer.
+   *
+   * @param conf the Flink configuration
+   * @return true if the flink managed memory is used for the write buffer.
+   */
+  public static boolean isManagedMemoryBufferEnabled(Configuration conf) {
+    return 
BufferMemoryType.MANAGED.name().equalsIgnoreCase(conf.get(FlinkOptions.WRITE_BUFFER_MEMORY_TYPE));
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
index 5459b5ccf764..4dd8f785ac6b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java
@@ -34,7 +34,6 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.metrics.FlinkStreamWriteMetrics;
-import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
 import org.apache.hudi.sink.buffer.RowDataBucket;
 import org.apache.hudi.sink.buffer.TotalSizeTracer;
 import org.apache.hudi.sink.bulk.RowDataKeyGen;
@@ -60,6 +59,7 @@ import org.apache.flink.table.runtime.util.MemorySegmentPool;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Collector;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -200,7 +200,7 @@ public class StreamWriteFunction extends 
AbstractStreamWriteFunction<HoodieFlink
 
   private void initBuffer() {
     this.buckets = new LinkedHashMap<>();
-    this.memorySegmentPool = 
MemorySegmentPoolFactory.createMemorySegmentPool(config);
+    this.memorySegmentPool = 
this.memorySegmentPoolFactory.createMemorySegmentPool(config, 
OptionsResolver.getWriteBufferSizeInBytes(config));
   }
 
   private void initWriteFunction() {
@@ -466,6 +466,17 @@ public class StreamWriteFunction extends 
AbstractStreamWriteFunction<HoodieFlink
     writeMetrics.registerMetrics();
   }
 
+  @Override
+  public void close() throws Exception {
+    try {
+      if (this.memorySegmentPool instanceof Closeable) {
+        ((Closeable) this.memorySegmentPool).close();
+      }
+    } finally {
+      super.close();
+    }
+  }
+
   // -------------------------------------------------------------------------
   //  Getter/Setter
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBIMBufferSort.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBIMBufferSort.java
index 9d7cc69c9691..89d4fd2c8f12 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBIMBufferSort.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithBIMBufferSort.java
@@ -19,11 +19,11 @@
 package org.apache.hudi.sink.append;
 
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 import org.apache.hudi.sink.buffer.BufferType;
-import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
 import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
 import org.apache.hudi.sink.utils.BufferUtils;
 import org.apache.hudi.util.MutableIteratorWrapperIterator;
@@ -41,6 +41,7 @@ import org.apache.flink.table.runtime.util.MemorySegmentPool;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Collector;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
@@ -70,6 +71,7 @@ public class AppendWriteFunctionWithBIMBufferSort<T> extends 
AppendWriteFunction
   private final long writeBufferSize;
   private transient BinaryInMemorySortBuffer activeBuffer;
   private transient BinaryInMemorySortBuffer backgroundBuffer;
+  private transient MemorySegmentPool[] memorySegmentPools;
   private transient ExecutorService asyncWriteExecutor;
   private transient AtomicReference<CompletableFuture<Void>> asyncWriteTask;
   private transient AtomicBoolean isBackgroundBufferBeingProcessed;
@@ -89,14 +91,14 @@ public class AppendWriteFunctionWithBIMBufferSort<T> 
extends AppendWriteFunction
     SortCodeGenerator codeGenerator = 
sortOperatorGen.createSortCodeGenerator();
     GeneratedNormalizedKeyComputer keyComputer = 
codeGenerator.generateNormalizedKeyComputer("SortComputer");
     GeneratedRecordComparator recordComparator = 
codeGenerator.generateRecordComparator("SortComparator");
-    MemorySegmentPool[] pools = 
MemorySegmentPoolFactory.createMemorySegmentPools(config, 2);
+    this.memorySegmentPools = 
this.memorySegmentPoolFactory.createMemorySegmentPools(config, 2, 
OptionsResolver.getWriteBufferSizeInBytes(config));
 
     this.activeBuffer = BufferUtils.createBuffer(rowType,
-        pools[0],
+        memorySegmentPools[0],
         
keyComputer.newInstance(Thread.currentThread().getContextClassLoader()),
         
recordComparator.newInstance(Thread.currentThread().getContextClassLoader()));
     this.backgroundBuffer = BufferUtils.createBuffer(rowType,
-        pools[1],
+        memorySegmentPools[1],
         
keyComputer.newInstance(Thread.currentThread().getContextClassLoader()),
         
recordComparator.newInstance(Thread.currentThread().getContextClassLoader()));
 
@@ -229,6 +231,13 @@ public class AppendWriteFunctionWithBIMBufferSort<T> 
extends AppendWriteFunction
       if (asyncWriteExecutor != null && !asyncWriteExecutor.isShutdown()) {
         asyncWriteExecutor.shutdown();
       }
+      if (memorySegmentPools != null) {
+        for (MemorySegmentPool memorySegmentPool : memorySegmentPools) {
+          if (memorySegmentPool instanceof Closeable) {
+            ((Closeable) memorySegmentPool).close();
+          }
+        }
+      }
     } finally {
       super.close();
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithDisruptorBufferSort.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithDisruptorBufferSort.java
index 8154f23899cd..dffa8bf8b91f 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithDisruptorBufferSort.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunctionWithDisruptorBufferSort.java
@@ -21,11 +21,11 @@ package org.apache.hudi.sink.append;
 import org.apache.hudi.common.util.queue.DisruptorMessageQueue;
 import org.apache.hudi.common.util.queue.HoodieConsumer;
 import org.apache.hudi.configuration.FlinkOptions;
+import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 import org.apache.hudi.sink.buffer.BufferType;
-import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
 import org.apache.hudi.sink.bulk.sort.SortOperatorGen;
 import org.apache.hudi.sink.utils.BufferUtils;
 import org.apache.hudi.util.MutableIteratorWrapperIterator;
@@ -43,6 +43,7 @@ import org.apache.flink.table.runtime.util.MemorySegmentPool;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Collector;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
@@ -97,7 +98,7 @@ public class AppendWriteFunctionWithDisruptorBufferSort<T> 
extends AppendWriteFu
     SortCodeGenerator codeGenerator = 
sortOperatorGen.createSortCodeGenerator();
     this.keyComputer = 
codeGenerator.generateNormalizedKeyComputer("SortComputer");
     this.recordComparator = 
codeGenerator.generateRecordComparator("SortComparator");
-    this.memorySegmentPool = 
MemorySegmentPoolFactory.createMemorySegmentPool(config);
+    this.memorySegmentPool = 
this.memorySegmentPoolFactory.createMemorySegmentPool(config, 
OptionsResolver.getWriteBufferSizeInBytes(config));
 
     initDisruptorBuffer();
 
@@ -196,6 +197,9 @@ public class AppendWriteFunctionWithDisruptorBufferSort<T> 
extends AppendWriteFu
       if (disruptorQueue != null) {
         disruptorQueue.close();
       }
+      if (this.memorySegmentPool instanceof Closeable) {
+        ((Closeable) this.memorySegmentPool).close();
+      }
     } finally {
       super.close();
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/BufferMemoryType.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/BufferMemoryType.java
new file mode 100644
index 000000000000..b911052ce7d9
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/BufferMemoryType.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.buffer;
+
+/**
+ * Enum representing the memory type used for the Flink write buffer.
+ *
+ * <p>This determines how memory is allocated for buffering records
+ * before they are flushed to storage during write operations.
+ *
+ * @see org.apache.hudi.configuration.FlinkOptions#WRITE_BUFFER_MEMORY_TYPE
+ */
+public enum BufferMemoryType {
+  /**
+   * Uses JVM heap memory for the write buffer.
+   * This is the default memory type.
+   */
+  ON_HEAP,
+
+  /**
+   * Uses Flink managed memory for the write buffer.
+   * Managed memory is controlled by the Flink framework and is accounted
+   * for in the task manager's memory budget, which helps avoid OOM errors
+   * in containerized environments.
+   */
+  MANAGED
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/MemorySegmentPoolFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/MemorySegmentPoolFactory.java
index ae262810515d..6b38b7697d03 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/MemorySegmentPoolFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/buffer/MemorySegmentPoolFactory.java
@@ -18,21 +18,39 @@
 
 package org.apache.hudi.sink.buffer;
 
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.table.runtime.util.LazyMemorySegmentPool;
 import org.apache.flink.table.runtime.util.MemorySegmentPool;
 
+import javax.annotation.Nullable;
+
 /**
- * Factory to create {@code MemorySegmentPool}, currently only heap based 
memory pool {@code HeapMemorySegmentPool}
- * is supported.
- *
- * <p> todo support memory segment pool based on flink managed memory, 
currently support heap pool only, see HUDI-9189.
+ * Factory to create {@code MemorySegmentPool} instances.
+ * <p>
+ * Supports two types of memory pools:
+ * <ul>
+ *   <li>Heap-based memory pool ({@code HeapMemorySegmentPool}) - uses JVM 
heap memory</li>
+ *   <li>Flink managed memory pool ({@code LazyMemorySegmentPool}) - uses 
Flink's managed memory</li>
+ * </ul>
  */
 public class MemorySegmentPoolFactory {
-  public static MemorySegmentPool createMemorySegmentPool(Configuration conf) {
-    return createMemorySegmentPools(conf, 1)[0];
+  private final Option<Object> owner;
+  private final Option<MemoryManager> memoryManager;
+  private final long managedMemorySize;
+
+  public MemorySegmentPoolFactory(@Nullable Object owner, @Nullable 
MemoryManager memoryManager, long managedMemorySize) {
+    this.owner = Option.ofNullable(owner);
+    this.memoryManager = Option.ofNullable(memoryManager);
+    this.managedMemorySize = managedMemorySize;
+  }
+
+  public MemorySegmentPool createMemorySegmentPool(Configuration conf, long 
heapMemorySize) {
+    return createMemorySegmentPools(conf, 1, heapMemorySize)[0];
   }
 
   /**
@@ -42,26 +60,37 @@ public class MemorySegmentPoolFactory {
    * @param numPools number of pools to create
    * @return array of memory segment pools, each with size = totalSize / 
numPools
    */
-  public static MemorySegmentPool[] createMemorySegmentPools(Configuration 
conf, int numPools) {
-    ValidationUtils.checkArgument(numPools > 0, "numPools must be positive");
-    long mergeReaderMem = 100; // constant 100MB
-    long mergeMapMaxMem = conf.get(FlinkOptions.WRITE_MERGE_MAX_MEMORY);
-    long maxBufferSize = (long) ((conf.get(FlinkOptions.WRITE_TASK_MAX_SIZE) - 
mergeReaderMem - mergeMapMaxMem) * 1024 * 1024);
-    final String errMsg = String.format("'%s' should be at least greater than 
'%s' plus merge reader memory(constant 100MB now)",
-        FlinkOptions.WRITE_TASK_MAX_SIZE.key(), 
FlinkOptions.WRITE_MERGE_MAX_MEMORY.key());
-    ValidationUtils.checkState(maxBufferSize > 0, errMsg);
+  public MemorySegmentPool[] createMemorySegmentPools(Configuration conf, int 
numPools, long heapMemorySize) {
+    if (memoryManager.isEmpty()) {
+      return createHeapMemoryPools(conf, numPools, heapMemorySize);
+    } else {
+      return createManagedMemoryPools(numPools);
+    }
+  }
 
+  private MemorySegmentPool[] createHeapMemoryPools(Configuration conf, int 
numPools, long heapMemorySize) {
+    ValidationUtils.checkArgument(numPools > 0, "numPools must be positive");
+    ValidationUtils.checkArgument(heapMemorySize > 0, "Buffer size must be 
positive");
     int pageSize = conf.get(FlinkOptions.WRITE_MEMORY_SEGMENT_PAGE_SIZE);
-    long poolSize = maxBufferSize / numPools;
+    long poolSize = heapMemorySize / numPools;
     MemorySegmentPool[] pools = new MemorySegmentPool[numPools];
+    ValidationUtils.checkArgument(poolSize >= pageSize,
+        String.format("The total size %s of memory pool should not be less 
than page size %s.", poolSize, pageSize));
     for (int i = 0; i < numPools; i++) {
       pools[i] = new HeapMemorySegmentPool(pageSize, poolSize);
     }
     return pools;
   }
 
-  public static MemorySegmentPool createMemorySegmentPool(Configuration conf, 
long maxBufferSize) {
-    ValidationUtils.checkArgument(maxBufferSize > 0, "Buffer size should be a 
positive number.");
-    return new 
HeapMemorySegmentPool(conf.get(FlinkOptions.WRITE_MEMORY_SEGMENT_PAGE_SIZE), 
maxBufferSize * 1024 * 1024);
+  private MemorySegmentPool[] createManagedMemoryPools(int numPools) {
+    long poolSize = managedMemorySize / numPools;
+    MemorySegmentPool[] pools = new MemorySegmentPool[numPools];
+    int pageSize = memoryManager.get().getPageSize();
+    ValidationUtils.checkArgument(poolSize >= pageSize,
+        String.format("The total size %s of memory pool should not be less 
than page size %s.", poolSize, pageSize));
+    for (int i = 0; i < numPools; i++) {
+      pools[i] = new LazyMemorySegmentPool(owner.get(), memoryManager.get(), 
(int) (poolSize / pageSize));
+    }
+    return pools;
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
index 89453464392a..593b00333dcf 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java
@@ -22,12 +22,14 @@ import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
 import org.apache.hudi.sink.common.AbstractWriteFunction;
 import org.apache.hudi.sink.event.Correspondent;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.utils.RuntimeContextUtils;
 
+import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.configuration.Configuration;
@@ -64,6 +66,7 @@ public class BulkInsertWriteFunction<I>
   /**
    * Config options.
    */
+  @Getter
   private final Configuration config;
 
   /**
@@ -122,6 +125,11 @@ public class BulkInsertWriteFunction<I>
     }
   }
 
+  @Override
+  public void setMemorySegmentPoolFactory(MemorySegmentPoolFactory 
memorySegmentPoolFactory) {
+    // do nothing
+  }
+
   /**
    * End input action for batch source.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
index db99f95f0d0f..68bb2a7dc546 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
+import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
 import org.apache.hudi.sink.event.CommitAckEvent;
 import org.apache.hudi.sink.event.Correspondent;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
@@ -64,6 +65,7 @@ public abstract class AbstractStreamWriteFunction<I>
   /**
    * Config options.
    */
+  @Getter
   protected final Configuration config;
 
   /**
@@ -98,6 +100,10 @@ public abstract class AbstractStreamWriteFunction<I>
   @Setter
   protected transient Correspondent correspondent;
 
+  @Getter
+  @Setter
+  protected transient MemorySegmentPoolFactory memorySegmentPoolFactory;
+
   /**
    * Gateway to send operator events to the operator coordinator.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java
index 437e1517eabb..399cd87db884 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteFunction.java
@@ -19,8 +19,10 @@
 package org.apache.hudi.sink.common;
 
 import org.apache.hudi.adapter.ProcessFunctionAdapter;
+import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
 import org.apache.hudi.sink.event.Correspondent;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
@@ -37,6 +39,16 @@ public abstract class AbstractWriteFunction<I> extends 
ProcessFunctionAdapter<I,
    */
   public abstract void setCorrespondent(Correspondent correspondent);
 
+  /**
+   * Sets up the {@code MemorySegmentPoolFactory} for creating write buffer.
+   */
+  public abstract void setMemorySegmentPoolFactory(MemorySegmentPoolFactory 
memorySegmentPoolFactory);
+
+  /**
+   * Get the flink write configuration.
+   */
+  public abstract Configuration getConfig();
+
   /**
    * Sets up the event gateway.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java
index 0513ab6c5632..3f74002df575 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractWriteOperator.java
@@ -18,13 +18,20 @@
 
 package org.apache.hudi.sink.common;
 
+import org.apache.hudi.adapter.Utils;
+import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
 import org.apache.hudi.sink.event.Correspondent;
 
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.table.data.RowData;
 
 /**
@@ -42,6 +49,17 @@ public abstract class AbstractWriteOperator<I>
     this.function = function;
   }
 
+  @Override
+  public void setup(StreamTask<?, ?> containingTask, StreamConfig config, 
Output<StreamRecord<RowData>> output) {
+    super.setup(containingTask, config, output);
+
+    MemorySegmentPoolFactory memoryPoolFactory =
+        OptionsResolver.isManagedMemoryBufferEnabled(this.function.getConfig())
+            ? new MemorySegmentPoolFactory(containingTask, 
containingTask.getEnvironment().getMemoryManager(), 
Utils.computeManagedMemory(this))
+            : new MemorySegmentPoolFactory(null, null, -1);
+    this.function.setMemorySegmentPoolFactory(memoryPoolFactory);
+  }
+
   public void setCorrespondent(Correspondent correspondent) {
     this.function.setCorrespondent(correspondent);
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
index 63ca8229bc79..ee06ac12ed5a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/index/IndexWriteFunction.java
@@ -26,7 +26,6 @@ import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
-import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
 import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.exception.MemoryPagesExhaustedException;
@@ -43,6 +42,7 @@ import 
org.apache.flink.table.runtime.operators.sort.BinaryInMemorySortBuffer;
 import org.apache.flink.table.runtime.util.MemorySegmentPool;
 import org.apache.flink.util.Collector;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -91,7 +91,9 @@ public class IndexWriteFunction extends 
AbstractStreamWriteFunction<RowData> {
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     this.flinkTable = this.writeClient.getHoodieTable();
-    this.memorySegmentPool = 
MemorySegmentPoolFactory.createMemorySegmentPool(config, 
config.get(FlinkOptions.INDEX_RLI_WRITE_BUFFER_SIZE));
+    this.memorySegmentPool = 
this.memorySegmentPoolFactory.createMemorySegmentPool(
+        config,
+        config.get(FlinkOptions.INDEX_RLI_WRITE_BUFFER_SIZE) * 1024 * 1024);
     this.indexDataBuffer = 
BufferUtils.createBuffer(IndexRowUtils.INDEX_ROW_TYPE, memorySegmentPool);
   }
 
@@ -196,7 +198,9 @@ public class IndexWriteFunction extends 
AbstractStreamWriteFunction<RowData> {
   @Override
   public void close() throws Exception {
     this.indexDataBuffer.dispose();
-    this.memorySegmentPool.freePages();
+    if (this.memorySegmentPool instanceof Closeable) {
+      ((Closeable) this.memorySegmentPool).close();
+    }
     super.close();
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 5390c482ac5a..34d016f94960 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -30,12 +30,14 @@ import 
org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.sink.CleanFunction;
 import org.apache.hudi.sink.StreamWriteOperator;
+import org.apache.hudi.sink.append.AppendWriteFunctions;
 import org.apache.hudi.sink.append.AppendWriteOperator;
 import org.apache.hudi.sink.bootstrap.BootstrapOperator;
 import org.apache.hudi.sink.bootstrap.batch.BatchBootstrapOperator;
 import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
 import org.apache.hudi.sink.bucket.BucketStreamWriteOperator;
 import org.apache.hudi.sink.bucket.ConsistentBucketAssignFunction;
+import org.apache.hudi.sink.buffer.BufferType;
 import org.apache.hudi.sink.bulk.BulkInsertWriteOperator;
 import org.apache.hudi.sink.bulk.RowDataKeyGen;
 import org.apache.hudi.sink.bulk.RowDataKeyGens;
@@ -67,6 +69,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.table.data.RowData;
@@ -78,6 +81,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
 import java.util.stream.Stream;
 
 /**
@@ -220,10 +224,14 @@ public class Pipelines {
 
     WriteOperatorFactory<RowData> operatorFactory = 
AppendWriteOperator.getFactory(conf, rowType);
 
-    return dataStream
+    SingleOutputStreamOperator<RowData> appendWriteDataStream = dataStream
         .transform(opName("hoodie_append_write", conf), 
TypeInformation.of(RowData.class), operatorFactory)
         .uid(opUID("hoodie_stream_write", conf))
         .setParallelism(conf.get(FlinkOptions.WRITE_TASKS));
+    if 
(!BufferType.NONE.name().equalsIgnoreCase(AppendWriteFunctions.resolveBufferType(conf)))
 {
+      declareManagedMemoryIfNecessary(conf, appendWriteDataStream, () -> 
OptionsResolver.getWriteBufferSizeInBytes(conf));
+    }
+    return appendWriteDataStream;
   }
 
   /**
@@ -353,7 +361,7 @@ public class Pipelines {
           // [HUDI-9036] BucketIndexPartitioner is also used in bulk insert 
mode,
           // keep use of HoodieKey here in partitionCustom for now
           BucketIndexPartitioner<HoodieKey> partitioner = new 
BucketIndexPartitioner<>(conf, indexKeyFields);
-          return dataStream
+          SingleOutputStreamOperator<RowData> bucketWriteStream = dataStream
               .partitionCustom(
                   partitioner,
                   record -> new HoodieKey(record.getRecordKey(), 
record.getPartitionPath()))
@@ -363,12 +371,14 @@ public class Pipelines {
                   BucketStreamWriteOperator.getFactory(conf, rowType))
               .uid(opUID("bucket_write", conf))
               .setParallelism(conf.get(FlinkOptions.WRITE_TASKS));
+          declareManagedMemoryIfNecessary(conf, bucketWriteStream, () -> 
OptionsResolver.getWriteBufferSizeInBytes(conf));
+          return bucketWriteStream;
         case CONSISTENT_HASHING:
           if (OptionsResolver.isInsertOverwrite(conf)) {
             // TODO support insert overwrite for consistent bucket index
             throw new HoodieException("Consistent hashing bucket index does 
not work with insert overwrite using FLINK engine. Use simple bucket index or 
Spark engine.");
           }
-          return dataStream
+          SingleOutputStreamOperator<RowData> consistentBucketWriteStream = 
dataStream
               .transform(
                   opName("consistent_bucket_assigner", conf),
                   new HoodieFlinkInternalRowTypeInfo(rowType),
@@ -382,6 +392,8 @@ public class Pipelines {
                   BucketStreamWriteOperator.getFactory(conf, rowType))
               .uid(opUID("consistent_bucket_write", conf))
               .setParallelism(conf.get(FlinkOptions.WRITE_TASKS));
+          declareManagedMemoryIfNecessary(conf, consistentBucketWriteStream, 
() -> OptionsResolver.getWriteBufferSizeInBytes(conf));
+          return consistentBucketWriteStream;
         default:
           throw new HoodieNotSupportedException("Unknown bucket index engine 
type: " + bucketIndexEngineType);
       }
@@ -392,7 +404,7 @@ public class Pipelines {
       String writeOperatorUid = opUID("stream_write", conf);
       DataStream<HoodieFlinkInternalRow> bucketAssignStream = 
createBucketAssignStream(dataStream, conf, rowType, writeOperatorUid);
       boolean isStreamingIndexWriteEnabled = 
OptionsResolver.isStreamingIndexWriteEnabled(conf);
-      DataStream<RowData> writeDatastream =
+      SingleOutputStreamOperator<RowData> writeDatastream =
           bucketAssignStream
               // shuffle by fileId(bucket id)
               .keyBy(HoodieFlinkInternalRow::getFileId)
@@ -402,9 +414,10 @@ public class Pipelines {
                   StreamWriteOperator.getFactory(conf, rowType))
               .uid(writeOperatorUid)
               .setParallelism(conf.get(FlinkOptions.WRITE_TASKS));
+      declareManagedMemoryIfNecessary(conf, writeDatastream, () -> 
OptionsResolver.getWriteBufferSizeInBytes(conf));
       if (isStreamingIndexWriteEnabled) {
         // index writing pipeline
-        return writeDatastream
+        SingleOutputStreamOperator<RowData> indexWriteDatastream = 
writeDatastream
             .partitionCustom(new RecordIndexPartitioner(conf), 
IndexRowUtils::getHoodieKey)
             .transform(
                 opName("index_write", conf),
@@ -412,6 +425,8 @@ public class Pipelines {
                 new IndexWriteOperator(conf, 
OperatorIDGenerator.fromUid(writeOperatorUid)))
             .uid(opUID("index_write", conf))
             .setParallelism(conf.get(FlinkOptions.INDEX_WRITE_TASKS));
+        declareManagedMemoryIfNecessary(conf, indexWriteDatastream, () -> 
conf.get(FlinkOptions.INDEX_RLI_WRITE_BUFFER_SIZE) * 1024L * 1024L);
+        return indexWriteDatastream;
       } else {
         return writeDatastream;
       }
@@ -563,6 +578,12 @@ public class Pipelines {
         : databaseName + "." + conf.get(FlinkOptions.TABLE_NAME);
   }
 
+  public static void declareManagedMemoryIfNecessary(Configuration conf, 
DataStream<?> dataStream, Supplier<Long> bufferSizeSupplier) {
+    if (OptionsResolver.isManagedMemoryBufferEnabled(conf)) {
+      ExecNodeUtil.setManagedMemoryWeight(dataStream.getTransformation(), 
bufferSizeSupplier.get());
+    }
+  }
+
   /**
    * Dummy sink that does nothing.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 3cc72934e88d..7580fd80ce56 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -413,6 +413,11 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
       
conf.setString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
 "true");
       conf.set(FlinkOptions.INDEX_GLOBAL_ENABLED, true);
       conf.setString(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(), 
"true");
+      // generally size of index data is much smaller than data record, so set 
the buffer size of
+      // the index writer as 1/4 of that for data writer if it's not set by 
user explicitly.
+      if (!conf.contains(FlinkOptions.INDEX_RLI_WRITE_BUFFER_SIZE)) {
+        conf.set(FlinkOptions.INDEX_RLI_WRITE_BUFFER_SIZE, 
OptionsResolver.getWriteBufferSizeInBytes(conf) / 1024 / 1024 / 4);
+      }
     } else {
       conf.setString(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(), 
"false");
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/buffer/TestMemorySegmentPoolFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/buffer/TestMemorySegmentPoolFactory.java
new file mode 100644
index 000000000000..29dc0225f5b3
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/buffer/TestMemorySegmentPoolFactory.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.sink.buffer;
+
+import org.apache.hudi.configuration.FlinkOptions;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.table.runtime.util.LazyMemorySegmentPool;
+import org.apache.flink.table.runtime.util.MemorySegmentPool;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test cases for {@link MemorySegmentPoolFactory}.
+ */
+public class TestMemorySegmentPoolFactory {
+
+  @Test
+  public void testCreateHeapMemoryPool() {
+    Configuration conf = new Configuration();
+    conf.set(FlinkOptions.WRITE_MEMORY_SEGMENT_PAGE_SIZE, 32 * 1024);
+
+    MemorySegmentPoolFactory factory = new MemorySegmentPoolFactory(null, 
null, 0);
+    MemorySegmentPool pool = factory.createMemorySegmentPool(conf, 1024 * 
1024L);
+
+    assertNotNull(pool);
+    assertTrue(pool instanceof HeapMemorySegmentPool);
+  }
+
+  @Test
+  public void testCreateMultipleHeapMemoryPools() {
+    Configuration conf = new Configuration();
+    conf.set(FlinkOptions.WRITE_MEMORY_SEGMENT_PAGE_SIZE, 32 * 1024);
+
+    MemorySegmentPoolFactory factory = new MemorySegmentPoolFactory(null, 
null, 0);
+    MemorySegmentPool[] pools = factory.createMemorySegmentPools(conf, 4, 4 * 
1024 * 1024L);
+
+    assertNotNull(pools);
+    assertEquals(4, pools.length);
+    for (MemorySegmentPool pool : pools) {
+      assertNotNull(pool);
+      assertTrue(pool instanceof HeapMemorySegmentPool);
+    }
+  }
+
+  @Test
+  public void testCreateManagedMemoryPool() {
+    Configuration conf = new Configuration();
+    conf.set(FlinkOptions.WRITE_MEMORY_SEGMENT_PAGE_SIZE, 32 * 1024);
+
+    MemoryManager memoryManager = mock(MemoryManager.class);
+    when(memoryManager.getPageSize()).thenReturn(32 * 1024);
+
+    Object owner = new Object();
+    long managedMemorySize = 2 * 1024 * 1024L;
+
+    MemorySegmentPoolFactory factory = new MemorySegmentPoolFactory(owner, 
memoryManager, managedMemorySize);
+    MemorySegmentPool pool = factory.createMemorySegmentPool(conf, 1024 * 
1024L);
+
+    assertNotNull(pool);
+    assertTrue(pool instanceof LazyMemorySegmentPool);
+  }
+
+  @Test
+  public void testCreateMultipleManagedMemoryPools() {
+    Configuration conf = new Configuration();
+    conf.set(FlinkOptions.WRITE_MEMORY_SEGMENT_PAGE_SIZE, 32 * 1024);
+
+    MemoryManager memoryManager = mock(MemoryManager.class);
+    when(memoryManager.getPageSize()).thenReturn(32 * 1024);
+
+    Object owner = new Object();
+    long managedMemorySize = 4 * 1024 * 1024L;
+
+    MemorySegmentPoolFactory factory = new MemorySegmentPoolFactory(owner, 
memoryManager, managedMemorySize);
+    MemorySegmentPool[] pools = factory.createMemorySegmentPools(conf, 4, 1024 
* 1024L);
+
+    assertNotNull(pools);
+    assertEquals(4, pools.length);
+    for (MemorySegmentPool pool : pools) {
+      assertNotNull(pool);
+      assertTrue(pool instanceof LazyMemorySegmentPool);
+    }
+  }
+}
+
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
index 4181e72bf206..80f6e7e33471 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BucketStreamWriteFunctionWrapper.java
@@ -26,6 +26,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.StreamWriteFunction;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 import org.apache.hudi.sink.bucket.BucketStreamWriteFunction;
+import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
 import org.apache.hudi.sink.common.AbstractWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.transform.RowDataToHoodieFunction;
@@ -215,8 +216,9 @@ public class BucketStreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<
     writeFunction.setRuntimeContext(runtimeContext);
     writeFunction.setOperatorEventGateway(gateway);
     writeFunction.initializeState(this.stateInitializationContext);
-    writeFunction.open(conf);
     writeFunction.setCorrespondent(new MockCorrespondent(this.coordinator));
+    writeFunction.setMemorySegmentPoolFactory(new 
MemorySegmentPoolFactory(null, null, -1));
+    writeFunction.open(conf);
   }
 
   protected StreamWriteFunction createWriteFunction() {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
index 5d0bd8cd6157..254592556669 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java
@@ -29,6 +29,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.sink.StreamWriteFunction;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 import org.apache.hudi.sink.bootstrap.BootstrapOperator;
+import org.apache.hudi.sink.buffer.MemorySegmentPoolFactory;
 import org.apache.hudi.sink.common.AbstractWriteFunction;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
 import org.apache.hudi.sink.partitioner.BucketAssignFunction;
@@ -395,14 +396,16 @@ public class StreamWriteFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
     writeFunction.setRuntimeContext(runtimeContext);
     writeFunction.setOperatorEventGateway(gateway);
     writeFunction.initializeState(this.stateInitializationContext);
-    writeFunction.open(conf);
+    writeFunction.setMemorySegmentPoolFactory(new 
MemorySegmentPoolFactory(null, null, -1));
     writeFunction.setCorrespondent(correspondent);
+    writeFunction.open(conf);
   }
 
   private void setupIndexWriteFunction() throws Exception {
     indexWriteFunction = new IndexWriteFunction(conf);
     indexWriteFunction.setRuntimeContext(runtimeContext);
     indexWriteFunction.setCorrespondent(correspondent);
+    indexWriteFunction.setMemorySegmentPoolFactory(new 
MemorySegmentPoolFactory(null, null, -1));
     indexWriteFunction.initializeState(this.indexStateInitializationContext);
     indexWriteFunction.open(conf);
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index fb096511f0a8..4a240490f7d4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -35,6 +35,8 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.bucket.partition.PartitionBucketIndexUtils;
 import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.sink.buffer.BufferMemoryType;
+import org.apache.hudi.sink.buffer.BufferType;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.catalog.HoodieCatalogTestUtils;
 import org.apache.hudi.table.catalog.HoodieHiveCatalog;
@@ -3074,6 +3076,54 @@ public class ITTestHoodieDataSource {
     assertRowsEquals(result1, TestData.DATA_SET_SOURCE_INSERT);
   }
 
+  @ParameterizedTest
+  @EnumSource(value = HoodieTableType.class)
+  void testStreamWriteWithManagedMemory(HoodieTableType tableType) throws 
Exception {
+    // create filesystem table named source
+    String createSource = TestConfigurations.getFileSourceDDL("source");
+    streamTableEnv.executeSql(createSource);
+
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
+        .option(FlinkOptions.TABLE_TYPE, tableType)
+        .option(FlinkOptions.WRITE_BUFFER_MEMORY_TYPE, 
BufferMemoryType.MANAGED)
+        .end();
+    streamTableEnv.executeSql(hoodieTableDDL);
+
+    String insertInto = "insert into t1 select * from source";
+    execInsertSql(streamTableEnv, insertInto);
+
+    List<Row> result = CollectionUtil.iteratorToList(
+        streamTableEnv.sqlQuery("select * from t1").execute().collect());
+    assertRowsEquals(result, TestData.DATA_SET_SOURCE_INSERT);
+  }
+
+  @ParameterizedTest
+  @EnumSource(value = BufferType.class,  names = {"BOUNDED_IN_MEMORY", 
"DISRUPTOR"})
+  void testAppendWriteWithManagedMemory(BufferType bufferType) {
+    // create filesystem table named source
+    String createSource = TestConfigurations.getFileSourceDDL("source");
+    streamTableEnv.executeSql(createSource);
+
+    String hoodieTableDDL = sql("t1")
+        .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
+        .options(getDefaultKeys())
+        .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
+        .option(FlinkOptions.OPERATION, "insert")
+        .option(FlinkOptions.WRITE_BUFFER_MEMORY_TYPE, 
BufferMemoryType.MANAGED)
+        .option(FlinkOptions.WRITE_BUFFER_TYPE, bufferType.name())
+        .end();
+    streamTableEnv.executeSql(hoodieTableDDL);
+
+    String insertInto = "insert into t1 select * from source";
+    execInsertSql(streamTableEnv, insertInto);
+
+    List<Row> result = CollectionUtil.iteratorToList(
+        streamTableEnv.sqlQuery("select * from t1").execute().collect());
+    assertRowsEquals(result, TestData.DATA_SET_SOURCE_INSERT);
+  }
+
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/Utils.java
 
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/Utils.java
index 61c6d0838415..51a58713dc72 100644
--- 
a/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/Utils.java
+++ 
b/hudi-flink-datasource/hudi-flink1.17.x/src/main/java/org/apache/hudi/adapter/Utils.java
@@ -26,8 +26,11 @@ import 
org.apache.hudi.internal.schema.action.InternalSchemaChangeApplier;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -90,6 +93,28 @@ public class Utils {
     return newSchema;
   }
 
+  /**
+   * Computes the managed memory size available for the given stream operator.
+   * <p>
+   * This method retrieves the memory manager from the operator's execution 
environment
+   * and calculates the memory size allocated for the operator based on the 
configured
+   * managed memory fraction and the task manager's memory settings.
+   *
+   * @param operator the stream operator for which to compute managed memory
+   * @return the computed managed memory size in bytes
+   */
+  public static long computeManagedMemory(AbstractStreamOperator<?> operator) {
+    final Environment environment = 
operator.getContainingTask().getEnvironment();
+    return environment
+        .getMemoryManager()
+        .computeMemorySize(
+            operator.getOperatorConfig()
+                .getManagedMemoryFractionOperatorUseCaseOfSlot(
+                    ManagedMemoryUseCase.OPERATOR,
+                    environment.getJobConfiguration(),
+                    environment.getUserCodeClassLoader().asClassLoader()));
+  }
+
   private static InternalSchema applyTableChange(InternalSchema oldSchema, 
TableChange change, Function<LogicalType, Type> convertFunc) {
     InternalSchemaChangeApplier changeApplier = new 
InternalSchemaChangeApplier(oldSchema);
     if (change instanceof TableChange.AddColumn) {
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/Utils.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/Utils.java
index e05407972130..f8cb836c0895 100644
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/Utils.java
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/adapter/Utils.java
@@ -26,8 +26,11 @@ import 
org.apache.hudi.internal.schema.action.InternalSchemaChangeApplier;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -90,6 +93,28 @@ public class Utils {
     return newSchema;
   }
 
+  /**
+   * Computes the managed memory size available for the given stream operator.
+   * <p>
+   * This method retrieves the memory manager from the operator's execution 
environment
+   * and calculates the memory size allocated for the operator based on the 
configured
+   * managed memory fraction and the task manager's memory settings.
+   *
+   * @param operator the stream operator for which to compute managed memory
+   * @return the computed managed memory size in bytes
+   */
+  public static long computeManagedMemory(AbstractStreamOperator<?> operator) {
+    final Environment environment = 
operator.getContainingTask().getEnvironment();
+    return environment
+        .getMemoryManager()
+        .computeMemorySize(
+            operator.getOperatorConfig()
+                .getManagedMemoryFractionOperatorUseCaseOfSlot(
+                    ManagedMemoryUseCase.OPERATOR,
+                    environment.getJobConfiguration(),
+                    environment.getUserCodeClassLoader().asClassLoader()));
+  }
+
   private static InternalSchema applyTableChange(InternalSchema oldSchema, 
TableChange change, Function<LogicalType, Type> convertFunc) {
     InternalSchemaChangeApplier changeApplier = new 
InternalSchemaChangeApplier(oldSchema);
     if (change instanceof TableChange.AddColumn) {
diff --git 
a/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/adapter/Utils.java
 
b/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/adapter/Utils.java
index e05407972130..5454e29c1920 100644
--- 
a/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/adapter/Utils.java
+++ 
b/hudi-flink-datasource/hudi-flink1.19.x/src/main/java/org/apache/hudi/adapter/Utils.java
@@ -26,8 +26,11 @@ import 
org.apache.hudi.internal.schema.action.InternalSchemaChangeApplier;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -90,6 +93,29 @@ public class Utils {
     return newSchema;
   }
 
+  /**
+   * Computes the managed memory size available for the given stream operator.
+   * <p>
+   * This method retrieves the memory manager from the operator's execution 
environment
+   * and calculates the memory size allocated for the operator based on the 
configured
+   * managed memory fraction and the task manager's memory settings.
+   *
+   * @param operator the stream operator for which to compute managed memory
+   * @return the computed managed memory size in bytes
+   */
+  public static long computeManagedMemory(AbstractStreamOperator<?> operator) {
+    final Environment environment = 
operator.getContainingTask().getEnvironment();
+    return environment
+        .getMemoryManager()
+        .computeMemorySize(
+            operator.getOperatorConfig()
+                .getManagedMemoryFractionOperatorUseCaseOfSlot(
+                    ManagedMemoryUseCase.OPERATOR,
+                    environment.getJobConfiguration(),
+                    environment.getTaskManagerInfo().getConfiguration(),
+                    environment.getUserCodeClassLoader().asClassLoader()));
+  }
+
   private static InternalSchema applyTableChange(InternalSchema oldSchema, 
TableChange change, Function<LogicalType, Type> convertFunc) {
     InternalSchemaChangeApplier changeApplier = new 
InternalSchemaChangeApplier(oldSchema);
     if (change instanceof TableChange.AddColumn) {
diff --git 
a/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/adapter/Utils.java
 
b/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/adapter/Utils.java
index e05407972130..5454e29c1920 100644
--- 
a/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/adapter/Utils.java
+++ 
b/hudi-flink-datasource/hudi-flink1.20.x/src/main/java/org/apache/hudi/adapter/Utils.java
@@ -26,8 +26,11 @@ import 
org.apache.hudi.internal.schema.action.InternalSchemaChangeApplier;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -90,6 +93,29 @@ public class Utils {
     return newSchema;
   }
 
+  /**
+   * Computes the managed memory size available for the given stream operator.
+   * <p>
+   * This method retrieves the memory manager from the operator's execution 
environment
+   * and calculates the memory size allocated for the operator based on the 
configured
+   * managed memory fraction and the task manager's memory settings.
+   *
+   * @param operator the stream operator for which to compute managed memory
+   * @return the computed managed memory size in bytes
+   */
+  public static long computeManagedMemory(AbstractStreamOperator<?> operator) {
+    final Environment environment = 
operator.getContainingTask().getEnvironment();
+    return environment
+        .getMemoryManager()
+        .computeMemorySize(
+            operator.getOperatorConfig()
+                .getManagedMemoryFractionOperatorUseCaseOfSlot(
+                    ManagedMemoryUseCase.OPERATOR,
+                    environment.getJobConfiguration(),
+                    environment.getTaskManagerInfo().getConfiguration(),
+                    environment.getUserCodeClassLoader().asClassLoader()));
+  }
+
   private static InternalSchema applyTableChange(InternalSchema oldSchema, 
TableChange change, Function<LogicalType, Type> convertFunc) {
     InternalSchemaChangeApplier changeApplier = new 
InternalSchemaChangeApplier(oldSchema);
     if (change instanceof TableChange.AddColumn) {
diff --git 
a/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/adapter/Utils.java
 
b/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/adapter/Utils.java
index e05407972130..5454e29c1920 100644
--- 
a/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/adapter/Utils.java
+++ 
b/hudi-flink-datasource/hudi-flink2.0.x/src/main/java/org/apache/hudi/adapter/Utils.java
@@ -26,8 +26,11 @@ import 
org.apache.hudi.internal.schema.action.InternalSchemaChangeApplier;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -90,6 +93,29 @@ public class Utils {
     return newSchema;
   }
 
+  /**
+   * Computes the managed memory size available for the given stream operator.
+   * <p>
+   * This method retrieves the memory manager from the operator's execution 
environment
+   * and calculates the memory size allocated for the operator based on the 
configured
+   * managed memory fraction and the task manager's memory settings.
+   *
+   * @param operator the stream operator for which to compute managed memory
+   * @return the computed managed memory size in bytes
+   */
+  public static long computeManagedMemory(AbstractStreamOperator<?> operator) {
+    final Environment environment = 
operator.getContainingTask().getEnvironment();
+    return environment
+        .getMemoryManager()
+        .computeMemorySize(
+            operator.getOperatorConfig()
+                .getManagedMemoryFractionOperatorUseCaseOfSlot(
+                    ManagedMemoryUseCase.OPERATOR,
+                    environment.getJobConfiguration(),
+                    environment.getTaskManagerInfo().getConfiguration(),
+                    environment.getUserCodeClassLoader().asClassLoader()));
+  }
+
   private static InternalSchema applyTableChange(InternalSchema oldSchema, 
TableChange change, Function<LogicalType, Type> convertFunc) {
     InternalSchemaChangeApplier changeApplier = new 
InternalSchemaChangeApplier(oldSchema);
     if (change instanceof TableChange.AddColumn) {
diff --git 
a/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/adapter/Utils.java
 
b/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/adapter/Utils.java
index e05407972130..516ff9073e1b 100644
--- 
a/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/adapter/Utils.java
+++ 
b/hudi-flink-datasource/hudi-flink2.1.x/src/main/java/org/apache/hudi/adapter/Utils.java
@@ -27,7 +27,10 @@ import 
org.apache.hudi.internal.schema.action.InternalSchemaChangeApplier;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.core.memory.ManagedMemoryUseCase;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -90,6 +93,29 @@ public class Utils {
     return newSchema;
   }
 
+  /**
+   * Computes the managed memory size available for the given stream operator.
+   * <p>
+   * This method retrieves the memory manager from the operator's execution 
environment
+   * and calculates the memory size allocated for the operator based on the 
configured
+   * managed memory fraction and the task manager's memory settings.
+   *
+   * @param operator the stream operator for which to compute managed memory
+   * @return the computed managed memory size in bytes
+   */
+  public static long computeManagedMemory(AbstractStreamOperator<?> operator) {
+    final Environment environment = 
operator.getContainingTask().getEnvironment();
+    return environment
+        .getMemoryManager()
+        .computeMemorySize(
+            operator.getOperatorConfig()
+                .getManagedMemoryFractionOperatorUseCaseOfSlot(
+                    ManagedMemoryUseCase.OPERATOR,
+                    environment.getJobConfiguration(),
+                    environment.getTaskManagerInfo().getConfiguration(),
+                    environment.getUserCodeClassLoader().asClassLoader()));
+  }
+
   private static InternalSchema applyTableChange(InternalSchema oldSchema, 
TableChange change, Function<LogicalType, Type> convertFunc) {
     InternalSchemaChangeApplier changeApplier = new 
InternalSchemaChangeApplier(oldSchema);
     if (change instanceof TableChange.AddColumn) {

Reply via email to