waywtdcc created FLINK-23219: -------------------------------- Summary: temproary join ttl configruation does not take effect Key: FLINK-23219 URL: https://issues.apache.org/jira/browse/FLINK-23219 Project: Flink Issue Type: Bug Components: Table SQL / API, Table SQL / Runtime Affects Versions: 1.12.2 Reporter: waywtdcc Attachments: image-2021-07-02-16-29-40-310.png
* version: flink 1.12.2 * problem: I run the job of table A temproary left join table B, and set the table.exec.state.ttl configuration to 3 hour or 2 sencond for test. But the task status keeps growing for more than 7 days. * code ``` tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(2)); tableEnv.executeSql("drop table if exists persons_table_kafka2"); String kafka_source_sql = "CREATE TABLE persons_table_kafka2 (\n" + " `id` BIGINT,\n" + " `name` STRING,\n" + " `age` INT,\n" + " proctime as PROCTIME(),\n" + " `ts` TIMESTAMP(3),\n" + " WATERMARK FOR ts AS ts\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'persons_test_auto',\n" + " 'properties.bootstrap.servers' = 'node2:6667',\n" + " 'properties.group.id' = 'testGrodsu1765',\n" + " 'scan.startup.mode' = 'group-offsets',\n" + " 'format' = 'json'\n" + ")"; tableEnv.executeSql(kafka_source_sql); tableEnv.executeSql("drop table if exists persons_message_table_kafka2"); String kafka_source_sql2 = "CREATE TABLE persons_message_table_kafka2 (\n" + " `id` BIGINT,\n" + " `name` STRING,\n" + " `message` STRING,\n" + " `ts` TIMESTAMP(3) ," + // " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" + " WATERMARK FOR ts AS ts\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = 'persons_extra_message_auto',\n" + " 'properties.bootstrap.servers' = 'node2:6667',\n" + " 'properties.group.id' = 'testGroud125313',\n" + " 'scan.startup.mode' = 'group-offsets',\n" + " 'format' = 'json'\n" + ")"; tableEnv.executeSql(kafka_source_sql2); tableEnv.executeSql( "CREATE TEMPORARY VIEW persons_message_table22 AS \n" + "SELECT id, name, message,ts \n" + " FROM (\n" + " SELECT *,\n" + " ROW_NUMBER() OVER (PARTITION BY name \n" + " ORDER BY ts DESC) AS rowNum \n" + " FROM persons_message_table_kafka2 " + " )\n" + "WHERE rowNum = 1"); tableEnv.executeSql("" + "CREATE TEMPORARY VIEW result_data_view " + " as " + " select " + " t1.name, t1.age,t1.ts as ts1,t2.message, cast (t2.ts as string) as ts2 " + " from persons_table_kafka2 t1 " + " left join persons_message_table22 FOR SYSTEM_TIME AS OF t1.ts AS t2 on t1.name = t2.name " ); ``` * the result like !image-2021-07-02-16-29-40-310.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)