Rui Fan created FLINK-33315:
-------------------------------

             Summary: Optimize memory usage of large StreamOperator
                 Key: FLINK-33315
                 URL: https://issues.apache.org/jira/browse/FLINK-33315
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Configuration
    Affects Versions: 1.17.0, 1.18.0
            Reporter: Rui Fan
            Assignee: Rui Fan
         Attachments: image-2023-10-19-16-28-16-077.png

Some of our batch jobs are upgraded from flink-1.15 to flink-1.17, and TM 
always fail with java.lang.OutOfMemoryError: Java heap space.

 

Here is a example: a hive table with a lot of data, and the 
HiveSource#partitionBytes is 281MB.

After analysis, the root cause is that TM maintains the big object with 3 
replicas:
 * Replica_1: SourceOperatorFactory (it's necessary for running task)
 * Replica_2: Temporarily generate the duplicate SourceOperatorFactory object.
 ** It's introduced in FLINK-30536 (1.17), it's not necessary. ([code 
link|https://github.com/apache/flink/blob/c2e14ff411e806f9ccf176c85eb8249b8ff12e56/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L646])
 ** When creating a successor operator to a SourceOperator, the call stack is:

 *** OperatorChain#createOperatorChain -> 
 *** wrapOperatorIntoOutput ->
 *** getOperatorRecordsOutCounter ->
 *** operatorConfig.getStreamOperatorFactory(userCodeClassloader)
 ** It will generate the SourceOperatorFactory temporarily and just check 
whether it's SinkWriterOperatorFactory
 * Replica_3: The value of StreamConfig#{color:#9876aa}SERIALIZEDUDF {color}
 ** It is used to generate SourceOperatorFactory.
 ** Now the value is always maintained in heap memory.
 ** However, after generating we can release it or store it in the disk if 
needed.
 *** We can define a threshold, when the value size is less than threshold, the 
release strategy doesn't take effect.
 ** If so, we can save a lot of heap memory.

These three replicas use about 800MB of memory. Please note that this is just a 
subtask. Since each TM has 4 slots, it will run 4 HiveSources at the same time, 
so 12 replicas are maintained in the TM memory, it's about 3.3 GB.

These large objects in the JVM cannot be recycled, causing TM to frequently OOM.

This JIRA is focused on optimizing Replica_2 and Replica_3.

 

!image-2023-10-19-16-28-16-077.png!

 

!https://f.haiserve.com/download/5366d5f07c07a00116b148c6fa1ebff00b010200001cc3da0438a0860702016976849360726a?userid=146850&token=d4a7e7d617dc71ea28bf02977333e1a8|width=1935,height=1127!

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to