Sources don't need to support two phase commits, that's something for sinks. I think the example of exactly-once-processing (although the interfaces have changed since) at https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/ is still a good explainer on this topic.
Best regards, Martijn On Mon, Feb 12, 2024 at 4:02 PM Kartik Kushwaha <kushwaha.karti...@gmail.com> wrote: > > Let me put the question in other words. > > What happens if a source does not support two phase commit and the Flink job > has to guarantee exactly once delivery to downstream? Checkpointing as I > understand, works on interval basis. New events for which the checkpoint > barrier has not yet reached will get dropped or missed. What would be the > best way to save the state of these non checkpointed data and recover them on > task crash or job restarts, taking into account that regular checkpoint is > also enabled and restart and recovery should not lead to duplicates from the > user managed state vs the checkpointed state. > > > Regards > Kartik > > > On Mon, Feb 12, 2024, 9:50 AM Martijn Visser <martijnvis...@apache.org> wrote: >> >> Hi Kartik, >> >> I don't think there's much that the Flink community can do here to >> help you. The Solace source and sink aren't owned by the Flink >> project, and based on the source code they haven't been touched for >> the last 7 years [1] and I'm actually not aware of anyone who uses >> Solace at all. >> >> Best regards, >> >> Martijn >> >> [1] >> https://github.com/SolaceLabs/solace-integration-guides/tree/master/src/flink-jms-connector >> >> On Mon, Feb 12, 2024 at 2:58 PM Kartik Kushwaha >> <kushwaha.karti...@gmail.com> wrote: >> > >> > Any help here please. >> > >> > Regards, >> > Kartik >> > >> > >> > On Fri, Feb 9, 2024, 8:33 AM Kartik Kushwaha <kushwaha.karti...@gmail.com> >> > wrote: >> >> >> >> I am using flink checkpointing to restore states of my job. I am using >> >> unaligned checkpointing with 100 ms as the checkpointing interval. I see >> >> few events getting dropped that were sucessfully processed by the >> >> operators or were in-flight that were yet to be captured by checkpoint. >> >> That is these were new events which came into the pipeline between the >> >> previously captured checkpoint state and the failure. >> >> >> >> My project acknowledges(commits) back to the topic after the event read >> >> and mongo ingestion. But the pipeline has transformation, enrichment and >> >> sink operators after that. These missing events were read, ack'd back to >> >> the topic and transformed successfully before failure and were not yet >> >> checkpointed (withing the 100 ms interval between checkpoints) were >> >> dropped. >> >> >> >> Pipeline: Source (solace topic, queue reader) --> [MongoWrite + >> >> sourcecommit] --> transform --> enrich --> sink (solace topic) >> >> >> >> Checkpoint: Unaligned, Exactly-once, 100ms interval, 10ms Min pause >> >> between checkpoint >> >> >> >> I see that whenever runtime exception is thrown it triggers the close >> >> method in each of the functions one by one. Do we have to store the >> >> states which were not yet captured by the checkpoint before failure? What >> >> happens Network failures or task manager crash or any other abrupt >> >> failure? >> >> >> >> Or do we have to shift the source topic acknowledgment to the last ( but >> >> we will have to chain all this operators to run in a single thread and >> >> carry the bytearray message object from solace queue to do ack at the >> >> end). >> >> >> >> Is there anything else Iam missing here? >> >> >> >> >> >> Note: Sources and Sinks are fully Solace based in all the Flink pipelines >> >> ( queues and topics) >> >> >> >>