Replies inline.
On 5/20/26 18:05, vamsikrishna korada wrote:
Thanks for the reply, Jan.
The job is similar to an ETL job. We continuously read events from a
Kafka topic and write them to files every 5 minutes. We are
considering using a |Timer| to close the current file and open a new
writer every 5 minutes.
If you don't use state (which implies shuffle under current Flink Runner
implementation) this approach will lose data on failures (and restarts)
of the Pipeline.
You might try using bundles & checkpointing interval to drive flushing
of the files (you would have to flush on @BundleFinish, set
checkpointing interval to about 5 minutes and bundle size large enough).
This _could_ work I suppose, but it will not produce exactly 5
minute-long "windows", will likely produce duplicate records on output
and might have consequences for other parts of your Pipeline (if you do
some more complex processing).
You can also have a look at @RequiresStableInput DoFn annotation, which
buffers input data until checkpoint, which will work somewhat similar as
the case above, but you would have to make sure that all elements you
process are persisted (if written to an external system).
In general, trying to avoid the shuffle in this case seems like a harsh
requirement, I would probably try to relax it, as it enables more "Beam
native" processing (e.g. you can then use GroupIntoBatches, FileIO, etc.).
> If you don't worry about consistency, but only need "rough inexact
estimates", then using a pure stateless DoFn without @Timer can do the
trick.
Could you please elaborate on how this approach would work?
My understanding is that using a timer would require a keyed state,
which would introduce a shuffle of the data. Is that correct? We are
trying to avoid the shuffle if possible.
Also, could you please help me understand why the output would be
considered rough/inexact when using a stateless |DoFn|? Is there a
possibility of records being dropped, or is the concern mainly around
consistency guarantees?
Using Beam timers require shuffle (for grouping keys), but nothing stops
you from using processing-time (system clock). As mentioned above,
without additional care, stateless processing is not supposed to buffer
data anywhere (because that would be stateful processing) and doing so
will lose data on failures.
On Wed, 20 May 2026 at 21:15, Jan Lukavský <[email protected]> wrote:
Hi Vamsi,
short answer is - it depends. :)
There are many unknowns in your question. First of all - what kind
of logic do you refer to? Does it need to modify (i.e. join with)
the incoming data? Or is it just some (volatile) monitoring?
If you need timers for output data consistency - then yes, under
current Flink Runner implementation there will necessarily be a
shuffle.
If you don't worry about consistency, but only need a "rough
inexact estimates" then using pure stateless DoFn without @Timer
can do the trick.
Can you provide more details on your use case?
Jan
On 5/20/26 14:29, vamsikrishna korada wrote:
Hi Beam Community,
I’m reaching out for some guidance on a Beam Flink streaming job
I’m working on.
We are reading from a Kafka topic, where the traffic can be
either sparse or high-volume, and we need to run a piece of logic
periodically, roughly every 5 minutes.
We considered using |@Timer|, but based on the Beam docs, timers
require keyed state, which introduces a shuffle. We would like to
avoid this shuffle if possible.
Is there a way to trigger periodic logic in a Beam pipeline
without causing a data shuffle?
Thanks,
Vamsi