CheckpointedFunction docs mention the following - > The snapshotState(FunctionSnapshotContext) > <https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.html#snapshotState-org.apache.flink.runtime.state.FunctionSnapshotContext-> > is called whenever a checkpoint takes a state snapshot of the > transformation function. Inside this method, functions typically make sure > that the checkpointed data structures (obtained in the initialization > phase) are up to date for a snapshot to be taken. The given snapshot > context gives access to the metadata of the checkpoint. > > *In addition, functions can use this method as a hook to > flush/commit/synchronize with external systems. * > Is there further documentation/examples of this synchronization?
On Mon, Feb 14, 2022 at 10:50 PM Gopi Krishna M <gopikrish...@gmail.com> wrote: > > On Mon, Feb 14, 2022 at 10:00 PM Niklas Semmler <nik...@ververica.com> > wrote: > >> So, you want to send basically the last message before the barrier? >> > Yes. > > >> >> Can you not instead send the first message after the barrier? From a >> first glance this sounds easier. >> > I'm not sure if this will help me synchronize the sink with the same > barrier. > >> >> Can you share what you are trying to accomplish? >> > Here's the objective I'm trying to achieve: > https://github.com/gopik/storage-reading-list/blob/main/RealtimeAnalytics.md#streaming-update-using-flink > > Basically, I want to capture DB changes via CDC and update a parquet table > (in delta format) consistently at each checkpoint. So, the data is first > partitioned by primary key, each task handling a set of keys causes new > files to be written, then when sink waits for barrier from all tasks which > will follow the file names. Then the sink updates the delta table via a > transaction and then consumes the barrier. > > >> >> Best regards, >> Niklas >> >> > On 14. Feb 2022, at 17:04, Gopi Krishna M <gopikrish...@gmail.com> >> wrote: >> > >> > Thanks Niklas! This helps with synchronizing uploads across partitioned >> tasks. The next step is to pass the handle to this upload to the sink which >> should be part of the same checkpoint. Is it possible to do the following: >> > >> > 1. Keep reducing the events to keyedStore. >> > 2. On snapshotState: upload the events and get the handle. Generate >> this handle as the output for the sink to consume. >> > 3. Return from snapshotState. >> > >> > Basically I want to ensure that the handle output is received by the >> next stage before this checkpoint barrier. >> > >> > On Mon, Feb 14, 2022 at 8:11 PM Niklas Semmler <nik...@ververica.com> >> wrote: >> > Hi Gopi, >> > >> > You can implement CheckpointedFunction and use the method >> snapshotState(FunctionSnapshotContext) to upload state on each checkpoint. >> > >> > >> https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.html >> > >> > Make sure, you don’t have unaligned checkpointing enabled. >> > >> > What it is: >> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/#unaligned-checkpointing >> > How to configure: >> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpointing_under_backpressure/#unaligned-checkpoints >> > >> > Note that delays introduced in CheckpointedFunction#snapshotState can >> slow down the job. >> > >> > You can also get the snapshot id from the FunctionSnapshotContext. >> Maybe that removes the need for the source logic? >> > >> > Does this help? >> > >> > Best regards, >> > Niklas >> > >> > >> > > On 14. Feb 2022, at 05:27, Gopi Krishna M <gopikrish...@gmail.com> >> wrote: >> > > >> > > Hi, >> > > In my flink operators, I need to connect to an external service to >> update state. I was thinking that the updates to the external service can >> be synchronized via checkpoint barriers. >> > > >> > > The topology of the stream is a source, then a single stage of >> operator replicas handling different partitions, then all joining in a >> single sink. >> > > >> > > Each operator will contact the external service when it receives a >> checkpoint barrier and uploads local state (which caches the uploads and >> returns a handle). >> > > >> > > After upload, it forwards the cache handle to the sink. Once sink >> receives handles from all such operators, it calls the external service >> with a list of handles received. This helps ensure that all handles are >> from the same checkpoint barrier. >> > > >> > > Is it possible to achieve this in a flink application? >> > > >> > > Thanks, >> > > Gopi >> > >> >>