Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4616#discussion_r137206502 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java --- @@ -35,6 +35,39 @@ * * @param value The input record. * @throws Exception + * @deprecated Use {@link #invoke(SinkContext, Object)}. */ - void invoke(IN value) throws Exception; + @Deprecated + default void invoke(IN value) throws Exception { + } + + /** + * Writes the given value to the sink. This function is called for every record. + * + * @param context Additional context about the input record. + * @param value The input record. + * @throws Exception + */ + default void invoke(SinkContext context, IN value) throws Exception { + invoke(value); + } + + /** + * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about + * an input record. + * + * <p>The context is only valid for the duration of a + * {@link SinkFunction#invoke(SinkContext, Object)} call. Do not store the context and use + * afterwards! + * + * @param <T> The type of elements accepted by the sink. + */ + @Public // Interface might be extended in the future with additional methods. + interface SinkContext<T> { + + /** + * Returns the timestamp of the current input record. + */ + long timestamp(); --- End diff -- @StephanEwen What do you think about this? You introduced the code in `StreamRecord` that returns `Long.MIN_VALUE` when there is no timestamp instead of throwing an exception.
---