Hi Caizhi,

Thanks for your reply. Much appreciated. I understood the difference now.
Also, I have a flow like Kafka Sink Datastream -> Process Function
(Separate Class) -> Cassandra Sink(Separate Class).

Process Function returns me the output as a string and now I want to create
a DataStream out of the string variable so that I can call something like
ds.addSink(new CassandraSink()). For that, I used the StreamExecution
variable as a global /method variable but I am not able to create it
properly. Could you please refer to my StackOverflow post mentioned in the
main thread?
What is happening is, if I don't create a data stream properly it doesn't
call the sink properly because it doesn't execute the methods under the
Cassandra Sink class.

What should I do?

On Thu, Jan 6, 2022 at 7:58 AM Caizhi Weng <tsreape...@gmail.com> wrote:

> Hi!
>
> This is because ProcessFunction#processElement is a must while all methods
> in SinkFunction are not mandatory (for example you can create a sink which
> just discards all records by directly implementing SinkFunction). However
> if you want your sink to be more useful you'll have to see which methods in
> SinkFunction you need to implement. For example you can deal with the
> records fed to the sink in the invoke method or clean up the resources in
> the finish method.
>
> Siddhesh Kalgaonkar <kalgaonkarsiddh...@gmail.com> 于2022年1月6日周四 03:11写道:
>
>> I have implemented a Cassandra sink and when I am trying to call it from
>> another class via DataStream it is not calling any of the methods. I tried
>> extending other interfaces like ProcessFunction and it is forcing me to
>> implement its methods whereas. when it comes to RichSinkFunction it doesn't
>> force me to do it. Is my problem due to this? or there is something else to
>> it?
>>
>>
>> https://stackoverflow.com/questions/70592174/richsinkfunction-for-cassandra-in-flink/70593375
>>
>

Reply via email to