JingsongLi commented on pull request #18412: URL: https://github.com/apache/flink/pull/18412#issuecomment-1019656485
> Isn't this approach very similar to option 2 you have outlined? You probably still need a serializable consumer that you can pass via the context to the sink writer. How is the consumer set in the init context? The `TableStoreSink` will have `KafkaSink`, it can create a new context when it `createWriter`. The code just like: ``` TableStoreSink implements Sink { Sink kafkaSink; TableStoreSink(Sink kafkaSink) { this.kafkaSink = kafkaSink; } SinkWriter createWriter(InitContext context) { Consumer metaSubscriber = new Consumer(); InitContext contextWithSubscriber = new WrappedInitContext(); SinkWriter kafkaWriter = kafkaSink.createWriter(contextWithSubscriber); return new TableStoreSinkWriter(kafkaWriter, metaSubscriber, .....); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org