garcia created FLINK-35218:
------------------------------

             Summary: Duplicated values caused by expired state TTL   
                 Key: FLINK-35218
                 URL: https://issues.apache.org/jira/browse/FLINK-35218
             Project: Flink
          Issue Type: Bug
            Reporter: garcia
         Attachments: image-2024-04-23-15-34-32-860.png

Hi,

We utilize the state TTL to clean our Flink input tables through the 
`table.exec.state.ttl` configuration. 
However, we encountered an issue when the TTL expires, as illustrated in our 
scenario:

Given this input_table
{code:java}

{
  "$schema": "http://json-schema.org/draft-07/schema";,
  "$id": "http://example.com/example.json";,
  "type": "object",
  "title": "bet placed schema",
  "required": [
    "placement_date"
  ],
  "properties": {
    "bet_id": {
      "$id": "#/properties/bet_id",
      "type": "string"
    },
    "regulator": {
      "$id": "#/properties/regulator",
      "type": "string"
    },
    "match_id": {
      "$id": "#/properties/match_id",
      "type": "integer"
    },
    "combo_id": {
      "$id": "#/properties/combo_id",
      "type": "integer"
    },
    "is_live": {
      "$id": "#/properties/is_live",
      "type": "boolean"
    },
    "offer_catalog": {
      "$id": "#/properties/offer_catalog",
      "type": "string"
    },
    "combo_selection_nbr": {
      "$id": "#/properties/combo_selection_nbr",
      "type": "integer"
    }
  },
  "additionalProperties": true
} {code}
This configuration: 
{code:java}
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment 
=> JavaStreamExecutionEnvironment}
import org.apache.flink.streaming.api.scala.{DataStream, 
StreamExecutionEnvironment}val streamEnv = new 
StreamExecutionEnvironment(JavaStreamExecutionEnvironment.getExecutionEnvironment(conf))val
 tableEnv = StreamTableEnvironment.create(env)
   tableEnv.getConfig.getConfiguration
     .setString("table.local-time-zone", "UTC")
   tableEnv.getConfig.getConfiguration
     .setString("table.exec.mini-batch.enabled", "true")
   tableEnv.getConfig.getConfiguration
     .setString("table.exec.mini-batch.allow-latency", "5 s")
   tableEnv.getConfig.getConfiguration
     .setString("table.exec.mini-batch.size", "5000")
   tableEnv.getConfig.getConfiguration
     .setString("table.exec.state.ttl", TimeUnit.MILLISECONDS.convert(1, 
TimeUnit.MINUTES).toString) {code}
And this query (simplified query): 


{code:java}
WITH exploded_combos AS (
    select
        event_timestamp,
        CAST(JSON_VALUE(combos.combo, '$.match_id') AS BIGINT) as match_id,
        CAST(
            JSON_VALUE(combos.combo, '$.combo_selection_id') AS BIGINT
        ) as combo_id,
        CAST(JSON_VALUE(combos.combo, '$.is_live') AS BOOLEAN) as is_live,
        CAST(RegulatorToCatalog(regulator) AS VARCHAR) as offer_catalog,
        CARDINALITY(
            JsonToArray(JSON_QUERY(combos.combo, '$.bet_selections'))
        ) as combo_selections_nbr,
        combo_bet_selections
    from
        bet_placed_view
        CROSS JOIN UNNEST(JsonToArray(combo_bet_selections)) AS combos(combo)
),
agg_match AS (
    SELECT
        match_id,
        LOWER(offer_catalog) as offer_catalog,
        MAX(event_timestamp) AS last_event_time_utc,
        COUNT(*) AS bet_count
    FROM
        exploded_combos
    WHERE
        match_id IS NOT NULL
        AND combo_id IS NOT NULL
        AND offer_catalog IS NOT NULL
        AND combo_bet_selections IS NOT NULL
    GROUP BY
        match_id,
        LOWER(offer_catalog)
),
agg_combo AS (
    SELECT
        match_id,
        combo_id,
        combo_selections_nbr,
        is_live,
        LOWER(offer_catalog) AS offer_catalog,
        MAX(event_timestamp) AS last_event_time_utc,
        COUNT(*) as bet_count
    FROM
        exploded_combos
    WHERE
        match_id IS NOT NULL
        AND combo_id IS NOT NULL
        AND (
            combo_selections_nbr = 3
            OR combo_selections_nbr = 2
        )
        AND offer_catalog IS NOT NULL
    GROUP BY
        match_id,
        combo_id,
        combo_selections_nbr,
        is_live,
        LOWER(offer_catalog)
),
ranked_filtered_agg_combo_main_page AS (
    SELECT
        match_id,
        combo_id,
        offer_catalog,
        bet_count,
        ROW_NUMBER() OVER (
            PARTITION BY match_id,
            offer_catalog
            ORDER BY
                bet_count DESC,
                combo_id DESC
        ) AS rank_combo
    FROM
        agg_combo
    WHERE
        combo_selections_nbr = 3
),
joined_filtered_agg_match_main_page AS (
    SELECT
        ranked_filtered_agg_combo_main_page.match_id,
        ranked_filtered_agg_combo_main_page.offer_catalog,
        ranked_filtered_agg_combo_main_page.bet_count,
        ranked_filtered_agg_combo_main_page.combo_id,
        ROW_NUMBER() OVER (
            PARTITION BY agg_match.offer_catalog
            ORDER BY
                agg_match.bet_count DESC,
                agg_match.match_id DESC
        ) AS rank_match
    FROM
        agg_match
        INNER JOIN ranked_filtered_agg_combo_main_page ON 
ranked_filtered_agg_combo_main_page.match_id = agg_match.match_id
        AND ranked_filtered_agg_combo_main_page.offer_catalog = 
agg_match.offer_catalog
    WHERE
        ranked_filtered_agg_combo_main_page.rank_combo = 1
)
SELECT
    partition_key,
    match_id,
    offer_catalog,
    false as live,
    LAST_VALUE(last_event_utc) AS last_event_utc,
    LAST_VALUE(last_event_utc) AS max_last_event_utc,
    LAST_VALUE(top) AS topFROM (
        SELECT
            '<all>' as match_id,
            offer_catalog,
            '<all>' || '#openmatch#' || offer_catalog as partition_key,
            CAST('MAX(very_last_event_time_utc)' AS VARCHAR) AS last_event_utc,
            '[' || LISTAGG(
                '{"match_id":' || CAST(match_id AS VARCHAR) || ',"rank":' || 
CAST(rank_match AS VARCHAR) || ',"count":' || CAST(bet_count AS VARCHAR) || 
',"combo_id":"' || CAST(combo_id AS VARCHAR) || '","offer_catalog":"' || 
CAST(offer_catalog AS VARCHAR) || '","live": ' || '}'
            ) || ']' AS top
        FROM
            joined_filtered_agg_match_main_page
        WHERE
            rank_match <= 5
        GROUP BY
            offer_catalog
    )
