@Stephan: Yes you are correct. Both omitting the "returns(...)" statement, or changing it to "returns(Centroid25.class)" would help.

The returns(TypeInformation) and returns(String) methods do absolutely no type extraction, the user has to know what he is doing. If you read the methods description:

Pojo types such as org.my.MyPojo<myFieldName=TYPE0,myFieldName2=TYPE1>
Generic types such as java.lang.Class

With the returns(String) method you can create all types of type information we currently support.

returns(Class) the description is as follows:

This method takes a class that will be analyzed by Flink's type extraction capabilities.


On 11.02.2015 21:42, Stephan Ewen wrote:
But in this case, there are no type parameters, correct? Centroid25 is not
a generic class...

On Wed, Feb 11, 2015 at 9:40 PM, Robert Metzger <rmetz...@apache.org> wrote:

I think the issue is that the returns("eu.euranova.flink.Centroid25")
variant only passes a string and the system does not know the
typeparameters.
So we have to put GenericTypeInfo there, because we basically see Object's.

On Wed, Feb 11, 2015 at 9:37 PM, Stephan Ewen <se...@apache.org> wrote:

@Timo If I understand it correctly, both omitting the "returns(...)"
statement, or changing it to "returns(Centroid25.class)" would help?

I think that the behavior between "returns(Centroid25.class)" and "
returns("eu.euranova.flink.Centroid25")" should be consistent in that
they
both handle the type as a POJO.

Stephan


On Wed, Feb 11, 2015 at 9:28 PM, Timo Walther <twal...@apache.org>
wrote:
Hey Nam-Luc,

I think your problem lies in the following line:

.returns("eu.euranova.flink.Centroid25")

If you do not specify the fields of the class in the String by using
"<myfield=String,otherField=int>", the underlying parser will create an
"GenericTypeInfo" type information which then uses Kryo for
serialization.
In general, lambda expressions are a very new feature which currently
makes a lot of problems due to missing type information by compilers.
Maybe
it is better to use (anonymous) classes instead.

In case of "map()" functions you don't need to provide type hints
through
the "returns()" method.

For other operators you need to either specify all fields of the class
in
the String (makes no sense in you case) or you change the method to

.returns(Centroid25.class)

I hope that helps.

Regards,
Timo


On 11.02.2015 17:38, Nam-Luc Tran wrote:

Hello Stephan,

Thank you for your help.

I ensured all the POJO classes used comply to what you previously said
and the same exception occurs. Here is the listing of classes
Centroid25 and Point25:

public class Centroid25 extends Point25 {

public int id;

public Centroid25() {}

public Centroid25(int id, Double value0, Double value1, Double value2,
Double value3, Double value4, Double value5,
Double value6, Double value7, Double value8, Double value9, Double
value10, Double value11, Double value12,
Double value13, Double value14, Double value15, Double value16, Double
value17, Double value18,
Double value19, Double value20, Double value21, Double value22, Double
value23, Double value24) {
super(value0, value1, value2, value3, value4, value5, value6, value7,
value8, value9, value10, value11,
value12, value13, value14, value15, value16, value17, value18,
value19, value20, value21, value22,
value23, value24);
this.id = id;
}

public Centroid25(int id, Point25 p) {
super(p.f0,
p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.
f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.
f22,p.f23,p.f24);
this.id = id;
}

public Centroid25(int id, Tuple25 p) {
super(p.f0,
p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.
f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.
f22,p.f23,p.f24);
this.id = id;
}

@Override
public String toString() {
return id + " " + super.toString();
}
}

