Just found that we can use prepareSnapshotPreBarrier
<https://nightlies.apache.org/flink/flink-docs-release-1.7/api/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.html#prepareSnapshotPreBarrier-long->
in AbstractStreamOperator to achieve this.

On Tue, Feb 15, 2022 at 9:13 AM Gopi Krishna M <gopikrish...@gmail.com>
wrote:

> CheckpointedFunction docs mention the following -
>
>> The snapshotState(FunctionSnapshotContext)
>> <https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.html#snapshotState-org.apache.flink.runtime.state.FunctionSnapshotContext->
>> is called whenever a checkpoint takes a state snapshot of the
>> transformation function. Inside this method, functions typically make sure
>> that the checkpointed data structures (obtained in the initialization
>> phase) are up to date for a snapshot to be taken. The given snapshot
>> context gives access to the metadata of the checkpoint.
>>
>> *In addition, functions can use this method as a hook to
>> flush/commit/synchronize with external systems. *
>>
> Is there further documentation/examples of this synchronization?
>
> On Mon, Feb 14, 2022 at 10:50 PM Gopi Krishna M <gopikrish...@gmail.com>
> wrote:
>
>>
>> On Mon, Feb 14, 2022 at 10:00 PM Niklas Semmler <nik...@ververica.com>
>> wrote:
>>
>>> So, you want to send basically the last message before the barrier?
>>>
>> Yes.
>>
>>
>>>
>>> Can you not instead send the first message after the barrier? From a
>>> first glance this sounds easier.
>>>
>> I'm not sure if this will help me synchronize the sink with the same
>> barrier.
>>
>>>
>>> Can you share what you are trying to accomplish?
>>>
>> Here's the objective I'm trying to achieve:
>> https://github.com/gopik/storage-reading-list/blob/main/RealtimeAnalytics.md#streaming-update-using-flink
>>
>> Basically, I want to capture DB changes via CDC and update a parquet
>> table (in delta format) consistently at each checkpoint. So, the data is
>> first partitioned by primary key, each task handling a set of keys causes
>> new files to be written, then when sink waits for barrier from all tasks
>> which will follow the file names. Then the sink updates the delta table via
>> a transaction and then consumes the barrier.
>>
>>
>>>
>>> Best regards,
>>> Niklas
>>>
>>> > On 14. Feb 2022, at 17:04, Gopi Krishna M <gopikrish...@gmail.com>
>>> wrote:
>>> >
>>> > Thanks Niklas! This helps with synchronizing uploads across
>>> partitioned tasks. The next step is to pass the handle to this upload to
>>> the sink which should be part of the same checkpoint. Is it possible to do
>>> the following:
>>> >
>>> > 1. Keep reducing the events to keyedStore.
>>> > 2. On snapshotState: upload the events and get the handle. Generate
>>> this handle as the output for the sink to consume.
>>> > 3. Return from snapshotState.
>>> >
>>> > Basically I want to ensure that the handle output is received by the
>>> next stage before this checkpoint barrier.
>>> >
>>> > On Mon, Feb 14, 2022 at 8:11 PM Niklas Semmler <nik...@ververica.com>
>>> wrote:
>>> > Hi Gopi,
>>> >
>>> > You can implement CheckpointedFunction and use the method
>>> snapshotState(FunctionSnapshotContext) to upload state on each checkpoint.
>>> >
>>> >
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/streaming/api/checkpoint/CheckpointedFunction.html
>>> >
>>> > Make sure, you don’t have unaligned checkpointing enabled.
>>> >
>>> > What it is:
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/concepts/stateful-stream-processing/#unaligned-checkpointing
>>> > How to configure:
>>> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpointing_under_backpressure/#unaligned-checkpoints
>>> >
>>> > Note that delays introduced in CheckpointedFunction#snapshotState can
>>> slow down the job.
>>> >
>>> > You can also get the snapshot id from the FunctionSnapshotContext.
>>> Maybe that removes the need for the source logic?
>>> >
>>> > Does this help?
>>> >
>>> > Best regards,
>>> > Niklas
>>> >
>>> >
>>> > > On 14. Feb 2022, at 05:27, Gopi Krishna M <gopikrish...@gmail.com>
>>> wrote:
>>> > >
>>> > > Hi,
>>> > > In my flink operators, I need to connect to an external service to
>>> update state. I was thinking that the updates to the external service can
>>> be synchronized via checkpoint barriers.
>>> > >
>>> > > The topology of the stream is a source, then a single stage of
>>> operator replicas handling different partitions, then all joining in a
>>> single sink.
>>> > >
>>> > > Each operator will contact the external service when it receives a
>>> checkpoint barrier and uploads local state (which caches the uploads and
>>> returns a handle).
>>> > >
>>> > > After upload, it forwards the cache handle to the sink. Once sink
>>> receives handles from all such operators, it calls the external service
>>> with a list of handles received. This helps ensure that all handles are
>>> from the same checkpoint barrier.
>>> > >
>>> > > Is it possible to achieve this in a flink application?
>>> > >
>>> > > Thanks,
>>> > > Gopi
>>> >
>>>
>>>

Reply via email to