CheckpointedFunction docs mention the following -

> The snapshotState(FunctionSnapshotContext)
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.html#snapshotState-org.apache.flink.runtime.state.FunctionSnapshotContext->
> is called whenever a checkpoint takes a state snapshot of the
> transformation function. Inside this method, functions typically make sure
> that the checkpointed data structures (obtained in the initialization
> phase) are up to date for a snapshot to be taken. The given snapshot
> context gives access to the metadata of the checkpoint.
>
> *In addition, functions can use this method as a hook to
> flush/commit/synchronize with external systems. *
>
Is there further documentation/examples of this synchronization?

On Mon, Feb 14, 2022 at 10:50 PM Gopi Krishna M <gopikrish...@gmail.com>
wrote:

>
> On Mon, Feb 14, 2022 at 10:00 PM Niklas Semmler <nik...@ververica.com>
> wrote:
>
>> So, you want to send basically the last message before the barrier?
>>
> Yes.
>
>
>>
>> Can you not instead send the first message after the barrier? From a
>> first glance this sounds easier.
>>
> I'm not sure if this will help me synchronize the sink with the same
> barrier.
>
>>
>> Can you share what you are trying to accomplish?
>>
> Here's the objective I'm trying to achieve:
> https://github.com/gopik/storage-reading-list/blob/main/RealtimeAnalytics.md#streaming-update-using-flink
>
> Basically, I want to capture DB changes via CDC and update a parquet table
> (in delta format) consistently at each checkpoint. So, the data is first
> partitioned by primary key, each task handling a set of keys causes new
> files to be written, then when sink waits for barrier from all tasks which
> will follow the file names. Then the sink updates the delta table via a
> transaction and then consumes the barrier.
>
>
>>
>> Best regards,
>> Niklas
>>
>> > On 14. Feb 2022, at 17:04, Gopi Krishna M <gopikrish...@gmail.com>
>> wrote:
>> >
>> > Thanks Niklas! This helps with synchronizing uploads across partitioned
>> tasks. The next step is to pass the handle to this upload to the sink which
>> should be part of the same checkpoint. Is it possible to do the following:
>> >
>> > 1. Keep reducing the events to keyedStore.
>> > 2. On snapshotState: upload the events and get the handle. Generate
>> this handle as the output for the sink to consume.
>> > 3. Return from snapshotState.
>> >
>> > Basically I want to ensure that the handle output is received by the
>> next stage before this checkpoint barrier.
>> >
>> > On Mon, Feb 14, 2022 at 8:11 PM Niklas Semmler <nik...@ververica.com>
>> wrote:
>> > Hi Gopi,
>> >
>> > You can implement CheckpointedFunction and use the method
>> snapshotState(FunctionSnapshotContext) to upload state on each checkpoint.
>> >
>> >
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.html
>> >
>> > Make sure, you don’t have unaligned checkpointing enabled.
>> >
>> > What it is:
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/#unaligned-checkpointing
>> > How to configure:
>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpointing_under_backpressure/#unaligned-checkpoints
>> >
>> > Note that delays introduced in CheckpointedFunction#snapshotState can
>> slow down the job.
>> >
>> > You can also get the snapshot id from the FunctionSnapshotContext.
>> Maybe that removes the need for the source logic?
>> >
>> > Does this help?
>> >
>> > Best regards,
>> > Niklas
>> >
>> >
>> > > On 14. Feb 2022, at 05:27, Gopi Krishna M <gopikrish...@gmail.com>
>> wrote:
>> > >
>> > > Hi,
>> > > In my flink operators, I need to connect to an external service to
>> update state. I was thinking that the updates to the external service can
>> be synchronized via checkpoint barriers.
>> > >
>> > > The topology of the stream is a source, then a single stage of
>> operator replicas handling different partitions, then all joining in a
>> single sink.
>> > >
>> > > Each operator will contact the external service when it receives a
>> checkpoint barrier and uploads local state (which caches the uploads and
>> returns a handle).
>> > >
>> > > After upload, it forwards the cache handle to the sink. Once sink
>> receives handles from all such operators, it calls the external service
>> with a list of handles received. This helps ensure that all handles are
>> from the same checkpoint barrier.
>> > >
>> > > Is it possible to achieve this in a flink application?
>> > >
>> > > Thanks,
>> > > Gopi
>> >
>>
>>

Reply via email to