zhengyuan created FLINK-36787: --------------------------------- Summary: Flink SQL mode kafka join jdbc Dimension Table Yarn node OOM Key: FLINK-36787 URL: https://issues.apache.org/jira/browse/FLINK-36787 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.17.2 Environment: Flink 1.17.2
hadoop 3.2.3 jdk:1.8 Reporter: zhengyuan Attachments: flink-join-oom.log In Flink SQL stream mode, I used a Kafka source to join with a JDBC dimension table. When the join node processed over 10 million records, the task was repeatedly re-initialized until an OutOfMemoryError (OOM) occurred. After changing the join type to a Temporal Join, the task node's garbage collection (GC) behavior normalized. inner join : CREATE TEMPORARY VIEW transform_tableJoin_effq515l0_ AS select `time`,`value`,`id`,`eng_name`, PROCTIME() as proc_time from ( SELECT source_kafka_rpPDIjxy5x.`time` AS `time`,source_kafka_rpPDIjxy5x.`value` AS `value`,source_kafka_rpPDIjxy5x.`id` AS `id`,source_meta_9rzxhLIL4w.`eng_name` AS `eng_name` FROM source_kafka_rpPDIjxy5x inner join source_meta_9rzxhLIL4w on source_kafka_rpPDIjxy5x.id=source_meta_9rzxhLIL4w.attr_id ) ; changed to Temporal Join,: CREATETEMPORARYVIEW transform_tableJoin_effq515l0_ AS select `time`,`value`,`id`,`attr_eng_name`, instance_eng_name, PROCTIME() as proc_time from ( SELECT source_kafka_rpPDIjxy5x.`time` AS `time`, source_kafka_rpPDIjxy5x.`value` AS `value`, source_kafka_rpPDIjxy5x.`id` AS `id`, v_meta_attr_processing.`attr_eng_name` AS `attr_eng_name`, v_meta_attr_processing.instance_eng_name as instance_eng_name FROM source_kafka_rpPDIjxy5x inner join v_meta_attr_processing FOR SYSTEM_TIME AS OF source_kafka_rpPDIjxy5x.auto_row_time on source_kafka_rpPDIjxy5x.id=v_meta_attr_processing.attr_id ) ; -- This message was sent by Atlassian Jira (v8.20.10#820010)