Hi everyone, Thank you for the feedback and discussions on the initial proposal. I've revised the FLIP based on the community's input and would like to share the updated version.
FLIP-XXX: Support Flexible Watermark Assignment via Built-in Function <https://drive.google.com/open?id=17PXYAi6Pb91OqFhVVK7tRULiaHAb6wiX79jjC0NaaDA> KEY UPDATES =========== The proposal has evolved from "Support Watermark Definition in SQL Views" to a more flexible and powerful approach: FLIP-XXX: Support Flexible Watermark Assignment via APPLY_WATERMARK Function. What's Changed: 1. Broader Scope: Instead of limiting watermark definitions to SQL views only, the new proposal introduces a built-in table function APPLY_WATERMARK that works with: - Base tables - Views (both regular and materialized) - Subqueries - Any table-valued expressions 2. More Flexible Design: The function-based approach provides: - Dynamic watermark assignment at query time without modifying catalog metadata - Override capability for existing watermark strategies - Composability with other SQL operations - No need for DDL changes or catalog write permissions 3. Better SQL Semantics: Using a table function follows SQL standard patterns and integrates naturally with Flink's existing function ecosystem. UPDATED FLIP DOCUMENT ===================== The revised FLIP is now available at: https://iwiki.woa.com/p/4019879693 Key sections include: - Motivation and use cases - Public interfaces and SQL syntax - Implementation plan - Compatibility analysis - Test plan EXAMPLE USAGE ============= -- Apply watermark to a view SELECT *FROM APPLY_WATERMARK(my_view, DESCRIPTOR(event_time), event_time - INTERVAL '5' SECOND); -- Override existing watermark strategy SELECT *FROM APPLY_WATERMARK(my_table_with_watermark, DESCRIPTOR(ts), ts - INTERVAL '10' SECOND -- Different from DDL watermark ); -- Use in complex queries SELECT window_start, window_end, COUNT(*)FROM TABLE(TUMBLE(TABLE APPLY_WATERMARK(orders, DESCRIPTOR(order_time), order_time - INTERVAL '5' SECOND), DESCRIPTOR(order_time), INTERVAL '1' HOUR))GROUP BY window_start, window_end; IMPLEMENTATION PROGRESS ======================= I've also opened a draft PR #27984 with the initial implementation: - Core built-in function definition - SQL-to-RelNode conversion rules - Physical plan integration - Unit tests and documentation (English + Chinese) The PR is available at: https://github.com/apache/flink/pull/27984 REQUEST FOR FEEDBACK ==================== I would appreciate your thoughts on: 1. Function naming: Is APPLY_WATERMARK clear and intuitive? (Alternative considered: WITH_WATERMARK, SET_WATERMARK) 2. DESCRIPTOR syntax: Using DESCRIPTOR(column_name) to specify the rowtime column—does this align well with Flink's existing patterns? 3. Override behavior: Should APPLY_WATERMARK always override existing watermarks, or should we provide a mode parameter (e.g., OVERRIDE, MERGE)? 4. Performance considerations: Any concerns about the function-based approach vs. catalog-level watermark definitions? Looking forward to your valuable feedback! Best regards, FeatZhang On Thu, Feb 12, 2026 at 6:24 PM Timo Walther <[email protected]> wrote: > Hi everyone, > > I think we all agree that we clearly want this functionality, just the > "how" needs to be discussed. I also like Lincoln’s suggestion of > introducing a built-in PTF for this, I had similar ideas in mind. > > There are two issues with a APPLY_WATERMARK function, but both on the > short-term roadmap: > > 1) This function would need to be a function that takes an expression. > Ideally as a lambda function. Newer Calcite versions have already lambda > expression support. At Confluent we were planning to work on a Calcite > upgrade this quarter especially to get lambda support in and improve > built-in functions that work on collections. > > 2) User-defined PTFs are currently not able to emit watermarks. We could > introduce a new interface WatermarkFunction (similar to > ChangelogFunction) that would offer this to everyone. Alternatively, we > could only use the PTF signature, but translate to a specialized > ExecNode similar how we do it for VECTOR_SEARCH and ML_PREDICT. > > In any case, even if we go with the function approach, we definitely > need a full FLIP on this. > > Thanks, > Timo > > On 12.02.26 08:25, Gyula Fóra wrote: > > Hi All! > > I would like to chime in here quickly from a slightly different angle. > > While I am the first to admit that I cannot grasp all the planning / > > conceptual implications, I also feel the need for more flexible watermark > > handling as suggested by Feat. > > > > Anything that can only be applied to base/catalog tables is very limiting > > from a usability perspective. Watermarks feel like they should be a > simple > > function that you can apply on a column/table as part of a query/view. > For > > example extract timestamp from a string convert to TS -> apply watermark > > etc. > > > > Users often receive the tables/catalogs as given and can only write > > queries. > > > > Fixing this would eliminate a long standing disconnect between the > > datastream api flexible watermark handling compared to the currently very > > restrictive SQL approach. > > > > Cheers > > Gyula > > > > On Thu, Feb 12, 2026 at 7:54 AM FeatZhang <[email protected]> wrote: > > > >> Hi Timo, Lincoln, > >> > >> Thank you both for the detailed feedback. > >> > >> I agree with the concern that non-materialized SQL views should remain a > >> pure logical abstraction. Introducing watermark definitions directly > >> into CREATE > >> VIEW or ALTER VIEW could blur the boundary between logical aliasing and > >> physical planning semantics, especially considering optimization > barriers > >> and watermark propagation behavior. > >> > >> Lincoln’s suggestion of introducing a built-in function such as: > >> > >> APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column), watermark_expression) > >> > >> is a cleaner direction. It keeps watermark definition as an explicit > >> relational transformation rather than attaching additional semantics to > >> views. > >> > >> However, to fully address the original use cases (especially logical > reuse > >> and layered lakehouse architectures), I propose that the table parameter > >> should support: > >> > >> - Base tables > >> - Non-materialized views > >> > >> If APPLY_WATERMARK can accept both, we can: > >> > >> - Preserve the conceptual purity of SQL views > >> - Avoid redefining view semantics > >> - Still enable logical reuse via views > >> - Allow different watermark strategies over the same logical > relation > >> > >> In other words, watermark definition becomes an explicit relational > >> operator applied on top of any logical relation, instead of being > embedded > >> into the view definition itself. > >> > >> From a planner perspective, this keeps the model consistent: > >> > >> - The function expands into a relational node > >> - No optimization barrier is introduced by views > >> - Watermark handling remains part of the logical plan transformation > >> > >> I will prepare a PR to prototype APPLY_WATERMARK with support for both > base > >> tables and non-materialized views, and share it for further discussion. > >> > >> Looking forward to your thoughts. > >> > >> Best, > >> Feat > >> > >> > >> Lincoln Lee <[email protected]> 于2026年2月12日周四 12:19写道: > >> > >>> Agree with Timo’s point regarding the conceptual semantics. We should > not > >>> directly extend non-materialized views with additional watermark > >>> definitions. > >>> > >>> Regarding the use case mentioned by Feat, defining different watermark > >>> strategies > >>> for the same data source, especially in the case of catalog tables, we > >> are > >>> exploring a possible solution introducing a built-in function: > >>> ```sql > >>> APPLY_WATERMARK(table, DESCRIPTOR(rowtime_column), > watermark_expression) > >>> ``` > >>> This function only support base tables as input and not support views, > >>> subqueries > >>> or derived relations. > >>> > >>> This would address a meaningful subset of the identified use cases > >> without > >>> redefining the role of SQL views. > >>> > >>> > >>> Best, > >>> Lincoln Lee > >>> > >>> > >>> Timo Walther <[email protected]> 于2026年2月11日周三 23:29写道: > >>> > >>>> Hi Feat, > >>>> > >>>> thanks for proposing this FLIP. We had similar discussions in the > past, > >>>> but so far could never reach consensus. > >>>> > >>>> SQL views are actually a very simple concept, they just give SQL text > >> an > >>>> alias. A view has no other properties except for the view definition. > >>>> Everything else is dynamically computed when the SQL text is inserted > >>>> into the larger plan. A view is never evaluated without the > surrounding > >>>> plan. > >>>> > >>>> Watermarks in the middle of a pipeline raise a couple of tricky > issues: > >>>> > >>>> - What if the upstream table is updating, how would you deal with > >>>> watermarks in the downstream view? > >>>> - What if the upstream table emits already watermarks? Would the view > >>>> catch them and discard this information? > >>>> - A view usually dissolves into the plan (e.g. via projection or > filter > >>>> pushdown). Would a watermark definition suddenly introduce an > >>>> optimization barrier? If this is an optimization barrier, is this > still > >>>> a view or a new concept? E.g. a "materialized view" or "pre-planned > >>> view"? > >>>> > >>>> Cheers, > >>>> Timo > >>>> > >>>> > >>>> > >>>> On 11.02.26 03:51, FeatZhang wrote: > >>>>> Hi Flink Community, > >>>>> > >>>>> I'd like to propose adding watermark support for SQL Views in Flink > >> to > >>>>> better support event-time processing scenarios. > >>>>> Problem Statement > >>>>> > >>>>> Currently, Flink SQL views cannot define watermarks. This limitation > >>>>> creates several challenges: > >>>>> > >>>>> - *Broken Abstraction*: Users must reference underlying table > >>>> watermarks > >>>>> directly, exposing implementation details > >>>>> - *No Flexibility*: Cannot define different watermark strategies > >>> for > >>>>> different use cases on the same data source > >>>>> - *Limited Architecture Support*: Incompatible with modern > >> layered > >>>> data > >>>>> architectures (Bronze/Silver/Gold medallion pattern) > >>>>> > >>>>> For example, in multi-tenant scenarios, different tenants may require > >>>>> different lateness tolerance, but currently we cannot create views > >> with > >>>>> different watermark strategies on the same source table. > >>>>> Proposed Solution > >>>>> > >>>>> I propose adding two SQL syntax options to support watermark > >>> definitions > >>>> in > >>>>> views: > >>>>> > >>>>> *Option 1: CREATE VIEW with WATERMARK* > >>>>> > >>>>> CREATE VIEW user_activity > >>>>> WATERMARK FOR event_time AS event_time - INTERVAL '5' SECONDAS SELECT > >>>>> user_id, event_time, action FROM raw_events; > >>>>> > >>>>> *Option 2: ALTER VIEW SET WATERMARK* > >>>>> > >>>>> ALTER VIEW user_activity SET WATERMARK FOR event_time AS event_time - > >>>>> INTERVAL '5' SECOND; > >>>>> > >>>>> Key Design Aspects > >>>>> > >>>>> - *Backward Compatibility*: Watermark stored as optional > metadata > >>> in > >>>>> view options; existing views continue to work unchanged > >>>>> - *Validation*: Watermark column must exist in view schema and > be > >>> of > >>>>> TIMESTAMP/TIMESTAMP_LTZ type > >>>>> - *Storage*: Watermark metadata persists in catalog options map > >>>> (works > >>>>> with all catalog implementations) > >>>>> - *Propagation*: Follows existing Flink watermark propagation > >> rules > >>>> in > >>>>> joins and nested views > >>>>> > >>>>> Use Case Example: Data Lakehouse Architecture > >>>>> > >>>>> -- Bronze: Raw data (no watermark)CREATE TABLE bronze_events > >> (raw_data > >>>>> STRING, ingestion_time TIMESTAMP(3)) WITH (...); > >>>>> -- Silver: Cleaned data with watermarkCREATE VIEW silver_events > >>>>> WATERMARK FOR event_time AS event_time - INTERVAL '10' SECONDAS > >> SELECT > >>>>> CAST(JSON_VALUE(raw_data, '$.event_time') AS TIMESTAMP(3)) AS > >>>> event_time, > >>>>> JSON_VALUE(raw_data, '$.user_id') AS user_idFROM > >>>>> bronze_eventsWHERE JSON_VALUE(raw_data, '$.event_time') IS NOT NULL; > >>>>> -- Gold: AggregationsSELECT TUMBLE_START(event_time, INTERVAL '1' > >>>>> HOUR), COUNT(*)FROM silver_eventsGROUP BY TUMBLE(event_time, INTERVAL > >>>>> '1' HOUR); > >>>>> > >>>>> Reference Materials > >>>>> > >>>>> - FLIP Document: FLIP-XXX: Support Watermark in Flink SQL View > >>>>> < > >>>> > >>> > >> > https://docs.google.com/document/d/1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y/edit?usp=sharing > >>>>> > >>>>> - JIRA Issue: https://issues.apache.org/jira/browse/FLINK-39062 > >>>>> - Implementation POC: > >>>>> - [FLINK-39062][table] Support WATERMARK clause in CREATE VIEW > >>>> statement > >>>>> <https://github.com/apache/flink/pull/27571> > >>>>> - [FLINK-39062][table] Support ALTER VIEW SET WATERMARK syntax > >>>>> <https://github.com/apache/flink/pull/27570> > >>>>> > >>>>> Implementation Timeline > >>>>> > >>>>> Estimated 6-8 weeks covering parser layer, planner layer, catalog > >>>>> integration, and comprehensive testing. > >>>>> Request for Feedback > >>>>> > >>>>> This enhancement would significantly improve Flink's support for > >>> layered > >>>>> data architectures and flexible event-time processing. I'm happy to > >>>> provide > >>>>> more details or start a formal FLIP process if the community sees > >> value > >>>> in > >>>>> this proposal. > >>>>> > >>>>> Looking forward to the community's feedback! > >>>>> > >>>>> Best regards, > >>>>> > >>>>> Feat Zhang > >>>>> > >>>>> FLIP-XXX: Support Watermark in Flink SQL View > >>>>> < > >>>> > >>> > >> > https://drive.google.com/open?id=1OBGTi3Xb-Kpcf_nHeKA30XiRPEKV4PU7FpieXQhWn7Y > >>>>> > >>>>> > >>>> > >>>> > >>> > >> > > > >