public class Point25{

public Double
f0,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13,f14,f15,f16,
f17,f18,f19,f20,f21,f22,f23,f24
= 0.0;

public Point25() {
}

public Point25(Double value0, Double value1, Double value2, Double
value3, Double value4, Double value5,
Double value6, Double value7, Double value8, Double value9, Double
value10, Double value11, Double value12,
Double value13, Double value14, Double value15, Double value16, Double
value17, Double value18,
Double value19, Double value20, Double value21, Double value22, Double
value23, Double value24) {
f0=value0;
f1=value1;
f2=value2;
f3=value3;
f4=value4;
f5=value5;
f6=value6;
f7=value7;
f8=value8;
f9=value9;
f10=value10;
f11=value11;
f12=value12;
f13=value13;
f14=value14;
f15=value15;
f16=value16;
f17=value17;
f18=value18;
f19=value19;
f20=value20;
f21=value21;
f22=value22;
f23=value23;
f24=value24;

}

public List getFieldsAsList() {
return Arrays.asList(f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11,
f12, f13, f14, f15, f16, f17, f18, f19,
f20, f21, f22, f23, f24);
}

public Point25 add(Point25 other) {
f0 += other.f0;
f1 += other.f1;
f2 += other.f2;
f3 += other.f3;
f4 += other.f4;
f5 += other.f5;
f6 += other.f6;
f7 += other.f7;
f8 += other.f8;
f9 += other.f9;
f10 += other.f10;
f11 += other.f11;
f12 += other.f12;
f13 += other.f13;
f14 += other.f14;
f15 += other.f15;
f16 += other.f16;
f17 += other.f17;
f18 += other.f18;
f19 += other.f19;
f20 += other.f20;
f21 += other.f21;
f22 += other.f22;
f23 += other.f23;
f24 += other.f24;
return this;
}

public Point25 div(long val) {
f0 /= val;
f1 /= val;
f2 /= val;
f3 /= val;
f4 /= val;
f5 += val;
f6 += val;
f7 += val;
f8 += val;
f9 += val;
f10 += val;
f11 += val;
f12 += val;
f13 += val;
f14 += val;
f15 += val;
f16 += val;
f17 += val;
f18 += val;
f19 += val;
f20 += val;
f21 += val;
f22 += val;
f23 += val;
f24 += val;
return this;
}

public double euclideanDistance(Point25 other) {
List l = this.getFieldsAsList();
List ol = other.getFieldsAsList();
double res = 0;
for(int i=0;i

I came accross an error for which I am unable to retrace the exact

cause.

Starting from flink-java-examples module, I have extended the KMeans
example
to a case where points have 25 coordinates. It follows the exact

same

structure and transformations as the original example, only with

points

having 25 coordinates instead of 2.

When creating the centroids dataset within the code as follows the

job

iterates and executes well:

Centroid25 cent1 = new

Centroid25(ThreadLocalRandom.current().nextInt(0,

1000),


  -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,
0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0);

Centroid25 cent2 = new

Centroid25(ThreadLocalRandom.current().nextInt(0,

1000),


  -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.
0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0);

DataSet centroids = env.fromCollection(Arrays.asList(cent1,
cent2));

When reading from a csv file containing the following:


  -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,
0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0

  -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.
0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0

with the following code:
DataSet> centroids = env

.readCsvFile("file:///home/nltran/res3.csv")


.fieldDelimiter(",")


.includeFields("1111111111111111111111111")


.types(Double.class, Double.class,

Double.class, Double.class,
Double.class, Double.class,


Double.class,

Double.class, Double.class, Double.class, Double.class,
Double.class,


Double.class,

Double.class, Double.class, Double.class, Double.class,
Double.class,


Double.class,

Double.class, Double.class, Double.class, Double.class,
Double.class,


Double.class).map(p -> {


return new

Centroid25(ThreadLocalRandom.current().nextInt(0, 1000),


  p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.
f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.
f21,p.f22,p.f23,p.f24);


}).returns("eu.euranova.flink.Centroid25");

I hit the following exception:

02/11/2015 14:58:27     PartialSolution (BulkIteration (Bulk
Iteration))(1/1)
switched to FAILED
com.esotericsoftware.kryo.KryoException: Buffer underflow
           at


  org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(
NoFetchingInput.java:76)

           at

com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)

           at


  com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
DefaultClassResolver.java:109)

           at

com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)

           at

com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)

           at



org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
KryoSerializer.java:205)

           at



org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
KryoSerializer.java:210)

           at


  org.apache.flink.runtime.io.disk.InputViewIterator.next(
InputViewIterator.java:43)

           at


org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
           at


  org.apache.flink.runtime.operators.RegularPactTask.run(
RegularPactTask.java:496)

           at



org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(
AbstractIterativePactTask.java:138)

           at


  org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(
IterationHeadPactTask.java:324)

           at


  org.apache.flink.runtime.operators.RegularPactTask.
invoke(RegularPactTask.java:360)

           at


  org.apache.flink.runtime.execution.RuntimeEnvironment.
run(RuntimeEnvironment.java:204)

           at java.lang.Thread.run(Thread.java:745)

