Okay I see, thank you for your quick reply! I'll have a look into that file.
Do you have an idea of on which interval I could expect the Dataflow runner to initiate the finalization? Thinking of the case where I have a message ack deadline of e.g. 30s and a continuous stream of messages that keeps the ProcessElement active. Then I will want to interrupt processing of new messages and self-checkpoint before those 30s have passed, if the runner hasn't initiated it within that time frame. Johanna On Sun, Oct 15, 2023, 21:13 Robert Burke <rob...@frantil.com> wrote: > Hi! Nswers inline. > > On Sun, Oct 15, 2023, 11:48 AM Johanna Öjeling via dev < > dev@beam.apache.org> wrote: > >> Hi, >> >> I'm working on a native streaming IO connector for the Go SDK to enable >> reads and writes from/to NATS (#29000 >> <https://github.com/apache/beam/issues/29000>) and would like to better >> understand how bundle finalization works. >> >> For this use case I need to register a callback function which >> acknowledges the processed messages. >> >> More concretely, I wonder: >> >> - When/with which interval is the callback function invoked? Is it >> supposed to happen at every runner and SDF initiated checkpoint? >> >> A runner that supports it should be calling back to the SDK for > finalization after a bundle is completed, regardless of how the bundle > terminates successfully. Eg. Via splits, or completed data, or process > continuation resume/stop. > > >> - Is it correct that the registered callback is called and retried on >> a best effort? What is the estimated success rate for this best effort? >> >> I don't believe the callback would be retried if it failed, since > technically the point is to callback after the runner has committed the > bundles output successfully. A failure in finalization would require > retractions, essentially. > > It is best effort, since a callback could expire before bundle execution > completes. > >> >> When running a WIP example pipeline >> <https://github.com/johannaojeling/beam/blob/cfa6babbd0c9b1196c186974165beb7559a226db/sdks/go/examples/natsread/main.go> >> on Dataflow the callback function is only ever called once but ignored >> after subsequent checkpoints so thereby my questions. >> > > That is very odd. My understanding is that it should be after every > complete bundle, if the stage contains a registered callback prior to it's > expiration time. > > The SDK code has a lot of time.Now() checks, so depending if the > expiration time is set relative to processing time, it could be mistakenly > dropping the valid callback. > > It does look like failed callbacks are re-queued after being called; > (exec/plan.go:188), but only if they haven't yet expired. Expired callbacks > are never run. > > I'm not sure what the correct runner side behavior is though. > > >> If anyone could help clarify the above or point me towards some >> documentation that would be much appreciated! >> >> Thanks, >> Johanna >> >>