garcia created FLINK-35218: ------------------------------ Summary: Duplicated values caused by expired state TTL Key: FLINK-35218 URL: https://issues.apache.org/jira/browse/FLINK-35218 Project: Flink Issue Type: Bug Reporter: garcia Attachments: image-2024-04-23-15-34-32-860.png
Hi, We utilize the state TTL to clean our Flink input tables through the `table.exec.state.ttl` configuration. However, we encountered an issue when the TTL expires, as illustrated in our scenario: Given this input_table {code:java} { "$schema": "http://json-schema.org/draft-07/schema", "$id": "http://example.com/example.json", "type": "object", "title": "bet placed schema", "required": [ "placement_date" ], "properties": { "bet_id": { "$id": "#/properties/bet_id", "type": "string" }, "regulator": { "$id": "#/properties/regulator", "type": "string" }, "match_id": { "$id": "#/properties/match_id", "type": "integer" }, "combo_id": { "$id": "#/properties/combo_id", "type": "integer" }, "is_live": { "$id": "#/properties/is_live", "type": "boolean" }, "offer_catalog": { "$id": "#/properties/offer_catalog", "type": "string" }, "combo_selection_nbr": { "$id": "#/properties/combo_selection_nbr", "type": "integer" } }, "additionalProperties": true } {code} This configuration: {code:java} import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamExecutionEnvironment} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}val streamEnv = new StreamExecutionEnvironment(JavaStreamExecutionEnvironment.getExecutionEnvironment(conf))val tableEnv = StreamTableEnvironment.create(env) tableEnv.getConfig.getConfiguration .setString("table.local-time-zone", "UTC") tableEnv.getConfig.getConfiguration .setString("table.exec.mini-batch.enabled", "true") tableEnv.getConfig.getConfiguration .setString("table.exec.mini-batch.allow-latency", "5 s") tableEnv.getConfig.getConfiguration .setString("table.exec.mini-batch.size", "5000") tableEnv.getConfig.getConfiguration .setString("table.exec.state.ttl", TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES).toString) {code} And this query (simplified query): {code:java} WITH exploded_combos AS ( select event_timestamp, CAST(JSON_VALUE(combos.combo, '$.match_id') AS BIGINT) as match_id, CAST( JSON_VALUE(combos.combo, '$.combo_selection_id') AS BIGINT ) as combo_id, CAST(JSON_VALUE(combos.combo, '$.is_live') AS BOOLEAN) as is_live, CAST(RegulatorToCatalog(regulator) AS VARCHAR) as offer_catalog, CARDINALITY( JsonToArray(JSON_QUERY(combos.combo, '$.bet_selections')) ) as combo_selections_nbr, combo_bet_selections from bet_placed_view CROSS JOIN UNNEST(JsonToArray(combo_bet_selections)) AS combos(combo) ), agg_match AS ( SELECT match_id, LOWER(offer_catalog) as offer_catalog, MAX(event_timestamp) AS last_event_time_utc, COUNT(*) AS bet_count FROM exploded_combos WHERE match_id IS NOT NULL AND combo_id IS NOT NULL AND offer_catalog IS NOT NULL AND combo_bet_selections IS NOT NULL GROUP BY match_id, LOWER(offer_catalog) ), agg_combo AS ( SELECT match_id, combo_id, combo_selections_nbr, is_live, LOWER(offer_catalog) AS offer_catalog, MAX(event_timestamp) AS last_event_time_utc, COUNT(*) as bet_count FROM exploded_combos WHERE match_id IS NOT NULL AND combo_id IS NOT NULL AND ( combo_selections_nbr = 3 OR combo_selections_nbr = 2 ) AND offer_catalog IS NOT NULL GROUP BY match_id, combo_id, combo_selections_nbr, is_live, LOWER(offer_catalog) ), ranked_filtered_agg_combo_main_page AS ( SELECT match_id, combo_id, offer_catalog, bet_count, ROW_NUMBER() OVER ( PARTITION BY match_id, offer_catalog ORDER BY bet_count DESC, combo_id DESC ) AS rank_combo FROM agg_combo WHERE combo_selections_nbr = 3 ), joined_filtered_agg_match_main_page AS ( SELECT ranked_filtered_agg_combo_main_page.match_id, ranked_filtered_agg_combo_main_page.offer_catalog, ranked_filtered_agg_combo_main_page.bet_count, ranked_filtered_agg_combo_main_page.combo_id, ROW_NUMBER() OVER ( PARTITION BY agg_match.offer_catalog ORDER BY agg_match.bet_count DESC, agg_match.match_id DESC ) AS rank_match FROM agg_match INNER JOIN ranked_filtered_agg_combo_main_page ON ranked_filtered_agg_combo_main_page.match_id = agg_match.match_id AND ranked_filtered_agg_combo_main_page.offer_catalog = agg_match.offer_catalog WHERE ranked_filtered_agg_combo_main_page.rank_combo = 1 ) SELECT partition_key, match_id, offer_catalog, false as live, LAST_VALUE(last_event_utc) AS last_event_utc, LAST_VALUE(last_event_utc) AS max_last_event_utc, LAST_VALUE(top) AS topFROM ( SELECT '<all>' as match_id, offer_catalog, '<all>' || '#openmatch#' || offer_catalog as partition_key, CAST('MAX(very_last_event_time_utc)' AS VARCHAR) AS last_event_utc, '[' || LISTAGG( '{"match_id":' || CAST(match_id AS VARCHAR) || ',"rank":' || CAST(rank_match AS VARCHAR) || ',"count":' || CAST(bet_count AS VARCHAR) || ',"combo_id":"' || CAST(combo_id AS VARCHAR) || '","offer_catalog":"' || CAST(offer_catalog AS VARCHAR) || '","live": ' || '}' ) || ']' AS top FROM joined_filtered_agg_match_main_page WHERE rank_match <= 5 GROUP BY offer_catalog ) GROUP BY partition_key, match_id, offer_catalog{code} As you can see in the result below, when the TTL is reached, we have duplicate values in our output. {code:java} +I[<all>#openmatch#betclic_fr, <all>, betclic_fr, false, MAX(very_last_event_time_utc), MAX(very_last_event_time_utc), [ { "match_id": 2, "rank": 1, "count": 2, "combo_id": "123", "offer_catalog": "betclic_fr", "live": }, { "match_id": 5, "rank": 2, "count": 1, "combo_id": "123", "offer_catalog": "betclic_fr", "live": } ] ] +U[<all>#openmatch#betclic_fr, <all>, betclic_fr, false, MAX(very_last_event_time_utc), MAX(very_last_event_time_utc), [ { "match_id": 5, "rank": 2, "count": 1, "combo_id": "123", "offer_catalog": "betclic_fr", "live": }, { "match_id": 2, "rank": 1, "count": 3, "combo_id": "123", "offer_catalog": "betclic_fr", "live": }, { "match_id": 3, "rank": 3, "count": 2, "combo_id": "123", "offer_catalog": "betclic_fr", "live": }, { "match_id": 1, "rank": 4, "count": 3, "combo_id": "123", "offer_catalog": "betclic_fr", "live": }, { "match_id": 4, "rank": 5, "count": 1, "combo_id": "123", "offer_catalog": "betclic_fr", "live": } ] ] ... // DUPLICATED VALUES (multiple times the same rank): +U[<all>#openmatch#betclic_fr, <all>, betclic_fr, false, MAX(very_last_event_time_utc), MAX(very_last_event_time_utc), [ { "match_id": 3, "rank": 1, "count": 16, "combo_id": "123", "offer_catalog": "betclic_fr", "live": }, { "match_id": 5, "rank": 3, "count": 14, "combo_id": "123", "offer_catalog": "betclic_fr", "live": }, { "match_id": 2, "rank": 2, "count": 15, "combo_id": "123", "offer_catalog": "betclic_fr", "live": }, { "match_id": 4, "rank": 4, "count": 14, "combo_id": "123", "offer_catalog": "betclic_fr", "live": }, { "match_id": 1, "rank": 5, "count": 12, "combo_id": "123", "offer_catalog": "betclic_fr", "live": }, { "match_id": 2, "rank": 2, "count": 15, "combo_id": "123", "offer_catalog": "betclic_fr", "live": }, { "match_id": 1, "rank": 5, "count": 12, "combo_id": "123", "offer_catalog": "betclic_fr", "live": }, { "match_id": 5, "rank": 3, "count": 16, "combo_id": "123", "offer_catalog": "betclic_fr", "live": }, { "match_id": 3, "rank": 1, "count": 18, "combo_id": "123", "offer_catalog": "betclic_fr", "live": }, { "match_id": 4, "rank": 4, "count": 15, "combo_id": "123", "offer_catalog": "betclic_fr", "live": } ] ] ... {code} *Is this a bug or did we misunderstand how to implement the ttl?* Flink is also sending to this warning: !image-2024-04-23-15-34-32-860.png! We actually found 2 workaround: * set a verry large ttl (1year) This will eliminate most of the problems thanks to our business logic * Use sliding window (but if we do that we need window of 50days sliding each minute) We're afraid that these two solutions will be very expensive Can you tell us if this is a normal behavior or if we miss understand something about the TTL? -- This message was sent by Atlassian Jira (v8.20.10#820010)