JingsongLi commented on pull request #18412:
URL: https://github.com/apache/flink/pull/18412#issuecomment-1019656485


   > Isn't this approach very similar to option 2 you have outlined? You 
probably still need a serializable consumer that you can pass via the context 
to the sink writer. How is the consumer set in the init context?
   
   The `TableStoreSink` will have `KafkaSink`, it can create a new context when 
it `createWriter`. The code just like:
   ```
   TableStoreSink implements Sink {
       Sink kafkaSink;
       TableStoreSink(Sink kafkaSink) {
           this.kafkaSink = kafkaSink;
       }
   
       SinkWriter createWriter(InitContext context) {
            Consumer metaSubscriber = new Consumer();
            InitContext contextWithSubscriber = new WrappedInitContext();
            SinkWriter kafkaWriter = 
kafkaSink.createWriter(contextWithSubscriber);
            return new TableStoreSinkWriter(kafkaWriter, metaSubscriber, .....);
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to