[ https://issues.apache.org/jira/browse/FLINK-33315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17787823#comment-17787823 ]
Rui Fan commented on FLINK-33315: --------------------------------- 3 subtasks have been merged for some time, and after these improvement our Flink Batch job with large operator run very well, so I close this JIRA. Many thanks to everyone who helped with the review. > Optimize memory usage of large StreamOperator > --------------------------------------------- > > Key: FLINK-33315 > URL: https://issues.apache.org/jira/browse/FLINK-33315 > Project: Flink > Issue Type: Improvement > Components: Runtime / Configuration, Runtime / Task > Affects Versions: 1.17.0, 1.18.0 > Reporter: Rui Fan > Assignee: Rui Fan > Priority: Major > Attachments: > 130f436613b52b321bd9bd0211dd109f0b0102000020e860f292a13c0702016976850466192b.png, > image-2023-10-19-16-28-16-077.png > > > Some of our batch jobs are upgraded from flink-1.15 to flink-1.17, and TM > always fail with java.lang.OutOfMemoryError: Java heap space. > > Here is a example: a hive table with a lot of data, and the > HiveSource#partitionBytes is 281MB. > After analysis, the root cause is that TM maintains the big object with 3 > replicas: > * Replica_1: SourceOperatorFactory (it's necessary for running task) > * Replica_2: Temporarily generate the duplicate SourceOperatorFactory object. > ** It's introduced in FLINK-30536 (1.17), it's not necessary. ([code > link|https://github.com/apache/flink/blob/c2e14ff411e806f9ccf176c85eb8249b8ff12e56/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java#L646]) > ** When creating a successor operator to a SourceOperator, the call stack is: > *** OperatorChain#createOperatorChain -> > *** wrapOperatorIntoOutput -> > *** getOperatorRecordsOutCounter -> > *** operatorConfig.getStreamOperatorFactory(userCodeClassloader) > ** It will generate the SourceOperatorFactory temporarily and just check > whether it's SinkWriterOperatorFactory > * Replica_3: The value of StreamConfig#{color:#9876aa}SERIALIZEDUDF {color} > ** It is used to generate SourceOperatorFactory. > ** Now the value is always maintained in heap memory. > ** However, after generating we can release it or store it in the disk if > needed. > *** We can define a threshold, when the value size is less than threshold, > the release strategy doesn't take effect. > ** If so, we can save a lot of heap memory. > These three replicas use about 800MB of memory. Please note that this is just > a subtask. Since each TM has 4 slots, it will run 4 HiveSources at the same > time, so 12 replicas are maintained in the TM memory, it's about 3.3 GB. > These large objects in the JVM cannot be recycled, causing TM to frequently > OOM. > This JIRA focus on optimizing Replica_2 and Replica_3. > > !image-2023-10-19-16-28-16-077.png! > > !https://f.haiserve.com/download/130f436613b52b321bd9bd0211dd109f0b0102000020e860f292a13c0702016976850466192b?userid=146850&token=4e7b7352b30d6e5d2dd2bb7a7479fc93! > > -- This message was sent by Atlassian Jira (v8.20.10#820010)