Okay I see, thank you for your quick reply! I'll have a look into that file.

Do you have an idea of on which interval I could expect the Dataflow runner
to initiate the finalization? Thinking of the case where I have a message
ack deadline of e.g. 30s and a continuous stream of messages that keeps the
ProcessElement active. Then I will want to interrupt processing of new
messages and self-checkpoint before those 30s have passed, if the runner
hasn't initiated it within that time frame.

Johanna

On Sun, Oct 15, 2023, 21:13 Robert Burke <rob...@frantil.com> wrote:

> Hi! Nswers inline.
>
> On Sun, Oct 15, 2023, 11:48 AM Johanna Öjeling via dev <
> dev@beam.apache.org> wrote:
>
>> Hi,
>>
>> I'm working on a native streaming IO connector for the Go SDK to enable
>> reads and writes from/to NATS (#29000
>> <https://github.com/apache/beam/issues/29000>) and would like to better
>> understand how bundle finalization works.
>>
>> For this use case I need to register a callback function which
>> acknowledges the processed messages.
>>
>> More concretely, I wonder:
>>
>>    - When/with which interval is the callback function invoked? Is it
>>    supposed to happen at every runner and SDF initiated checkpoint?
>>
>> A runner that supports it should be calling back to the SDK for
> finalization after a bundle is completed, regardless of how the bundle
> terminates successfully. Eg. Via splits, or completed data, or process
> continuation resume/stop.
>
>
>>    - Is it correct that the registered callback is called and retried on
>>    a best effort? What is the estimated success rate for this best effort?
>>
>> I don't believe the callback would be retried if it failed, since
> technically the point is to callback after the runner has committed the
> bundles output successfully. A failure in finalization would require
> retractions, essentially.
>
> It is best effort, since a callback could expire before bundle execution
> completes.
>
>>
>> When running a WIP example pipeline
>> <https://github.com/johannaojeling/beam/blob/cfa6babbd0c9b1196c186974165beb7559a226db/sdks/go/examples/natsread/main.go>
>> on Dataflow the callback function is only ever called once but ignored
>> after subsequent checkpoints so thereby my questions.
>>
>
> That is very odd. My understanding is that it should be after every
> complete bundle, if the stage contains a registered callback prior to it's
> expiration time.
>
> The SDK code has a lot of time.Now() checks, so depending if the
> expiration time is set relative to processing time, it could be mistakenly
> dropping the valid callback.
>
> It does look like failed callbacks are re-queued after being called;
> (exec/plan.go:188), but only if they haven't yet expired. Expired callbacks
> are never run.
>
> I'm not sure what the correct runner side behavior is though.
>
>
>> If anyone could help clarify the above or point me towards some
>> documentation that would be much appreciated!
>>
>> Thanks,
>> Johanna
>>
>>

Reply via email to