[ 
https://issues.apache.org/jira/browse/FLINK-23219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17377821#comment-17377821
 ] 

Caizhi Weng commented on FLINK-23219:
-------------------------------------

[~waywtdcc] If I'm not mistaken, it is possible for your keys on the right side 
to appear only a few times and will not appear any more, and you want to remove 
them by setting an expiration time?

If it is the case, Flink currently does not have an option to achieve such 
behavior. I'd like to see it as a feature request or an "improvement".

Shall we add an option for this? [~Leonard Xu]

> temproary join ttl configruation does not take effect
> -----------------------------------------------------
>
>                 Key: FLINK-23219
>                 URL: https://issues.apache.org/jira/browse/FLINK-23219
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API, Table SQL / Runtime
>    Affects Versions: 1.12.2
>            Reporter: waywtdcc
>            Priority: Major
>              Labels: flink, pull-request-available, sql
>         Attachments: image-2021-07-02-16-29-40-310.png
>
>
> * version: flink 1.12.2
>  *  problem: I run the job of table A temproary left join table B, and set 
> the table.exec.state.ttl configuration
>  to 3 hour or 2 sencond for test. But the task status keeps growing for more 
> than 7 days.
>  *  code
> {code:java}
> tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(2));
> tableEnv.executeSql("drop table if exists persons_table_kafka2");
>  String kafka_source_sql = "CREATE TABLE persons_table_kafka2 (\n" +
>  " `id` BIGINT,\n" +
>  " `name` STRING,\n" +
>  " `age` INT,\n" +
>  " proctime as PROCTIME(),\n" +
>  " `ts` TIMESTAMP(3),\n" +
>  " WATERMARK FOR ts AS ts\n" +
>  ") WITH (\n" +
>  " 'connector' = 'kafka',\n" +
>  " 'topic' = 'persons_test_auto',\n" +
>  " 'properties.bootstrap.servers' = 'node2:6667',\n" +
>  " 'properties.group.id' = 'testGrodsu1765',\n" +
>  " 'scan.startup.mode' = 'group-offsets',\n" +
>  " 'format' = 'json'\n" +
>  ")";
>  tableEnv.executeSql(kafka_source_sql);
>  tableEnv.executeSql("drop table if exists persons_message_table_kafka2");
>  String kafka_source_sql2 = "CREATE TABLE persons_message_table_kafka2 (\n" +
>  " `id` BIGINT,\n" +
>  " `name` STRING,\n" +
>  " `message` STRING,\n" +
>  " `ts` TIMESTAMP(3) ," +
> // " WATERMARK FOR ts AS ts - INTERVAL '5' SECOND\n" +
>  " WATERMARK FOR ts AS ts\n" +
>  ") WITH (\n" +
>  " 'connector' = 'kafka',\n" +
>  " 'topic' = 'persons_extra_message_auto',\n" +
>  " 'properties.bootstrap.servers' = 'node2:6667',\n" +
>  " 'properties.group.id' = 'testGroud125313',\n" +
>  " 'scan.startup.mode' = 'group-offsets',\n" +
>  " 'format' = 'json'\n" +
>  ")";
>  tableEnv.executeSql(kafka_source_sql2);
>  tableEnv.executeSql(
>  "CREATE TEMPORARY VIEW persons_message_table22 AS \n" +
>  "SELECT id, name, message,ts \n" +
>  " FROM (\n" +
>  " SELECT *,\n" +
>  " ROW_NUMBER() OVER (PARTITION BY name \n" +
>  " ORDER BY ts DESC) AS rowNum \n" +
>  " FROM persons_message_table_kafka2 " +
>  " )\n" +
>  "WHERE rowNum = 1");
>  tableEnv.executeSql("" +
>  "CREATE TEMPORARY VIEW result_data_view " +
>  " as " +
>  " select " +
>  " t1.name, t1.age,t1.ts as ts1,t2.message, cast (t2.ts as string) as ts2 " +
>  " from persons_table_kafka2 t1 " +
>  " left join persons_message_table22 FOR SYSTEM_TIME AS OF t1.ts AS t2 on 
> t1.name = t2.name "
>  );
> Table resultTable = tableEnv.from("result_data_view");
> DataStream<RowData> rowDataDataStream = tableEnv.toAppendStream(resultTable, 
> RowData.class);
> rowDataDataStream.print();
> env.execute("test_it");
> {code}
>  * the result like   !image-2021-07-02-16-29-40-310.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to