Hi Lec,

In short the KeyedProcessFunction will see all elements of a key
back-to-back, but the sequence among those elements is unspecified and not
guaranteed stable.
The order can change across runs so any feature built at top of this is
discouraged.

In order to overcome this one option is to assign event timestamps and rely
on BATCH event-time processing.

BR,
G


On Tue, Aug 26, 2025 at 5:18 PM lec ssmi <shicheng31...@gmail.com> wrote:

> Hello Flink comminity,
>
> When using a KeyedProcessFunction<K, IN, OUT>, what is the *processing
> order* of the records *belonging to the same key* within a single batch?
>
> Concretely:
>
>    1. With *no event-time* defined, and running in *batch mode*, does
>    KeyedProcessFunction process all elements of the same key in a 
> *deterministic
>    order*?
>    2. Is that order tied to the *upstream partition/file order*, or to
>    any *internal shuffle/sort* guarantees in batch execution (e.g.,
>    sort-shuffle / hash-shuffle), or is it *unspecified*?
>    3. Is the per-key order *stable across runs* (same input &
>    parallelism), or can it vary?
>    4. If a stable/ascending order is required (e.g., by a field like ts
>    or seq), what is the *recommended way* to enforce it before/inside
>    KeyedProcessFunction in batch mode? (e.g., explicit sortPartition,
>    pre-aggregation, or other operator/setting)
>
> *Minimal sketch:*
>
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> DataStream<MyEvent> input = env.fromSource(...); // no timestamps/watermarks
>
> input
>   .keyBy(e -> e.userId)
>   .process(new KeyedProcessFunction<String, MyEvent, MyOut>() {
>     @Override
>     public void processElement(MyEvent value, Context ctx, Collector<MyOut> 
> out) {
>       // Question: in what order will events with the same userId arrive here?
>     }
>   });
>
> Any pointers to *docs / source code locations* (for 1.19 batch) that
> describe the per-key ordering semantics would be highly appreciated. Thanks!
>
> Best!
>

Reply via email to