Hello,
I have a question, since I am observing quite weird behavior. In the
documentation[1] the example of FlinkMiniCluster usage, shows that we can
expect the results to appear in the same order as they were injected to the
stream by use of *fromElements(). *I mean that Java version of the code is
using assertEquals for list, which will be true only if ArrayLists have the
same elements with the same order. On the other hand the Scala version of
this code uses different matcher that only asserts if all elements are
actually present in the list.
So I have two questions here:

1) For the code below can we be sure that the output will have the same
order as the input ?
For some reason the code returns the elements In quite random order in the
sink. I was actually sure that it is the expected behavior but this piece
of documentation made me wonder.

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.fromElements("A", "B", "C", "D", "E", "F")
    .addSink(new TestSink)
env.execute()

class TestSink extends SinkFunction[String] {
  override def invoke(value: String): Unit =
synchronized{TestSink.vals.add(value)}
}

object TestSink {
  val vals = new ConcurrentLinkedQueue[String]()
}

 2) Is there a reason to enforce order to be kept for env with parallelism
= 1 ? If I want to test some function or set of functions that depend on
the order of the events. Like for example detecting the beginning and the
end of the pattern, can I somehow assure that the order for testing
purposes ??


Best Regards,
Dom.
a
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html

Reply via email to