Just found that we can use prepareSnapshotPreBarrier <https://nightlies.apache.org/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.html#prepareSnapshotPreBarrier-long-> in AbstractStreamOperator to achieve this.
On Tue, Feb 15, 2022 at 9:13 AM Gopi Krishna M <gopikrish...@gmail.com> wrote: > CheckpointedFunction docs mention the following - > >> The snapshotState(FunctionSnapshotContext) >> <https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.html#snapshotState-org.apache.flink.runtime.state.FunctionSnapshotContext-> >> is called whenever a checkpoint takes a state snapshot of the >> transformation function. Inside this method, functions typically make sure >> that the checkpointed data structures (obtained in the initialization >> phase) are up to date for a snapshot to be taken. The given snapshot >> context gives access to the metadata of the checkpoint. >> >> *In addition, functions can use this method as a hook to >> flush/commit/synchronize with external systems. * >> > Is there further documentation/examples of this synchronization? > > On Mon, Feb 14, 2022 at 10:50 PM Gopi Krishna M <gopikrish...@gmail.com> > wrote: > >> >> On Mon, Feb 14, 2022 at 10:00 PM Niklas Semmler <nik...@ververica.com> >> wrote: >> >>> So, you want to send basically the last message before the barrier? >>> >> Yes. >> >> >>> >>> Can you not instead send the first message after the barrier? From a >>> first glance this sounds easier. >>> >> I'm not sure if this will help me synchronize the sink with the same >> barrier. >> >>> >>> Can you share what you are trying to accomplish? >>> >> Here's the objective I'm trying to achieve: >> https://github.com/gopik/storage-reading-list/blob/main/RealtimeAnalytics.md#streaming-update-using-flink >> >> Basically, I want to capture DB changes via CDC and update a parquet >> table (in delta format) consistently at each checkpoint. So, the data is >> first partitioned by primary key, each task handling a set of keys causes >> new files to be written, then when sink waits for barrier from all tasks >> which will follow the file names. Then the sink updates the delta table via >> a transaction and then consumes the barrier. >> >> >>> >>> Best regards, >>> Niklas >>> >>> > On 14. Feb 2022, at 17:04, Gopi Krishna M <gopikrish...@gmail.com> >>> wrote: >>> > >>> > Thanks Niklas! This helps with synchronizing uploads across >>> partitioned tasks. The next step is to pass the handle to this upload to >>> the sink which should be part of the same checkpoint. Is it possible to do >>> the following: >>> > >>> > 1. Keep reducing the events to keyedStore. >>> > 2. On snapshotState: upload the events and get the handle. Generate >>> this handle as the output for the sink to consume. >>> > 3. Return from snapshotState. >>> > >>> > Basically I want to ensure that the handle output is received by the >>> next stage before this checkpoint barrier. >>> > >>> > On Mon, Feb 14, 2022 at 8:11 PM Niklas Semmler <nik...@ververica.com> >>> wrote: >>> > Hi Gopi, >>> > >>> > You can implement CheckpointedFunction and use the method >>> snapshotState(FunctionSnapshotContext) to upload state on each checkpoint. >>> > >>> > >>> https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.html >>> > >>> > Make sure, you don’t have unaligned checkpointing enabled. >>> > >>> > What it is: >>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/#unaligned-checkpointing >>> > How to configure: >>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpointing_under_backpressure/#unaligned-checkpoints >>> > >>> > Note that delays introduced in CheckpointedFunction#snapshotState can >>> slow down the job. >>> > >>> > You can also get the snapshot id from the FunctionSnapshotContext. >>> Maybe that removes the need for the source logic? >>> > >>> > Does this help? >>> > >>> > Best regards, >>> > Niklas >>> > >>> > >>> > > On 14. Feb 2022, at 05:27, Gopi Krishna M <gopikrish...@gmail.com> >>> wrote: >>> > > >>> > > Hi, >>> > > In my flink operators, I need to connect to an external service to >>> update state. I was thinking that the updates to the external service can >>> be synchronized via checkpoint barriers. >>> > > >>> > > The topology of the stream is a source, then a single stage of >>> operator replicas handling different partitions, then all joining in a >>> single sink. >>> > > >>> > > Each operator will contact the external service when it receives a >>> checkpoint barrier and uploads local state (which caches the uploads and >>> returns a handle). >>> > > >>> > > After upload, it forwards the cache handle to the sink. Once sink >>> receives handles from all such operators, it calls the external service >>> with a list of handles received. This helps ensure that all handles are >>> from the same checkpoint barrier. >>> > > >>> > > Is it possible to achieve this in a flink application? >>> > > >>> > > Thanks, >>> > > Gopi >>> > >>> >>>