Hi Kartik, It should be the other way around: the connector should use the proper Source and Sink interfaces, and therefore get the right guarantees and integration with mechanisms like checkpoints and savepoints. I would say there's no other way to achieve your desired result, because of all the edge cases there exist.
Best regards, Martijn On Tue, Feb 13, 2024 at 2:08 AM Kartik Kushwaha <kushwaha.karti...@gmail.com> wrote: > > Thank you Martijn, the article you provided had detailed explanation on the > exactly once two phase commit. > > > Returning to the best way to handle commits/acknowledgments on sources like > JMS Queues or Solace topics to support guaranteed delivery, when they are > not supported out of the box by Flink (especially when there is no concept of > offset), will the CheckpointListner API and its Checkpoint Subsuming Contract > make it possible to perform the JMS message consumption commits by overriding > "notifyCheckpointCommit" and "notifyCheckpointAbort" methods. In other words > is there a way to find when a Checkpoint is complete in an Operator and > perform specific actions when it is complete? Any articles around these would > help. > > Regards, > Kartik > > > On Mon, Feb 12, 2024, 10:24 AM Martijn Visser <martijnvis...@apache.org> > wrote: >> >> Sources don't need to support two phase commits, that's something for >> sinks. I think the example of exactly-once-processing (although the >> interfaces have changed since) at >> https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/ >> is still a good explainer on this topic. >> >> Best regards, >> >> Martijn >> >> On Mon, Feb 12, 2024 at 4:02 PM Kartik Kushwaha >> <kushwaha.karti...@gmail.com> wrote: >> > >> > Let me put the question in other words. >> > >> > What happens if a source does not support two phase commit and the Flink >> > job has to guarantee exactly once delivery to downstream? Checkpointing as >> > I understand, works on interval basis. New events for which the checkpoint >> > barrier has not yet reached will get dropped or missed. What would be the >> > best way to save the state of these non checkpointed data and recover them >> > on task crash or job restarts, taking into account that regular checkpoint >> > is also enabled and restart and recovery should not lead to duplicates >> > from the user managed state vs the checkpointed state. >> > >> > >> > Regards >> > Kartik >> > >> > >> > On Mon, Feb 12, 2024, 9:50 AM Martijn Visser <martijnvis...@apache.org> >> > wrote: >> >> >> >> Hi Kartik, >> >> >> >> I don't think there's much that the Flink community can do here to >> >> help you. The Solace source and sink aren't owned by the Flink >> >> project, and based on the source code they haven't been touched for >> >> the last 7 years [1] and I'm actually not aware of anyone who uses >> >> Solace at all. >> >> >> >> Best regards, >> >> >> >> Martijn >> >> >> >> [1] >> >> https://github.com/SolaceLabs/solace-integration-guides/tree/master/src/flink-jms-connector >> >> >> >> On Mon, Feb 12, 2024 at 2:58 PM Kartik Kushwaha >> >> <kushwaha.karti...@gmail.com> wrote: >> >> > >> >> > Any help here please. >> >> > >> >> > Regards, >> >> > Kartik >> >> > >> >> > >> >> > On Fri, Feb 9, 2024, 8:33 AM Kartik Kushwaha >> >> > <kushwaha.karti...@gmail.com> wrote: >> >> >> >> >> >> I am using flink checkpointing to restore states of my job. I am using >> >> >> unaligned checkpointing with 100 ms as the checkpointing interval. I >> >> >> see few events getting dropped that were sucessfully processed by the >> >> >> operators or were in-flight that were yet to be captured by >> >> >> checkpoint. That is these were new events which came into the pipeline >> >> >> between the previously captured checkpoint state and the failure. >> >> >> >> >> >> My project acknowledges(commits) back to the topic after the event >> >> >> read and mongo ingestion. But the pipeline has transformation, >> >> >> enrichment and sink operators after that. These missing events were >> >> >> read, ack'd back to the topic and transformed successfully before >> >> >> failure and were not yet checkpointed (withing the 100 ms interval >> >> >> between checkpoints) were dropped. >> >> >> >> >> >> Pipeline: Source (solace topic, queue reader) --> [MongoWrite + >> >> >> sourcecommit] --> transform --> enrich --> sink (solace topic) >> >> >> >> >> >> Checkpoint: Unaligned, Exactly-once, 100ms interval, 10ms Min pause >> >> >> between checkpoint >> >> >> >> >> >> I see that whenever runtime exception is thrown it triggers the close >> >> >> method in each of the functions one by one. Do we have to store the >> >> >> states which were not yet captured by the checkpoint before failure? >> >> >> What happens Network failures or task manager crash or any other >> >> >> abrupt failure? >> >> >> >> >> >> Or do we have to shift the source topic acknowledgment to the last ( >> >> >> but we will have to chain all this operators to run in a single thread >> >> >> and carry the bytearray message object from solace queue to do ack at >> >> >> the end). >> >> >> >> >> >> Is there anything else Iam missing here? >> >> >> >> >> >> >> >> >> Note: Sources and Sinks are fully Solace based in all the Flink >> >> >> pipelines ( queues and topics) >> >> >> >> >> >>