Thanks for the advice. I created this issue: https://issues.apache.org/jira/browse/FLINK-24767
On Thu, Nov 4, 2021 at 2:44 PM Guowei Ma <guowei....@gmail.com> wrote: > Hi Yan > After a second thought I think you are right, the downstream operator > should keep the order of the same key from the same upstream. So feel free > to open a jira. > Best, > Guowei > > > On Wed, Nov 3, 2021 at 7:30 PM Yan Shen <leey...@gmail.com> wrote: > >> Hi, >> >> It will complicate things a lot if we cannot assume input order of any >> operator after a keyBy. So far I only have the problem with countWindow >> which I seem to be able to avoid by writing my own stateful KeyedProcess. >> Are there other operators which might cause the same problem? >> >> The other alternative is not to use batch mode, but the problem is that I >> wont know when a batch job finishes if I don't run it in batch mode since a >> streaming process will never end. >> >> Thanks. >> >> On Wed, Nov 3, 2021 at 4:38 PM Guowei Ma <guowei....@gmail.com> wrote: >> >>> Hi, Yan >>> I do not think it is a bug. Maybe we could not assume the input's order >>> of an operator simply. >>> Best, >>> Guowei >>> >>> >>> On Wed, Nov 3, 2021 at 3:10 PM Yan Shen <leey...@gmail.com> wrote: >>> >>>> Yes, it does not happen in streaming mode. Is this considered a bug or >>>> is it by design? >>>> >>>> Thanks! >>>> >>>> On Wed, Nov 3, 2021 at 1:58 PM Guowei Ma <guowei....@gmail.com> wrote: >>>> >>>>> Hi >>>>> >>>>> I did not run your program directly, but I see that you are now using >>>>> the Batch execution mode. I suspect it is related to this, because in the >>>>> Batch execution mode FLINK will "sort" the Key (this might be an unstable >>>>> sort). >>>>> So would you like to experiment with the results of running with >>>>> Streaming mode and to see what happens? >>>>> >>>>> Best, >>>>> Guowei >>>>> >>>>> >>>>> On Wed, Nov 3, 2021 at 12:16 AM Yan Shen <leey...@gmail.com> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> Can anyone advise on this? >>>>>> >>>>>> I wrote a simple test of the countWindow method (in Kotlin) as below >>>>>> >>>>>> package aero.airlab.flinkjobs.headingreminder >>>>>> >>>>>> import org.apache.flink.api.common.RuntimeExecutionMode >>>>>> import org.apache.flink.api.common.eventtime.WatermarkStrategy >>>>>> import >>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment >>>>>> import kotlin.random.Random >>>>>> >>>>>> object CountWindowTest { >>>>>> @JvmStatic >>>>>> fun main(args: Array<String>) { >>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment() >>>>>> env.setRuntimeMode(RuntimeExecutionMode.BATCH) >>>>>> >>>>>> val rand = Random(0) >>>>>> val data = (0..1000).map { Pair(rand.nextInt(10), it) } >>>>>> env.fromCollection(data).assignTimestampsAndWatermarks( >>>>>> WatermarkStrategy.forMonotonousTimestamps<Pair<Int, Int>>() >>>>>> .withTimestampAssigner { e, _ -> e.second.toLong() }) >>>>>> .keyBy { it.first } >>>>>> .countWindow(3L, 1) >>>>>> .reduce { a, b -> b } >>>>>> .keyBy { it.first } >>>>>> .filter { it.first == 5 } >>>>>> .print() >>>>>> >>>>>> env.execute() >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> The beginning of the output is as such >>>>>> >>>>>> 12> (5, 184) >>>>>> 12> (5, 18) >>>>>> 12> (5, 29) >>>>>> 12> (5, 37) >>>>>> 12> (5, 38) >>>>>> 12> (5, 112) >>>>>> 12> (5, 131) >>>>>> >>>>>> The first line (5, 184) is not in order from the rest. >>>>>> >>>>>> Is this a bug? The problem disappears if I remove the keyBy after the >>>>>> reduce. >>>>>> >>>>>> Thanks. >>>>>> >>>>>