Hi Kartik,

It should be the other way around: the connector should use the proper
Source and Sink interfaces, and therefore get the right guarantees and
integration with mechanisms like checkpoints and savepoints. I would
say there's no other way to achieve your desired result, because of
all the edge cases there exist.

Best regards,

Martijn

On Tue, Feb 13, 2024 at 2:08 AM Kartik Kushwaha
<kushwaha.karti...@gmail.com> wrote:
>
> Thank you Martijn, the article you provided had detailed explanation on the 
> exactly once two phase commit.
>
>
> Returning to the best way to handle commits/acknowledgments on sources like 
> JMS Queues or Solace topics to  support guaranteed delivery, when they are 
> not supported out of the box by Flink (especially when there is no concept of 
> offset), will the CheckpointListner API and its Checkpoint Subsuming Contract 
> make it possible to perform the JMS message consumption commits by overriding 
> "notifyCheckpointCommit" and "notifyCheckpointAbort" methods. In other words 
> is there a way to find when a Checkpoint is complete in an Operator and 
> perform specific actions when it is complete? Any articles around these would 
> help.
>
> Regards,
> Kartik
>
>
> On Mon, Feb 12, 2024, 10:24 AM Martijn Visser <martijnvis...@apache.org> 
> wrote:
>>
>> Sources don't need to support two phase commits, that's something for
>> sinks. I think the example of exactly-once-processing (although the
>> interfaces have changed since) at
>> https://flink.apache.org/2018/02/28/an-overview-of-end-to-end-exactly-once-processing-in-apache-flink-with-apache-kafka-too/
>> is still a good explainer on this topic.
>>
>> Best regards,
>>
>> Martijn
>>
>> On Mon, Feb 12, 2024 at 4:02 PM Kartik Kushwaha
>> <kushwaha.karti...@gmail.com> wrote:
>> >
>> > Let me put the question in other words.
>> >
>> > What happens if a source does not support two phase commit and the Flink 
>> > job has to guarantee exactly once delivery to downstream? Checkpointing as 
>> > I understand, works on interval basis. New events for which the checkpoint 
>> > barrier has not yet reached will get dropped or missed. What would be the 
>> > best way to save the state of these non checkpointed data and recover them 
>> > on task crash or job restarts, taking into account that regular checkpoint 
>> > is also enabled and restart and recovery should not lead to duplicates 
>> > from the user managed state vs the checkpointed state.
>> >
>> >
>> > Regards
>> > Kartik
>> >
>> >
>> > On Mon, Feb 12, 2024, 9:50 AM Martijn Visser <martijnvis...@apache.org> 
>> > wrote:
>> >>
>> >> Hi Kartik,
>> >>
>> >> I don't think there's much that the Flink community can do here to
>> >> help you. The Solace source and sink aren't owned by the Flink
>> >> project, and based on the source code they haven't been touched for
>> >> the last 7 years [1] and I'm actually not aware of anyone who uses
>> >> Solace at all.
>> >>
>> >> Best regards,
>> >>
>> >> Martijn
>> >>
>> >> [1] 
>> >> https://github.com/SolaceLabs/solace-integration-guides/tree/master/src/flink-jms-connector
>> >>
>> >> On Mon, Feb 12, 2024 at 2:58 PM Kartik Kushwaha
>> >> <kushwaha.karti...@gmail.com> wrote:
>> >> >
>> >> > Any help here please.
>> >> >
>> >> > Regards,
>> >> > Kartik
>> >> >
>> >> >
>> >> > On Fri, Feb 9, 2024, 8:33 AM Kartik Kushwaha 
>> >> > <kushwaha.karti...@gmail.com> wrote:
>> >> >>
>> >> >> I am using flink checkpointing to restore states of my job. I am using 
>> >> >> unaligned checkpointing with 100 ms as the checkpointing interval. I 
>> >> >> see few events getting dropped that were sucessfully processed by the 
>> >> >> operators or were in-flight that were yet to be captured by 
>> >> >> checkpoint. That is these were new events which came into the pipeline 
>> >> >> between the previously captured checkpoint state and the failure.
>> >> >>
>> >> >> My project acknowledges(commits) back to the topic after the event 
>> >> >> read and mongo ingestion. But the pipeline has transformation, 
>> >> >> enrichment and sink operators after that. These missing events were 
>> >> >> read, ack'd back to the topic and transformed successfully before 
>> >> >> failure and were not yet checkpointed (withing the 100 ms interval 
>> >> >> between checkpoints) were dropped.
>> >> >>
>> >> >> Pipeline: Source (solace topic, queue reader) --> [MongoWrite + 
>> >> >> sourcecommit] --> transform --> enrich --> sink (solace topic)
>> >> >>
>> >> >> Checkpoint: Unaligned, Exactly-once, 100ms interval, 10ms Min pause 
>> >> >> between checkpoint
>> >> >>
>> >> >> I see that whenever runtime exception is thrown it triggers the close 
>> >> >> method in each of the functions one by one. Do we have to store the 
>> >> >> states which were not yet captured by the checkpoint before failure? 
>> >> >> What happens Network failures or task manager crash or any other 
>> >> >> abrupt failure?
>> >> >>
>> >> >> Or do we have to shift the source topic acknowledgment to the last ( 
>> >> >> but we will have to chain all this operators to run in a single thread 
>> >> >> and carry the bytearray message object from solace queue to do ack at 
>> >> >> the end).
>> >> >>
>> >> >> Is there anything else Iam missing here?
>> >> >>
>> >> >>
>> >> >> Note: Sources and Sinks are fully Solace based in all the Flink 
>> >> >> pipelines ( queues and topics)
>> >> >>
>> >> >>

Reply via email to