Hi Stephan,

 

I confirm that I was using custom types in the collect(), and that the bug is 
not present in the master.

 

Thanks

Flavio

 

 

From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan 
Ewen
Sent: Monday, May 04, 2015 2:33 PM
To: user@flink.apache.org
Subject: Re: Crash on DataSet.collect()

 

Hi Flavio!

 

This issue is known and has been fixed already. It occurs when you use custom 
types in collect, because it uses the wrong classloader/serializer to transfer 
them.

 

The current master should not have this issue any more.

 

Greetings,
Stephan

 

 

On Mon, May 4, 2015 at 2:09 PM, Flavio Baronti <f.baro...@list-group.com 
<mailto:f.baro...@list-group.com> > wrote:

Hello,

I'm testing the new DataSet.collect() method on version 0.9-milestone-1, but
I obtain the following error on cluster execution (no problem with local
execution), which also causes the job manager to crash:

14:05:41,145 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
- Deploying CHAIN Cross(Cross at main(Test01.java:53)) -> Map (Map at
main(Test01.java:54)) -> F
latMap (FlatMap at collect(DataSet.java:413)) (1/1) (attempt #0) to india3
14:05:41,211 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
- Deploying DataSink
(org.apache.flink.api.java.io.DiscardingOutputFormat@4386f16 
<mailto:org.apache.flink.api.java.io.DiscardingOutputFormat@4386f16> ) (1/1) 
(attemp
t #0) to india3
14:05:41,269 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Status of job 254ba2f06f7a9c4d454ca7288dae4092 (Flink Java Job at Mon May
04 14:05:39 CEST 201
5) changed to FINISHED .
14:05:41,284 ERROR akka.actor.OneForOneStrategy
- java.io.StreamCorruptedException: invalid type code: 00
org.apache.commons.lang3.SerializationException:
java.io.StreamCorruptedException: invalid type code: 00
        at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
ava:232)
        at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
ava:268)
        at
org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListA
ccumulator.java:51)
        at
org.apache.flink.api.common.accumulators.ListAccumulator.getLocalValue(ListA
ccumulator.java:35)
        at
org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager.getJobAc
cumulatorResults(AccumulatorManager.java:77)
        at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessag
es$1.applyOrElse(JobManager.scala:300)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialF
unction.scala:33)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.
scala:33)
        at
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.
scala:25)
        at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sca
la:37)
        at
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.sca
la:30)
        at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessag
es.scala:30)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scal
a:91)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool
.java:1253)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1
346)
        at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java
:107)
Caused by: java.io.StreamCorruptedException: invalid type code: 00
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1379)
        at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at
org.apache.commons.lang3.SerializationUtils.deserialize(SerializationUtils.j
ava:224)
        ... 24 more
14:05:41,290 INFO  org.apache.flink.runtime.jobmanager.JobManager
- Stopping JobManager akka://flink/user/jobmanager#-828467473.
14:05:41,297 ERROR org.apache.flink.runtime.jobmanager.JobManager
- Actor akka://flink/user/jobmanager#-828467473 terminated, stopping
process...

Is this a known issue? Am I doing something wrong?

Thanks
Flavio



 

Reply via email to