Thanks for the advice. I created this issue:
https://issues.apache.org/jira/browse/FLINK-24767

On Thu, Nov 4, 2021 at 2:44 PM Guowei Ma <guowei....@gmail.com> wrote:

> Hi Yan
> After a second thought I think you are right, the downstream operator
> should keep the order of the same key from the same upstream. So feel free
> to open a jira.
> Best,
> Guowei
>
>
> On Wed, Nov 3, 2021 at 7:30 PM Yan Shen <leey...@gmail.com> wrote:
>
>> Hi,
>>
>> It will complicate things a lot if we cannot assume input order of any
>> operator after a keyBy. So far I only have the problem with countWindow
>> which I seem to be able to avoid by writing my own stateful KeyedProcess.
>> Are there other operators which might cause the same problem?
>>
>> The other alternative is not to use batch mode, but the problem is that I
>> wont know when a batch job finishes if I don't run it in batch mode since a
>> streaming process will never end.
>>
>> Thanks.
>>
>> On Wed, Nov 3, 2021 at 4:38 PM Guowei Ma <guowei....@gmail.com> wrote:
>>
>>> Hi, Yan
>>> I do not think it is a bug. Maybe we could not assume the input's order
>>> of an operator simply.
>>> Best,
>>> Guowei
>>>
>>>
>>> On Wed, Nov 3, 2021 at 3:10 PM Yan Shen <leey...@gmail.com> wrote:
>>>
>>>> Yes, it does not happen in streaming mode. Is this considered a bug or
>>>> is it by design?
>>>>
>>>> Thanks!
>>>>
>>>> On Wed, Nov 3, 2021 at 1:58 PM Guowei Ma <guowei....@gmail.com> wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> I did not run your program directly, but I see that you are now using
>>>>> the Batch execution mode. I suspect it is related to this, because in the
>>>>> Batch execution mode FLINK will "sort" the Key (this might be an unstable
>>>>> sort).
>>>>> So would you like to experiment with the results of running with
>>>>> Streaming mode and to see what happens?
>>>>>
>>>>> Best,
>>>>> Guowei
>>>>>
>>>>>
>>>>> On Wed, Nov 3, 2021 at 12:16 AM Yan Shen <leey...@gmail.com> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> Can anyone advise on this?
>>>>>>
>>>>>> I wrote a simple test of the countWindow method (in Kotlin) as below
>>>>>>
>>>>>> package aero.airlab.flinkjobs.headingreminder
>>>>>>
>>>>>> import org.apache.flink.api.common.RuntimeExecutionMode
>>>>>> import org.apache.flink.api.common.eventtime.WatermarkStrategy
>>>>>> import 
>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>>>>>> import kotlin.random.Random
>>>>>>
>>>>>> object CountWindowTest {
>>>>>>     @JvmStatic
>>>>>>     fun main(args: Array<String>) {
>>>>>>         val env = StreamExecutionEnvironment.getExecutionEnvironment()
>>>>>>         env.setRuntimeMode(RuntimeExecutionMode.BATCH)
>>>>>>
>>>>>>         val rand = Random(0)
>>>>>>         val data = (0..1000).map { Pair(rand.nextInt(10), it) }
>>>>>>         env.fromCollection(data).assignTimestampsAndWatermarks(
>>>>>>             WatermarkStrategy.forMonotonousTimestamps<Pair<Int, Int>>()
>>>>>>                 .withTimestampAssigner { e, _ -> e.second.toLong() })
>>>>>>             .keyBy { it.first }
>>>>>>             .countWindow(3L, 1)
>>>>>>             .reduce { a, b -> b }
>>>>>>             .keyBy { it.first }
>>>>>>             .filter { it.first == 5 }
>>>>>>             .print()
>>>>>>
>>>>>>         env.execute()
>>>>>>     }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> The beginning of the output is as such
>>>>>>
>>>>>> 12> (5, 184)
>>>>>> 12> (5, 18)
>>>>>> 12> (5, 29)
>>>>>> 12> (5, 37)
>>>>>> 12> (5, 38)
>>>>>> 12> (5, 112)
>>>>>> 12> (5, 131)
>>>>>>
>>>>>> The first line (5, 184) is not in order from the rest.
>>>>>>
>>>>>> Is this a bug? The problem disappears if I remove the keyBy after the
>>>>>> reduce.
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>

Reply via email to