So, you want to send basically the last message before the barrier?

Can you not instead send the first message after the barrier? From a first 
glance this sounds easier.

Can you share what you are trying to accomplish? 

Best regards,
Niklas

> On 14. Feb 2022, at 17:04, Gopi Krishna M <gopikrish...@gmail.com> wrote:
> 
> Thanks Niklas! This helps with synchronizing uploads across partitioned 
> tasks. The next step is to pass the handle to this upload to the sink which 
> should be part of the same checkpoint. Is it possible to do the following:
> 
> 1. Keep reducing the events to keyedStore.
> 2. On snapshotState: upload the events and get the handle. Generate this 
> handle as the output for the sink to consume.
> 3. Return from snapshotState.
> 
> Basically I want to ensure that the handle output is received by the next 
> stage before this checkpoint barrier.
> 
> On Mon, Feb 14, 2022 at 8:11 PM Niklas Semmler <nik...@ververica.com> wrote:
> Hi Gopi,
> 
> You can implement CheckpointedFunction and use the method 
> snapshotState(FunctionSnapshotContext) to upload state on each checkpoint.
> 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.html
> 
> Make sure, you don’t have unaligned checkpointing enabled.
> 
> What it is: 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/#unaligned-checkpointing
> How to configure: 
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpointing_under_backpressure/#unaligned-checkpoints
> 
> Note that delays introduced in CheckpointedFunction#snapshotState can slow 
> down the job. 
> 
> You can also get the snapshot id from the FunctionSnapshotContext. Maybe that 
> removes the need for the source logic?
> 
> Does this help?
> 
> Best regards,
> Niklas
> 
> 
> > On 14. Feb 2022, at 05:27, Gopi Krishna M <gopikrish...@gmail.com> wrote:
> > 
> > Hi,
> > In my flink operators, I need to connect to an external service to update 
> > state. I was thinking that the updates to the external service can be 
> > synchronized via checkpoint barriers. 
> > 
> > The topology of the stream is a source, then a single stage of operator 
> > replicas handling different partitions, then all joining in a single sink.
> > 
> > Each operator will contact the external service when it receives a 
> > checkpoint barrier and uploads local state (which caches the uploads and 
> > returns a handle).
> > 
> > After upload, it forwards the cache handle to the sink. Once sink receives 
> > handles from all such operators, it calls the external service with a list 
> > of handles received. This helps ensure that all handles are from the same 
> > checkpoint barrier.
> > 
> > Is it possible to achieve this in a flink application?
> > 
> > Thanks,
> > Gopi
> 

Reply via email to