snuyanzin commented on code in PR #26250:
URL: https://github.com/apache/flink/pull/26250#discussion_r1980901441


##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java:
##########
@@ -351,9 +439,158 @@ public interface Context {
         /**
          * Clears all state entries within the virtual partition once the 
eval() method returns.
          *
-         * <p>Semantically this is equal to calling {@link 
#clearState(String)} on all state
+         * <p>Semantically, this is equal to calling {@link 
#clearState(String)} on all state
          * entries.
          */
         void clearAllState();
+
+        /** Clears all timers within the virtual partition. */
+        void clearAllTimers();
+
+        /** Clears the virtual partition including timers and state. */
+        void clearAll();
+    }
+
+    /**
+     * A context that gives access to Flink's concepts of time and timers.
+     *
+     * <p>An event can have an event-time timestamp assigned. The timestamp 
can be accessed using
+     * the {@link #time()} method.
+     *
+     * <p>Timers allow for continuing the processing at a later point in time. 
This makes waiting,
+     * synchronization, or timeouts possible. A timer fires for the registered 
time when the
+     * watermark progresses the logical clock.
+     *
+     * <p>Flink takes care of storing and restoring timers during failures or 
restarts. Thus, timers
+     * are a special kind of state. Similarly, timers are scoped to a virtual 
processor defined by
+     * the PARTITION BY clause. A timer can only be registered and deleted in 
the current virtual
+     * processor.
+     *
+     * @param <TimeType> conversion class of timestamps, see {@link 
Context#timeContext(Class)}
+     */
+    @PublicEvolving
+    public interface TimeContext<TimeType> {
+
+        /**
+         * Returns the timestamp of the currently processed event.
+         *
+         * <p>An event can be either the row of a table or a firing timer:
+         *
+         * <h1>Row event timestamp</h1>
+         *
+         * <p>The timestamp of the row currently being processed within the 
{@code eval()} method.
+         *
+         * <p>Powered by the function call's {@code on_time} argument, this 
method will return the
+         * content of the referenced time attribute column. Returns {@code 
null} if the {@code
+         * on_time} argument doesn't reference a time attribute column in the 
currently processed
+         * table.
+         *
+         * <h1>Timer event timestamp</h1>
+         *
+         * <p>The timestamp of the firing timer currently being processed 
within the {@code
+         * onTimer()} method.
+         *
+         * @return the event-time timestamp, or {@code null} if no timestamp 
is present
+         */
+        TimeType time();
+
+        /**
+         * Returns the current event-time watermark.
+         *
+         * <p>Watermarks are generated in sources and sent through the 
topology for advancing the
+         * logical clock in each Flink subtask. The current watermark of a 
Flink subtask is the
+         * global minimum watermark of all inputs (i.e. across all parallel 
inputs and table
+         * partitions).
+         *
+         * <p>This method returns the current watermark of the Flink subtask 
that evaluates the PTF.
+         * Thus, the returned timestamp represents the entire Flink subtask, 
independent of the
+         * currently processed partition. This behavior is similar to a call 
to {@code SELECT
+         * CURRENT_WATERMARK(...)} in SQL.
+         *
+         * <p>If a watermark was not received from all inputs, the method 
returns {@code null}.
+         *
+         * <p>In case this method is called within the {@code onTimer()} 
method, the returned
+         * watermark is the triggering watermark that currently fires the 
timer.
+         *
+         * @return the current watermark of the Flink subtask, or {@code null} 
if no common logical
+         *     time could be determined from the inputs
+         */
+        TimeType currentWatermark();
+
+        /**
+         * Registers a timer under the given name.
+         *
+         * <p>The timer fires when the {@link #currentWatermark()} advances 
the logical clock of the
+         * Flink subtask to a timestamp later or equal to the desired 
timestamp. In other words: A
+         * timer only fires if a watermark was received from all inputs and 
the timestamp is smaller
+         * or equal to the minimum of all received watermarks.
+         *
+         * <p>Timers can be named for distinguishing them in the {@code 
onTimer()} method.
+         * Registering a timer under the same name twice will replace an 
existing timer.
+         *
+         * <p>Note: Because only PTFs taking set semantic tables support 
state, and timers are a
+         * special kind of state, at least one {@link 
ArgumentTrait#TABLE_AS_SET} table argument
+         * must be declared.
+         *
+         * @param name identifier of the timer
+         * @param time timestamp when the timer should fire
+         */
+        void registerOnTime(String name, TimeType time);
+
+        /**
+         * Registers a timer.
+         *
+         * <p>The timer fires when the {@link #currentWatermark()} advances 
the logical clock of the
+         * Flink subtask to a timestamp later or equal to the desired 
timestamp. In other words: A
+         * timer only fires if a watermark was received from all inputs and 
the timestamp is smaller
+         * or equal to the minimum of all received watermarks.
+         *
+         * <p>Only one timer can be registered for a given time.
+         *
+         * <p>Note: Because only PTFs taking set semantic tables support 
state, and timers are a
+         * special kind of state, at least one {@link 
ArgumentTrait#TABLE_AS_SET} table argument
+         * must be declared.
+         *
+         * @param time timestamp when the timer should fire
+         */
+        void registerOnTime(TimeType time);
+
+        /**
+         * Clears a timer that was previously registered under the given name.
+         *
+         * <p>The call is ignored if no timer can be found.
+         *
+         * @param name identifier of the timer
+         */
+        void clearTimer(String name);

Review Comment:
   Similar question about returning the value associated with `name` like in 
case of `Map#remove` for instance



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to