Thanks a lot for Your answer here Chesnay. I have one more question,
since the `fromElements` is creating the stream with parallelism 1, and I
can see that the env created for my local machine has a default parallelism
== 12. So I assume that the entries from the stream are propagated for the
first operators with some scheme ? Or am I missing something?

Thanks in advance,
Best Regards,
Dom.

pon., 7 paź 2019 o 11:08 Chesnay Schepler <ches...@apache.org> napisał(a):

> Are you referring to "ExampleIntegrationTest"? If so, then I'm afraid
> this test is slightly misleading since the order isn't guaranteed in this
> case.
>
> 1) As long as the parallelism of the sink is 1 the elements should arrive
> in order.
>
> 2) The order is maintained if parallelism=1 since elements cannot overtake
> each other in a single stream.
>
> If the parallelism is increased by a subsequent operation O1, then the
> individual subtasks of O1 will still see a sorted stream.
> If an operation O2 after O1 has a lower parallelism than O1 then it will
> not see a sorted stream, since the outputs of O1-subtasks may interleave at
> will. This is the reason why the "ExampleIntegrationTest" is incorrect;
> while the 2 sink instances receive a sorted input they are adding them into
> a single collection, interleaving data.
>
> This is fine:
>
> env.fromElements(1L, 21L, 22L)
>    .map(x -> x * 2)
>    .setParallelism(2)
>    <apply order-dependent operation>
>    .setParallelism(2);
>
> This is not:
>
> env.fromElements(1L, 21L, 22L)
>    .map(x -> x * 2)
>    .setParallelism(2)
>    <apply order-dependent operation>
>    .setParallelism(1);
>
> In other words, if you never reduce the parallelism your functions should
> be fine.
> If you have to reduce the parallelism then you must resort the stream (or
> realistically, window) somehow.
>
> On 02/10/2019 23:32, Dominik Wosiński wrote:
>
> Hello,
> I have a question, since I am observing quite weird behavior. In the
> documentation[1] the example of FlinkMiniCluster usage, shows that we can
> expect the results to appear in the same order as they were injected to the
> stream by use of *fromElements(). *I mean that Java version of the code is
> using assertEquals for list, which will be true only if ArrayLists have the
> same elements with the same order. On the other hand the Scala version of
> this code uses different matcher that only asserts if all elements are
> actually present in the list.
> So I have two questions here:
>
> 1) For the code below can we be sure that the output will have the same
> order as the input ?
> For some reason the code returns the elements In quite random order in the
> sink. I was actually sure that it is the expected behavior but this piece
> of documentation made me wonder.
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.fromElements("A", "B", "C", "D", "E", "F")
>     .addSink(new TestSink)
> env.execute()
>
> class TestSink extends SinkFunction[String] {
>   override def invoke(value: String): Unit =
> synchronized{TestSink.vals.add(value)}
> }
>
> object TestSink {
>   val vals = new ConcurrentLinkedQueue[String]()
> }
>
>  2) Is there a reason to enforce order to be kept for env with parallelism
> = 1 ? If I want to test some function or set of functions that depend on
> the order of the events. Like for example detecting the beginning and the
> end of the pattern, can I somehow assure that the order for testing
> purposes ??
>
>
> Best Regards,
> Dom.
> a
> [1]https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html
>
>
>

Reply via email to