[ https://issues.apache.org/jira/browse/FLINK-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148662#comment-16148662 ]
ASF GitHub Bot commented on FLINK-7552: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4616#discussion_r136279538 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java --- @@ -31,14 +31,24 @@ private static final long serialVersionUID = 1L; + private transient SimpleSinkContext sinkContext; + public StreamSink(SinkFunction<IN> sinkFunction) { super(sinkFunction); chainingStrategy = ChainingStrategy.ALWAYS; } @Override + public void open() throws Exception { + super.open(); + + this.sinkContext = new SimpleSinkContext<>(); + } + + @Override public void processElement(StreamRecord<IN> element) throws Exception { - userFunction.invoke(element.getValue()); + sinkContext.element = element; + userFunction.invoke(sinkContext, element.getValue()); --- End diff -- `StreamRecord` is an internal concept that should not be exposed in any user facing API. That's the reason for the extract context. Plus, the context allows us to extend the information that we pass to the `SinkFunction` in the future, similar to the context in `ProcessFunction` and `ProcessWindowFunction`. > Extend SinkFunction interface with SinkContext > ---------------------------------------------- > > Key: FLINK-7552 > URL: https://issues.apache.org/jira/browse/FLINK-7552 > Project: Flink > Issue Type: Bug > Components: DataStream API > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > Fix For: 1.4.0 > > > Now that we require Java 8 we can extend the {{SinkFunction}} interface > without breaking backwards compatibility. I'm proposing this: > {code} > /** > * Interface for implementing user defined sink functionality. > * > * @param <IN> Input type parameter. > */ > @Public > public interface SinkFunction<IN> extends Function, Serializable { > /** > * Function for standard sink behaviour. This function is called for > every record. > * > * @param value The input record. > * @throws Exception > * @deprecated Use {@link #invoke(SinkContext, Object)}. > */ > @Deprecated > default void invoke(IN value) throws Exception { > } > /** > * Writes the given value to the sink. This function is called for > every record. > * > * @param context Additional context about the input record. > * @param value The input record. > * @throws Exception > */ > default void invoke(SinkContext context, IN value) throws Exception { > invoke(value); > } > /** > * Context that {@link SinkFunction SinkFunctions } can use for getting > additional data about > * an input record. > * > * @param <T> The type of elements accepted by the sink. > */ > @Public // Interface might be extended in the future with additional > methods. > interface SinkContext<T> { > /** > * Returns the timestamp of the current input record. > */ > long timestamp(); > } > } > {code} > For now, this only allows access to the element timestamp. This would allow > us to fix the abomination that is {{FlinkKafkaProducer010}}, which is a > hybrid {{SinkFunction}}/{{StreamOperator}} only because it needs access to > timestamps. -- This message was sent by Atlassian JIRA (v6.4.14#64029)