Replies inline.

On 5/20/26 18:05, vamsikrishna korada wrote:

Thanks for the reply, Jan.

The job is similar to an ETL job. We continuously read events from a Kafka topic and write them to files every 5 minutes. We are considering using a |Timer| to close the current file and open a new writer every 5 minutes.

If you don't use state (which implies shuffle under current Flink Runner implementation) this approach will lose data on failures (and restarts) of the Pipeline.

You might try using bundles & checkpointing interval to drive flushing of the files (you would have to flush on @BundleFinish, set checkpointing interval to about 5 minutes and bundle size large enough). This _could_ work I suppose, but it will not produce exactly 5 minute-long "windows", will likely produce duplicate records on output and might have consequences for other parts of your Pipeline (if you do some more complex processing).

You can also have a look at @RequiresStableInput DoFn annotation, which buffers input data until checkpoint, which will work somewhat similar as the case above, but you would have to make sure that all elements you process are persisted (if written to an external system).

In general, trying to avoid the shuffle in this case seems like a harsh requirement, I would probably try to relax it, as it enables more "Beam native" processing (e.g. you can then use GroupIntoBatches, FileIO, etc.).

> If you don't worry about consistency, but only need "rough inexact estimates", then using a pure stateless DoFn without @Timer can do the trick.

Could you please elaborate on how this approach would work?

My understanding is that using a timer would require a keyed state, which would introduce a shuffle of the data. Is that correct? We are trying to avoid the shuffle if possible.

Also, could you please help me understand why the output would be considered rough/inexact when using a stateless |DoFn|? Is there a possibility of records being dropped, or is the concern mainly around consistency guarantees?

Using Beam timers require shuffle (for grouping keys), but nothing stops you from using processing-time (system clock). As mentioned above, without additional care, stateless processing is not supposed to buffer data anywhere (because that would be stateful processing) and doing so will lose data on failures.

On Wed, 20 May 2026 at 21:15, Jan Lukavský <[email protected]> wrote:

    Hi Vamsi,

    short answer is - it depends. :)

    There are many unknowns in your question. First of all - what kind
    of logic do you refer to? Does it need to modify (i.e. join with)
    the incoming data? Or is it just some (volatile) monitoring?

    If you need timers for output data consistency - then yes, under
    current Flink Runner implementation there will necessarily be a
    shuffle.
    If you don't worry about consistency, but only need a "rough
    inexact estimates" then using pure stateless DoFn without @Timer
    can do the trick.

    Can you provide more details on your use case?

     Jan

    On 5/20/26 14:29, vamsikrishna korada wrote:

    Hi Beam Community,

    I’m reaching out for some guidance on a Beam Flink streaming job
    I’m working on.

    We are reading from a Kafka topic, where the traffic can be
    either sparse or high-volume, and we need to run a piece of logic
    periodically, roughly every 5 minutes.

    We considered using |@Timer|, but based on the Beam docs, timers
    require keyed state, which introduces a shuffle. We would like to
    avoid this shuffle if possible.

    Is there a way to trigger periodic logic in a Beam pipeline
    without causing a data shuffle?



    Thanks,

    Vamsi

Reply via email to