snuyanzin commented on code in PR #26250: URL: https://github.com/apache/flink/pull/26250#discussion_r1980888148
########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ProcessTableFunction.java: ########## @@ -285,6 +288,76 @@ * } * }</pre> * + * <h1>Time and Timers</h1> + * + * <p>A PTF supports event time natively. Time-based services are available via {@link + * Context#timeContext(Class)}. + * + * <h2>Time</h2> + * + * <p>Every PTF takes an optional {@code on_time} argument. The {@code on_time} argument in the function call declares the time attribute column for which + * a watermark has been declared. When processing a table's row, this timestamp can be accessed via + * {@link TimeContext#time()} and the watermark via {@link TimeContext#currentWatermark()} + * respectively. + * + * <p>Specifying an {@code on_time} argument in the function call instructs the framework to return + * a {@code rowtime} column in the function's output for subsequent time-based operations. + * + * <p>The {@link ArgumentTrait#REQUIRE_ON_TIME} makes the {@code on_time} argument mandatory if + * necessary. + * + * <h2>Timers</h2> + * + * <p>A PTF that takes set semantic tables can support timers. Timers allow for continuing the + * processing at a later point in time. This makes waiting, synchronization, or timeouts possible. A timer + * fires for the registered time when the watermark progresses the logical clock. + * + * <p>Timers can be named ({@link TimeContext#registerOnTime(String, Object)}) or unnamed ({@link + * TimeContext#registerOnTime(Object)}). The name of a timer can be useful for replacing or deleting + * an existing timer, or for identifying multiple timers via {@link OnTimerContext#currentTimer()} when they fire. + * + * <p>An {@code onTimer()} method must be declared next to the eval() method for reacting to timer + * events. The signature of the onTimer() method must contain an optional {@link OnTimerContext} followed by all + * state entries (as declared in the eval() method). + * + * <p>Flink takes care of storing and restoring timers during failures or restarts. Thus, timers are + * a special kind of state. Similarly, timers are scoped to a virtual processor defined by the + * PARTITION BY clause. A timer can only be registered and deleted in the current virtual processor. + * + * <pre>{@code + * // a function that waits for a second event or timeouts after 60 seconds + * class TimerFunction extends ProcessTableFunction<String> { + * public static class SeenState { + * public String seen = null; + * } + * + * public void eval( + * Context ctx, + * @StateHint SeenState memory, + * @ArgumentHint( { TABLE_AS_SET, REQUIRE_ON_TIME } ) Row input) { + * TimeContext<Instant> timeCtx = ctx.timeContext(Instant.class); + * if (memory.seen == null) { + * memory.seen = input.getField(0).toString(); + * timeCtx.registerOnTimer("timeout", timeCtx.time().plusSeconds(60)); + * } else { + * collect("Second event arrived for: " + memory.seen) + * ctx.clearAll(); + * } + * } + * + * public void onTimer(SeenState memory) { + * collect("Timeout for: " + memory.seen) + * } + * } + * + * <h2>Efficiency and Design Principles</h2> + * + * <p>Registering too many timers might affect performance. An ever-growing timer state can happen + * by an unlimited number of partitions (i.e. an open keyspace) or even within a partition. Thus, Review Comment: Out of curiosity: is there any exported metric showing amount of timers or are there plans to add something like this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org