Thanks for helping us debug this. You can start many taskmanagers in one JVM, by using the LocalMiniCluster. Have a look at this (manually triggered) test, which runs 100 TaskManagers in one JVM:
https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java The data type serialization should have no impact on the network stream, as that is buffer-oriented and has no understanding or notion of the types inside. That transition comes later in the readers/writers. It is never impossible for a bug to be in there such that the serialization affects the buffer transport. Thanks for the pointer, we will definitely look into that. On Thu, Jun 4, 2015 at 1:07 PM, Kruse, Sebastian <sebastian.kr...@hpi.de> wrote: > Thanks for your feedback. I am neither running IPSec nor the aesni-intel > module. > > So far, I could not reproduce the reordering issue. I also have detected > that my code might have created String objects with invalid UTF16 content > in exactly those jobs that suffered from the reordering. I wanted to use > binary data as a key, so I put it into a char array and wrapped it in a > String, hoping that would do the trick. Maybe, this has caused some error > in the String (de-)serialization with further consequences, e.g, skipping > of a buffer. Ever since I have encoded the binary data in Base64, the > reordering error did not pop up again. I will do some more runs to verify > this. > > However, the other problem with the serialization exceptions should not be > affected of this. To see if network stream corruption is an issue, I also > perform some runs now on a larger single machine with a single task > manager, so that no network communication is involved. Serialization should > take place anyway, right? Is there a way, to run a second task manager on > the same machine within the same OS? > > Cheers, > Sebastian > > -----Original Message----- > From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of > Stephan Ewen > Sent: Mittwoch, 3. Juni 2015 20:14 > To: dev@flink.apache.org > Subject: Re: Buffer re-ordering problem > > Hi Sebastian! > > The first error that you report looks could also be a lost buffer, rather > than a reordering. > The second error that you post seems a serialization issue. > Both can be consequences of a network stream corruption. > > Would be good to figure out why the netty event loop does not > encode/decode this properly in your case. > > BTW: I would assume we can virtually rule out TCP/kernel bugs as sources > for the corruption, unless you have this setup here: > > http://arstechnica.com/information-technology/2015/05/the-discovery-of-apache-zookeepers-poison-packet/ > > Stephan > > > On Wed, Jun 3, 2015 at 7:51 PM, Ufuk Celebi <u...@apache.org> wrote: > > > Hey Sebastian, > > > > would you mind sharing the code and dataset with me (privately would > > work fine). I want to try to reproduce this as well. > > > > – Ufuk > > > > On Wed, Jun 3, 2015 at 11:07 AM, Kruse, Sebastian > > <sebastian.kr...@hpi.de> > > wrote: > > > > > I am currently using 0.9-SNAPSHOT. All the non-jar files are from an > > older > > > build, but I recently manually updated the flink-dist.jar with > > > commit d163a817fa2e330e86384d0bbcd104f051a6fb48. > > > > > > Our setup consists of 10 workers and a master, all interconnected > > > via a switch (100 Mbit, I think). The data set is an NTriple file of > > > about 8 > > GB, > > > however, intermediate datasets might be much larger. However, for > > > smaller datasets, I could not observe this problem, yet. > > > > > > Also, during the failure there are a lot of concurrent shuffles > > > ongoing [1,2]. Additionally, it might be interesting that in the > > > affected jobs either this exception occurs or another one that looks > > > like a network disruption. [3] So, it might well be, that our setup > > > suffers from occasional network errors, especially during high > > > network load - but > > that’s > > > just a plain guess. > > > > > > Regarding the reproducibility, I can only say right now, that this > > > error occurred twice. I will re-run the jobs and see, if the errors > > > can be observed again and let you know. > > > > > > Cheers, > > > Sebastian > > > > > > [1] > > > > > https://owncloud.hpi.de/public.php?service=files&t=89dcba7ce63ed053331 > > f5099cad64704 > > > [2] > > > > > https://owncloud.hpi.de/public.php?service=files&t=1b16d66d6b9e41c68bf > > 598309d57b7b1 > > > [3] Stack trace: tenem18.hpi.uni-potsdam.de > > > Error: java.lang.Exception: The data preparation for task 'CHAIN > > > GroupReduce (GroupReduce at > > > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl > > an(AllAtOnceTraversalStrategy.scala:56)) > > > -> Filter (Filter at > > > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl > > an(AllAtOnceTraversalStrategy.scala:68)) > > > -> FlatMap (FlatMap at > > > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(Traver > > salStrategy.scala:46)) > > > -> Map (Map at > > > > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' > > > , caused an error: Error obtaining the sorted input: Thread > > > 'SortMerger Reading Thread' terminated due to an exception: null at > > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask > > .java:471) > > > at > > > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactT > > ask.java:362) > > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > > at java.lang.Thread.run(Thread.java:745) > > > Caused by: java.lang.RuntimeException: Error obtaining the sorted > input: > > > Thread 'SortMerger Reading Thread' terminated due to an exception: > > > null at > > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterat > > or(UnilateralSortMerger.java:607) > > > at > > > > > org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPac > > tTask.java:1145) > > > at > > > > > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupRedu > > ceDriver.java:94) > > > at > > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask > > .java:466) > > > ... 3 more > > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > > > terminated due to an exception: null at > > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBas > > e.run(UnilateralSortMerger.java:784) > > > Caused by: java.io.EOFException > > > at > > > > > org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(D > > ataInputDeserializer.java:290) > > > at > > > org.apache.flink.types.StringValue.readString(StringValue.java:741) > > > at > > > > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserializ > > e(StringSerializer.java:68) > > > at > > > > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserializ > > e(StringSerializer.java:28) > > > at > > > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C > > aseClassSerializer.scala:102) > > > at > > > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C > > aseClassSerializer.scala:29) > > > at > > > > > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.dese > > rialize(GenericArraySerializer.java:123) > > > at > > > > > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.dese > > rialize(GenericArraySerializer.java:33) > > > at > > > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C > > aseClassSerializer.scala:102) > > > at > > > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C > > aseClassSerializer.scala:95) > > > at > > > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(C > > aseClassSerializer.scala:29) > > > at > > > > > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read( > > ReusingDeserializationDelegate.java:57) > > > at > > > > > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptive > > SpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecor > > dDeserializer.java:133) > > > at > > > > > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.ge > > tNextRecord(AbstractRecordReader.java:64) > > > at > > > > > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.nex > > t(MutableRecordReader.java:34) > > > at > > > > > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIter > > ator.java:59) > > > at > > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingTh > > read.go(UnilateralSortMerger.java:1020) > > > at > > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBas > > e.run(UnilateralSortMerger.java:781) > > > > > > > > > -----Original Message----- > > > From: Ufuk Celebi [mailto:u...@apache.org] > > > Sent: Mittwoch, 3. Juni 2015 10:33 > > > To: dev@flink.apache.org > > > Subject: Re: Buffer re-ordering problem > > > > > > This is a critical bug. > > > > > > - which version are you using? If snapshot, which commit? > > > - what is your setup? Number of machines, datset etc? > > > - is it reproducible? > > > > > > On Wednesday, June 3, 2015, Kruse, Sebastian > > > <sebastian.kr...@hpi.de> > > > wrote: > > > > > > > Hi everyone, > > > > > > > > I had some jobs running over the night and in two of them after > > > > about half an hour the following exception occurred. Do you know > > > > why this > > > happens? > > > > > > > > Thanks, > > > > Sebastian > > > > > > > > tenem16.hpi.uni-potsdam.de > > > > Error: java.lang.Exception: The data preparation for task 'CHAIN > > > > GroupReduce (GroupReduce at > > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFli > > > > nkPl > > > > an(AllAtOnceTraversalStrategy.scala:56)) > > > > -> Filter (Filter at > > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFli > > > > nkPl > > > > an(AllAtOnceTraversalStrategy.scala:68)) > > > > -> FlatMap (FlatMap at > > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(Tr > > > > aver > > > > salStrategy.scala:46)) > > > > -> Map (Map at > > > > > > > > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' > > > > , caused an error: Error obtaining the sorted input: Thread > > > > 'SortMerger Reading Thread' terminated due to an exception: Buffer > > > re-ordering: > > > > expected buffer with sequence number 17841, but received 17842. > > > > at > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPact > > > > Task > > > > .java:471) > > > > at > > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularP > > > > actT > > > > ask.java:362) at > > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > > > at java.lang.Thread.run(Thread.java:745) > > > > Caused by: java.lang.RuntimeException: Error obtaining the sorted > > input: > > > > Thread 'SortMerger Reading Thread' terminated due to an exception: > > > > Buffer > > > > re-ordering: expected buffer with sequence number 17841, but > > > > received > > > 17842. > > > > at > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIt > > > > erat > > > > or(UnilateralSortMerger.java:607) > > > > at > > > > org.apache.flink.runtime.operators.RegularPactTask.getInput(Regula > > > > rPac > > > > tTask.java:1145) > > > > at > > > > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(Group > > > > Redu > > > > ceDriver.java:94) > > > > at > > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPact > > > > Task > > > > .java:466) > > > > ... 3 more > > > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > > > > terminated due to an exception: Buffer re-ordering: expected > > > > buffer with sequence number 17841, but received 17842. > > > > at > > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$Threa > > > > dBas > > > > e.run(UnilateralSortMerger.java:784) > > > > Caused by: > > > > > > > > > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel$BufferReorderingException: > > > > Buffer re-ordering: expected buffer with sequence number 17841, > > > > but received 17842. > > > > at > > > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInput > > > > Chan > > > > nel.onBuffer(RemoteInputChannel.java:253) > > > > at > > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHa > > > > ndle > > > > r.decodeBufferOrEvent(PartitionRequestClientHandler.java:279) > > > > at > > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHa > > > > ndle > > > > r.decodeMsg(PartitionRequestClientHandler.java:214) > > > > at > > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHa > > > > ndle > > > > r.channelRead(PartitionRequestClientHandler.java:158) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(A > > > > bstr > > > > actChannelHandlerContext.java:339) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abs > > > > trac > > > > tChannelHandlerContext.java:324) > > > > at > > > > io.netty.handler.codec.MessageToMessageDecoder.channelRead(Message > > > > ToMe > > > > ssageDecoder.java:103) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(A > > > > bstr > > > > actChannelHandlerContext.java:339) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abs > > > > trac > > > > tChannelHandlerContext.java:324) > > > > at > > > > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMess > > > > ageD > > > > ecoder.java:242) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(A > > > > bstr > > > > actChannelHandlerContext.java:339) > > > > at > > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abs > > > > trac > > > > tChannelHandlerContext.java:324) > > > > at > > > > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultCha > > > > nnel > > > > Pipeline.java:847) > > > > at > > > > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(Abs > > > > trac > > > > tNioByteChannel.java:131) > > > > at > > > > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop. > > > > java > > > > :511) > > > > at > > > > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(Nio > > > > Even > > > > tLoop.java:468) > > > > at > > > > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop > > > > .jav > > > > a:382) at > > > > io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > > > > at > > > > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThr > > > > eadE > > > > ventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) > > > > > > > > > > > > --- > > > > Sebastian Kruse > > > > Doktorand am Fachbereich Information Systems Group > > > > Hasso-Plattner-Institut an der Universität Potsdam > > > > Prof.-Dr.-Helmert-Str. 2-3, D-14482 Potsdam Tel +49 331 5509 240 > > > > Amtsgericht Potsdam, HRB 12184 > > > > Geschäftsführung: Prof. Dr. Christoph Meinel > > > > > > > > > > > > > > > > > >