Hi Caizhi, Thanks for your reply. Much appreciated. I understood the difference now. Also, I have a flow like Kafka Sink Datastream -> Process Function (Separate Class) -> Cassandra Sink(Separate Class).
Process Function returns me the output as a string and now I want to create a DataStream out of the string variable so that I can call something like ds.addSink(new CassandraSink()). For that, I used the StreamExecution variable as a global /method variable but I am not able to create it properly. Could you please refer to my StackOverflow post mentioned in the main thread? What is happening is, if I don't create a data stream properly it doesn't call the sink properly because it doesn't execute the methods under the Cassandra Sink class. What should I do? On Thu, Jan 6, 2022 at 7:58 AM Caizhi Weng <tsreape...@gmail.com> wrote: > Hi! > > This is because ProcessFunction#processElement is a must while all methods > in SinkFunction are not mandatory (for example you can create a sink which > just discards all records by directly implementing SinkFunction). However > if you want your sink to be more useful you'll have to see which methods in > SinkFunction you need to implement. For example you can deal with the > records fed to the sink in the invoke method or clean up the resources in > the finish method. > > Siddhesh Kalgaonkar <kalgaonkarsiddh...@gmail.com> 于2022年1月6日周四 03:11写道: > >> I have implemented a Cassandra sink and when I am trying to call it from >> another class via DataStream it is not calling any of the methods. I tried >> extending other interfaces like ProcessFunction and it is forcing me to >> implement its methods whereas. when it comes to RichSinkFunction it doesn't >> force me to do it. Is my problem due to this? or there is something else to >> it? >> >> >> https://stackoverflow.com/questions/70592174/richsinkfunction-for-cassandra-in-flink/70593375 >> >