You're starting two executions at the same time (in different
threads). Here's why:

Execution No 1
DataStreamUtils.collect(..) starts a Thread which executes your job
and collects stream elements. It runs asynchronously. The collect(..)
method returns after starting the thread.

Execution No 2
env.execute() also executes your job.

Now these two race against each other causing all kinds of strange behavior.

In general, the use of DataStreamUtils is discouraged. You should
rather define a sink to write your data into a file. Or, you directly
verify behavior as part of your Flink job (e.g. in a map function).


On Thu, Aug 11, 2016 at 5:47 PM, Ciar, David B. <dcia...@ceh.ac.uk> wrote:
> Hi everyone,
> I've been trying to write unit tests for my data stream bolts (map, flatMap,
> apply etc.), however the results I've been getting are strange.  The code
> for testing is here (running with scalatest and sbt):
> https://gist.github.com/dbciar/7469adfea9e6442cdc9568aed07095ff
> It runs the stream process environment once for each check, and for one of
> the checks (output below) I get an "IllegalStateException: Factory has
> already been initialized" which I'm not sure of the cause, while for the
> rest I get an IndexOutOfboundsException.
> The index exception is strange, as the index positions refer to the same
> number of input tuples to the stream, so it is as if some are being lost,
> or, the assert is running before the stream has completed processing and
> adding objects to rawStreamOutput: Iterator[RawObservation] object.
> Any pointers on what might be happening would be appreciated.  Also if
> anyone has suggestions on how to incorporate the redis server in this check,
> as I had to comment out the server definition here and run it separately to
> get to the current position.
> The two types of exception are first this:
> [info] - Do well-formed observations parse OK? *** FAILED ***
> [info]   java.lang.IllegalStateException: Factory has already been
> initialized
> [info]   at
> org.apache.flink.core.memory.MemorySegmentFactory.initializeFactory(MemorySegmentFactory.java:132)
> [info]   at
> org.apache.flink.runtime.taskmanager.TaskManager$.parseTaskManagerConfiguration(TaskManager.scala:2055)
> [info]   at
> org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1802)
> [info]   at
> org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster.startTaskManager(LocalFlinkMiniCluster.scala:142)
> [info]   at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:319)
> [info]   at
> org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:312)
> [info]   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> [info]   at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> [info]   at scala.collection.immutable.Range.foreach(Range.scala:160)
> [info]   at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> The rest are as follows, with the index being the position in the
> rawStreamOutput Iterator[RawObservation] object I expect:
> [info]   java.lang.IndexOutOfBoundsException: 3
> [info]   at
> scala.collection.immutable.Vector.checkRangeConvert(Vector.scala:132)
> [info]   at scala.collection.immutable.Vector.apply(Vector.scala:122)
> [info]   at
> ProcessingBoltTest$$anonfun$5$$anon$7.<init>(ProcessingBoltTest.scala:93)
> [info]   at ProcessingBoltTest$$anonfun$5.apply(ProcessingBoltTest.scala:91)
> [info]   at ProcessingBoltTest$$anonfun$5.apply(ProcessingBoltTest.scala:91)
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
> [info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
Thanks,
David
