Got it. The ack deadline is configurable on the consumer but this gave me
some new ideas. Thanks!

Johanna

On Sun, Oct 15, 2023, 22:51 Robert Burke <rob...@frantil.com> wrote:

> I would recommend avoiding over fitting to a specific runner, but given
> the constraints, I'd say being eager about self checkpointing and ensuring
> fine grain splits. This will allow runners to schedule more bundles in
> parallel if they are able, and provide independence between them.
>
> Part of the issue is that the downstream transforms will eat into that ack
> deadline time as well. 30s is all the time to pull the message, process it
> and any children downstream of the Read, and so on.
>
> Dataflow biases towards small bundles during streaming execution, but
> setting short Process Continuation suggestions should allow for low latency.
>
> All that said, 30s sounds fairly short for an ack timeout (knowing little
> about the specific source you're adding). I know that Google Cloud PubSub
> auto-extends ack deadlines as long as the client connection remains open.
> This is done automatically by the client itself. That's an alternative
> possibility as well if the datasource supports it: manually extended the
> ack deadline until the bundle completes normally, and then allowing
> finalization to happen. (Balanced with how much state stays in memory and
> so on).
>
>
> On Sun, Oct 15, 2023, 1:30 PM Johanna Öjeling <joha...@ojeling.net> wrote:
>
>> Okay I see, thank you for your quick reply! I'll have a look into that
>> file.
>>
>> Do you have an idea of on which interval I could expect the Dataflow
>> runner to initiate the finalization? Thinking of the case where I have a
>> message ack deadline of e.g. 30s and a continuous stream of messages that
>> keeps the ProcessElement active. Then I will want to interrupt processing
>> of new messages and self-checkpoint before those 30s have passed, if the
>> runner hasn't initiated it within that time frame.
>>
>> Johanna
>>
>> On Sun, Oct 15, 2023, 21:13 Robert Burke <rob...@frantil.com> wrote:
>>
>>> Hi! Nswers inline.
>>>
>>> On Sun, Oct 15, 2023, 11:48 AM Johanna Öjeling via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm working on a native streaming IO connector for the Go SDK to enable
>>>> reads and writes from/to NATS (#29000
>>>> <https://github.com/apache/beam/issues/29000>) and would like to
>>>> better understand how bundle finalization works.
>>>>
>>>> For this use case I need to register a callback function which
>>>> acknowledges the processed messages.
>>>>
>>>> More concretely, I wonder:
>>>>
>>>>    - When/with which interval is the callback function invoked? Is it
>>>>    supposed to happen at every runner and SDF initiated checkpoint?
>>>>
>>>> A runner that supports it should be calling back to the SDK for
>>> finalization after a bundle is completed, regardless of how the bundle
>>> terminates successfully. Eg. Via splits, or completed data, or process
>>> continuation resume/stop.
>>>
>>>
>>>>    - Is it correct that the registered callback is called and retried
>>>>    on a best effort? What is the estimated success rate for this best 
>>>> effort?
>>>>
>>>> I don't believe the callback would be retried if it failed, since
>>> technically the point is to callback after the runner has committed the
>>> bundles output successfully. A failure in finalization would require
>>> retractions, essentially.
>>>
>>> It is best effort, since a callback could expire before bundle execution
>>> completes.
>>>
>>>>
>>>> When running a WIP example pipeline
>>>> <https://github.com/johannaojeling/beam/blob/cfa6babbd0c9b1196c186974165beb7559a226db/sdks/go/examples/natsread/main.go>
>>>> on Dataflow the callback function is only ever called once but ignored
>>>> after subsequent checkpoints so thereby my questions.
>>>>
>>>
>>> That is very odd. My understanding is that it should be after every
>>> complete bundle, if the stage contains a registered callback prior to it's
>>> expiration time.
>>>
>>> The SDK code has a lot of time.Now() checks, so depending if the
>>> expiration time is set relative to processing time, it could be mistakenly
>>> dropping the valid callback.
>>>
>>> It does look like failed callbacks are re-queued after being called;
>>> (exec/plan.go:188), but only if they haven't yet expired. Expired callbacks
>>> are never run.
>>>
>>> I'm not sure what the correct runner side behavior is though.
>>>
>>>
>>>> If anyone could help clarify the above or point me towards some
>>>> documentation that would be much appreciated!
>>>>
>>>> Thanks,
>>>> Johanna
>>>>
>>>>

Reply via email to