Got it. The ack deadline is configurable on the consumer but this gave me some new ideas. Thanks!
Johanna On Sun, Oct 15, 2023, 22:51 Robert Burke <rob...@frantil.com> wrote: > I would recommend avoiding over fitting to a specific runner, but given > the constraints, I'd say being eager about self checkpointing and ensuring > fine grain splits. This will allow runners to schedule more bundles in > parallel if they are able, and provide independence between them. > > Part of the issue is that the downstream transforms will eat into that ack > deadline time as well. 30s is all the time to pull the message, process it > and any children downstream of the Read, and so on. > > Dataflow biases towards small bundles during streaming execution, but > setting short Process Continuation suggestions should allow for low latency. > > All that said, 30s sounds fairly short for an ack timeout (knowing little > about the specific source you're adding). I know that Google Cloud PubSub > auto-extends ack deadlines as long as the client connection remains open. > This is done automatically by the client itself. That's an alternative > possibility as well if the datasource supports it: manually extended the > ack deadline until the bundle completes normally, and then allowing > finalization to happen. (Balanced with how much state stays in memory and > so on). > > > On Sun, Oct 15, 2023, 1:30 PM Johanna Öjeling <joha...@ojeling.net> wrote: > >> Okay I see, thank you for your quick reply! I'll have a look into that >> file. >> >> Do you have an idea of on which interval I could expect the Dataflow >> runner to initiate the finalization? Thinking of the case where I have a >> message ack deadline of e.g. 30s and a continuous stream of messages that >> keeps the ProcessElement active. Then I will want to interrupt processing >> of new messages and self-checkpoint before those 30s have passed, if the >> runner hasn't initiated it within that time frame. >> >> Johanna >> >> On Sun, Oct 15, 2023, 21:13 Robert Burke <rob...@frantil.com> wrote: >> >>> Hi! Nswers inline. >>> >>> On Sun, Oct 15, 2023, 11:48 AM Johanna Öjeling via dev < >>> dev@beam.apache.org> wrote: >>> >>>> Hi, >>>> >>>> I'm working on a native streaming IO connector for the Go SDK to enable >>>> reads and writes from/to NATS (#29000 >>>> <https://github.com/apache/beam/issues/29000>) and would like to >>>> better understand how bundle finalization works. >>>> >>>> For this use case I need to register a callback function which >>>> acknowledges the processed messages. >>>> >>>> More concretely, I wonder: >>>> >>>> - When/with which interval is the callback function invoked? Is it >>>> supposed to happen at every runner and SDF initiated checkpoint? >>>> >>>> A runner that supports it should be calling back to the SDK for >>> finalization after a bundle is completed, regardless of how the bundle >>> terminates successfully. Eg. Via splits, or completed data, or process >>> continuation resume/stop. >>> >>> >>>> - Is it correct that the registered callback is called and retried >>>> on a best effort? What is the estimated success rate for this best >>>> effort? >>>> >>>> I don't believe the callback would be retried if it failed, since >>> technically the point is to callback after the runner has committed the >>> bundles output successfully. A failure in finalization would require >>> retractions, essentially. >>> >>> It is best effort, since a callback could expire before bundle execution >>> completes. >>> >>>> >>>> When running a WIP example pipeline >>>> <https://github.com/johannaojeling/beam/blob/cfa6babbd0c9b1196c186974165beb7559a226db/sdks/go/examples/natsread/main.go> >>>> on Dataflow the callback function is only ever called once but ignored >>>> after subsequent checkpoints so thereby my questions. >>>> >>> >>> That is very odd. My understanding is that it should be after every >>> complete bundle, if the stage contains a registered callback prior to it's >>> expiration time. >>> >>> The SDK code has a lot of time.Now() checks, so depending if the >>> expiration time is set relative to processing time, it could be mistakenly >>> dropping the valid callback. >>> >>> It does look like failed callbacks are re-queued after being called; >>> (exec/plan.go:188), but only if they haven't yet expired. Expired callbacks >>> are never run. >>> >>> I'm not sure what the correct runner side behavior is though. >>> >>> >>>> If anyone could help clarify the above or point me towards some >>>> documentation that would be much appreciated! >>>> >>>> Thanks, >>>> Johanna >>>> >>>>