Thanks a lot for Your answer here Chesnay. I have one more question, since the `fromElements` is creating the stream with parallelism 1, and I can see that the env created for my local machine has a default parallelism == 12. So I assume that the entries from the stream are propagated for the first operators with some scheme ? Or am I missing something?
Thanks in advance, Best Regards, Dom. pon., 7 paź 2019 o 11:08 Chesnay Schepler <ches...@apache.org> napisał(a): > Are you referring to "ExampleIntegrationTest"? If so, then I'm afraid > this test is slightly misleading since the order isn't guaranteed in this > case. > > 1) As long as the parallelism of the sink is 1 the elements should arrive > in order. > > 2) The order is maintained if parallelism=1 since elements cannot overtake > each other in a single stream. > > If the parallelism is increased by a subsequent operation O1, then the > individual subtasks of O1 will still see a sorted stream. > If an operation O2 after O1 has a lower parallelism than O1 then it will > not see a sorted stream, since the outputs of O1-subtasks may interleave at > will. This is the reason why the "ExampleIntegrationTest" is incorrect; > while the 2 sink instances receive a sorted input they are adding them into > a single collection, interleaving data. > > This is fine: > > env.fromElements(1L, 21L, 22L) > .map(x -> x * 2) > .setParallelism(2) > <apply order-dependent operation> > .setParallelism(2); > > This is not: > > env.fromElements(1L, 21L, 22L) > .map(x -> x * 2) > .setParallelism(2) > <apply order-dependent operation> > .setParallelism(1); > > In other words, if you never reduce the parallelism your functions should > be fine. > If you have to reduce the parallelism then you must resort the stream (or > realistically, window) somehow. > > On 02/10/2019 23:32, Dominik Wosiński wrote: > > Hello, > I have a question, since I am observing quite weird behavior. In the > documentation[1] the example of FlinkMiniCluster usage, shows that we can > expect the results to appear in the same order as they were injected to the > stream by use of *fromElements(). *I mean that Java version of the code is > using assertEquals for list, which will be true only if ArrayLists have the > same elements with the same order. On the other hand the Scala version of > this code uses different matcher that only asserts if all elements are > actually present in the list. > So I have two questions here: > > 1) For the code below can we be sure that the output will have the same > order as the input ? > For some reason the code returns the elements In quite random order in the > sink. I was actually sure that it is the expected behavior but this piece > of documentation made me wonder. > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > env.fromElements("A", "B", "C", "D", "E", "F") > .addSink(new TestSink) > env.execute() > > class TestSink extends SinkFunction[String] { > override def invoke(value: String): Unit = > synchronized{TestSink.vals.add(value)} > } > > object TestSink { > val vals = new ConcurrentLinkedQueue[String]() > } > > 2) Is there a reason to enforce order to be kept for env with parallelism > = 1 ? If I want to test some function or set of functions that depend on > the order of the events. Like for example detecting the beginning and the > end of the pattern, can I somehow assure that the order for testing > purposes ?? > > > Best Regards, > Dom. > a > [1]https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html > > >