Hi everyone,

Thank you for the feedback and discussions on the initial proposal. I've
revised the FLIP based on the community's input and would like to share
the updated version.


 FLIP-XXX: Support Flexible Watermark Assignment via Built-in Function
<https://drive.google.com/open?id=17PXYAi6Pb91OqFhVVK7tRULiaHAb6wiX79jjC0NaaDA>


KEY UPDATES
===========

The proposal has evolved from "Support Watermark Definition in SQL Views"
to a more flexible and powerful approach: FLIP-XXX: Support Flexible
Watermark Assignment via APPLY_WATERMARK Function.

What's Changed:

1. Broader Scope: Instead of limiting watermark definitions to SQL views
   only, the new proposal introduces a built-in table function
   APPLY_WATERMARK that works with:
   - Base tables
   - Views (both regular and materialized)
   - Subqueries
   - Any table-valued expressions

2. More Flexible Design: The function-based approach provides:
   - Dynamic watermark assignment at query time without modifying catalog
     metadata
   - Override capability for existing watermark strategies
   - Composability with other SQL operations
   - No need for DDL changes or catalog write permissions

3. Better SQL Semantics: Using a table function follows SQL standard
   patterns and integrates naturally with Flink's existing function
   ecosystem.

UPDATED FLIP DOCUMENT
=====================

The revised FLIP is now available at:
https://iwiki.woa.com/p/4019879693

Key sections include:
- Motivation and use cases
- Public interfaces and SQL syntax
- Implementation plan
- Compatibility analysis
- Test plan

EXAMPLE USAGE
=============

-- Apply watermark to a view
SELECT *FROM APPLY_WATERMARK(my_view, DESCRIPTOR(event_time),
event_time - INTERVAL '5' SECOND);
-- Override existing watermark strategy
SELECT *FROM APPLY_WATERMARK(my_table_with_watermark, DESCRIPTOR(ts),
ts - INTERVAL '10' SECOND -- Different from DDL watermark
);
-- Use in complex queries
SELECT window_start,
       window_end,
       COUNT(*)FROM TABLE(TUMBLE(TABLE APPLY_WATERMARK(orders,
DESCRIPTOR(order_time), order_time - INTERVAL '5' SECOND),
DESCRIPTOR(order_time), INTERVAL '1' HOUR))GROUP BY window_start,
         window_end;


IMPLEMENTATION PROGRESS
=======================

I've also opened a draft PR #27984 with the initial implementation:
- Core built-in function definition
- SQL-to-RelNode conversion rules
- Physical plan integration
- Unit tests and documentation (English + Chinese)

The PR is available at:
https://github.com/apache/flink/pull/27984

REQUEST FOR FEEDBACK
====================

I would appreciate your thoughts on:

1. Function naming: Is APPLY_WATERMARK clear and intuitive?
   (Alternative considered: WITH_WATERMARK, SET_WATERMARK)

2. DESCRIPTOR syntax: Using DESCRIPTOR(column_name) to specify the
   rowtime column—does this align well with Flink's existing patterns?

3. Override behavior: Should APPLY_WATERMARK always override existing
   watermarks, or should we provide a mode parameter
   (e.g., OVERRIDE, MERGE)?

4. Performance considerations: Any concerns about the function-based
   approach vs. catalog-level watermark definitions?





Looking forward to your valuable feedback!

Best regards,
FeatZhang

On Thu, Feb 12, 2026 at 6:24 PM Timo Walther <[email protected]> wrote:

