Thanks for the writeup! I'm generally +1 to the smart bucketing workstream.
This should probably leverage the stateful batch elements which already
exists though (
https://github.com/apache/beam/blob/112685de58d7b24b70b8d8eb7171f0e28b35a2f1/sdks/python/apache_beam/transforms/util.py#L1305)
and/or work without state (sorting/batching by size is still doable in the
context of a single bundle). More design here will help regardless!

The value of the interactive pipeline is a bit less clear to me, and much
harder to do well; I'd be curious to see more of the mechanics of what
you're proposing. An example use case would be helpful as well. I will note
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/ml/inference/vllm_inference.py
which uses vLLM to do some of what you describe.

Thanks,
Danny

On Sun, Feb 1, 2026 at 4:19 AM Elia LIU <[email protected]> wrote:

> Apologies regarding the formatting in the previous message. My email
> client stripped the structure. Please find the readable version below:
>
> Hi Beam community,
>
> Following the recent merge of my PR on dynamic batching parameters (
> https://github.com/apache/beam/pull/37428). I’ve been profiling Beam’s
> RunInference on LLM-style workloads to identify opportunities where we can
> adapt efficiency patterns from dedicated serving engines.
>
> Beam’s unified processing model is a big advantage, but for production
> GenAI pipelines there might be two bottlenecks that show up quickly in cost
> and user-perceived latency:
>
> 1. Padding waste (Throughput / GPU cost): Current RunInference batching
> relies on arrival order. In mixed-workload scenarios, a single long prompt
> forces the entire batch to be padded to its length, wasting significant
> compute. "Smart Bucketing" is needed to group similar-length inputs and
> minimize this waste.
>
> 2. No incremental output (TTFT / Latency): The current one-shot generation
> pattern blocks until completion, which makes interactive GenAI flows feel
> unresponsive compared to streaming gRPC/HTTP endpoints.
>
> I’d like to propose a phased roadmap to make production-ready GenAI
> workloads first-class on Beam ML.
>
>
> Phase 1 — Efficiency Core: Smart Bucketing
>
> Goal: Reduce padding overhead significantly (and improve throughput) for
> variable-length inputs without changing existing behavior.
>
> Approach (building on the element_size_fn infrastructure):
>
> 1. Use the element_size_fn mechanism to bucket elements by length/size
> before they reach the batching transform.
>
> 2. Implement a stateful bucketing transform using BagState + Timers:
> maintain per-bucket buffers (e.g., token-length ranges), emit when a
> specific bucket reaches its target batch size, and use timer-based flushing
> to bound waiting time.
>
> Engineering Principles:
>
> 1. Opt-in transform / feature flag so existing pipelines remain unchanged.
>
> 2. Clear semantics around flushing (latency bounds, watermark
> considerations).
>
> 3. Benchmarks + tests as part of the initial PR series (padding ratio,
> throughput, p95 latency).
>
>
> Phase 2 — Latency Breakthrough: Iterative / Streaming Inference
>
> Goal: Improve Time-To-First-Token and enable real-time GenAI patterns
> downstream.
>
> Approach:
>
> 1. Extend ModelHandler/RunInference with an experimental iterative output
> mode (token- or chunk-level), where the DoFn can emit partial results as
> they’re produced.
>
> 2. Start behind a feature flag with a conservative compatibility story.
>
> (Note: This isn’t trying to turn Beam into a serving engine; it’s about
> making Beam viable for interactive GenAI pipelines where incremental
> outputs are part of the dataflow.)
>
>
> Next Steps & Process
>
> I’m currently drafting a design doc for Phase 1 (focusing on the state
> machine, timer policies, and portability notes). If the direction
> resonates, I’d love to share it early for feedback and iterate before
> coding.
>
> A note on process: I may package this roadmap as a GSoC proposal, but I’m
> happy to drive it as incremental upstream improvements either way — Phase 1
> in particular should land as a sequence of reviewable and measurable PRs.
>
>
> Feedback I’d value:
>
> 1. Does “bucketing first, streaming next” match Beam ML’s direction for
> GenAI workloads?
>
> 2. Any runner/portability constraints you’d want explicitly addressed in
> the bucketing design (state/timers semantics, watermark behavior, etc.)?
>
> 3. Would you prefer Phase 2 to target chunk-level streaming first (lower
> risk) before token-level?
>
>
> Best regards,
>
> Elia
>
> https://github.com/Eliaaazzz
>
> On 2026/02/01 08:07:37 Elia LIU wrote:
> > Hi Beam community (and Danny),
> >
> > Following the recent merge of my PR on dynamic batching parameters (
> > https://github.com/apache/beam/pull/37428), I’ve been profiling Beam’s
> > RunInference on LLM-style workloads to identify opportunities where we
> can
> > adapt efficiency patterns from dedicated serving engines.
> >
> > Beam’s unified processing model is a big advantage, but for production
> > GenAI pipelines there are two bottlenecks that show up quickly in cost
> and
> > user-perceived latency:
> >
> >    -
> >
> >    *Padding waste (Throughput / GPU cost):* Current RunInference
> batching
> >    relies on arrival order. In mixed-workload scenarios, a single long
> prompt
> >    forces the entire batch to be padded to its length, wasting
> significant
> >    compute. While my previous PR optimized batch timing, it implies
> nothing
> >    about batch composition. "Smart Bucketing" is needed to group
> >    similar-length inputs and minimize this waste.
> >    -
> >
> >    *No incremental output (TTFT / Latency):* The current one-shot
> >    generation pattern blocks until completion, which makes interactive
> GenAI
> >    flows feel unresponsive compared to streaming gRPC/HTTP endpoints.
> >
> > I’d like to propose a phased roadmap to make production-ready GenAI
> > workloads first-class on Beam ML.
> > ------------------------------
> >
> > *Phase 1 — Efficiency Core: Smart Bucketing* *Goal: Reduce padding
> overhead
> > significantly (and improve throughput) for variable-length inputs without
> > changing existing behavior.*
> >
> > *Approach* (building on the element_size_fn infrastructure):
> >
> >    -
> >
> >    Use the element_size_fn mechanism to bucket elements by length/size
> >    before they reach the batching transform.
> >    -
> >
> >    Implement a stateful bucketing transform using BagState + Timers:
> >    -
> >
> >       Maintain per-bucket buffers (e.g., token-length ranges /
> configurable
> >       boundaries).
> >       -
> >
> >       Emit when a specific bucket reaches its target batch size.
> >       -
> >
> >       Use timer-based flushing to bound waiting time and avoid starving
> >       smaller buckets (handling the "straggler" problem).
> >
> > *Engineering Principles:*
> >
> >    -
> >
> >    Opt-in transform / feature flag so existing pipelines remain
> unchanged.
> >    -
> >
> >    Clear semantics around flushing (latency bounds, watermark
> >    considerations).
> >    -
> >
> >    Benchmarks + tests as part of the initial PR series (padding ratio,
> >    throughput, p95 latency).
> >
> > ------------------------------
> >
> > *Phase 2 — Latency Breakthrough: Iterative / Streaming Inference* *Goal:
> > Improve Time-To-First-Token and enable real-time GenAI patterns
> downstream.*
> >
> > *Approach:*
> >
> >    -
> >
> >    Extend ModelHandler/RunInference with an experimental iterative
> output
> >    mode (token- or chunk-level), where the DoFn can emit partial
> results as
> >    they’re produced.
> >    -
> >
> >    Start behind a feature flag with a conservative compatibility story.
> >
> > (Note: This isn’t trying to turn Beam into a serving engine; it’s about
> > making Beam viable for interactive GenAI pipelines where incremental
> > outputs are part of the dataflow.)
> > ------------------------------
> >
> > *Next Steps & Process*
> >
> > I’m currently drafting a design doc for Phase 1 (focusing on the state
> > machine, timer policies, and portability notes). If the direction
> > resonates, I’d love to share it early for feedback and iterate before
> > coding.
> >
> > A note on process: I may package this roadmap as a GSoC proposal, but I’m
> > happy to drive it as incremental upstream improvements either way —
> Phase 1
> > in particular should land as a sequence of reviewable, measurable PRs.
> >
> > *Feedback I’d value:*
> >
> >    1.
> >
> >    Does “bucketing first, streaming next” match Beam ML’s direction for
> >    GenAI workloads?
> >    2.
> >
> >    Any runner/portability constraints you’d want explicitly addressed in
> >    the bucketing design (state/timers semantics, watermark behavior,
> etc.)?
> >    3.
> >
> >    Would you prefer Phase 2 to target chunk-level streaming first (lower
> >    risk) before token-level?
> >
> > Best regards,
> >
> > *Elia* https://github.com/Eliaaazzz
> >
>

Reply via email to