[ https://issues.apache.org/jira/browse/FLINK-23219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17373337#comment-17373337 ]
waywtdcc commented on FLINK-23219: ---------------------------------- I want to assigner assign this problem to me,i have solved it with update the code.But i Don't know the correctness > temproary join ttl configruation does not take effect > ----------------------------------------------------- > > Key: FLINK-23219 > URL: https://issues.apache.org/jira/browse/FLINK-23219 > Project: Flink > Issue Type: Bug > Components: Table SQL / API, Table SQL / Runtime > Affects Versions: 1.12.2 > Reporter: waywtdcc > Priority: Major > Labels: flink, sql > Attachments: image-2021-07-02-16-29-40-310.png > > > * version: flink 1.12.2 > * problem: I run the job of table A temproary left join table B, and set > the table.exec.state.ttl configuration > to 3 hour or 2 sencond for test. But the task status keeps growing for more > than 7 days. > * code > ``` > tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(2)); > tableEnv.executeSql("drop table if exists persons_table_kafka2"); > String kafka_source_sql = "CREATE TABLE persons_table_kafka2 (\n" + > " `id` BIGINT,\n" + > " `name` STRING,\n" + > " `age` INT,\n" + > " proctime as PROCTIME(),\n" + > " `ts` TIMESTAMP(3),\n" + > " WATERMARK FOR ts AS ts\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'persons_test_auto',\n" + > " 'properties.bootstrap.servers' = 'node2:6667',\n" + > " 'properties.group.id' = 'testGrodsu1765',\n" + > " 'scan.startup.mode' = 'group-offsets',\n" + > " 'format' = 'json'\n" + > ")"; > tableEnv.executeSql(kafka_source_sql); > tableEnv.executeSql("drop table if exists persons_message_table_kafka2"); > String kafka_source_sql2 = "CREATE TABLE persons_message_table_kafka2 (\n" + > " `id` BIGINT,\n" + > " `name` STRING,\n" + > " `message` STRING,\n" + > " `ts` TIMESTAMP(3) ," + > // " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" + > " WATERMARK FOR ts AS ts\n" + > ") WITH (\n" + > " 'connector' = 'kafka',\n" + > " 'topic' = 'persons_extra_message_auto',\n" + > " 'properties.bootstrap.servers' = 'node2:6667',\n" + > " 'properties.group.id' = 'testGroud125313',\n" + > " 'scan.startup.mode' = 'group-offsets',\n" + > " 'format' = 'json'\n" + > ")"; > tableEnv.executeSql(kafka_source_sql2); > tableEnv.executeSql( > "CREATE TEMPORARY VIEW persons_message_table22 AS \n" + > "SELECT id, name, message,ts \n" + > " FROM (\n" + > " SELECT *,\n" + > " ROW_NUMBER() OVER (PARTITION BY name \n" + > " ORDER BY ts DESC) AS rowNum \n" + > " FROM persons_message_table_kafka2 " + > " )\n" + > "WHERE rowNum = 1"); > tableEnv.executeSql("" + > "CREATE TEMPORARY VIEW result_data_view " + > " as " + > " select " + > " t1.name, t1.age,t1.ts as ts1,t2.message, cast (t2.ts as string) as ts2 " + > " from persons_table_kafka2 t1 " + > " left join persons_message_table22 FOR SYSTEM_TIME AS OF t1.ts AS t2 on > t1.name = t2.name " > ); > ``` > * the result like !image-2021-07-02-16-29-40-310.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)