fapaul commented on pull request #18412: URL: https://github.com/apache/flink/pull/18412#issuecomment-1018451463
> Thanks @fapaul for the review, I've rethought it and it's probably not a good idea either way. > > * approach 1: `SinkWriter` implements `MetadataPublisher`, actually, `subscribe` method should be similar to `open`, before other calls. Otherwise, there are thread safety issues. But the thing is, `SinkWriter` dose not have `open`. It is opened in `Sink.createWriter`. I think here we have two different thread-safety issues. One is that the consumer is invoked by the KafkaProducer that you probably need to fix and trigger the callback via the Mailbox. The other issue is around adding the subscribers concurrently. > * approach 2: `Sink` implements `MetadataPublisher`, the question is when to call `subscribe`? On the client side or the server side? What about the serialization call on the client side, does the `Consumer` still work properly after serialization? I agree the Consumer needs to be serializable but if we pass an unmodifiable list to the sink writer it solves a lot of the thread-safety issues because it is not possible to add new subscribers after the sink translation. I guess if that works depends on your implementation that you have in mind to use the metadata. > So I am thinking maybe we can move forward in [JingsongLi#18 (comment)](https://github.com/JingsongLi/flink/pull/18#discussion_r765610007) Add a method to `InitContext`: > > ``` > /** > * Returns a metadata subscriber, the {@link SinkWriter} can publish metadata events of type > * {@link MetaT} to the subscriber. > */ > <MetaT> Optional<Consumer<MetaT>> metadataSubscriber(); > ``` > > Sink can decide for itself whether to publish meta information to subscriber. > > What do you think? Isn't this approach very similar to option 2 you have outlined? You probably still need a serializable consumer that you can pass via the context to the sink writer. How is the consumer set in the init context? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org