Hi! Nswers inline.

On Sun, Oct 15, 2023, 11:48 AM Johanna Öjeling via dev <dev@beam.apache.org>
wrote:

> Hi,
>
> I'm working on a native streaming IO connector for the Go SDK to enable
> reads and writes from/to NATS (#29000
> <https://github.com/apache/beam/issues/29000>) and would like to better
> understand how bundle finalization works.
>
> For this use case I need to register a callback function which
> acknowledges the processed messages.
>
> More concretely, I wonder:
>
>    - When/with which interval is the callback function invoked? Is it
>    supposed to happen at every runner and SDF initiated checkpoint?
>
> A runner that supports it should be calling back to the SDK for
finalization after a bundle is completed, regardless of how the bundle
terminates successfully. Eg. Via splits, or completed data, or process
continuation resume/stop.


>    - Is it correct that the registered callback is called and retried on
>    a best effort? What is the estimated success rate for this best effort?
>
> I don't believe the callback would be retried if it failed, since
technically the point is to callback after the runner has committed the
bundles output successfully. A failure in finalization would require
retractions, essentially.

It is best effort, since a callback could expire before bundle execution
completes.

>
> When running a WIP example pipeline
> <https://github.com/johannaojeling/beam/blob/cfa6babbd0c9b1196c186974165beb7559a226db/sdks/go/examples/natsread/main.go>
> on Dataflow the callback function is only ever called once but ignored
> after subsequent checkpoints so thereby my questions.
>

That is very odd. My understanding is that it should be after every
complete bundle, if the stage contains a registered callback prior to it's
expiration time.

The SDK code has a lot of time.Now() checks, so depending if the expiration
time is set relative to processing time, it could be mistakenly dropping
the valid callback.

It does look like failed callbacks are re-queued after being called;
(exec/plan.go:188), but only if they haven't yet expired. Expired callbacks
are never run.

I'm not sure what the correct runner side behavior is though.


> If anyone could help clarify the above or point me towards some
> documentation that would be much appreciated!
>
> Thanks,
> Johanna
>
>

Reply via email to