I would recommend avoiding over fitting to a specific runner, but given the
constraints, I'd say being eager about self checkpointing and ensuring fine
grain splits. This will allow runners to schedule more bundles in parallel
if they are able, and provide independence between them.

Part of the issue is that the downstream transforms will eat into that ack
deadline time as well. 30s is all the time to pull the message, process it
and any children downstream of the Read, and so on.

Dataflow biases towards small bundles during streaming execution, but
setting short Process Continuation suggestions should allow for low latency.

All that said, 30s sounds fairly short for an ack timeout (knowing little
about the specific source you're adding). I know that Google Cloud PubSub
auto-extends ack deadlines as long as the client connection remains open.
This is done automatically by the client itself. That's an alternative
possibility as well if the datasource supports it: manually extended the
ack deadline until the bundle completes normally, and then allowing
finalization to happen. (Balanced with how much state stays in memory and
so on).


On Sun, Oct 15, 2023, 1:30 PM Johanna Öjeling <joha...@ojeling.net> wrote:

> Okay I see, thank you for your quick reply! I'll have a look into that
> file.
>
> Do you have an idea of on which interval I could expect the Dataflow
> runner to initiate the finalization? Thinking of the case where I have a
> message ack deadline of e.g. 30s and a continuous stream of messages that
> keeps the ProcessElement active. Then I will want to interrupt processing
> of new messages and self-checkpoint before those 30s have passed, if the
> runner hasn't initiated it within that time frame.
>
> Johanna
>
> On Sun, Oct 15, 2023, 21:13 Robert Burke <rob...@frantil.com> wrote:
>
>> Hi! Nswers inline.
>>
>> On Sun, Oct 15, 2023, 11:48 AM Johanna Öjeling via dev <
>> dev@beam.apache.org> wrote:
>>
>>> Hi,
>>>
>>> I'm working on a native streaming IO connector for the Go SDK to enable
>>> reads and writes from/to NATS (#29000
>>> <https://github.com/apache/beam/issues/29000>) and would like to better
>>> understand how bundle finalization works.
>>>
>>> For this use case I need to register a callback function which
>>> acknowledges the processed messages.
>>>
>>> More concretely, I wonder:
>>>
>>>    - When/with which interval is the callback function invoked? Is it
>>>    supposed to happen at every runner and SDF initiated checkpoint?
>>>
>>> A runner that supports it should be calling back to the SDK for
>> finalization after a bundle is completed, regardless of how the bundle
>> terminates successfully. Eg. Via splits, or completed data, or process
>> continuation resume/stop.
>>
>>
>>>    - Is it correct that the registered callback is called and retried
>>>    on a best effort? What is the estimated success rate for this best 
>>> effort?
>>>
>>> I don't believe the callback would be retried if it failed, since
>> technically the point is to callback after the runner has committed the
>> bundles output successfully. A failure in finalization would require
>> retractions, essentially.
>>
>> It is best effort, since a callback could expire before bundle execution
>> completes.
>>
>>>
>>> When running a WIP example pipeline
>>> <https://github.com/johannaojeling/beam/blob/cfa6babbd0c9b1196c186974165beb7559a226db/sdks/go/examples/natsread/main.go>
>>> on Dataflow the callback function is only ever called once but ignored
>>> after subsequent checkpoints so thereby my questions.
>>>
>>
>> That is very odd. My understanding is that it should be after every
>> complete bundle, if the stage contains a registered callback prior to it's
>> expiration time.
>>
>> The SDK code has a lot of time.Now() checks, so depending if the
>> expiration time is set relative to processing time, it could be mistakenly
>> dropping the valid callback.
>>
>> It does look like failed callbacks are re-queued after being called;
>> (exec/plan.go:188), but only if they haven't yet expired. Expired callbacks
>> are never run.
>>
>> I'm not sure what the correct runner side behavior is though.
>>
>>
>>> If anyone could help clarify the above or point me towards some
>>> documentation that would be much appreciated!
>>>
>>> Thanks,
>>> Johanna
>>>
>>>

Reply via email to