Hi FeatZhang,

Thanks for the update and the revised proposal! I don't have further questions. 
Looking forward to seeing this move forward.



--

    Best!
    Xuyang



在 2026-05-16 13:44:20,"FeatZhang" <[email protected]> 写道:
>Hi Xuyang,
>
>Thanks a lot for the careful follow-ups — these are exactly the right
>questions to ask before we move to a vote. After re-checking the code
>paths you pointed at, I want to revise several statements I made in my
>previous reply, and tighten the FLIP accordingly. I'll go through them
>one by one.
>
>1. Semantics at any position in the pipeline
>=============================================
>Fully agree. To make this concrete, I'll add a new sub-section
>"Semantics at Different Pipeline Positions" to the FLIP, covering:
>
>  - APPLY_WATERMARK on a base table (with or without an existing
>    DDL watermark)
>  - APPLY_WATERMARK on top of a non-materialized view / sub-query
>  - APPLY_WATERMARK applied multiple times in the same query
>  - Interaction with TUMBLE / HOP / SESSION / CUMULATE
>  - Interaction with joins (regular / interval / temporal)
>
>The mental model will be explicitly aligned with the DataStream API:
>each APPLY_WATERMARK in SQL corresponds to one
>`assignTimestampsAndWatermarks(...)` call in the DataStream pipeline,
>applied in the order they appear.
>
>2. Monotonicity validation
>==========================
>You're right, and I want to correct my earlier reply. Today's
>`CREATE TABLE ... WATERMARK FOR ... AS ...` does NOT enforce
>monotonicity at the planner level — the planner only checks that:
>
>  - the rowtime column exists and is of TIMESTAMP / TIMESTAMP_LTZ,
>  - the watermark expression is a valid scalar expression over the
>    table's schema and resolves to a comparable type.
>
>Monotonicity is a runtime contract: the WatermarkAssignerOperator
>emits watermarks that are guaranteed to be non-decreasing.
>
>APPLY_WATERMARK will follow exactly the same contract — no stricter
>planner-level monotonicity check. I'll update the FLIP's "Planner
>Changes → Validation" section to reflect this.
>
>3. Override timing — clarification
>==================================
>I think this was a wording issue on my side rather than a real
>design disagreement. Let me restate it:
>
>  - APPLY_WATERMARK introduces a dedicated WatermarkAssigner node in
>    the plan. Whether the input already carries a watermark or not,
>    the plan ends up with the new assigner positioned downstream of
>    the existing one.
>  - At runtime there is no "merge" or "reconciliation": each
>    WatermarkAssigner operator independently emits its own watermark
>    stream; downstream operators simply observe the watermark from
>    the most recent upstream assigner.
>
>This is the same model as calling `assignTimestampsAndWatermarks()`
>twice in DataStream — the second call wins because it sits later in
>the operator chain, not because of any planner-level magic.
>
>So "planner-level override" was a poor choice of words. The correct
>description is: **the planner decides the operator topology; the
>runtime emits watermarks according to that topology**, exactly like
>DataStream. I'll rephrase the FLIP accordingly and drop the
>"override" terminology.
>
>4. "Watermark expression evaluation: needs to support arbitrary
>   expressions"
>====================================================================
>Apologies, this statement was inaccurate. After re-checking,
>StreamExecWatermarkAssigner already evaluates the watermark
>expression through the standard `ExprCodeGenerator`, which supports
>the same scalar expressions as DDL today (arithmetic on TIMESTAMP /
>TIMESTAMP_LTZ, INTERVAL arithmetic, scalar UDFs, etc.).
>
>What APPLY_WATERMARK actually needs from the ExecNode is:
>
>  - resolving the rowtime column index from the DESCRIPTOR, since the
>    input may be a non-base-table (view / sub-query / projected
>    relation),
>  - wiring the watermark expression's input row to the upstream
>    operator's output row instead of a TableScan output.
>
>No new expression capability is required. I'll fix this in the FLIP.
>
>5. State management
>===================
>You're right, this should be removed. In the scope of this FLIP the
>APPLY_WATERMARK ExecNode is **stateless**, identical to the existing
>`WatermarkAssignerOperator`. It does not buffer rows and does not
>evict late data; late-data handling remains the responsibility of
>the downstream window / join operators, exactly as it works today.
>
>The "state management" bullet in my previous reply was speculation
>about future watermark strategies (idle source detection, etc.) and
>does not belong in this FLIP. I'll drop it.
>
>------------------------------------------------------------
>Updated summary of the design after this round
>------------------------------------------------------------
>
>  - Scope: base tables, non-materialized views, sub-queries; any
>    relation position in the query.
>  - Semantics: aligned with DataStream API
>    `assignTimestampsAndWatermarks()`; multiple applications in the
>    same query are positional, not "overriding".
>  - Validation: same contract as today's DDL — scalar expression on
>    a TIMESTAMP / TIMESTAMP_LTZ rowtime column; no planner-level
>    monotonicity check.
>  - ExecNode: stateless, reuses StreamExecWatermarkAssigner with
>    minor wiring changes (rowtime column resolution + non-TableScan
>    input handling).
>  - Out of scope: WatermarkFunction interface, idle-source state,
>    runtime-level merge of multiple watermark strategies.
>
>I'll update the FLIP document and PR #27984 to match the points
>above, and post a diff summary here once it's done.
>
>Thanks again for pushing on these — the FLIP is much cleaner after
>this round.
>
>Best regards,
>FeatZhang
>
>On Mon, May 11, 2026 at 10:45 AM Xuyang <[email protected]> wrote:
>
>> Hi FeatZhang. Thanks for the detailed responses. I have a few follow-up
>> comments and questions:
>>
>>
>> 1. Support for APPLY_WATERMARK at any node/position
>> I generally agree with the direction that APPLY_WATERMARK should be
>> applicable at any node and any position — one or more times — similar to
>> how the DataStream API allows watermark assignment. However, I think we
>> need to clearly articulate the precise behavior/semantics in each scenario
>> to reduce user confusion. Aligning the mental model with the DataStream API
>> (where users can call assignTimestampsAndWatermarks() at arbitrary points
>> in the pipeline) would also help lower the learning curve.
>> 2. Monotonicity validation of watermark expressions
>> Why do we need to enforce monotonicity guarantees on the watermark
>> expression at the planner level? As far as I know, Flink SQL currently does
>> NOT perform such validation at the DDL level for CREATE TABLE ... WATERMARK
>> FOR ... AS .... What it actually does is ensure at runtime that emitted
>> watermarks are non-decreasing. If the existing DDL path does not validate
>> monotonicity at planning time, why should APPLY_WATERMARK introduce a
>> stricter contract?
>> 3. Planner-level watermark override
>> Could you elaborate more on why the watermark override must happen at the
>> planner level? If I understand correctly, in the DataStream API, users can
>> define different watermark strategies at different nodes in the same
>> pipeline, and the runtime handles watermark propagation naturally.
>> 4. "Watermark expression evaluation: Needs to support arbitrary
>> expressions"
>> You mentioned that StreamExecWatermarkAssigner currently has limitations
>> in watermark expression evaluation and "needs to support arbitrary
>> expressions." Could you clarify what the current limitations are exactly?
>> Today's CREATE TABLE ... WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
>> already supports scalar expressions, what additional expression types does
>> APPLY_WATERMARK require that are not already supported?
>> 5. State management
>> You mentioned "State management". Are we proposing to introduce a stateful
>> watermark assigner node that evicts late data? This sounds like a
>> significant change that goes well beyond the scope of this FLIP. The
>> current WatermarkAssigner is stateless, it simply computes and emits
>> watermarks without buffering data.
>> Looking forward to your clarification!
>>
>>
>>
>>
>>
>> --
>>
>>     Best!
>>     Xuyang
>>
>>
>>
>> At 2026-05-09 19:08:32, "熊饶饶" <[email protected]> wrote:
>> >Hi Feat Zhang,
>> >
>> >+1 (non-binding)
>> >
>> >This is a well-motivated proposal that addresses a real pain point in
>> Flink SQL. The inability to define watermarks in views currently forces
>> users to either:
>> >
>> >Reference underlying table watermarks directly, breaking encapsulation
>> >Create separate physical tables for each watermark strategy, leading to
>> data duplication
>> >The proposed syntax CREATE VIEW ... WATERMARK FOR col AS expr is
>> intuitive and aligns naturally with existing DDL watermark semantics. The
>> backward-compatible design (storing watermark metadata in catalog options)
>> is also a smart choice — it avoids breaking existing views while enabling
>> the new capability.
>> >
>> >The Data Lakehouse / medallion architecture use case is particularly
>> compelling. Being able to define watermark strategies at the Silver/Gold
>> layer while keeping Bronze as raw data would significantly simplify
>> pipeline design for many teams.
>> >
>> >Looking forward to seeing this move to a formal FLIP!
>> >
>> >Best regards,
>> >Raorao Xiong
>> >
>> >> 2026年5月7日 20:56,FeatZhang <[email protected]> 写道:
>> >>
>> >> Hi Xuyang,
>> >>
>> >> Thank you for the thorough review and thoughtful questions.
>> >>
>> >> *Problem this FLIP aims to solve*: The core goal of this FLIP is to
>> allow
>> >> watermarks to be defined on *computed columns (and, more generally, on
>> any
>> >> column produced inside a SQL query) directly in SQL statements*. Today,
>> >> watermarks in Flink SQL can only be declared at the CREATE TABLE level
>> >> via WATERMARK
>> >> FOR ... AS ..., which means the time attribute must be a column visible
>> at
>> >> the base table DDL. This makes it impossible to attach a watermark to a
>> >> timestamp derived inside a query — for example, one computed by string
>> >> parsing, JSON extraction, or any expression inside a view or subquery —
>> >> without pushing that computation back down into the source table DDL. By
>> >> introducing APPLY_WATERMARK as an explicit relational operator that can
>> be
>> >> applied to *base tables, non-materialized views, and subqueries*, users
>> can
>> >> assign watermark semantics to any (computed) column produced by a query,
>> >> which also addresses the broader motivations listed in the FLIP: broken
>> >> layered-pipeline abstractions, lack of per-query / multi-tenant
>> watermark
>> >> strategies, and the current gap between SQL and the DataStream API.
>> >>
>> >> Now let me address each of your points:
>> >> Support for Non-materialized Views
>> >>
>> >> You raised an excellent point about the scope of APPLY_WATERMARK in
>> layered
>> >> architectures.
>> >>
>> >> *My position*: APPLY_WATERMARK should support base tables,
>> non-materialized
>> >> views, *and subqueries* (this matches the Goals section of the FLIP).
>> >> Here's why:
>> >>
>> >>   - Non-materialized views dissolve into the surrounding plan during
>> >>   optimization (inline expansion)
>> >>   - There's no physical "view" node in the execution plan—just a logical
>> >>   alias
>> >>   - The watermark becomes a relational transformation applied on top of
>> >>   the view's / subquery's output
>> >>
>> >> The key design principle is: *watermark definition is an explicit
>> >> relational operator, not attached metadata*.
>> >>
>> >> I'd also like to clarify the positions in the prior thread to avoid
>> >> confusion:
>> >>
>> >>   - *Lincoln* originally proposed APPLY_WATERMARK(table,
>> DESCRIPTOR(col),
>> >>   expr) scoped to base tables only.
>> >>   - *Timo* raised the concern about blurring the view abstraction ("*A
>> >>   view usually dissolves into the plan … would a watermark definition
>> >>   suddenly introduce an optimization barrier? If this is an optimization
>> >>   barrier, is this still a view or a new concept?*"). This is exactly
>> why
>> >>   this FLIP does *not* attach watermarks to views via CREATE VIEW /
>> ALTER
>> >>   VIEW, and keeps views as pure logical aliases.
>> >>   - *Gyula* emphasized that watermark assignment should be available on
>> >>   views and subqueries too, consistent with the DataStream API.
>> >>
>> >> To address Timo's concern concretely:
>> >>
>> >>   - Watermark semantics are applied at query planning time via an
>> explicit
>> >>   relational operator (APPLY_WATERMARK), not hidden in view/catalog
>> metadata.
>> >>   - No watermark information is persisted into the catalog for views —
>> the
>> >>   catalog stays unchanged (see FLIP "Catalog Changes": *No catalog
>> changes
>> >>   are required*).
>> >>   - Views continue to dissolve transparently into the plan; the
>> >>   optimization barrier only appears where APPLY_WATERMARK is explicitly
>> used.
>> >>
>> >> Monotonicity Validation
>> >>
>> >> Great question! Monotonicity guarantees are essential for watermark
>> >> correctness:
>> >>
>> >>   - Watermarks define the boundary of "late" data
>> >>   - If the watermark expression is not monotonically non-decreasing, the
>> >>   watermark could move backward
>> >>   - This would cause data that was previously considered "on-time" to be
>> >>   treated as late (or vice-versa), breaking event-time semantics
>> >>
>> >> *Validation requirement*: In line with the FLIP's Planner Changes
>> section,
>> >> the planner validates that watermark_expression is a valid *scalar*
>> expression
>> >> over columns of the input schema, and (as with today's CREATE TABLE ...
>> >> WATERMARK FOR ... AS ...) the expression must produce a monotonically
>> >> non-decreasing value relative to the designated rowtime column.
>> >>
>> >> Typical valid forms are the same as what's allowed in DDL today, for
>> >> example:
>> >>
>> >> -- Bounded out-of-orderness (most common)
>> >> APPLY_WATERMARK(t, DESCRIPTOR(ts), ts - INTERVAL '5' SECOND)
>> >> -- Strictly ascending
>> >> APPLY_WATERMARK(t, DESCRIPTOR(ts), ts)
>> >>
>> >> Note: watermark_expression is a *scalar expression* per the FLIP (not an
>> >> aggregate / window function). Richer forms such as user-defined
>> watermark
>> >> strategies are explicitly out of scope for this FLIP and are tracked as
>> a
>> >> future WatermarkFunction interface, which also depends on the Calcite
>> >> lambda upgrade mentioned by Timo.
>> >> Override Timing (Planner vs Runtime)
>> >>
>> >> You raised a valid concern. Let me clarify the design, which aligns with
>> >> the FLIP's "Planner Changes → Interaction with Existing Table
>> Watermarks"
>> >> section:
>> >>
>> >> *Current proposal*: Planner-level override
>> >>
>> >>   - During query compilation, when the input to APPLY_WATERMARK already
>> >>   carries a watermark (e.g., from CREATE TABLE ... WATERMARK), the
>> >>   LogicalWatermarkAssigner node produced by APPLY_WATERMARK *overrides*
>> the
>> >>   upstream watermark strategy.
>> >>   - When the input has no watermark (e.g., a view or a subquery),
>> >>   APPLY_WATERMARK introduces a new one.
>> >>   - This makes override behavior explicit in the plan and keeps room for
>> >>   standard optimizations.
>> >>
>> >> *Why not runtime-level override*:
>> >>
>> >>   1. Planner-level override keeps watermark semantics a first-class,
>> >>   visible part of the plan (consistent with how VECTOR_SEARCH /
>> ML_PREDICT
>> >>   are modeled as specialized ExecNodes in the FLIP).
>> >>   2. The override point is deterministic and inspectable via EXPLAIN.
>> >>   3. Simpler execution model — no dual-watermark reconciliation at
>> runtime.
>> >>
>> >> If concrete use cases for a runtime-level override surface later, we
>> could
>> >> revisit this via a hint (e.g., /*+ RUNTIME_OVERRIDE */), but it's not
>> part
>> >> of this FLIP.
>> >> Relationship with BuiltInProcessTableFunction
>> >>
>> >> Good observation. APPLY_WATERMARK is declared as a built-in PTF at the
>> SQL
>> >> surface, similar in spirit to TO_CHANGELOG / FROM_CHANGELOG, but the
>> FLIP
>> >> intentionally maps it to a *specialized ExecNode* rather than a generic
>> PTF
>> >> runtime — the same pattern used by VECTOR_SEARCH and ML_PREDICT.
>> >>
>> >> *Option A: Reuse the generic BuiltInProcessTableFunction runtime*
>> >>
>> >>   - Pros: Consistent with other built-in PTFs at the runtime layer.
>> >>   - Cons: Watermark assignment is not a row-transforming PTF — it
>> changes
>> >>   stream metadata (time attribute + watermark strategy). Forcing it
>> through
>> >>   the generic PTF runtime would require extending the PTF contract with
>> >>   watermark semantics.
>> >>
>> >> *Option B: Dedicated LogicalWatermarkAssigner + specialized ExecNode
>> (the
>> >> FLIP's choice)*
>> >>
>> >>   - Pros: Keeps watermark semantics a first-class citizen in the
>> planner;
>> >>   cleanly integrates with watermark propagation rules; no need to
>> overload
>> >>   the PTF contract; same pattern as VECTOR_SEARCH / ML_PREDICT already
>> >>   established in Flink.
>> >>   - Cons: A new dedicated node, though that cost is small compared to
>> the
>> >>   semantic clarity.
>> >>
>> >> *Current decision*: Option B, as stated in the FLIP ("*APPLY_WATERMARK
>> >> compiles to a specialized ExecNode --- similar to how VECTOR_SEARCH and
>> >> ML_PREDICT are handled*"). Open to revisiting based on community
>> feedback.
>> >> StreamExecWatermarkAssigner Sufficiency
>> >>
>> >> For the physical implementation:
>> >>
>> >> *Yes, StreamExecWatermarkAssigner should be sufficient*, with some
>> >> modifications:
>> >>
>> >>   1. *Input handling*: Currently assumes direct table scan; needs to
>> >>   handle APPLY_WATERMARK's column mapping
>> >>   2. *Watermark expression evaluation*: Needs to support arbitrary
>> >>   expressions (currently limited)
>> >>   3. *State management*: May need additional state for handling
>> >>   out-of-order events
>> >>
>> >> The key insight is that APPLY_WATERMARK conceptually translates to:
>> >>
>> >> TableScan -> Calc (expression evaluation) -> WatermarkAssigner
>> >>
>> >> StreamExecWatermarkAssigner handles the last step; the Calc step handles
>> >> the expression.
>> >> ------------------------------
>> >> Summary of Proposed Responses
>> >>
>> >>   - *Scope of input*: Support base tables, non-materialized views *and
>> >>   subqueries* (per FLIP Goals); views/catalog semantics stay unchanged.
>> >>   - *Monotonicity*: Validate watermark_expression as a scalar
>> expression;
>> >>   same monotonicity contract as today's DDL watermarks.
>> >>   - *Override timing*: Planner-level override at
>> LogicalWatermarkAssigner;
>> >>   potential /*+ RUNTIME_OVERRIDE */ hint as future work.
>> >>   - *PTF reuse*: Dedicated LogicalWatermarkAssigner + specialized
>> ExecNode
>> >>   (same pattern as VECTOR_SEARCH / ML_PREDICT).
>> >>   - *ExecNode sufficiency*: StreamExecWatermarkAssigner is sufficient
>> with
>> >>   minor modifications (input handling + expression evaluation).
>> >>
>> >> ------------------------------
>> >>
>> >> Looking forward to further discussion!
>> >>
>> >> Best regards,
>> >> FeatZhang
>> >>
>> >>
>> >> On Wed, May 6, 2026 at 4:39 PM Xuyang <[email protected]> wrote:
>> >>
>> >>> Hi, FeatZhang. Thanks for driving this discussion. I've read through
>> the
>> >>> full FLIP and the mailing list context, and I have a few questions:
>> >>> 1. If I understand correctly, in a Layered Data Architecture,
>> >>> silver_events would typically be a table, a materialized view, or a
>> >>> materialized table. From the mailing list discussion, it seems like no
>> >>> consensus was reached on this point. I think we still need to consider
>> >>> whether APPLY_WATERMARK should be allowed on (non-materialized) views.
>> >>> 2. In the Planner Changes section under Logical Plan, could you
>> elaborate
>> >>> on why monotonicity guarantees need to be ensured for the watermark
>> >>> expression validation?
>> >>> 3. (nit) In the Watermark Override part under Planner Changes,
>> shouldn't
>> >>> the override of the upstream watermark happen at runtime rather than
>> at the
>> >>> planner level?
>> >>> 4. I feel that APPLY_WATERMARK is quite similar to TO_CHANGELOG and
>> >>> FROM_CHANGELOG. Is what we actually need just a
>> >>> BuiltInProcessTableFunction? That way, we would only need to further
>> extend
>> >>> ProcessTableFunction to support this.
>> >>> 5. If we choose to translate APPLY_WATERMARK into a specialized
>> ExecNode
>> >>> (similar to VECTOR_SEARCH and ML_PREDICT), would the existing
>> >>> StreamExecWatermarkAssigner be sufficient for this purpose?
>> >>>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>>
>> >>>    Best!
>> >>>    Xuyang
>> >>>
>> >>>
>> >>>
>> >>> At 2026-04-21 22:22:33, "FeatZhang" <[email protected]> wrote:
>> >>>> Hi everyone,
>> >>>>
>> >>>> Thank you for the feedback and discussions on the initial proposal.
>> I've
>> >>>> revised the FLIP based on the community's input and would like to
>> share
>> >>>> the updated version.
>> >>>>
>> >>>>
>> >>>> FLIP-XXX: Support Flexible Watermark Assignment via Built-in Function
>> >>>> <
>> >>>
>> https://drive.google.com/open?id=17PXYAi6Pb91OqFhVVK7tRULiaHAb6wiX79jjC0NaaDA
>> >>>>
>> >>>>
>> >>>>
>> >>>> KEY UPDATES
>> >>>> ===========
>> >>>>
>> >>>> The proposal has evolved from "Support Watermark Definition in SQL
>> Views"
>> >>>> to a more flexible and powerful approach: FLIP-XXX: Support Flexible
>> >>>> Watermark Assignment via APPLY_WATERMARK Function.
>> >>>>
>> >>>> What's Changed:
>> >>>>
>> >>>> 1. Broader Scope: Instead of limiting watermark definitions to SQL
>> views
>> >>>>  only, the new proposal introduces a built-in table function
>> >>>>  APPLY_WATERMARK that works with:
>> >>>>  - Base tables
>> >>>>  - Views (both regular and materialized)
>> >>>>  - Subqueries
>> >>>>  - Any table-valued expressions
>> >>>>
>> >>>> 2. More Flexible Design: The function-based approach provides:
>> >>>>  - Dynamic watermark assignment at query time without modifying
>> catalog
>> >>>>    metadata
>> >>>>  - Override capability for existing watermark strategies
>> >>>>  - Composability with other SQL operations
>> >>>>  - No need for DDL changes or catalog write permissions
>> >>>>
>> >>>> 3. Better SQL Semantics: Using a table function follows SQL standard
>> >>>>  patterns and integrates naturally with Flink's existing function
>> >>>>  ecosystem.
>> >>>>
>> >>>> UPDATED FLIP DOCUMENT
>> >>>> =====================
>> >>>>
>> >>>> The revised FLIP is now available at:
>> >>>> https://iwiki.woa.com/p/4019879693
>> >>>>
>> >>>> Key sections include:
>> >>>> - Motivation and use cases
>> >>>> - Public interfaces and SQL syntax
>> >>>> - Implementation plan
>> >>>> - Compatibility analysis
>> >>>> - Test plan
>> >>>>
>> >>>> EXAMPLE USAGE
>> >>>> =============
>> >>>>
>> >>>> -- Apply watermark to a view
>> >>>> SELECT *FROM APPLY_WATERMARK(my_view, DESCRIPTOR(event_time),
>> >>>> event_time - INTERVAL '5' SECOND);
>> >>>> -- Override existing watermark strategy
>> >>>> SELECT *FROM APPLY_WATERMARK(my_table_with_watermark, DESCRIPTOR(ts),
>> >>>> ts - INTERVAL '10' SECOND -- Different from DDL watermark
>> >>>> );
>> >>>> -- Use in complex queries
>> >>>> SELECT window_start,
>> >>>>      window_end,
>> >>>>      COUNT(*)FROM TABLE(TUMBLE(TABLE APPLY_WATERMARK(orders,
>> >>>> DESCRIPTOR(order_time), order_time - INTERVAL '5' SECOND),
>> >>>> DESCRIPTOR(order_time), INTERVAL '1' HOUR))GROUP BY window_start,
>> >>>>        window_end;
>> >>>>
>> >>>>
>> >>>> IMPLEMENTATION PROGRESS
>> >>>> =======================
>> >>>>
>> >>>> I've also opened a draft PR #27984 with the initial implementation:
>> >>>> - Core built-in function definition
>> >>>> - SQL-to-RelNode conversion rules
>> >>>> - Physical plan integration
>> >>>> - Unit tests and documentation (English + Chinese)
>> >>>>
>> >>>> The PR is available at:
>> >>>> https://github.com/apache/flink/pull/27984
>> >>>>
>> >>>> REQUEST FOR FEEDBACK
>> >>>> ====================
>> >>>>
>> >>>> I would appreciate your thoughts on:
>> >>>>
>> >>>> 1. Function naming: Is APPLY_WATERMARK clear and intuitive?
>> >>>>  (Alternative considered: WITH_WATERMARK, SET_WATERMARK)
>> >>>>
>> >>>> 2. DESCRIPTOR syntax: Using DESCRIPTOR(column_name) to specify the
>> >>>>  rowtime column—does this align well with Flink's existing patterns?
>> >>>>
>> >>>> 3. Override behavior: Should APPLY_WATERMARK always override existing
>> >>>>  watermarks, or should we provide a mode parameter
>> >>>>  (e.g., OVERRIDE, MERGE)?
>> >>>>
>> >>>> 4. Performance considerations: Any concerns about the function-based
>> >>>>  approach vs. catalog-level watermark definitions?
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>>
>> >>>> Looking forward to your valuable feedback!
>> >>>>
>> >>>> Best regards,
>> >>>> FeatZhang
>> >>>>
>> >>>> On Thu, Feb 12, 2026 at 6:24 PM Timo Walther <[email protected]>
>> wrote:
>> >>>>
>> >>>>> Hi everyone,
>> >>>>>
>> >>>>> I think we all agree that we clearly want this functionality, just
>> the
>> >>>>> "how" needs to be discussed. I also like Lincoln’s suggestion of
>> >>>>> introducing a built-in PTF for this, I had similar ideas in mind.
>> >>>>>
>> >>>>> There are two issues with a APPLY_WATERMARK function, but both on the
>> >>>>> short-term roadmap:
>> >>>>>
>> >>>>> 1) This function would need to be a function that takes an
>> expression.
>> >>>>> Ideally as a lambda function. Newer Calcite versions have already
>> lambda
>> >>>>> expression support. At Confluent we were planning to work on a
>> Calcite
>> >>>>> upgrade this quarter especially to get lambda support in and improve
>> >>>>> built-in functions that work on collections.
>> >>>>>
>> >>>>> 2) User-defined PTFs are currently not able to emit watermarks. We
>> could
>> >>>>> introduce a new interface WatermarkFunction (similar to
>> >>>>> ChangelogFunction) that would offer this to everyone. Alternatively,
>> we
>> >>>>> could only use the PTF signature, but translate to a specialized
>> >>>>> ExecNode similar how we do it for VECTOR_SEARCH and ML_PREDICT.
>> >>>>>
>> >>>>> In any case, even if we go with the function approach, we definitely
>> >>>>> need a full FLIP on this.
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Timo
>> >>>>>
>> >>>>> On 12.02.26 08:25, Gyula Fóra wrote:
>> >>>>>> Hi All!
>> >>>>>> I would like to chime in here quickly from a slightly different
>> angle.
>> >>>>>> While I am the first to admit that I cannot grasp all the planning /
>> >>>>>> conceptual implications, I also feel the need for more flexible
>> >>> watermark
>> >>>>>> handling as suggested by Feat.
>> >>>>>>
>> >>>>>> Anything that can only be applied to base/catalog tables is very
>> >>> limiting
>> >>>>>> from a usability perspective. Watermarks feel like they should be a
>> >>>>> simple
>> >>>>>> function that you can apply on a column/table as part of a
>> query/view.
>> >>>>> For
>> >>>>>> example extract timestamp from a string convert to TS -> apply
>> >>> watermark
>> >>>>>> etc.
>> >>>>>>
>> >>>>>> Users often receive the tables/catalogs as given and can only write
>> >>>>>> queries.
>> >>>>>>
>> >>>>>> Fixing this would eliminate a long standing disconnect between the
>> >>>>>> datastream api flexible watermark handling compared to the currently
>> >>> very
>> >>>>>> restrictive SQL approach.
>> >>>>>>
>> >>>>>> Cheers
>> >>>>>> Gyula
>> >>>>>>
>> >>>>>> On Thu, Feb 12, 2026 at 7:54 AM FeatZhang <[email protected]>
>> >>> wrote:
>> >>>>>>
>> >>>>>>> Hi Timo, Lincoln,
>> >>>>>>>
>> >>>>>>> Thank you both for the detailed feedback.
>> >>>>>>>
>> >>>>>>> I agree with the concern that non-materialized SQL views should
>> >>> remain a
>> >>>>>>> pure logical abstraction. Introducing watermark definitions
>> directly
>> >>>>>>> into CREATE
>> >>>>>>> VIEW or ALTER VIEW could blur the boundary between logical aliasing
>> >>> and
>> >>>>>>> physical planning semantics, especially considering optimization
>> >>>>> barriers
>> >>>>>>> and watermark propagation behavior.
>> >>>>>>>
>> >>>>>>> Lincoln’s suggestion of introducing a built-in function such as:
>> >>>>>>>
>> >>>>>>> APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column),
>> >>> watermark_expression)
>> >>>>>>>
>> >>>>>>> is a cleaner direction. It keeps watermark definition as an
>> explicit
>> >>>>>>> relational transformation rather than attaching additional
>> semantics
>> >>> to
>> >>>>>>> views.
>> >>>>>>>
>> >>>>>>> However, to fully address the original use cases (especially
>> logical
>> >>>>> reuse
>> >>>>>>> and layered lakehouse architectures), I propose that the table
>> >>> parameter
>> >>>>>>> should support:
>> >>>>>>>
>> >>>>>>>    - Base tables
>> >>>>>>>    - Non-materialized views
>> >>>>>>>
>> >>>>>>> If APPLY_WATERMARK can accept both, we can:
>> >>>>>>>
>> >>>>>>>    - Preserve the conceptual purity of SQL views
>> >>>>>>>    - Avoid redefining view semantics
>> >>>>>>>    - Still enable logical reuse via views
>> >>>>>>>    - Allow different watermark strategies over the same logical
>> >>>>> relation
>> >>>>>>>
>> >>>>>>> In other words, watermark definition becomes an explicit relational
>> >>>>>>> operator applied on top of any logical relation, instead of being
>> >>>>> embedded
>> >>>>>>> into the view definition itself.
>> >>>>>>>
>> >>>>>>> From a planner perspective, this keeps the model consistent:
>> >>>>>>>
>> >>>>>>>    - The function expands into a relational node
>> >>>>>>>    - No optimization barrier is introduced by views
>> >>>>>>>    - Watermark handling remains part of the logical plan
>> >>> transformation
>> >>>>>>>
>> >>>>>>> I will prepare a PR to prototype APPLY_WATERMARK with support for
>> >>> both
>> >>>>> base
>> >>>>>>> tables and non-materialized views, and share it for further
>> >>> discussion.
>> >>>>>>>
>> >>>>>>> Looking forward to your thoughts.
>> >>>>>>>
>> >>>>>>> Best,
>> >>>>>>> Feat
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> Lincoln Lee <[email protected]> 于2026年2月12日周四 12:19写道:
>> >>>>>>>
>> >>>>>>>> Agree with Timo’s point regarding the conceptual semantics. We
>> >>> should
>> >>>>> not
>> >>>>>>>> directly extend non-materialized views with additional watermark
>> >>>>>>>> definitions.
>> >>>>>>>>
>> >>>>>>>> Regarding the use case mentioned by Feat, defining different
>> >>> watermark
>> >>>>>>>> strategies
>> >>>>>>>> for the same data source, especially in the case of catalog
>> tables,
>> >>> we
>> >>>>>>> are
>> >>>>>>>> exploring a possible solution introducing a built-in function:
>> >>>>>>>> ```sql
>> >>>>>>>> APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column),
>> >>>>> watermark_expression)
>> >>>>>>>> ```
>> >>>>>>>> This function only support base tables as input and not support
>> >>> views,
>> >>>>>>>> subqueries
>> >>>>>>>> or derived relations.
>> >>>>>>>>
>> >>>>>>>> This would address a meaningful subset of the identified use cases
>> >>>>>>> without
>> >>>>>>>> redefining the role of SQL views.
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Best,
>> >>>>>>>> Lincoln Lee
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Timo Walther <[email protected]> 于2026年2月11日周三 23:29写道:
>> >>>>>>>>
>> >>>>>>>>> Hi Feat,
>> >>>>>>>>>
>> >>>>>>>>> thanks for proposing this FLIP. We had similar discussions in the
>> >>>>> past,
>> >>>>>>>>> but so far could never reach consensus.
>> >>>>>>>>>
>> >>>>>>>>> SQL views are actually a very simple concept, they just give SQL
>> >>> text
>> >>>>>>> an
>> >>>>>>>>> alias. A view has no other properties except for the view
>> >>> definition.
>> >>>>>>>>> Everything else is dynamically computed when the SQL text is
>> >>> inserted
>> >>>>>>>>> into the larger plan. A view is never evaluated without the
>> >>>>> surrounding
>> >>>>>>>>> plan.
>> >>>>>>>>>
>> >>>>>>>>> Watermarks in the middle of a pipeline raise a couple of tricky
>> >>>>> issues:
>> >>>>>>>>>
>> >>>>>>>>> - What if the upstream table is updating, how would you deal with
>> >>>>>>>>> watermarks in the downstream view?
>> >>>>>>>>> - What if the upstream table emits already watermarks? Would the
>> >>> view
>> >>>>>>>>> catch them and discard this information?
>> >>>>>>>>> - A view usually dissolves into the plan (e.g. via projection or
>> >>>>> filter
>> >>>>>>>>> pushdown). Would a watermark definition suddenly introduce an
>> >>>>>>>>> optimization barrier? If this is an optimization barrier, is this
>> >>>>> still
>> >>>>>>>>> a view or a new concept? E.g. a "materialized view" or
>> "pre-planned
>> >>>>>>>> view"?
>> >>>>>>>>>
>> >>>>>>>>> Cheers,
>> >>>>>>>>> Timo
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> On 11.02.26 03:51, FeatZhang wrote:
>> >>>>>>>>>> Hi Flink Community,
>> >>>>>>>>>>
>> >>>>>>>>>> I'd like to propose adding watermark support for SQL Views in
>> >>> Flink
>> >>>>>>> to
>> >>>>>>>>>> better support event-time processing scenarios.
>> >>>>>>>>>> Problem Statement
>> >>>>>>>>>>
>> >>>>>>>>>> Currently, Flink SQL views cannot define watermarks. This
>> >>> limitation
>> >>>>>>>>>> creates several challenges:
>> >>>>>>>>>>
>> >>>>>>>>>>     - *Broken Abstraction*: Users must reference underlying
>> table
>> >>>>>>>>> watermarks
>> >>>>>>>>>>     directly, exposing implementation details
>> >>>>>>>>>>     - *No Flexibility*: Cannot define different watermark
>> >>> strategies
>> >>>>>>>> for
>> >>>>>>>>>>     different use cases on the same data source
>> >>>>>>>>>>     - *Limited Architecture Support*: Incompatible with modern
>> >>>>>>> layered
>> >>>>>>>>> data
>> >>>>>>>>>>     architectures (Bronze/Silver/Gold medallion pattern)
>> >>>>>>>>>>
>> >>>>>>>>>> For example, in multi-tenant scenarios, different tenants may
>> >>> require
>> >>>>>>>>>> different lateness tolerance, but currently we cannot create
>> views
>> >>>>>>> with
>> >>>>>>>>>> different watermark strategies on the same source table.
>> >>>>>>>>>> Proposed Solution
>> >>>>>>>>>>
>> >>>>>>>>>> I propose adding two SQL syntax options to support watermark
>> >>>>>>>> definitions
>> >>>>>>>>> in
>> >>>>>>>>>> views:
>> >>>>>>>>>>
>> >>>>>>>>>> *Option 1: CREATE VIEW with WATERMARK*
>> >>>>>>>>>>
>> >>>>>>>>>> CREATE VIEW user_activity
>> >>>>>>>>>> WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDAS
>> >>> SELECT
>> >>>>>>>>>> user_id, event_time, action FROM raw_events;
>> >>>>>>>>>>
>> >>>>>>>>>> *Option 2: ALTER VIEW SET WATERMARK*
>> >>>>>>>>>>
>> >>>>>>>>>> ALTER VIEW user_activity SET WATERMARK FOR event_time AS
>> >>> event_time -
>> >>>>>>>>>> INTERVAL '5' SECOND;
>> >>>>>>>>>>
>> >>>>>>>>>> Key Design Aspects
>> >>>>>>>>>>
>> >>>>>>>>>>     - *Backward Compatibility*: Watermark stored as optional
>> >>>>> metadata
>> >>>>>>>> in
>> >>>>>>>>>>     view options; existing views continue to work unchanged
>> >>>>>>>>>>     - *Validation*: Watermark column must exist in view schema
>> >>> and
>> >>>>> be
>> >>>>>>>> of
>> >>>>>>>>>>     TIMESTAMP/TIMESTAMP_LTZ type
>> >>>>>>>>>>     - *Storage*: Watermark metadata persists in catalog options
>> >>> map
>> >>>>>>>>> (works
>> >>>>>>>>>>     with all catalog implementations)
>> >>>>>>>>>>     - *Propagation*: Follows existing Flink watermark
>> propagation
>> >>>>>>> rules
>> >>>>>>>>> in
>> >>>>>>>>>>     joins and nested views
>> >>>>>>>>>>
>> >>>>>>>>>> Use Case Example: Data Lakehouse Architecture
>> >>>>>>>>>>
>> >>>>>>>>>> -- Bronze: Raw data (no watermark)CREATE TABLE bronze_events
>> >>>>>>> (raw_data
>> >>>>>>>>>> STRING, ingestion_time TIMESTAMP(3)) WITH (...);
>> >>>>>>>>>> -- Silver: Cleaned data with watermarkCREATE VIEW silver_events
>> >>>>>>>>>> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECONDAS
>> >>>>>>> SELECT
>> >>>>>>>>>>      CAST(JSON_VALUE(raw_data, '$.event_time') AS TIMESTAMP(3))
>> >>> AS
>> >>>>>>>>> event_time,
>> >>>>>>>>>>      JSON_VALUE(raw_data, '$.user_id') AS user_idFROM
>> >>>>>>>>>> bronze_eventsWHERE JSON_VALUE(raw_data, '$.event_time') IS NOT
>> >>> NULL;
>> >>>>>>>>>> -- Gold: AggregationsSELECT TUMBLE_START(event_time, INTERVAL
>> '1'
>> >>>>>>>>>> HOUR), COUNT(*)FROM silver_eventsGROUP BY TUMBLE(event_time,
>> >>> INTERVAL
>> >>>>>>>>>> '1' HOUR);
>> >>>>>>>>>>
>> >>>>>>>>>> Reference Materials
>> >>>>>>>>>>
>> >>>>>>>>>>     - FLIP Document: FLIP-XXX: Support Watermark in Flink SQL
>> >>> View
>> >>>>>>>>>>     <
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>
>> >>>
>> https://docs.google.com/document/d/1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y/edit?usp=sharing
>> >>>>>>>>>>
>> >>>>>>>>>>     - JIRA Issue:
>> >>> https://issues.apache.org/jira/browse/FLINK-39062
>> >>>>>>>>>>     - Implementation POC:
>> >>>>>>>>>>     - [FLINK-39062][table] Support WATERMARK clause in CREATE
>> >>> VIEW
>> >>>>>>>>> statement
>> >>>>>>>>>>     <https://github.com/apache/flink/pull/27571>
>> >>>>>>>>>>     - [FLINK-39062][table] Support ALTER VIEW SET WATERMARK
>> >>> syntax
>> >>>>>>>>>>     <https://github.com/apache/flink/pull/27570>
>> >>>>>>>>>>
>> >>>>>>>>>> Implementation Timeline
>> >>>>>>>>>>
>> >>>>>>>>>> Estimated 6-8 weeks covering parser layer, planner layer,
>> catalog
>> >>>>>>>>>> integration, and comprehensive testing.
>> >>>>>>>>>> Request for Feedback
>> >>>>>>>>>>
>> >>>>>>>>>> This enhancement would significantly improve Flink's support for
>> >>>>>>>> layered
>> >>>>>>>>>> data architectures and flexible event-time processing. I'm happy
>> >>> to
>> >>>>>>>>> provide
>> >>>>>>>>>> more details or start a formal FLIP process if the community
>> sees
>> >>>>>>> value
>> >>>>>>>>> in
>> >>>>>>>>>> this proposal.
>> >>>>>>>>>>
>> >>>>>>>>>> Looking forward to the community's feedback!
>> >>>>>>>>>>
>> >>>>>>>>>> Best regards,
>> >>>>>>>>>>
>> >>>>>>>>>> Feat Zhang
>> >>>>>>>>>>
>> >>>>>>>>>>   FLIP-XXX: Support Watermark in Flink SQL View
>> >>>>>>>>>> <
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>
>> >>>
>> https://drive.google.com/open?id=1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>>
>> >>>>>
>> >>>>>
>> >>>
>>

Reply via email to