Hi Yun, My job graph is: (A: 1) -(rebalance)-> (B: 32) -(hash)-> (C: 32). A lists files, forwards to B as FileInputSlits. B parses those files and shuffles the data records to C as keyed streams. C is the slowest in the graph, A is the fastest.
I relied on the slf4j/logback logs to derive that conclusion. There's one log entry for each context.collect() call of A, and there's one log entry whenever B open a new FileInputSplits (B is Flink's ContinuousFileReaderOperator). My logback configuration is: <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> The logs I got from A showed messages in order (by *dt *in my case). However, the logs I got from B showed that messages' order was lost (please refer to the logs below). I suppose that each logback %thread corresponding exactly one B_i. Thanks and regards, Averell *2019-10-30 05:30:43.548 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-12/part-00119-2dd7fe37-5e1b-4bc7-8bc4-fc632b419ac02019-10-30 05:30:51.239 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-13/part-00001-3ee818c2-c543-4744-957b-7fd0391e01432019-10-30 05:31:06.537 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-13/part-00083-3ee818c2-c543-4744-957b-7fd0391e01432019-10-30 05:31:13.611 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-13/part-00159-3ee818c2-c543-4744-957b-7fd0391e01432019-10-30 05:31:20.826 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-14/part-00041-c4b2a37e-066d-4adb-b610-a714e7b45b8b2019-10-30 05:31:28.487 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-14/part-00121-c4b2a37e-066d-4adb-b610-a714e7b45b8b2019-10-30 05:31:35.806 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-15/part-00001-3830100b-611e-455d-b6f9-9bce78ca51392019-10-30 05:31:42.739 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-15/part-00081-3830100b-611e-455d-b6f9-9bce78ca51392019-10-30 05:31:49.861 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-01/part-00045-1dc6388b-b72c-4bcd-a337-35c371b583f62019-10-30 05:31:55.834 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-01/part-00130-1dc6388b-b72c-4bcd-a337-35c371b583f62019-10-30 05:32:02.097 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-15/part-00161-3830100b-611e-455d-b6f9-9bce78ca51392019-10-30 05:32:06.452 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-02/part-00000-4ef9a43f-d0de-412c-9a3f-01f990cee55f2019-10-30 05:32:11.379 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-02/part-00077-4ef9a43f-d0de-412c-9a3f-01f990cee55f2019-10-30 05:32:16.103 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-02/part-00147-4ef9a43f-d0de-412c-9a3f-01f990cee55f2019-10-30 05:32:21.025 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-16/part-00039-d12ed910-d58b-46b2-b607-784ebf1266d42019-10-30 05:32:25.758 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-03/part-00043-92a58007-0c35-479b-b9e5-6663fae4e71c2019-10-30 05:32:30.156 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-03/part-00123-92a58007-0c35-479b-b9e5-6663fae4e71c2019-10-30 05:32:34.169 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-16/part-00121-d12ed910-d58b-46b2-b607-784ebf1266d42019-10-30 05:32:39.462 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-04/part-00001-413d1982-21b8-4bfb-828e-8014c9dfdb162019-10-30 05:32:43.551 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-04/part-00085-413d1982-21b8-4bfb-828e-8014c9dfdb162019-10-30 05:32:48.100 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-04/part-00166-413d1982-21b8-4bfb-828e-8014c9dfdb162019-10-30 05:32:52.629 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-17/part-00001-491d8c85-7eb2-48c7-af06-501934f65a832019-10-30 05:32:57.834 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-05/part-00045-19080414-962a-455c-b342-fcf3e36f1cc52019-10-30 05:33:01.943 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2018-01-05/part-00113-19080414-962a-455c-b342-fcf3e36f1cc52019-10-30 05:33:06.871 [Thread-34] INFO com.myco.myFIF - Opening file /dt=2017-12-17/part-00082-491d8c85-7eb2-48c7-af06-501934f65a83* On Fri, Nov 1, 2019 at 1:32 PM Yun Gao <yungao...@aliyun.com> wrote: > Hi Averell, > > If I understood right, the job graph is A (parallelism = 1) --> > B (parallelism > 1), then I think the records sending into the subtask B_i > should be the same as the order sending out from A. Therefore, could you > also provide more details on the topology ? Is there only the two > operators? And could you also provide how the message order is checked in > B_i ? > > Best, > Yun > > ------------------------------------------------------------------ > From:Averell <lvhu...@gmail.com> > Send Time:2019 Oct. 31 (Thu.) 12:55 > To:user <user@flink.apache.org> > Subject:Preserving (best effort) messages order between operators > > Hi, > > I have a source function with parallelism = 1, sending out records ordered > > by event-time. These records are then re-balanced to the next operator which > has parallelism > 1. I observed that within each subtask of the 2nd > operator, the order of the messages is not maintained. Is this behaviour > > expected? If it is, is there any way to avoid that? Or at least reduce that? > I have high back-pressure on that 2nd operator as the one after that is > slow. There is also high back-pressure on the 1st operator, which makes my > problem more severe (the mentioned out-of-order is high). If I could > throttle the 1st operator when back-pressure is high, then I could mitigate > the mentioned problem. But I could not find any guide on doing that. > > Could you please help? > > Thanks. > Averell > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ > > >