1996fanrui opened a new pull request, #23599:
URL: https://github.com/apache/flink/pull/23599

   ## What is the purpose of the change
   
   The background is similar to 
[FLINK-33315](https://issues.apache.org/jira/browse/FLINK-33315).
   
   A hive table with a lot of data, and the HiveSource#partitionBytes is 281MB. 
When slotPerTM = 4, one TM will run 4 HiveSources at the same time.
   
   How the TaskExecutor to submit a large task?
   
   1. TaskExecutor#loadBigData will read all bytes from file to 
SerializedValue<TaskInformation> 
     - The SerializedValue<TaskInformation>  has a byte[]
     - It will cost the heap memory
     - It will be great than 281 MB, because it not only stores 
HiveSource#partitionBytes, it also stores other information of TaskInformation.
   2. Generate the TaskInformation from SerializedValue<TaskInformation> 
     - TaskExecutor#submitTask calls the 
tdd.getSerializedTaskInformation()..deserializeValue()
     - tdd.getSerializedTaskInformation() is SerializedValue<TaskInformation> 
     - It will generate the TaskInformation
     - TaskInformation includes the Configuration taskConfiguration
     - The taskConfiguration includes StreamConfig#SERIALIZEDUDF
   
   Based on the above process, TM memory will have 2 big byte array for each 
task:
   
   - The SerializedValue<TaskInformation>
   - The TaskInformation
   
   When one TM runs 4 HiveSources at the same time, it will have 8 big byte 
array.
   
   In our production environment, this is also a situation that often leads to 
TM OOM.
   
   ## Brief change log
   
   - [FLINK-33354][runtime][refactor] Refactor ShuffleDescriptorsCache into a 
generic GroupCache
   - [FLINK-33354][runtime][refactor] serializedJobInformation and taskInfo are 
never null
   - [FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid 
deserializing duplicate big objects
   
   ## Verifying this change
   
   Improve the old tests:
   
   - DefaultGroupCacheTest
   - TaskDeploymentDescriptorTest
   
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency):  no
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature?no


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to