1996fanrui opened a new pull request, #23599: URL: https://github.com/apache/flink/pull/23599
## What is the purpose of the change The background is similar to [FLINK-33315](https://issues.apache.org/jira/browse/FLINK-33315). A hive table with a lot of data, and the HiveSource#partitionBytes is 281MB. When slotPerTM = 4, one TM will run 4 HiveSources at the same time. How the TaskExecutor to submit a large task? 1. TaskExecutor#loadBigData will read all bytes from file to SerializedValue<TaskInformation> - The SerializedValue<TaskInformation> has a byte[] - It will cost the heap memory - It will be great than 281 MB, because it not only stores HiveSource#partitionBytes, it also stores other information of TaskInformation. 2. Generate the TaskInformation from SerializedValue<TaskInformation> - TaskExecutor#submitTask calls the tdd.getSerializedTaskInformation()..deserializeValue() - tdd.getSerializedTaskInformation() is SerializedValue<TaskInformation> - It will generate the TaskInformation - TaskInformation includes the Configuration taskConfiguration - The taskConfiguration includes StreamConfig#SERIALIZEDUDF Based on the above process, TM memory will have 2 big byte array for each task: - The SerializedValue<TaskInformation> - The TaskInformation When one TM runs 4 HiveSources at the same time, it will have 8 big byte array. In our production environment, this is also a situation that often leads to TM OOM. ## Brief change log - [FLINK-33354][runtime][refactor] Refactor ShuffleDescriptorsCache into a generic GroupCache - [FLINK-33354][runtime][refactor] serializedJobInformation and taskInfo are never null - [FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid deserializing duplicate big objects ## Verifying this change Improve the old tests: - DefaultGroupCacheTest - TaskDeploymentDescriptorTest ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature?no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org