[ https://issues.apache.org/jira/browse/FLINK-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16173936#comment-16173936 ]
ASF GitHub Bot commented on FLINK-7552: --------------------------------------- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/4616 +1 > Extend SinkFunction interface with SinkContext > ---------------------------------------------- > > Key: FLINK-7552 > URL: https://issues.apache.org/jira/browse/FLINK-7552 > Project: Flink > Issue Type: Bug > Components: DataStream API > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > Fix For: 1.4.0 > > > Now that we require Java 8 we can extend the {{SinkFunction}} interface > without breaking backwards compatibility. I'm proposing this: > {code} > /** > * Interface for implementing user defined sink functionality. > * > * @param <IN> Input type parameter. > */ > @Public > public interface SinkFunction<IN> extends Function, Serializable { > /** > * Function for standard sink behaviour. This function is called for > every record. > * > * @param value The input record. > * @throws Exception > * @deprecated Use {@link #invoke(SinkContext, Object)}. > */ > @Deprecated > default void invoke(IN value) throws Exception { > } > /** > * Writes the given value to the sink. This function is called for > every record. > * > * @param context Additional context about the input record. > * @param value The input record. > * @throws Exception > */ > default void invoke(SinkContext context, IN value) throws Exception { > invoke(value); > } > /** > * Context that {@link SinkFunction SinkFunctions } can use for getting > additional data about > * an input record. > * > * @param <T> The type of elements accepted by the sink. > */ > @Public // Interface might be extended in the future with additional > methods. > interface SinkContext<T> { > /** > * Returns the timestamp of the current input record. > */ > long timestamp(); > } > } > {code} > For now, this only allows access to the element timestamp. This would allow > us to fix the abomination that is {{FlinkKafkaProducer010}}, which is a > hybrid {{SinkFunction}}/{{StreamOperator}} only because it needs access to > timestamps. -- This message was sent by Atlassian JIRA (v6.4.14#64029)