Hi everyone,

I've been trying to write unit tests for my data stream bolts (map, flatMap, 
apply etc.), however the results I've been getting are strange.  The code for 
testing is here (running with scalatest and sbt):


https://gist.github.com/dbciar/7469adfea9e6442cdc9568aed07095ff


It runs the stream process environment once for each check, and for one of the 
checks (output below) I get an "IllegalStateException: Factory has already been 
initialized" which I'm not sure of the cause, while for the rest I get an 
IndexOutOfboundsException.


The index exception is strange, as the index positions refer to the same number 
of input tuples to the stream, so it is as if some are being lost, or, the 
assert is running before the stream has completed processing and adding objects 
to rawStreamOutput: Iterator[RawObservation] object.


Any pointers on what might be happening would be appreciated.  Also if anyone 
has suggestions on how to incorporate the redis server in this check, as I had 
to comment out the server definition here and run it separately to get to the 
current position.


The two types of exception are first this:


[info] - Do well-formed observations parse OK? *** FAILED ***
[info]   java.lang.IllegalStateException: Factory has already been initialized
[info]   at 
org.apache.flink.core.memory.MemorySegmentFactory.initializeFactory(MemorySegmentFactory.java:132)
[info]   at 
org.apache.flink.runtime.taskmanager.TaskManager$.parseTaskManagerConfiguration(TaskManager.scala:2055)
[info]   at 
org.apache.flink.runtime.taskmanager.TaskManager$.startTaskManagerComponentsAndActor(TaskManager.scala:1802)
[info]   at 
org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster.startTaskManager(LocalFlinkMiniCluster.scala:142)
[info]   at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:319)
[info]   at 
org.apache.flink.runtime.minicluster.FlinkMiniCluster$$anonfun$3.apply(FlinkMiniCluster.scala:312)
[info]   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info]   at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
[info]   at scala.collection.immutable.Range.foreach(Range.scala:160)
[info]   at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)


The rest are as follows, with the index being the position in the 
rawStreamOutput Iterator[RawObservation] object I expect:


[info]   java.lang.IndexOutOfBoundsException: 3
[info]   at 
scala.collection.immutable.Vector.checkRangeConvert(Vector.scala:132)
[info]   at scala.collection.immutable.Vector.apply(Vector.scala:122)
[info]   at 
ProcessingBoltTest$$anonfun$5$$anon$7.<init>(ProcessingBoltTest.scala:93)
[info]   at ProcessingBoltTest$$anonfun$5.apply(ProcessingBoltTest.scala:91)
[info]   at ProcessingBoltTest$$anonfun$5.apply(ProcessingBoltTest.scala:91)
[info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
[info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
[info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
[info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)




Thanks,

David


________________________________
This message (and any attachments) is for the recipient only. NERC is subject 
to the Freedom of Information Act 2000 and the contents of this email and any 
reply you make may be disclosed by NERC unless it is exempt from release under 
the Act. Any material supplied to NERC may be stored in an electronic records 
management system.
________________________________

Reply via email to