Re: Preserving (best effort) messages order between operators

2019-11-01 Thread Averell
Hi Yun, I found the cause of the issue. That ContinuousFileReaderOperator (my operator B) is using a PriorityQueue which maintains a buffer sorted by modTime, thus my records were re-ordered. I don't understand the reason behind using PriorityQueue instead of an ordinary Queue though. Thanks. Ave

Re: Preserving (best effort) messages order between operators

2019-10-31 Thread Huyen Levan
Hi Yun, My job graph is: (A: 1) -(rebalance)-> (B: 32) -(hash)-> (C: 32). A lists files, forwards to B as FileInputSlits. B parses those files and shuffles the data records to C as keyed streams. C is the slowest in the graph, A is the fastest. I relied on the slf4j/logback logs to derive that co

Re: Preserving (best effort) messages order between operators

2019-10-31 Thread Yun Gao
Hi Averell, If I understood right, the job graph is A (parallelism = 1) --> B (parallelism > 1), then I think the records sending into the subtask B_i should be the same as the order sending out from A. Therefore, could you also provide more details on the topology ? Is there on

Preserving (best effort) messages order between operators

2019-10-30 Thread Averell
Hi, I have a source function with parallelism = 1, sending out records ordered by event-time. These records are then re-balanced to the next operator which has parallelism > 1. I observed that within each subtask of the 2nd operator, the order of the messages is not maintained. Is this behaviour