> Hi everyone,
>
> I think we all agree that we clearly want this functionality, just the
> "how" needs to be discussed. I also like Lincoln’s suggestion of
> introducing a built-in PTF for this, I had similar ideas in mind.
>
> There are two issues with a APPLY_WATERMARK function, but both on the
> short-term roadmap:
>
> 1) This function would need to be a function that takes an expression.
> Ideally as a lambda function. Newer Calcite versions have already lambda
> expression support. At Confluent we were planning to work on a Calcite
> upgrade this quarter especially to get lambda support in and improve
> built-in functions that work on collections.
>
> 2) User-defined PTFs are currently not able to emit watermarks. We could
> introduce a new interface WatermarkFunction (similar to
> ChangelogFunction) that would offer this to everyone. Alternatively, we
> could only use the PTF signature, but translate to a specialized
> ExecNode similar how we do it for VECTOR_SEARCH and ML_PREDICT.
>
> In any case, even if we go with the function approach, we definitely
> need a full FLIP on this.
>
> Thanks,
> Timo
>
> On 12.02.26 08:25, Gyula Fóra wrote:
> > Hi All!
> > I would like to chime in here quickly from a slightly different angle.
> > While I am the first to admit that I cannot grasp all the planning /
> > conceptual implications, I also feel the need for more flexible watermark
> > handling as suggested by Feat.
> >
> > Anything that can only be applied to base/catalog tables is very limiting
> > from a usability perspective. Watermarks feel like they should be a
> simple
> > function that you can apply on a column/table as part of a query/view.
> For
> > example extract timestamp from a string convert to TS -> apply watermark
> > etc.
> >
> > Users often receive the tables/catalogs as given and can only write
> > queries.
> >
> > Fixing this would eliminate a long standing disconnect between the
> > datastream api flexible watermark handling compared to the currently very
> > restrictive SQL approach.
> >
> > Cheers
> > Gyula
> >
> > On Thu, Feb 12, 2026 at 7:54 AM FeatZhang <[email protected]> wrote:
> >
> >> Hi Timo, Lincoln,
> >>
> >> Thank you both for the detailed feedback.
> >>
> >> I agree with the concern that non-materialized SQL views should remain a
> >> pure logical abstraction. Introducing watermark definitions directly
> >> into CREATE
> >> VIEW or ALTER VIEW could blur the boundary between logical aliasing and
> >> physical planning semantics, especially considering optimization
> barriers
> >> and watermark propagation behavior.
> >>
> >> Lincoln’s suggestion of introducing a built-in function such as:
> >>
> >> APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column), watermark_expression)
> >>
> >> is a cleaner direction. It keeps watermark definition as an explicit
> >> relational transformation rather than attaching additional semantics to
> >> views.
> >>
> >> However, to fully address the original use cases (especially logical
> reuse
> >> and layered lakehouse architectures), I propose that the table parameter
> >> should support:
> >>
> >>     - Base tables
> >>     - Non-materialized views
> >>
> >> If APPLY_WATERMARK can accept both, we can:
> >>
> >>     - Preserve the conceptual purity of SQL views
> >>     - Avoid redefining view semantics
> >>     - Still enable logical reuse via views
> >>     - Allow different watermark strategies over the same logical
> relation
> >>
> >> In other words, watermark definition becomes an explicit relational
> >> operator applied on top of any logical relation, instead of being
> embedded
> >> into the view definition itself.
> >>
> >>  From a planner perspective, this keeps the model consistent:
> >>
> >>     - The function expands into a relational node
> >>     - No optimization barrier is introduced by views
> >>     - Watermark handling remains part of the logical plan transformation
> >>
> >> I will prepare a PR to prototype APPLY_WATERMARK with support for both
> base
> >> tables and non-materialized views, and share it for further discussion.
> >>
> >> Looking forward to your thoughts.
> >>
> >> Best,
> >> Feat
> >>
> >>
> >> Lincoln Lee <[email protected]> 于2026年2月12日周四 12:19写道:
> >>
> >>> Agree with Timo’s point regarding the conceptual semantics. We should
> not
> >>> directly extend non-materialized views with additional watermark
> >>> definitions.
> >>>
> >>> Regarding the use case mentioned by Feat, defining different watermark
> >>> strategies
> >>> for the same data source, especially in the case of catalog tables, we
> >> are
> >>> exploring a possible solution introducing a built-in function:
> >>> ```sql
> >>> APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column),
> watermark_expression)
> >>> ```
> >>> This function only support base tables as input and not support views,
> >>> subqueries
> >>> or derived relations.
> >>>
> >>> This would address a meaningful subset of the identified use cases
> >> without
> >>> redefining the role of SQL views.
> >>>
> >>>
> >>> Best,
> >>> Lincoln Lee
> >>>
> >>>
> >>> Timo Walther <[email protected]> 于2026年2月11日周三 23:29写道:
> >>>
> >>>> Hi Feat,
> >>>>
> >>>> thanks for proposing this FLIP. We had similar discussions in the
> past,
> >>>> but so far could never reach consensus.
> >>>>
> >>>> SQL views are actually a very simple concept, they just give SQL text
> >> an
> >>>> alias. A view has no other properties except for the view definition.
> >>>> Everything else is dynamically computed when the SQL text is inserted
> >>>> into the larger plan. A view is never evaluated without the
> surrounding
> >>>> plan.
> >>>>
> >>>> Watermarks in the middle of a pipeline raise a couple of tricky
> issues:
> >>>>
> >>>> - What if the upstream table is updating, how would you deal with
> >>>> watermarks in the downstream view?
> >>>> - What if the upstream table emits already watermarks? Would the view
> >>>> catch them and discard this information?
> >>>> - A view usually dissolves into the plan (e.g. via projection or
> filter
> >>>> pushdown). Would a watermark definition suddenly introduce an
> >>>> optimization barrier? If this is an optimization barrier, is this
> still
> >>>> a view or a new concept? E.g. a "materialized view" or "pre-planned
> >>> view"?
> >>>>
> >>>> Cheers,
> >>>> Timo
> >>>>
> >>>>
> >>>>
> >>>> On 11.02.26 03:51, FeatZhang wrote:
> >>>>> Hi Flink Community,
> >>>>>
> >>>>> I'd like to propose adding watermark support for SQL Views in Flink
> >> to
> >>>>> better support event-time processing scenarios.
> >>>>> Problem Statement
> >>>>>
> >>>>> Currently, Flink SQL views cannot define watermarks. This limitation
> >>>>> creates several challenges:
> >>>>>
> >>>>>      - *Broken Abstraction*: Users must reference underlying table
> >>>> watermarks
> >>>>>      directly, exposing implementation details
> >>>>>      - *No Flexibility*: Cannot define different watermark strategies
> >>> for
> >>>>>      different use cases on the same data source
> >>>>>      - *Limited Architecture Support*: Incompatible with modern
> >> layered
> >>>> data
> >>>>>      architectures (Bronze/Silver/Gold medallion pattern)
> >>>>>
> >>>>> For example, in multi-tenant scenarios, different tenants may require
> >>>>> different lateness tolerance, but currently we cannot create views
> >> with
> >>>>> different watermark strategies on the same source table.
> >>>>> Proposed Solution
> >>>>>
> >>>>> I propose adding two SQL syntax options to support watermark
> >>> definitions
> >>>> in
> >>>>> views:
> >>>>>
> >>>>> *Option 1: CREATE VIEW with WATERMARK*
> >>>>>
> >>>>> CREATE VIEW user_activity
> >>>>> WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDAS SELECT
> >>>>> user_id, event_time, action FROM raw_events;
> >>>>>
> >>>>> *Option 2: ALTER VIEW SET WATERMARK*
> >>>>>
> >>>>> ALTER VIEW user_activity SET WATERMARK FOR event_time AS event_time -
> >>>>> INTERVAL '5' SECOND;
> >>>>>
> >>>>> Key Design Aspects
> >>>>>
> >>>>>      - *Backward Compatibility*: Watermark stored as optional
> metadata
> >>> in
> >>>>>      view options; existing views continue to work unchanged
> >>>>>      - *Validation*: Watermark column must exist in view schema and
> be
> >>> of
> >>>>>      TIMESTAMP/TIMESTAMP_LTZ type
> >>>>>      - *Storage*: Watermark metadata persists in catalog options map
> >>>> (works
> >>>>>      with all catalog implementations)
> >>>>>      - *Propagation*: Follows existing Flink watermark propagation
> >> rules
> >>>> in
> >>>>>      joins and nested views
> >>>>>
> >>>>> Use Case Example: Data Lakehouse Architecture
> >>>>>
> >>>>> -- Bronze: Raw data (no watermark)CREATE TABLE bronze_events
> >> (raw_data
> >>>>> STRING, ingestion_time TIMESTAMP(3)) WITH (...);
> >>>>> -- Silver: Cleaned data with watermarkCREATE VIEW silver_events
> >>>>> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECONDAS
> >> SELECT
> >>>>>       CAST(JSON_VALUE(raw_data, '$.event_time') AS TIMESTAMP(3)) AS
> >>>> event_time,
> >>>>>       JSON_VALUE(raw_data, '$.user_id') AS user_idFROM
> >>>>> bronze_eventsWHERE JSON_VALUE(raw_data, '$.event_time') IS NOT NULL;
> >>>>> -- Gold: AggregationsSELECT TUMBLE_START(event_time, INTERVAL '1'
> >>>>> HOUR), COUNT(*)FROM silver_eventsGROUP BY TUMBLE(event_time, INTERVAL
> >>>>> '1' HOUR);
> >>>>>
> >>>>> Reference Materials
> >>>>>
> >>>>>      - FLIP Document: FLIP-XXX: Support Watermark in Flink SQL View
> >>>>>      <
> >>>>
> >>>
> >>
> https://docs.google.com/document/d/1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y/edit?usp=sharing
> >>>>>
> >>>>>      - JIRA Issue: https://issues.apache.org/jira/browse/FLINK-39062
> >>>>>      - Implementation POC:
> >>>>>      - [FLINK-39062][table] Support WATERMARK clause in CREATE VIEW
> >>>> statement
> >>>>>      <https://github.com/apache/flink/pull/27571>
> >>>>>      - [FLINK-39062][table] Support ALTER VIEW SET WATERMARK syntax
> >>>>>      <https://github.com/apache/flink/pull/27570>
> >>>>>
> >>>>> Implementation Timeline
> >>>>>
> >>>>> Estimated 6-8 weeks covering parser layer, planner layer, catalog
> >>>>> integration, and comprehensive testing.
> >>>>> Request for Feedback
> >>>>>
> >>>>> This enhancement would significantly improve Flink's support for
> >>> layered
> >>>>> data architectures and flexible event-time processing. I'm happy to
> >>>> provide
> >>>>> more details or start a formal FLIP process if the community sees
> >> value
> >>>> in
> >>>>> this proposal.
> >>>>>
> >>>>> Looking forward to the community's feedback!
> >>>>>
> >>>>> Best regards,
> >>>>>
> >>>>> Feat Zhang
> >>>>>
> >>>>>    FLIP-XXX: Support Watermark in Flink SQL View
> >>>>> <
> >>>>
> >>>
> >>
> https://drive.google.com/open?id=1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >
>
>

Reply via email to