snuyanzin commented on code in PR #26250: URL: https://github.com/apache/flink/pull/26250#discussion_r1980897961
########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java: ########## @@ -351,9 +439,158 @@ public interface Context { /** * Clears all state entries within the virtual partition once the eval() method returns. * - * <p>Semantically this is equal to calling {@link #clearState(String)} on all state + * <p>Semantically, this is equal to calling {@link #clearState(String)} on all state * entries. */ void clearAllState(); + + /** Clears all timers within the virtual partition. */ + void clearAllTimers(); + + /** Clears the virtual partition including timers and state. */ + void clearAll(); + } + + /** + * A context that gives access to Flink's concepts of time and timers. + * + * <p>An event can have an event-time timestamp assigned. The timestamp can be accessed using + * the {@link #time()} method. + * + * <p>Timers allow for continuing the processing at a later point in time. This makes waiting, + * synchronization, or timeouts possible. A timer fires for the registered time when the + * watermark progresses the logical clock. + * + * <p>Flink takes care of storing and restoring timers during failures or restarts. Thus, timers + * are a special kind of state. Similarly, timers are scoped to a virtual processor defined by + * the PARTITION BY clause. A timer can only be registered and deleted in the current virtual + * processor. + * + * @param <TimeType> conversion class of timestamps, see {@link Context#timeContext(Class)} + */ + @PublicEvolving + public interface TimeContext<TimeType> { + + /** + * Returns the timestamp of the currently processed event. + * + * <p>An event can be either the row of a table or a firing timer: + * + * <h1>Row event timestamp</h1> + * + * <p>The timestamp of the row currently being processed within the {@code eval()} method. + * + * <p>Powered by the function call's {@code on_time} argument, this method will return the + * content of the referenced time attribute column. Returns {@code null} if the {@code + * on_time} argument doesn't reference a time attribute column in the currently processed + * table. + * + * <h1>Timer event timestamp</h1> + * + * <p>The timestamp of the firing timer currently being processed within the {@code + * onTimer()} method. + * + * @return the event-time timestamp, or {@code null} if no timestamp is present + */ + TimeType time(); + + /** + * Returns the current event-time watermark. + * + * <p>Watermarks are generated in sources and sent through the topology for advancing the + * logical clock in each Flink subtask. The current watermark of a Flink subtask is the + * global minimum watermark of all inputs (i.e. across all parallel inputs and table + * partitions). + * + * <p>This method returns the current watermark of the Flink subtask that evaluates the PTF. + * Thus, the returned timestamp represents the entire Flink subtask, independent of the + * currently processed partition. This behavior is similar to a call to {@code SELECT + * CURRENT_WATERMARK(...)} in SQL. + * + * <p>If a watermark was not received from all inputs, the method returns {@code null}. + * + * <p>In case this method is called within the {@code onTimer()} method, the returned + * watermark is the triggering watermark that currently fires the timer. + * + * @return the current watermark of the Flink subtask, or {@code null} if no common logical + * time could be determined from the inputs + */ + TimeType currentWatermark(); + + /** + * Registers a timer under the given name. + * + * <p>The timer fires when the {@link #currentWatermark()} advances the logical clock of the + * Flink subtask to a timestamp later or equal to the desired timestamp. In other words: A + * timer only fires if a watermark was received from all inputs and the timestamp is smaller + * or equal to the minimum of all received watermarks. + * + * <p>Timers can be named for distinguishing them in the {@code onTimer()} method. + * Registering a timer under the same name twice will replace an existing timer. Review Comment: Should it then return for instance `boolean`or timer object indicating whether existing timer was replaced or not? Similar to for example `put` method in `java.util.Map`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org