Hi Tran Nam-Luc!

That is a problem we will look into.

In the meantime, can you try to modify your object such that it is a "Flink
POJO"? Then we will serialize it ourselves, without involving Kryo. To do
that, make sure that
 - The class is public
 - It has a public null-argument constructor
 - All fields are wither public, or have public getters and setters

Here are some minor pointers for the program:
 - If you include all CSV fields, you need not have the "
.includeFields("1111111111111111111111111")"
function call. The "includeFields" function is only necessary if you want
to skip over some fields.
 - If the lambda map function returns a simple class without generic
parameters, you do not need the 'returns("eu.euranova.flink.Centroid25")'
call. It should work even without.

Greetings,
Stephan




On Wed, Feb 11, 2015 at 3:02 PM, Nam-Luc Tran <namluc.t...@euranova.eu>
wrote:

> Hello,
>
> I came accross an error for which I am unable to retrace the exact cause.
> Starting from flink-java-examples module, I have extended the KMeans
> example
> to a case where points have 25 coordinates. It follows the exact same
> structure and transformations as the original example, only with points
> having 25 coordinates instead of 2.
>
> When creating the centroids dataset within the code as follows the job
> iterates and executes well:
>
> Centroid25 cent1 = new Centroid25(ThreadLocalRandom.current().nextInt(0,
> 1000),
>
> -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0);
> Centroid25 cent2 = new Centroid25(ThreadLocalRandom.current().nextInt(0,
> 1000),
>
> -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0);
> DataSet<Centroid25> centroids = env.fromCollection(Arrays.asList(cent1,
> cent2));
>
> When reading from a csv file containing the following:
>
> -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0
>
> -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
>
> with the following code:
> DataSet<Centroid25>> centroids = env
>
> .readCsvFile("file:///home/nltran/res3.csv")
>                                 .fieldDelimiter(",")
>                                 .includeFields("1111111111111111111111111")
>                                 .types(Double.class, Double.class,
> Double.class, Double.class,
> Double.class, Double.class,
>                                                 Double.class,
> Double.class, Double.class, Double.class, Double.class,
> Double.class,
>                                                 Double.class,
> Double.class, Double.class, Double.class, Double.class,
> Double.class,
>                                                 Double.class,
> Double.class, Double.class, Double.class, Double.class,
> Double.class,
>                                                 Double.class).map(p -> {
>                                         return new
> Centroid25(ThreadLocalRandom.current().nextInt(0, 1000),
>
> p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.f22,p.f23,p.f24);
>                                 }).returns("eu.euranova.flink.Centroid25");
>
>
> I hit the following exception:
>
> 02/11/2015 14:58:27     PartialSolution (BulkIteration (Bulk
> Iteration))(1/1)
> switched to FAILED
> com.esotericsoftware.kryo.KryoException: Buffer underflow
>         at
>
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:76)
>         at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>         at
>
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at
>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:205)
>         at
>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:210)
>         at
>
> org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43)
>         at
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
>         at
>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>         at
>
> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:138)
>         at
>
> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:324)
>         at
>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
>         at
>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204)
>         at java.lang.Thread.run(Thread.java:745)
>
> 02/11/2015 14:58:27     Job execution switched to status FAILING.
> 02/11/2015 14:58:27     CHAIN Map (Map at main(DoTheKMeans.java:64)) ->
> Map (Map
> at main(DoTheKMeans.java:65))(1/1) switched to CANCELING
> 02/11/2015 14:58:27     Combine (Reduce at main(DoTheKMeans.java:68))(1/1)
> switched to CANCELING
> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at main(DoTheKMeans.java:68))
> -> Map
> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING
> 02/11/2015 14:58:27     DataSink(Print to System.out)(1/1) switched to
> CANCELED
> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk Iteration))(1/1) switched
> to
> CANCELING
> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk Iteration))(1/1) switched
> to
> CANCELED
> 02/11/2015 14:58:27     CHAIN Map (Map at main(DoTheKMeans.java:64)) ->
> Map (Map
> at main(DoTheKMeans.java:65))(1/1) switched to CANCELED
> 02/11/2015 14:58:27     Combine (Reduce at main(DoTheKMeans.java:68))(1/1)
> switched to CANCELED
> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at main(DoTheKMeans.java:68))
> -> Map
> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED
> 02/11/2015 14:58:27     Job execution switched to status FAILED.
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException:
> com.esotericsoftware.kryo.KryoException: Buffer underflow
>         at
>
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:76)
>         at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>         at
>
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at
>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:205)
>         at
>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:210)
>         at
>
> org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43)
>         at
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
>         at
>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>         at
>
> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:138)
>         at
>
> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:324)
>         at
>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
>         at
>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204)
>         at java.lang.Thread.run(Thread.java:745)
>
>         at
>
> org.apache.flink.runtime.client.JobClientListener$$anonfun$receiveWithLogMessages$2.applyOrElse(JobClient.scala:88)
>         at
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>         at
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>         at
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>         at
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
>         at
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>         at
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at
>
> org.apache.flink.runtime.client.JobClientListener.aroundReceive(JobClient.scala:74)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> The centroid25 data is exactly the same in both cases. Could you help me
> retrace what is wrong?
>
> Thanks and best regards,
>
> Tran Nam-Luc
>
>
>
> --
> View this message in context:
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/kryoException-Buffer-underflow-tp3760.html
> Sent from the Apache Flink (Incubator) Mailing List archive. mailing list
> archive at Nabble.com.
>

Reply via email to