Hi! Nswers inline. On Sun, Oct 15, 2023, 11:48 AM Johanna Öjeling via dev <dev@beam.apache.org> wrote:
> Hi, > > I'm working on a native streaming IO connector for the Go SDK to enable > reads and writes from/to NATS (#29000 > <https://github.com/apache/beam/issues/29000>) and would like to better > understand how bundle finalization works. > > For this use case I need to register a callback function which > acknowledges the processed messages. > > More concretely, I wonder: > > - When/with which interval is the callback function invoked? Is it > supposed to happen at every runner and SDF initiated checkpoint? > > A runner that supports it should be calling back to the SDK for finalization after a bundle is completed, regardless of how the bundle terminates successfully. Eg. Via splits, or completed data, or process continuation resume/stop. > - Is it correct that the registered callback is called and retried on > a best effort? What is the estimated success rate for this best effort? > > I don't believe the callback would be retried if it failed, since technically the point is to callback after the runner has committed the bundles output successfully. A failure in finalization would require retractions, essentially. It is best effort, since a callback could expire before bundle execution completes. > > When running a WIP example pipeline > <https://github.com/johannaojeling/beam/blob/cfa6babbd0c9b1196c186974165beb7559a226db/sdks/go/examples/natsread/main.go> > on Dataflow the callback function is only ever called once but ignored > after subsequent checkpoints so thereby my questions. > That is very odd. My understanding is that it should be after every complete bundle, if the stage contains a registered callback prior to it's expiration time. The SDK code has a lot of time.Now() checks, so depending if the expiration time is set relative to processing time, it could be mistakenly dropping the valid callback. It does look like failed callbacks are re-queued after being called; (exec/plan.go:188), but only if they haven't yet expired. Expired callbacks are never run. I'm not sure what the correct runner side behavior is though. > If anyone could help clarify the above or point me towards some > documentation that would be much appreciated! > > Thanks, > Johanna > >