[ https://issues.apache.org/jira/browse/FLINK-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16154126#comment-16154126 ]
ASF GitHub Bot commented on FLINK-7552: --------------------------------------- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/4616#discussion_r137082133 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java --- @@ -35,6 +35,39 @@ * * @param value The input record. * @throws Exception + * @deprecated Use {@link #invoke(SinkContext, Object)}. */ - void invoke(IN value) throws Exception; + @Deprecated + default void invoke(IN value) throws Exception { + } + + /** + * Writes the given value to the sink. This function is called for every record. + * + * @param context Additional context about the input record. + * @param value The input record. + * @throws Exception + */ + default void invoke(SinkContext context, IN value) throws Exception { + invoke(value); + } + + /** + * Context that {@link SinkFunction SinkFunctions } can use for getting additional data about + * an input record. + * + * <p>The context is only valid for the duration of a + * {@link SinkFunction#invoke(SinkContext, Object)} call. Do not store the context and use + * afterwards! + * + * @param <T> The type of elements accepted by the sink. + */ + @Public // Interface might be extended in the future with additional methods. + interface SinkContext<T> { + + /** + * Returns the timestamp of the current input record. + */ + long timestamp(); --- End diff -- @pnowojski let us not throw an exception here, given the commented-out section of `StreamRecord::getTimestamp` which suggests a problem with that approach. It would be good to know why `ProcessFunction` used a `Long` rather than `long`. I have an almost unhealthy desire for consistency. > Extend SinkFunction interface with SinkContext > ---------------------------------------------- > > Key: FLINK-7552 > URL: https://issues.apache.org/jira/browse/FLINK-7552 > Project: Flink > Issue Type: Bug > Components: DataStream API > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > Fix For: 1.4.0 > > > Now that we require Java 8 we can extend the {{SinkFunction}} interface > without breaking backwards compatibility. I'm proposing this: > {code} > /** > * Interface for implementing user defined sink functionality. > * > * @param <IN> Input type parameter. > */ > @Public > public interface SinkFunction<IN> extends Function, Serializable { > /** > * Function for standard sink behaviour. This function is called for > every record. > * > * @param value The input record. > * @throws Exception > * @deprecated Use {@link #invoke(SinkContext, Object)}. > */ > @Deprecated > default void invoke(IN value) throws Exception { > } > /** > * Writes the given value to the sink. This function is called for > every record. > * > * @param context Additional context about the input record. > * @param value The input record. > * @throws Exception > */ > default void invoke(SinkContext context, IN value) throws Exception { > invoke(value); > } > /** > * Context that {@link SinkFunction SinkFunctions } can use for getting > additional data about > * an input record. > * > * @param <T> The type of elements accepted by the sink. > */ > @Public // Interface might be extended in the future with additional > methods. > interface SinkContext<T> { > /** > * Returns the timestamp of the current input record. > */ > long timestamp(); > } > } > {code} > For now, this only allows access to the element timestamp. This would allow > us to fix the abomination that is {{FlinkKafkaProducer010}}, which is a > hybrid {{SinkFunction}}/{{StreamOperator}} only because it needs access to > timestamps. -- This message was sent by Atlassian JIRA (v6.4.14#64029)