Apologies regarding the formatting in the previous message. My email client 
stripped the structure. Please find the readable version below:

Hi Beam community,

Following the recent merge of my PR on dynamic batching parameters 
(https://github.com/apache/beam/pull/37428). I’ve been profiling Beam’s 
RunInference on LLM-style workloads to identify opportunities where we can 
adapt efficiency patterns from dedicated serving engines.

Beam’s unified processing model is a big advantage, but for production GenAI 
pipelines there might be two bottlenecks that show up quickly in cost and 
user-perceived latency:

1. Padding waste (Throughput / GPU cost): Current RunInference batching relies 
on arrival order. In mixed-workload scenarios, a single long prompt forces the 
entire batch to be padded to its length, wasting significant compute. "Smart 
Bucketing" is needed to group similar-length inputs and minimize this waste.

2. No incremental output (TTFT / Latency): The current one-shot generation 
pattern blocks until completion, which makes interactive GenAI flows feel 
unresponsive compared to streaming gRPC/HTTP endpoints.

I’d like to propose a phased roadmap to make production-ready GenAI workloads 
first-class on Beam ML.


Phase 1 — Efficiency Core: Smart Bucketing

Goal: Reduce padding overhead significantly (and improve throughput) for 
variable-length inputs without changing existing behavior.

Approach (building on the element_size_fn infrastructure):

1. Use the element_size_fn mechanism to bucket elements by length/size before 
they reach the batching transform.

2. Implement a stateful bucketing transform using BagState + Timers: maintain 
per-bucket buffers (e.g., token-length ranges), emit when a specific bucket 
reaches its target batch size, and use timer-based flushing to bound waiting 
time.

Engineering Principles:

1. Opt-in transform / feature flag so existing pipelines remain unchanged.

2. Clear semantics around flushing (latency bounds, watermark considerations).

3. Benchmarks + tests as part of the initial PR series (padding ratio, 
throughput, p95 latency).


Phase 2 — Latency Breakthrough: Iterative / Streaming Inference

Goal: Improve Time-To-First-Token and enable real-time GenAI patterns 
downstream.

Approach:

1. Extend ModelHandler/RunInference with an experimental iterative output mode 
(token- or chunk-level), where the DoFn can emit partial results as they’re 
produced.

2. Start behind a feature flag with a conservative compatibility story.

(Note: This isn’t trying to turn Beam into a serving engine; it’s about making 
Beam viable for interactive GenAI pipelines where incremental outputs are part 
of the dataflow.)


Next Steps & Process

I’m currently drafting a design doc for Phase 1 (focusing on the state machine, 
timer policies, and portability notes). If the direction resonates, I’d love to 
share it early for feedback and iterate before coding.

A note on process: I may package this roadmap as a GSoC proposal, but I’m happy 
to drive it as incremental upstream improvements either way — Phase 1 in 
particular should land as a sequence of reviewable and measurable PRs.


Feedback I’d value:

1. Does “bucketing first, streaming next” match Beam ML’s direction for GenAI 
workloads?

2. Any runner/portability constraints you’d want explicitly addressed in the 
bucketing design (state/timers semantics, watermark behavior, etc.)?

3. Would you prefer Phase 2 to target chunk-level streaming first (lower risk) 
before token-level?


Best regards,

Elia

https://github.com/Eliaaazzz

On 2026/02/01 08:07:37 Elia LIU wrote:
> Hi Beam community (and Danny),
> 
> Following the recent merge of my PR on dynamic batching parameters (
> https://github.com/apache/beam/pull/37428), I’ve been profiling Beam’s
> RunInference on LLM-style workloads to identify opportunities where we can
> adapt efficiency patterns from dedicated serving engines.
> 
> Beam’s unified processing model is a big advantage, but for production
> GenAI pipelines there are two bottlenecks that show up quickly in cost and
> user-perceived latency:
> 
>    -
> 
>    *Padding waste (Throughput / GPU cost):* Current RunInference batching
>    relies on arrival order. In mixed-workload scenarios, a single long prompt
>    forces the entire batch to be padded to its length, wasting significant
>    compute. While my previous PR optimized batch timing, it implies nothing
>    about batch composition. "Smart Bucketing" is needed to group
>    similar-length inputs and minimize this waste.
>    -
> 
>    *No incremental output (TTFT / Latency):* The current one-shot
>    generation pattern blocks until completion, which makes interactive GenAI
>    flows feel unresponsive compared to streaming gRPC/HTTP endpoints.
> 
> I’d like to propose a phased roadmap to make production-ready GenAI
> workloads first-class on Beam ML.
> ------------------------------
> 
> *Phase 1 — Efficiency Core: Smart Bucketing* *Goal: Reduce padding overhead
> significantly (and improve throughput) for variable-length inputs without
> changing existing behavior.*
> 
> *Approach* (building on the element_size_fn infrastructure):
> 
>    -
> 
>    Use the element_size_fn mechanism to bucket elements by length/size
>    before they reach the batching transform.
>    -
> 
>    Implement a stateful bucketing transform using BagState + Timers:
>    -
> 
>       Maintain per-bucket buffers (e.g., token-length ranges / configurable
>       boundaries).
>       -
> 
>       Emit when a specific bucket reaches its target batch size.
>       -
> 
>       Use timer-based flushing to bound waiting time and avoid starving
>       smaller buckets (handling the "straggler" problem).
> 
> *Engineering Principles:*
> 
>    -
> 
>    Opt-in transform / feature flag so existing pipelines remain unchanged.
>    -
> 
>    Clear semantics around flushing (latency bounds, watermark
>    considerations).
>    -
> 
>    Benchmarks + tests as part of the initial PR series (padding ratio,
>    throughput, p95 latency).
> 
> ------------------------------
> 
> *Phase 2 — Latency Breakthrough: Iterative / Streaming Inference* *Goal:
> Improve Time-To-First-Token and enable real-time GenAI patterns downstream.*
> 
> *Approach:*
> 
>    -
> 
>    Extend ModelHandler/RunInference with an experimental iterative output
>    mode (token- or chunk-level), where the DoFn can emit partial results as
>    they’re produced.
>    -
> 
>    Start behind a feature flag with a conservative compatibility story.
> 
> (Note: This isn’t trying to turn Beam into a serving engine; it’s about
> making Beam viable for interactive GenAI pipelines where incremental
> outputs are part of the dataflow.)
> ------------------------------
> 
> *Next Steps & Process*
> 
> I’m currently drafting a design doc for Phase 1 (focusing on the state
> machine, timer policies, and portability notes). If the direction
> resonates, I’d love to share it early for feedback and iterate before
> coding.
> 
> A note on process: I may package this roadmap as a GSoC proposal, but I’m
> happy to drive it as incremental upstream improvements either way — Phase 1
> in particular should land as a sequence of reviewable, measurable PRs.
> 
> *Feedback I’d value:*
> 
>    1.
> 
>    Does “bucketing first, streaming next” match Beam ML’s direction for
>    GenAI workloads?
>    2.
> 
>    Any runner/portability constraints you’d want explicitly addressed in
>    the bucketing design (state/timers semantics, watermark behavior, etc.)?
>    3.
> 
>    Would you prefer Phase 2 to target chunk-level streaming first (lower
>    risk) before token-level?
> 
> Best regards,
> 
> *Elia* https://github.com/Eliaaazzz
> 

Reply via email to