Hello squirrels, I'm writing a few graph algorithms to test the performance of different iteration models and I am quite stuck with an error. While my sssp example works fine, I get the following in my connected components job (local execution inside eclipse):
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$anonfun$handleMessage$1$anonfun$applyOrElse$5.apply$mcV$sp(JobManager.scala:563) at org.apache.flink.runtime.jobmanager.JobManager$anonfun$handleMessage$1$anonfun$applyOrElse$5.apply(JobManager.scala:509) at org.apache.flink.runtime.jobmanager.JobManager$anonfun$handleMessage$1$anonfun$applyOrElse$5.apply(JobManager.scala:509) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to org.apache.flink.types.CopyableValue at org.apache.flink.api.java.typeutils.runtime.CopyableValueSerializer.serialize(CopyableValueSerializer.java:1) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:116) at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:1) at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56) at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:79) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:84) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:97) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) at java.lang.Thread.run(Thread.java:745) The 2 jobs are almost identical, expect that in connected components I have Long values instead of Double and edges have no weights (NullValue types). With this exception message, I'm not really sure where to look for the error. What is the CopyableValueSerializer and when is it used? I tried turning the object reuse mode on/off, but no luck. I would really appreciate any hint or idea on where to look :) Cheers, -Vasia.