02/11/2015 14:58:27     Job execution switched to status

FAILING.

02/11/2015 14:58:27     CHAIN Map (Map at

main(DoTheKMeans.java:64)) ->

Map (Map
at main(DoTheKMeans.java:65))(1/1) switched to CANCELING
02/11/2015 14:58:27     Combine (Reduce at

main(DoTheKMeans.java:68))(1/1)

switched to CANCELING
02/11/2015 14:58:27     CHAIN Reduce(Reduce at

main(DoTheKMeans.java:68))

-> Map
(Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING
02/11/2015 14:58:27     DataSink(Print to System.out)(1/1)

switched to

CANCELED
02/11/2015 14:58:27     Sync(BulkIteration (Bulk

Iteration))(1/1) switched

to
CANCELING
02/11/2015 14:58:27     Sync(BulkIteration (Bulk

Iteration))(1/1) switched

to
CANCELED
02/11/2015 14:58:27     CHAIN Map (Map at

main(DoTheKMeans.java:64)) ->

Map (Map
at main(DoTheKMeans.java:65))(1/1) switched to CANCELED
02/11/2015 14:58:27     Combine (Reduce at

main(DoTheKMeans.java:68))(1/1)

switched to CANCELED
02/11/2015 14:58:27     CHAIN Reduce(Reduce at

main(DoTheKMeans.java:68))

-> Map
(Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED
02/11/2015 14:58:27     Job execution switched to status FAILED.
Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException:
com.esotericsoftware.kryo.KryoException: Buffer underflow
           at


  org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(
NoFetchingInput.java:76)

           at

com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)

           at


  com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
DefaultClassResolver.java:109)

           at

com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)

           at

com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)

           at



org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
KryoSerializer.java:205)

           at



org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
KryoSerializer.java:210)

           at


  org.apache.flink.runtime.io.disk.InputViewIterator.next(
InputViewIterator.java:43)

           at


org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
           at


  org.apache.flink.runtime.operators.RegularPactTask.run(
RegularPactTask.java:496)

           at



org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(
AbstractIterativePactTask.java:138)

           at


  org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(
IterationHeadPactTask.java:324)

           at


  org.apache.flink.runtime.operators.RegularPactTask.
invoke(RegularPactTask.java:360)

           at


  org.apache.flink.runtime.execution.RuntimeEnvironment.
run(RuntimeEnvironment.java:204)

           at java.lang.Thread.run(Thread.java:745)

           at


  org.apache.flink.runtime.client.JobClientListener$$anonfun$
receiveWithLogMessages$2.applyOrElse(JobClient.scala:88)

           at


  scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
AbstractPartialFunction.scala:33)

           at


  scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
AbstractPartialFunction.scala:33)

           at


  scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
AbstractPartialFunction.scala:25)

           at


  org.apache.flink.runtime.ActorLogMessages$$anon$1.
apply(ActorLogMessages.scala:37)

           at


  org.apache.flink.runtime.ActorLogMessages$$anon$1.
apply(ActorLogMessages.scala:30)

           at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
           at


  org.apache.flink.runtime.ActorLogMessages$$anon$1.
applyOrElse(ActorLogMessages.scala:30)

           at

akka.actor.Actor$class.aroundReceive(Actor.scala:465)

           at


  org.apache.flink.runtime.client.JobClientListener.
aroundReceive(JobClient.scala:74)

           at

akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)

           at akka.actor.ActorCell.invoke(ActorCell.scala:487)
           at

akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)

           at akka.dispatch.Mailbox.run(Mailbox.scala:221)
           at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
           at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
           at


  scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
runTask(ForkJoinPool.java:1339)

           at

  scala.concurrent.forkjoin.ForkJoinPool.runWorker(
ForkJoinPool.java:1979)

           at


  scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
ForkJoinWorkerThread.java:107)

The centroid25 data is exactly the same in both cases. Could you

help me

retrace what is wrong?

Thanks and best regards,

Tran Nam-Luc



--
View this message in context:

  http://apache-flink-incubator-mailing-list-archive.1008284.
n3.nabble.com/kryoException-Buffer-underflow-tp3760.html

Sent from the Apache Flink (Incubator) Mailing List archive. mailing

list

archive at Nabble.com.




Reply via email to