Chen Zhang created FLINK-39899:
----------------------------------
Summary: Flink SQL Window TVF didn't remove rowtime attribute from
original rowtime field
Key: FLINK-39899
URL: https://issues.apache.org/jira/browse/FLINK-39899
Project: Flink
Issue Type: Bug
Reporter: Chen Zhang
h2. Summary
Window Table-Valued Functions (TUMBLE/HOP/CUMULATE) do not materialize the
original rowtime attribute column to a regular {{TIMESTAMP}} type in the output
schema. Per FLIP-145 specification, the original time attribute should become a
regular timestamp after applying the window TVF, with only {{window_time}}
remaining as the rowtime attribute. Instead, both the original column and
{{window_time}} retain the {{*ROWTIME*}} indicator.
h2. Description
*FLIP-145 states:*
{quote}
The original row time attribute "timecol" will be a regular timestamp column
after applying window TVF.
{quote}
*Actual behavior:*
After applying {{TUMBLE(TABLE t, DESCRIPTOR(ts), INTERVAL '10' SECOND)}}, the
output schema shows:
{code}
`window_time` TIMESTAMP(3) NOT NULL *ROWTIME*
`ts` TIMESTAMP(3) *ROWTIME* <-- should be regular
TIMESTAMP(3)
{code}
Both {{ts}} and {{window_time}} are marked as {{*ROWTIME*}}, violating the
FLIP-145 design.
*Evidence from plan AST:*
{code}
LogicalTableFunctionScan(
invocation=[TUMBLE(DESCRIPTOR($2), 10000:INTERVAL SECOND)],
rowType=[RecordType(
VARCHAR entity_id,
VARCHAR payload,
TIMESTAMP(3) *ROWTIME* ts, <-- STILL ROWTIME
TIMESTAMP(3) window_start,
TIMESTAMP(3) window_end,
TIMESTAMP(3) *ROWTIME* window_time
)]
)
{code}
h2. Impact
h3. 1. Silent data loss in OVER aggregation after window TVF
Because {{ts}} retains {{*ROWTIME*}}, it passes the time-attribute validation
in {{StreamExecOverAggregate.translateToPlanInternal()}}. The planner accepts
queries like:
{code:sql}
SELECT *, COUNT(*) OVER (PARTITION BY id ORDER BY ts ROWS UNBOUNDED PRECEDING)
FROM TABLE(TUMBLE(TABLE source, DESCRIPTOR(ts), INTERVAL '10' SECOND))
{code}
At runtime, the window aggregate operator ({{SlicingWindowOperator}}) registers
timers at {{window_end - 1}} and forwards intermediate watermarks. When the
window fires and emits records, records with {{ts}} values early in the window
are *behind the downstream OVER operator's watermark* and are *silently dropped
as late*.
Example: For a window {{[12:00:00, 12:00:10)}}:
* Intermediate watermarks (e.g., {{12:00:05}}, {{12:00:08}}) are forwarded to
the OVER operator
* When the window fires, records with {{ts = 12:00:01}}, {{12:00:03}},
{{12:00:06}} are late (behind watermark {{12:00:08}})
* Only records near the end of the window survive
* With {{window_time = 12:00:09.999}}, all records share the same timestamp and
none are dropped
h3. 2. Multiple rowtime columns in output
Having two rowtime columns causes errors when writing to sinks:
{code}
TableException: The query contains more than one rowtime attribute column
[window_time, ts] for writing into table '*anonymous_datastream_sink*'.
{code}
This was partially worked around in FLINK-24186 by relaxing the check for
collect/print sinks, but the root cause was never fixed.
h2. Root Cause
The window TVF's output type derivation (in the planner's type inference for
{{LogicalTableFunctionScan}}) preserves the {{TimeIndicatorRelDataType}} on the
original time column. It should materialize the original time column to a
regular {{TIMESTAMP}} type, keeping only {{window_time}} as {{*ROWTIME*}}.
h2. Steps to Reproduce
{code:java}
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.newInstance().inStreamingMode().build());
// Create source with rowtime
tableEnv.executeSql(
"CREATE TABLE source (" +
" id STRING, ts TIMESTAMP(3)," +
" WATERMARK FOR ts AS ts - INTERVAL '1' SECOND" +
") WITH ('connector' = 'datagen')");
// Query: TUMBLE + OVER using original ts
Table result = tableEnv.sqlQuery(
"SELECT *, COUNT(*) OVER (PARTITION BY id ORDER BY ts ROWS UNBOUNDED
PRECEDING) AS cnt " +
"FROM TABLE(TUMBLE(TABLE source, DESCRIPTOR(ts), INTERVAL '10' SECOND))");
// Inspect schema - ts should NOT be *ROWTIME*
result.printSchema();
// Actual: `ts` TIMESTAMP(3) *ROWTIME*
// Expected: `ts` TIMESTAMP(3)
{code}
h2. Expected Behavior
After window TVF, the output schema should be:
{code}
`ts` TIMESTAMP(3) <-- regular timestamp, NOT
rowtime
`window_time` TIMESTAMP(3) NOT NULL *ROWTIME* <-- sole rowtime attribute
{code}
The OVER aggregation {{ORDER BY ts}} should be *rejected* by the planner
because {{ts}} is no longer a time attribute.
h2. Related Issues
* FLINK-24186 - Worked around the "multiple rowtime columns" symptom for
collect/print sinks
* FLINK-38162 - Time attribute propagation issues with SQL functions after
window TVF
* FLINK-10211 - Broader issue with time indicator materialization
*
[FLIP-145|https://cwiki.apache.org/confluence/display/FLINK/FLIP-145:+Support+SQL+windowing+table-valued+function]
- Original design specification for window TVFs
--
This message was sent by Atlassian Jira
(v8.20.10#820010)