Hi Team,

I was trying to implement flink sql api join with 2 tables it is throwing error 
OutOfMemoryError: Java heap space . PFB screenshot for flink cluster memory 
details.
[Flink Memory Model][1]


  [1]: https://i.stack.imgur.com/AOnQI.png

**PFB below code snippet which I was trying:**
```
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);


tableEnv.getConfig().getConfiguration().setString("table.optimizer.agg-phase-strategy",
 "TWO_PHASE");
tableEnv.getConfig().getConfiguration().setString("table.optimizer.join-reorder-enabled",
 "true");
tableEnv.getConfig().getConfiguration().setString("table.exec.resource.default-parallelism",
 "16");

tableEnv.executeSql("CREATE TEMPORARY TABLE ccmversionsumapTable (\r\n"
                                             + "  suname STRING\r\n"
                                             + "  ,ccmversion STRING\r\n"
                                             + "       )\r\n"
                                             + "       WITH (\r\n"
                                             + "       'connector' = 'jdbc'\r\n"
                                             + "       ,'url' = 
'jdbc:mysql://****:3306/ccucdb'\r\n"
                                             + "       ,'table-name' = 
'ccmversionsumap'\r\n"
                                             + "       ,'username' = 
'*****'\r\n"
                                             + "       ,'password' = '****'\r\n"
                                             + "       )");

tableEnv.executeSql("CREATE TEMPORARY TABLE cdrTable (\r\n"
                               + "       org_id STRING\r\n"
                               + "       ,cluster_id STRING\r\n"
                               + "       ,cluster_name STRING\r\n"
                               + "       ,version STRING\r\n"
                               + "       ,ip_address STRING\r\n"
                               + "       ,pkid STRING\r\n"
                               + "       ,globalcallid_callid INT\r\n"
                                      ... --- multiple columns can be added
                               + "       )\r\n"
                               + "       WITH (\r\n"
                               + "       'connector' = 'kafka'\r\n"
                               + "       ,'topic' = 'cdr'\r\n"
                               + "       ,'properties.bootstrap.servers' = 
'****:9092'\r\n"
                               + "       ,'scan.startup.mode' = 
'earliest-offset'\r\n"
                               //+ "    ,'value.fields-include' = 
'EXCEPT_KEY'\r\n"
                               + "       ,'format' = 'json'\r\n"
                               + "       )");


String sql = "SELECT cdr.org_id orgid,\r\n"
                                                          + "         
cdr.cluster_name clustername,\r\n"
                                                          + "         
cdr.cluster_id clusterid,\r\n"
                                                          + "         
cdr.ip_address clusteripaddr,\r\n"
                                                          + "         
cdr.version clusterversion,\r\n"
                                                          + "         
cvsm.suname clustersuname,\r\n"
                                                          + "         cdr.pkid 
cdrpkid,\r\n"
                                                              ... --- multiple 
columns can be added
                                                          + "         from 
cdrTable cdr\r\n"
                                                          + " left join 
ccmversionsumapTable cvsm ON (cdr.version = cvsm.ccmversion) group by 
TUMBLE(PROCTIME(), INTERVAL '1' MINUTE), cdr.org_id, cdr.cluster_name, 
cdr.cluster_id, cdr.ip_address, cdr.version, cdr.pkid, cdr.globalcallid_callid, 
..."

Table order20 = tableEnv.sqlQuery(sql);
order20.executeInsert("outputCdrTable");
```

**scenario / use case :**

we are pushing 2.5 million json record in kafka topic and reading it via kafka 
connector as temporary cdrTable as shown in above code and we reading 23 
records from jdbc static/reference table via jdbc connector as temporary 
ccmversionsumapTable as shown in above code and doing a left join for 1 min 
tumble window .

So while doing a join we are getting OutOfMemoryError: jvm heap space error 
while processing it.

but the similar use case we tried to do left join with two tables cdr (2.5m 
records) and cmr (5m records) with same tumbling window we are able to process 
that without any issue and both are reading from kafka as shown in above code 
snnipet for cdrTable

Thanks
Ronak Beejawat

Reply via email to