GROUP BY
    partition_key,
    match_id,
    offer_catalog{code}


As you can see in the result below, when the TTL is reached, we have duplicate 
values in our output.

 

 
{code:java}
+I[<all>#openmatch#betclic_fr, <all>, betclic_fr,
    false, MAX(very_last_event_time_utc), MAX(very_last_event_time_utc),
    [
        {
            "match_id": 2,
            "rank": 1,
            "count": 2,
            "combo_id": "123",
            "offer_catalog": "betclic_fr",
            "live":
        },
        {
            "match_id": 5,
            "rank": 2,
            "count": 1,
            "combo_id": "123",
            "offer_catalog": "betclic_fr",
            "live":
        }
    ]
]
+U[<all>#openmatch#betclic_fr, <all>, betclic_fr,
    false, MAX(very_last_event_time_utc), MAX(very_last_event_time_utc),
    [
        {
            "match_id": 5,
            "rank": 2,
            "count": 1,
            "combo_id": "123",
            "offer_catalog": "betclic_fr",
            "live":
        },
        {
            "match_id": 2,
            "rank": 1,
            "count": 3,
            "combo_id": "123",
            "offer_catalog": "betclic_fr",
            "live":
        },
        {
            "match_id": 3,
            "rank": 3,
            "count": 2,
            "combo_id": "123",
            "offer_catalog": "betclic_fr",
            "live":
        },
        {
            "match_id": 1,
            "rank": 4,
            "count": 3,
            "combo_id": "123",
            "offer_catalog": "betclic_fr",
            "live":
        },
        {
            "match_id": 4,
            "rank": 5,
            "count": 1,
            "combo_id": "123",
            "offer_catalog": "betclic_fr",
            "live":
        }
    ]
]

...

// DUPLICATED VALUES (multiple times the same rank): 
+U[<all>#openmatch#betclic_fr, <all>, betclic_fr,
    false, MAX(very_last_event_time_utc), MAX(very_last_event_time_utc),
    [
        {
            "match_id": 3,
            "rank": 1,
            "count": 16,
            "combo_id": "123",
            "offer_catalog": "betclic_fr",
            "live":
        },
        {
            "match_id": 5,
            "rank": 3,
            "count": 14,
            "combo_id": "123",
            "offer_catalog": "betclic_fr",
            "live":
        },
        {
            "match_id": 2,
            "rank": 2,
            "count": 15,
            "combo_id": "123",
            "offer_catalog": "betclic_fr",
            "live":
        },
        {
            "match_id": 4,
            "rank": 4,
            "count": 14,
            "combo_id": "123",
            "offer_catalog": "betclic_fr",
            "live":
        },
        {
            "match_id": 1,
            "rank": 5,
            "count": 12,
            "combo_id": "123",
            "offer_catalog": "betclic_fr",
            "live":
        },
        {
            "match_id": 2,
            "rank": 2,
            "count": 15,
            "combo_id": "123",
            "offer_catalog": "betclic_fr",
            "live":
        },
        {
            "match_id": 1,
            "rank": 5,
            "count": 12,
            "combo_id": "123",
            "offer_catalog": "betclic_fr",
            "live":
        },
        {
            "match_id": 5,
            "rank": 3,
            "count": 16,
            "combo_id": "123",
            "offer_catalog": "betclic_fr",
            "live":
        },
        {
            "match_id": 3,
            "rank": 1,
            "count": 18,
            "combo_id": "123",
            "offer_catalog": "betclic_fr",
            "live":
        },
        {
            "match_id": 4,
            "rank": 4,
            "count": 15,
            "combo_id": "123",
            "offer_catalog": "betclic_fr",
            "live":
        }
    ]
]
...  {code}
 

*Is this a bug or did we misunderstand how to implement the ttl?*

Flink is also sending to this warning: 

 

!image-2024-04-23-15-34-32-860.png!

 

We actually found 2 workaround: 
* set a verry large ttl (1year) This will eliminate most of the problems thanks 
to our business logic
* Use sliding window (but if we do that we need window of 50days sliding each 
minute)

We're afraid that these two solutions will be very expensive

Can you tell us if this is a normal behavior or if we miss understand something 
about the TTL?

 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to