Sources don't need to support two phase commits, that's something for
sinks. I think the example of exactly-once-processing (although the
interfaces have changed since) at
https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/
is still a good explainer on this topic.

Best regards,

Martijn

On Mon, Feb 12, 2024 at 4:02 PM Kartik Kushwaha
<kushwaha.karti...@gmail.com> wrote:
>
> Let me put the question in other words.
>
> What happens if a source does not support two phase commit and the Flink job 
> has to guarantee exactly once delivery to downstream? Checkpointing as I 
> understand, works on interval basis. New events for which the checkpoint 
> barrier has not yet reached will get dropped or missed. What would be the 
> best way to save the state of these non checkpointed data and recover them on 
> task crash or job restarts, taking into account that regular checkpoint is 
> also enabled and restart and recovery should not lead to duplicates from the 
> user managed state vs the checkpointed state.
>
>
> Regards
> Kartik
>
>
> On Mon, Feb 12, 2024, 9:50 AM Martijn Visser <martijnvis...@apache.org> wrote:
>>
>> Hi Kartik,
>>
>> I don't think there's much that the Flink community can do here to
>> help you. The Solace source and sink aren't owned by the Flink
>> project, and based on the source code they haven't been touched for
>> the last 7 years [1] and I'm actually not aware of anyone who uses
>> Solace at all.
>>
>> Best regards,
>>
>> Martijn
>>
>> [1] 
>> https://github.com/SolaceLabs/solace-integration-guides/tree/master/src/flink-jms-connector
>>
>> On Mon, Feb 12, 2024 at 2:58 PM Kartik Kushwaha
>> <kushwaha.karti...@gmail.com> wrote:
>> >
>> > Any help here please.
>> >
>> > Regards,
>> > Kartik
>> >
>> >
>> > On Fri, Feb 9, 2024, 8:33 AM Kartik Kushwaha <kushwaha.karti...@gmail.com> 
>> > wrote:
>> >>
>> >> I am using flink checkpointing to restore states of my job. I am using 
>> >> unaligned checkpointing with 100 ms as the checkpointing interval. I see 
>> >> few events getting dropped that were sucessfully processed by the 
>> >> operators or were in-flight that were yet to be captured by checkpoint. 
>> >> That is these were new events which came into the pipeline between the 
>> >> previously captured checkpoint state and the failure.
>> >>
>> >> My project acknowledges(commits) back to the topic after the event read 
>> >> and mongo ingestion. But the pipeline has transformation, enrichment and 
>> >> sink operators after that. These missing events were read, ack'd back to 
>> >> the topic and transformed successfully before failure and were not yet 
>> >> checkpointed (withing the 100 ms interval between checkpoints) were 
>> >> dropped.
>> >>
>> >> Pipeline: Source (solace topic, queue reader) --> [MongoWrite + 
>> >> sourcecommit] --> transform --> enrich --> sink (solace topic)
>> >>
>> >> Checkpoint: Unaligned, Exactly-once, 100ms interval, 10ms Min pause 
>> >> between checkpoint
>> >>
>> >> I see that whenever runtime exception is thrown it triggers the close 
>> >> method in each of the functions one by one. Do we have to store the 
>> >> states which were not yet captured by the checkpoint before failure? What 
>> >> happens Network failures or task manager crash or any other abrupt 
>> >> failure?
>> >>
>> >> Or do we have to shift the source topic acknowledgment to the last ( but 
>> >> we will have to chain all this operators to run in a single thread and 
>> >> carry the bytearray message object from solace queue to do ack at the 
>> >> end).
>> >>
>> >> Is there anything else Iam missing here?
>> >>
>> >>
>> >> Note: Sources and Sinks are fully Solace based in all the Flink pipelines 
>> >> ( queues and topics)
>> >>
>> >>

Reply via email to