zhengyuan created FLINK-36787:
---------------------------------

             Summary: Flink SQL mode  kafka join jdbc Dimension Table Yarn node 
OOM
                 Key: FLINK-36787
                 URL: https://issues.apache.org/jira/browse/FLINK-36787
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.17.2
         Environment: Flink 1.17.2

hadoop 3.2.3

jdk:1.8
            Reporter: zhengyuan
         Attachments: flink-join-oom.log

In Flink SQL stream mode, I used a Kafka source to join with a JDBC dimension 
table. When the join node processed over 10 million records, the task was 
repeatedly re-initialized until an OutOfMemoryError (OOM) occurred. After 
changing the join type to a Temporal Join, the task node's garbage collection 
(GC) behavior normalized.

 

inner join :
CREATE TEMPORARY VIEW transform_tableJoin_effq515l0_ AS select 
`time`,`value`,`id`,`eng_name`, PROCTIME() as proc_time from ( SELECT 
source_kafka_rpPDIjxy5x.`time` AS `time`,source_kafka_rpPDIjxy5x.`value` AS 
`value`,source_kafka_rpPDIjxy5x.`id` AS `id`,source_meta_9rzxhLIL4w.`eng_name` 
AS `eng_name` FROM source_kafka_rpPDIjxy5x  
 inner join  source_meta_9rzxhLIL4w  on 
source_kafka_rpPDIjxy5x.id=source_meta_9rzxhLIL4w.attr_id
  ) ;
changed to Temporal Join,:
CREATETEMPORARYVIEW transform_tableJoin_effq515l0_ AS
select `time`,`value`,`id`,`attr_eng_name`, instance_eng_name, PROCTIME() as 
proc_time
from (
  SELECT source_kafka_rpPDIjxy5x.`time` AS `time`,
  source_kafka_rpPDIjxy5x.`value` AS `value`,
  source_kafka_rpPDIjxy5x.`id` AS `id`,
  v_meta_attr_processing.`attr_eng_name` AS `attr_eng_name`,
  v_meta_attr_processing.instance_eng_name as instance_eng_name
 FROM source_kafka_rpPDIjxy5x  
 inner join  v_meta_attr_processing
 FOR SYSTEM_TIME AS OF source_kafka_rpPDIjxy5x.auto_row_time
  on source_kafka_rpPDIjxy5x.id=v_meta_attr_processing.attr_id
  ) ;



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to