[ 
https://issues.apache.org/jira/browse/FLINK-23219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17373337#comment-17373337
 ] 

waywtdcc commented on FLINK-23219:
----------------------------------

I want to assigner assign this problem to me,i have solved it with update the 
code.But i Don't know the correctness

> temproary join ttl configruation does not take effect
> -----------------------------------------------------
>
>                 Key: FLINK-23219
>                 URL: https://issues.apache.org/jira/browse/FLINK-23219
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API, Table SQL / Runtime
>    Affects Versions: 1.12.2
>            Reporter: waywtdcc
>            Priority: Major
>              Labels: flink, sql
>         Attachments: image-2021-07-02-16-29-40-310.png
>
>
> * version: flink 1.12.2
>  *  problem: I run the job of table A temproary left join table B, and set 
> the table.exec.state.ttl configuration
>  to 3 hour or 2 sencond for test. But the task status keeps growing for more 
> than 7 days.
>  *  code
>  ```
> tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(2));
> tableEnv.executeSql("drop table if exists persons_table_kafka2");
>  String kafka_source_sql = "CREATE TABLE persons_table_kafka2 (\n" +
>  " `id` BIGINT,\n" +
>  " `name` STRING,\n" +
>  " `age` INT,\n" +
>  " proctime as PROCTIME(),\n" +
>  " `ts` TIMESTAMP(3),\n" +
>  " WATERMARK FOR ts AS ts\n" +
>  ") WITH (\n" +
>  " 'connector' = 'kafka',\n" +
>  " 'topic' = 'persons_test_auto',\n" +
>  " 'properties.bootstrap.servers' = 'node2:6667',\n" +
>  " 'properties.group.id' = 'testGrodsu1765',\n" +
>  " 'scan.startup.mode' = 'group-offsets',\n" +
>  " 'format' = 'json'\n" +
>  ")";
>  tableEnv.executeSql(kafka_source_sql);
>  tableEnv.executeSql("drop table if exists persons_message_table_kafka2");
>  String kafka_source_sql2 = "CREATE TABLE persons_message_table_kafka2 (\n" +
>  " `id` BIGINT,\n" +
>  " `name` STRING,\n" +
>  " `message` STRING,\n" +
>  " `ts` TIMESTAMP(3) ," +
> // " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" +
>  " WATERMARK FOR ts AS ts\n" +
>  ") WITH (\n" +
>  " 'connector' = 'kafka',\n" +
>  " 'topic' = 'persons_extra_message_auto',\n" +
>  " 'properties.bootstrap.servers' = 'node2:6667',\n" +
>  " 'properties.group.id' = 'testGroud125313',\n" +
>  " 'scan.startup.mode' = 'group-offsets',\n" +
>  " 'format' = 'json'\n" +
>  ")";
>  tableEnv.executeSql(kafka_source_sql2);
>  tableEnv.executeSql(
>  "CREATE TEMPORARY VIEW persons_message_table22 AS \n" +
>  "SELECT id, name, message,ts \n" +
>  " FROM (\n" +
>  " SELECT *,\n" +
>  " ROW_NUMBER() OVER (PARTITION BY name \n" +
>  " ORDER BY ts DESC) AS rowNum \n" +
>  " FROM persons_message_table_kafka2 " +
>  " )\n" +
>  "WHERE rowNum = 1");
>  tableEnv.executeSql("" +
>  "CREATE TEMPORARY VIEW result_data_view " +
>  " as " +
>  " select " +
>  " t1.name, t1.age,t1.ts as ts1,t2.message, cast (t2.ts as string) as ts2 " +
>  " from persons_table_kafka2 t1 " +
>  " left join persons_message_table22 FOR SYSTEM_TIME AS OF t1.ts AS t2 on 
> t1.name = t2.name "
>  );
> ```
>  * the result like  !image-2021-07-02-16-29-40-310.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to