Chen Zhang created FLINK-39899:
----------------------------------

             Summary: Flink SQL Window TVF didn't remove rowtime attribute from 
original rowtime field
                 Key: FLINK-39899
                 URL: https://issues.apache.org/jira/browse/FLINK-39899
             Project: Flink
          Issue Type: Bug
            Reporter: Chen Zhang


h2. Summary

Window Table-Valued Functions (TUMBLE/HOP/CUMULATE) do not materialize the 
original rowtime attribute column to a regular {{TIMESTAMP}} type in the output 
schema. Per FLIP-145 specification, the original time attribute should become a 
regular timestamp after applying the window TVF, with only {{window_time}} 
remaining as the rowtime attribute. Instead, both the original column and 
{{window_time}} retain the {{*ROWTIME*}} indicator.

h2. Description

*FLIP-145 states:*
{quote}
The original row time attribute "timecol" will be a regular timestamp column 
after applying window TVF.
{quote}

*Actual behavior:*
After applying {{TUMBLE(TABLE t, DESCRIPTOR(ts), INTERVAL '10' SECOND)}}, the 
output schema shows:
{code}
`window_time` TIMESTAMP(3) NOT NULL *ROWTIME*
`ts`          TIMESTAMP(3) *ROWTIME*            <-- should be regular 
TIMESTAMP(3)
{code}

Both {{ts}} and {{window_time}} are marked as {{*ROWTIME*}}, violating the 
FLIP-145 design.

*Evidence from plan AST:*
{code}
LogicalTableFunctionScan(
  invocation=[TUMBLE(DESCRIPTOR($2), 10000:INTERVAL SECOND)],
  rowType=[RecordType(
    VARCHAR entity_id,
    VARCHAR payload,
    TIMESTAMP(3) *ROWTIME* ts,           <-- STILL ROWTIME
    TIMESTAMP(3) window_start,
    TIMESTAMP(3) window_end,
    TIMESTAMP(3) *ROWTIME* window_time
  )]
)
{code}

h2. Impact

h3. 1. Silent data loss in OVER aggregation after window TVF

Because {{ts}} retains {{*ROWTIME*}}, it passes the time-attribute validation 
in {{StreamExecOverAggregate.translateToPlanInternal()}}. The planner accepts 
queries like:

{code:sql}
SELECT *, COUNT(*) OVER (PARTITION BY id ORDER BY ts ROWS UNBOUNDED PRECEDING)
FROM TABLE(TUMBLE(TABLE source, DESCRIPTOR(ts), INTERVAL '10' SECOND))
{code}

At runtime, the window aggregate operator ({{SlicingWindowOperator}}) registers 
timers at {{window_end - 1}} and forwards intermediate watermarks. When the 
window fires and emits records, records with {{ts}} values early in the window 
are *behind the downstream OVER operator's watermark* and are *silently dropped 
as late*.

Example: For a window {{[12:00:00, 12:00:10)}}:
* Intermediate watermarks (e.g., {{12:00:05}}, {{12:00:08}}) are forwarded to 
the OVER operator
* When the window fires, records with {{ts = 12:00:01}}, {{12:00:03}}, 
{{12:00:06}} are late (behind watermark {{12:00:08}})
* Only records near the end of the window survive
* With {{window_time = 12:00:09.999}}, all records share the same timestamp and 
none are dropped

h3. 2. Multiple rowtime columns in output

Having two rowtime columns causes errors when writing to sinks:
{code}
TableException: The query contains more than one rowtime attribute column
[window_time, ts] for writing into table '*anonymous_datastream_sink*'.
{code}

This was partially worked around in FLINK-24186 by relaxing the check for 
collect/print sinks, but the root cause was never fixed.

h2. Root Cause

The window TVF's output type derivation (in the planner's type inference for 
{{LogicalTableFunctionScan}}) preserves the {{TimeIndicatorRelDataType}} on the 
original time column. It should materialize the original time column to a 
regular {{TIMESTAMP}} type, keeping only {{window_time}} as {{*ROWTIME*}}.

h2. Steps to Reproduce

{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
    EnvironmentSettings.newInstance().inStreamingMode().build());

// Create source with rowtime
tableEnv.executeSql(
    "CREATE TABLE source (" +
    "  id STRING, ts TIMESTAMP(3)," +
    "  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND" +
    ") WITH ('connector' = 'datagen')");

// Query: TUMBLE + OVER using original ts
Table result = tableEnv.sqlQuery(
    "SELECT *, COUNT(*) OVER (PARTITION BY id ORDER BY ts ROWS UNBOUNDED 
PRECEDING) AS cnt " +
    "FROM TABLE(TUMBLE(TABLE source, DESCRIPTOR(ts), INTERVAL '10' SECOND))");

// Inspect schema - ts should NOT be *ROWTIME*
result.printSchema();
// Actual:   `ts` TIMESTAMP(3) *ROWTIME*
// Expected: `ts` TIMESTAMP(3)
{code}

h2. Expected Behavior

After window TVF, the output schema should be:
{code}
`ts`          TIMESTAMP(3)                      <-- regular timestamp, NOT 
rowtime
`window_time` TIMESTAMP(3) NOT NULL *ROWTIME*   <-- sole rowtime attribute
{code}

The OVER aggregation {{ORDER BY ts}} should be *rejected* by the planner 
because {{ts}} is no longer a time attribute.

h2. Related Issues

* FLINK-24186 - Worked around the "multiple rowtime columns" symptom for 
collect/print sinks
* FLINK-38162 - Time attribute propagation issues with SQL functions after 
window TVF
* FLINK-10211 - Broader issue with time indicator materialization
* 
[FLIP-145|https://cwiki.apache.org/confluence/display/FLINK/FLIP-145:+Support+SQL+windowing+table-valued+function]
 - Original design specification for window TVFs



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to