So, you want to send basically the last message before the barrier? Can you not instead send the first message after the barrier? From a first glance this sounds easier.
Can you share what you are trying to accomplish? Best regards, Niklas > On 14. Feb 2022, at 17:04, Gopi Krishna M <gopikrish...@gmail.com> wrote: > > Thanks Niklas! This helps with synchronizing uploads across partitioned > tasks. The next step is to pass the handle to this upload to the sink which > should be part of the same checkpoint. Is it possible to do the following: > > 1. Keep reducing the events to keyedStore. > 2. On snapshotState: upload the events and get the handle. Generate this > handle as the output for the sink to consume. > 3. Return from snapshotState. > > Basically I want to ensure that the handle output is received by the next > stage before this checkpoint barrier. > > On Mon, Feb 14, 2022 at 8:11 PM Niklas Semmler <nik...@ververica.com> wrote: > Hi Gopi, > > You can implement CheckpointedFunction and use the method > snapshotState(FunctionSnapshotContext) to upload state on each checkpoint. > > https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.html > > Make sure, you don’t have unaligned checkpointing enabled. > > What it is: > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/#unaligned-checkpointing > How to configure: > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpointing_under_backpressure/#unaligned-checkpoints > > Note that delays introduced in CheckpointedFunction#snapshotState can slow > down the job. > > You can also get the snapshot id from the FunctionSnapshotContext. Maybe that > removes the need for the source logic? > > Does this help? > > Best regards, > Niklas > > > > On 14. Feb 2022, at 05:27, Gopi Krishna M <gopikrish...@gmail.com> wrote: > > > > Hi, > > In my flink operators, I need to connect to an external service to update > > state. I was thinking that the updates to the external service can be > > synchronized via checkpoint barriers. > > > > The topology of the stream is a source, then a single stage of operator > > replicas handling different partitions, then all joining in a single sink. > > > > Each operator will contact the external service when it receives a > > checkpoint barrier and uploads local state (which caches the uploads and > > returns a handle). > > > > After upload, it forwards the cache handle to the sink. Once sink receives > > handles from all such operators, it calls the external service with a list > > of handles received. This helps ensure that all handles are from the same > > checkpoint barrier. > > > > Is it possible to achieve this in a flink application? > > > > Thanks, > > Gopi >