[ https://issues.apache.org/jira/browse/FLINK-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147231#comment-16147231 ]
ASF GitHub Bot commented on FLINK-7552: --------------------------------------- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4616#discussion_r136063488 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java --- @@ -31,14 +31,24 @@ private static final long serialVersionUID = 1L; + private transient SimpleSinkContext sinkContext; + public StreamSink(SinkFunction<IN> sinkFunction) { super(sinkFunction); chainingStrategy = ChainingStrategy.ALWAYS; } @Override + public void open() throws Exception { + super.open(); + + this.sinkContext = new SimpleSinkContext<>(); + } + + @Override public void processElement(StreamRecord<IN> element) throws Exception { - userFunction.invoke(element.getValue()); + sinkContext.element = element; + userFunction.invoke(sinkContext, element.getValue()); --- End diff -- wouldn't it be better/simpler to just pass `StreamRecord` to the `userFunction`? `userFunction.invoke(element)`? and instead of adding `SinkContext` as a first argument of the `invoke` method in the sink interface, just change the element type from `IN` to `StreamRecord<IN>`? > Extend SinkFunction interface with SinkContext > ---------------------------------------------- > > Key: FLINK-7552 > URL: https://issues.apache.org/jira/browse/FLINK-7552 > Project: Flink > Issue Type: Bug > Components: DataStream API > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > Fix For: 1.4.0 > > > Now that we require Java 8 we can extend the {{SinkFunction}} interface > without breaking backwards compatibility. I'm proposing this: > {code} > /** > * Interface for implementing user defined sink functionality. > * > * @param <IN> Input type parameter. > */ > @Public > public interface SinkFunction<IN> extends Function, Serializable { > /** > * Function for standard sink behaviour. This function is called for > every record. > * > * @param value The input record. > * @throws Exception > * @deprecated Use {@link #invoke(SinkContext, Object)}. > */ > @Deprecated > default void invoke(IN value) throws Exception { > } > /** > * Writes the given value to the sink. This function is called for > every record. > * > * @param context Additional context about the input record. > * @param value The input record. > * @throws Exception > */ > default void invoke(SinkContext context, IN value) throws Exception { > invoke(value); > } > /** > * Context that {@link SinkFunction SinkFunctions } can use for getting > additional data about > * an input record. > * > * @param <T> The type of elements accepted by the sink. > */ > @Public // Interface might be extended in the future with additional > methods. > interface SinkContext<T> { > /** > * Returns the timestamp of the current input record. > */ > long timestamp(); > } > } > {code} > For now, this only allows access to the element timestamp. This would allow > us to fix the abomination that is {{FlinkKafkaProducer010}}, which is a > hybrid {{SinkFunction}}/{{StreamOperator}} only because it needs access to > timestamps. -- This message was sent by Atlassian JIRA (v6.4.14#64029)