But in this case, there are no type parameters, correct? Centroid25 is not a generic class...
On Wed, Feb 11, 2015 at 9:40 PM, Robert Metzger <rmetz...@apache.org> wrote: > I think the issue is that the returns("eu.euranova.flink.Centroid25") > variant only passes a string and the system does not know the > typeparameters. > So we have to put GenericTypeInfo there, because we basically see Object's. > > On Wed, Feb 11, 2015 at 9:37 PM, Stephan Ewen <se...@apache.org> wrote: > > > @Timo If I understand it correctly, both omitting the "returns(...)" > > statement, or changing it to "returns(Centroid25.class)" would help? > > > > I think that the behavior between "returns(Centroid25.class)" and " > > returns("eu.euranova.flink.Centroid25")" should be consistent in that > they > > both handle the type as a POJO. > > > > Stephan > > > > > > On Wed, Feb 11, 2015 at 9:28 PM, Timo Walther <twal...@apache.org> > wrote: > > > > > Hey Nam-Luc, > > > > > > I think your problem lies in the following line: > > > > > > .returns("eu.euranova.flink.Centroid25") > > > > > > If you do not specify the fields of the class in the String by using > > > "<myfield=String,otherField=int>", the underlying parser will create an > > > "GenericTypeInfo" type information which then uses Kryo for > > serialization. > > > > > > In general, lambda expressions are a very new feature which currently > > > makes a lot of problems due to missing type information by compilers. > > Maybe > > > it is better to use (anonymous) classes instead. > > > > > > In case of "map()" functions you don't need to provide type hints > through > > > the "returns()" method. > > > > > > For other operators you need to either specify all fields of the class > in > > > the String (makes no sense in you case) or you change the method to > > > > > > .returns(Centroid25.class) > > > > > > I hope that helps. > > > > > > Regards, > > > Timo > > > > > > > > > On 11.02.2015 17:38, Nam-Luc Tran wrote: > > > > > >> Hello Stephan, > > >> > > >> Thank you for your help. > > >> > > >> I ensured all the POJO classes used comply to what you previously said > > >> and the same exception occurs. Here is the listing of classes > > >> Centroid25 and Point25: > > >> > > >> public class Centroid25 extends Point25 { > > >> > > >> public int id; > > >> > > >> public Centroid25() {} > > >> > > >> public Centroid25(int id, Double value0, Double value1, Double value2, > > >> Double value3, Double value4, Double value5, > > >> Double value6, Double value7, Double value8, Double value9, Double > > >> value10, Double value11, Double value12, > > >> Double value13, Double value14, Double value15, Double value16, Double > > >> value17, Double value18, > > >> Double value19, Double value20, Double value21, Double value22, Double > > >> value23, Double value24) { > > >> super(value0, value1, value2, value3, value4, value5, value6, value7, > > >> value8, value9, value10, value11, > > >> value12, value13, value14, value15, value16, value17, value18, > > >> value19, value20, value21, value22, > > >> value23, value24); > > >> this.id = id; > > >> } > > >> > > >> public Centroid25(int id, Point25 p) { > > >> super(p.f0, > > >> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p. > > >> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p. > > >> f22,p.f23,p.f24); > > >> this.id = id; > > >> } > > >> > > >> public Centroid25(int id, Tuple25 p) { > > >> super(p.f0, > > >> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p. > > >> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p. > > >> f22,p.f23,p.f24); > > >> this.id = id; > > >> } > > >> > > >> @Override > > >> public String toString() { > > >> return id + " " + super.toString(); > > >> } > > >> } > > >> > > >> public class Point25{ > > >> > > >> public Double > > >> f0,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13,f14,f15,f16, > > >> f17,f18,f19,f20,f21,f22,f23,f24 > > >> = 0.0; > > >> > > >> public Point25() { > > >> } > > >> > > >> public Point25(Double value0, Double value1, Double value2, Double > > >> value3, Double value4, Double value5, > > >> Double value6, Double value7, Double value8, Double value9, Double > > >> value10, Double value11, Double value12, > > >> Double value13, Double value14, Double value15, Double value16, Double > > >> value17, Double value18, > > >> Double value19, Double value20, Double value21, Double value22, Double > > >> value23, Double value24) { > > >> f0=value0; > > >> f1=value1; > > >> f2=value2; > > >> f3=value3; > > >> f4=value4; > > >> f5=value5; > > >> f6=value6; > > >> f7=value7; > > >> f8=value8; > > >> f9=value9; > > >> f10=value10; > > >> f11=value11; > > >> f12=value12; > > >> f13=value13; > > >> f14=value14; > > >> f15=value15; > > >> f16=value16; > > >> f17=value17; > > >> f18=value18; > > >> f19=value19; > > >> f20=value20; > > >> f21=value21; > > >> f22=value22; > > >> f23=value23; > > >> f24=value24; > > >> > > >> } > > >> > > >> public List getFieldsAsList() { > > >> return Arrays.asList(f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11, > > >> f12, f13, f14, f15, f16, f17, f18, f19, > > >> f20, f21, f22, f23, f24); > > >> } > > >> > > >> public Point25 add(Point25 other) { > > >> f0 += other.f0; > > >> f1 += other.f1; > > >> f2 += other.f2; > > >> f3 += other.f3; > > >> f4 += other.f4; > > >> f5 += other.f5; > > >> f6 += other.f6; > > >> f7 += other.f7; > > >> f8 += other.f8; > > >> f9 += other.f9; > > >> f10 += other.f10; > > >> f11 += other.f11; > > >> f12 += other.f12; > > >> f13 += other.f13; > > >> f14 += other.f14; > > >> f15 += other.f15; > > >> f16 += other.f16; > > >> f17 += other.f17; > > >> f18 += other.f18; > > >> f19 += other.f19; > > >> f20 += other.f20; > > >> f21 += other.f21; > > >> f22 += other.f22; > > >> f23 += other.f23; > > >> f24 += other.f24; > > >> return this; > > >> } > > >> > > >> public Point25 div(long val) { > > >> f0 /= val; > > >> f1 /= val; > > >> f2 /= val; > > >> f3 /= val; > > >> f4 /= val; > > >> f5 += val; > > >> f6 += val; > > >> f7 += val; > > >> f8 += val; > > >> f9 += val; > > >> f10 += val; > > >> f11 += val; > > >> f12 += val; > > >> f13 += val; > > >> f14 += val; > > >> f15 += val; > > >> f16 += val; > > >> f17 += val; > > >> f18 += val; > > >> f19 += val; > > >> f20 += val; > > >> f21 += val; > > >> f22 += val; > > >> f23 += val; > > >> f24 += val; > > >> return this; > > >> } > > >> > > >> public double euclideanDistance(Point25 other) { > > >> List l = this.getFieldsAsList(); > > >> List ol = other.getFieldsAsList(); > > >> double res = 0; > > >> for(int i=0;i > > >> > > >>> I came accross an error for which I am unable to retrace the exact > > >>> > > >> cause. > > >> > > >>> Starting from flink-java-examples module, I have extended the KMeans > > >>> example > > >>> to a case where points have 25 coordinates. It follows the exact > > >>> > > >> same > > >> > > >>> structure and transformations as the original example, only with > > >>> > > >> points > > >> > > >>> having 25 coordinates instead of 2. > > >>> > > >>> When creating the centroids dataset within the code as follows the > > >>> > > >> job > > >> > > >>> iterates and executes well: > > >>> > > >>> Centroid25 cent1 = new > > >>> > > >> Centroid25(ThreadLocalRandom.current().nextInt(0, > > >> > > >>> 1000), > > >>> > > >>> > > >>> -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0, > > >> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0); > > >> > > >>> Centroid25 cent2 = new > > >>> > > >> Centroid25(ThreadLocalRandom.current().nextInt(0, > > >> > > >>> 1000), > > >>> > > >>> > > >>> -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1. > > >> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0); > > >> > > >>> DataSet centroids = env.fromCollection(Arrays.asList(cent1, > > >>> cent2)); > > >>> > > >>> When reading from a csv file containing the following: > > >>> > > >>> > > >>> -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0, > > >> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0 > > >> > > >>> > > >>> -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1. > > >> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0 > > >> > > >>> with the following code: > > >>> DataSet> centroids = env > > >>> > > >>> .readCsvFile("file:///home/nltran/res3.csv") > > >>> > > >>> > > >> .fieldDelimiter(",") > > >> > > >>> > > >>> > > >> .includeFields("1111111111111111111111111") > > >> > > >>> > > >>> > > >> .types(Double.class, Double.class, > > >> > > >>> Double.class, Double.class, > > >>> Double.class, Double.class, > > >>> > > >>> > > >> Double.class, > > >> > > >>> Double.class, Double.class, Double.class, Double.class, > > >>> Double.class, > > >>> > > >>> > > >> Double.class, > > >> > > >>> Double.class, Double.class, Double.class, Double.class, > > >>> Double.class, > > >>> > > >>> > > >> Double.class, > > >> > > >>> Double.class, Double.class, Double.class, Double.class, > > >>> Double.class, > > >>> > > >>> > > >> Double.class).map(p -> { > > >> > > >>> > > >>> > > >> return new > > >> > > >>> Centroid25(ThreadLocalRandom.current().nextInt(0, 1000), > > >>> > > >>> > > >>> p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p. > > >> f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p. > > >> f21,p.f22,p.f23,p.f24); > > >> > > >>> > > >>> > > >> }).returns("eu.euranova.flink.Centroid25"); > > >> > > >>> > > >>> I hit the following exception: > > >>> > > >>> 02/11/2015 14:58:27 PartialSolution (BulkIteration (Bulk > > >>> Iteration))(1/1) > > >>> switched to FAILED > > >>> com.esotericsoftware.kryo.KryoException: Buffer underflow > > >>> at > > >>> > > >>> > > >>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require( > > >> NoFetchingInput.java:76) > > >> > > >>> at > > >>> > > >> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355) > > >> > > >>> at > > >>> > > >>> > > >>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass( > > >> DefaultClassResolver.java:109) > > >> > > >>> at > > >>> > > >> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > > >> > > >>> at > > >>> > > >> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > > >> > > >>> at > > >>> > > >>> > > >>> > > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize( > > >> KryoSerializer.java:205) > > >> > > >>> at > > >>> > > >>> > > >>> > > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize( > > >> KryoSerializer.java:210) > > >> > > >>> at > > >>> > > >>> > > >>> org.apache.flink.runtime.io.disk.InputViewIterator.next( > > >> InputViewIterator.java:43) > > >> > > >>> at > > >>> > > >>> > org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91) > > >> > > >>> at > > >>> > > >>> > > >>> org.apache.flink.runtime.operators.RegularPactTask.run( > > >> RegularPactTask.java:496) > > >> > > >>> at > > >>> > > >>> > > >>> > org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run( > > >> AbstractIterativePactTask.java:138) > > >> > > >>> at > > >>> > > >>> > > >>> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run( > > >> IterationHeadPactTask.java:324) > > >> > > >>> at > > >>> > > >>> > > >>> org.apache.flink.runtime.operators.RegularPactTask. > > >> invoke(RegularPactTask.java:360) > > >> > > >>> at > > >>> > > >>> > > >>> org.apache.flink.runtime.execution.RuntimeEnvironment. > > >> run(RuntimeEnvironment.java:204) > > >> > > >>> at java.lang.Thread.run(Thread.java:745) > > >>> > > >>> 02/11/2015 14:58:27 Job execution switched to status > > >>> > > >> FAILING. > > >> > > >>> 02/11/2015 14:58:27 CHAIN Map (Map at > > >>> > > >> main(DoTheKMeans.java:64)) -> > > >> > > >>> Map (Map > > >>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELING > > >>> 02/11/2015 14:58:27 Combine (Reduce at > > >>> > > >> main(DoTheKMeans.java:68))(1/1) > > >> > > >>> switched to CANCELING > > >>> 02/11/2015 14:58:27 CHAIN Reduce(Reduce at > > >>> > > >> main(DoTheKMeans.java:68)) > > >> > > >>> -> Map > > >>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING > > >>> 02/11/2015 14:58:27 DataSink(Print to System.out)(1/1) > > >>> > > >> switched to > > >> > > >>> CANCELED > > >>> 02/11/2015 14:58:27 Sync(BulkIteration (Bulk > > >>> > > >> Iteration))(1/1) switched > > >> > > >>> to > > >>> CANCELING > > >>> 02/11/2015 14:58:27 Sync(BulkIteration (Bulk > > >>> > > >> Iteration))(1/1) switched > > >> > > >>> to > > >>> CANCELED > > >>> 02/11/2015 14:58:27 CHAIN Map (Map at > > >>> > > >> main(DoTheKMeans.java:64)) -> > > >> > > >>> Map (Map > > >>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELED > > >>> 02/11/2015 14:58:27 Combine (Reduce at > > >>> > > >> main(DoTheKMeans.java:68))(1/1) > > >> > > >>> switched to CANCELED > > >>> 02/11/2015 14:58:27 CHAIN Reduce(Reduce at > > >>> > > >> main(DoTheKMeans.java:68)) > > >> > > >>> -> Map > > >>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED > > >>> 02/11/2015 14:58:27 Job execution switched to status FAILED. > > >>> Exception in thread "main" > > >>> org.apache.flink.runtime.client.JobExecutionException: > > >>> com.esotericsoftware.kryo.KryoException: Buffer underflow > > >>> at > > >>> > > >>> > > >>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require( > > >> NoFetchingInput.java:76) > > >> > > >>> at > > >>> > > >> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355) > > >> > > >>> at > > >>> > > >>> > > >>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass( > > >> DefaultClassResolver.java:109) > > >> > > >>> at > > >>> > > >> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641) > > >> > > >>> at > > >>> > > >> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752) > > >> > > >>> at > > >>> > > >>> > > >>> > > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize( > > >> KryoSerializer.java:205) > > >> > > >>> at > > >>> > > >>> > > >>> > > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize( > > >> KryoSerializer.java:210) > > >> > > >>> at > > >>> > > >>> > > >>> org.apache.flink.runtime.io.disk.InputViewIterator.next( > > >> InputViewIterator.java:43) > > >> > > >>> at > > >>> > > >>> > org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91) > > >> > > >>> at > > >>> > > >>> > > >>> org.apache.flink.runtime.operators.RegularPactTask.run( > > >> RegularPactTask.java:496) > > >> > > >>> at > > >>> > > >>> > > >>> > org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run( > > >> AbstractIterativePactTask.java:138) > > >> > > >>> at > > >>> > > >>> > > >>> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run( > > >> IterationHeadPactTask.java:324) > > >> > > >>> at > > >>> > > >>> > > >>> org.apache.flink.runtime.operators.RegularPactTask. > > >> invoke(RegularPactTask.java:360) > > >> > > >>> at > > >>> > > >>> > > >>> org.apache.flink.runtime.execution.RuntimeEnvironment. > > >> run(RuntimeEnvironment.java:204) > > >> > > >>> at java.lang.Thread.run(Thread.java:745) > > >>> > > >>> at > > >>> > > >>> > > >>> org.apache.flink.runtime.client.JobClientListener$$anonfun$ > > >> receiveWithLogMessages$2.applyOrElse(JobClient.scala:88) > > >> > > >>> at > > >>> > > >>> > > >>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp( > > >> AbstractPartialFunction.scala:33) > > >> > > >>> at > > >>> > > >>> > > >>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply( > > >> AbstractPartialFunction.scala:33) > > >> > > >>> at > > >>> > > >>> > > >>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply( > > >> AbstractPartialFunction.scala:25) > > >> > > >>> at > > >>> > > >>> > > >>> org.apache.flink.runtime.ActorLogMessages$$anon$1. > > >> apply(ActorLogMessages.scala:37) > > >> > > >>> at > > >>> > > >>> > > >>> org.apache.flink.runtime.ActorLogMessages$$anon$1. > > >> apply(ActorLogMessages.scala:30) > > >> > > >>> at > > >>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > > >>> at > > >>> > > >>> > > >>> org.apache.flink.runtime.ActorLogMessages$$anon$1. > > >> applyOrElse(ActorLogMessages.scala:30) > > >> > > >>> at > > >>> > > >> akka.actor.Actor$class.aroundReceive(Actor.scala:465) > > >> > > >>> at > > >>> > > >>> > > >>> org.apache.flink.runtime.client.JobClientListener. > > >> aroundReceive(JobClient.scala:74) > > >> > > >>> at > > >>> > > >> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > > >> > > >>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) > > >>> at > > >>> > > >> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > > >> > > >>> at akka.dispatch.Mailbox.run(Mailbox.scala:221) > > >>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > > >>> at > > >>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > > >>> at > > >>> > > >>> > > >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. > > >> runTask(ForkJoinPool.java:1339) > > >> > > >>> at > > >>> > > >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker( > > >> ForkJoinPool.java:1979) > > >> > > >>> at > > >>> > > >>> > > >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run( > > >> ForkJoinWorkerThread.java:107) > > >> > > >>> The centroid25 data is exactly the same in both cases. Could you > > >>> > > >> help me > > >> > > >>> retrace what is wrong? > > >>> > > >>> Thanks and best regards, > > >>> > > >>> Tran Nam-Luc > > >>> > > >>> > > >>> > > >>> -- > > >>> View this message in context: > > >>> > > >>> http://apache-flink-incubator-mailing-list-archive.1008284. > > >> n3.nabble.com/kryoException-Buffer-underflow-tp3760.html > > >> > > >>> Sent from the Apache Flink (Incubator) Mailing List archive. mailing > > >>> > > >> list > > >> > > >>> archive at Nabble.com. > > >>> > > >>> > > >> > > >> > > > > > >