Hi,

It will complicate things a lot if we cannot assume input order of any
operator after a keyBy. So far I only have the problem with countWindow
which I seem to be able to avoid by writing my own stateful KeyedProcess.
Are there other operators which might cause the same problem?

The other alternative is not to use batch mode, but the problem is that I
wont know when a batch job finishes if I don't run it in batch mode since a
streaming process will never end.

Thanks.

On Wed, Nov 3, 2021 at 4:38 PM Guowei Ma <guowei....@gmail.com> wrote:

> Hi, Yan
> I do not think it is a bug. Maybe we could not assume the input's order of
> an operator simply.
> Best,
> Guowei
>
>
> On Wed, Nov 3, 2021 at 3:10 PM Yan Shen <leey...@gmail.com> wrote:
>
>> Yes, it does not happen in streaming mode. Is this considered a bug or is
>> it by design?
>>
>> Thanks!
>>
>> On Wed, Nov 3, 2021 at 1:58 PM Guowei Ma <guowei....@gmail.com> wrote:
>>
>>> Hi
>>>
>>> I did not run your program directly, but I see that you are now using
>>> the Batch execution mode. I suspect it is related to this, because in the
>>> Batch execution mode FLINK will "sort" the Key (this might be an unstable
>>> sort).
>>> So would you like to experiment with the results of running with
>>> Streaming mode and to see what happens?
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Wed, Nov 3, 2021 at 12:16 AM Yan Shen <leey...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> Can anyone advise on this?
>>>>
>>>> I wrote a simple test of the countWindow method (in Kotlin) as below
>>>>
>>>> package aero.airlab.flinkjobs.headingreminder
>>>>
>>>> import org.apache.flink.api.common.RuntimeExecutionMode
>>>> import org.apache.flink.api.common.eventtime.WatermarkStrategy
>>>> import 
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>>>> import kotlin.random.Random
>>>>
>>>> object CountWindowTest {
>>>>     @JvmStatic
>>>>     fun main(args: Array<String>) {
>>>>         val env = StreamExecutionEnvironment.getExecutionEnvironment()
>>>>         env.setRuntimeMode(RuntimeExecutionMode.BATCH)
>>>>
>>>>         val rand = Random(0)
>>>>         val data = (0..1000).map { Pair(rand.nextInt(10), it) }
>>>>         env.fromCollection(data).assignTimestampsAndWatermarks(
>>>>             WatermarkStrategy.forMonotonousTimestamps<Pair<Int, Int>>()
>>>>                 .withTimestampAssigner { e, _ -> e.second.toLong() })
>>>>             .keyBy { it.first }
>>>>             .countWindow(3L, 1)
>>>>             .reduce { a, b -> b }
>>>>             .keyBy { it.first }
>>>>             .filter { it.first == 5 }
>>>>             .print()
>>>>
>>>>         env.execute()
>>>>     }
>>>> }
>>>>
>>>>
>>>> The beginning of the output is as such
>>>>
>>>> 12> (5, 184)
>>>> 12> (5, 18)
>>>> 12> (5, 29)
>>>> 12> (5, 37)
>>>> 12> (5, 38)
>>>> 12> (5, 112)
>>>> 12> (5, 131)
>>>>
>>>> The first line (5, 184) is not in order from the rest.
>>>>
>>>> Is this a bug? The problem disappears if I remove the keyBy after the
>>>> reduce.
>>>>
>>>> Thanks.
>>>>
>>>

Reply via email to