Hi David, You're starting two executions at the same time (in different threads). Here's why:
Execution No 1 DataStreamUtils.collect(..) starts a Thread which executes your job and collects stream elements. It runs asynchronously. The collect(..) method returns after starting the thread. Execution No 2 env.execute() also executes your job. Now these two race against each other causing all kinds of strange behavior. In general, the use of DataStreamUtils is discouraged. You should rather define a sink to write your data into a file. Or, you directly verify behavior as part of your Flink job (e.g. in a map function). Cheers, Max On Thu, Aug 11, 2016 at 5:47 PM, Ciar, David B. <dcia...@ceh.ac.uk> wrote: > Hi everyone, > > > I've been trying to write unit tests for my data stream bolts (map, flatMap, > apply etc.), however the results I've been getting are strange. The code > for testing is here (running with scalatest and sbt): > > > https://gist.github.com/dbciar/7469adfea9e6442cdc9568aed07095ff > > > It runs the stream process environment once for each check, and for one of > the checks (output below) I get an "IllegalStateException: Factory has > already been initialized" which I'm not sure of the cause, while for the > rest I get an IndexOutOfboundsException. > > > The index exception is strange, as the index positions refer to the same > number of input tuples to the stream, so it is as if some are being lost, > or, the assert is running before the stream has completed processing and > adding objects to rawStreamOutput: Iterator[RawObservation] object. > > > Any pointers on what might be happening would be appreciated. Also if > anyone has suggestions on how to incorporate the redis server in this check, > as I had to comment out the server definition here and run it separately to > get to the current position. > > > The two types of exception are first this: > > > [info] - Do well-formed observations parse OK? *** FAILED *** > [info] java.lang.IllegalStateException: Factory has already been > initialized > [info] at > org.apache.flink.core.memory.MemorySegmentFactory.initializeFactory(MemorySegmentFactory.java:132) > [info] at > org.apache.flink.runtime.taskmanager.TaskManager$.parseTaskManagerConfiguration(TaskManager.scala:2055) > [info] at > org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1802) > [info] at > org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster.startTaskManager(LocalFlinkMiniCluster.scala:142) > [info] at > org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:319) > [info] at > org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:312) > [info] at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > [info] at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > [info] at scala.collection.immutable.Range.foreach(Range.scala:160) > [info] at > scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > > The rest are as follows, with the index being the position in the > rawStreamOutput Iterator[RawObservation] object I expect: > > > [info] java.lang.IndexOutOfBoundsException: 3 > [info] at > scala.collection.immutable.Vector.checkRangeConvert(Vector.scala:132) > [info] at scala.collection.immutable.Vector.apply(Vector.scala:122) > [info] at > ProcessingBoltTest$$anonfun$5$$anon$7.<init>(ProcessingBoltTest.scala:93) > [info] at ProcessingBoltTest$$anonfun$5.apply(ProcessingBoltTest.scala:91) > [info] at ProcessingBoltTest$$anonfun$5.apply(ProcessingBoltTest.scala:91) > [info] at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) > [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > [info] at org.scalatest.Transformer.apply(Transformer.scala:22) > [info] at org.scalatest.Transformer.apply(Transformer.scala:20) > [info] at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186) > > > > Thanks, > > David > > > ________________________________ > This message (and any attachments) is for the recipient only. NERC is > subject to the Freedom of Information Act 2000 and the contents of this > email and any reply you make may be disclosed by NERC unless it is exempt > from release under the Act. Any material supplied to NERC may be stored in > an electronic records management system. > ________________________________