Hi Sebastian! The first error that you report looks could also be a lost buffer, rather than a reordering. The second error that you post seems a serialization issue. Both can be consequences of a network stream corruption.
Would be good to figure out why the netty event loop does not encode/decode this properly in your case. BTW: I would assume we can virtually rule out TCP/kernel bugs as sources for the corruption, unless you have this setup here: http://arstechnica.com/information-technology/2015/05/the-discovery-of-apache-zookeepers-poison-packet/ Stephan On Wed, Jun 3, 2015 at 7:51 PM, Ufuk Celebi <u...@apache.org> wrote: > Hey Sebastian, > > would you mind sharing the code and dataset with me (privately would work > fine). I want to try to reproduce this as well. > > – Ufuk > > On Wed, Jun 3, 2015 at 11:07 AM, Kruse, Sebastian <sebastian.kr...@hpi.de> > wrote: > > > I am currently using 0.9-SNAPSHOT. All the non-jar files are from an > older > > build, but I recently manually updated the flink-dist.jar with commit > > d163a817fa2e330e86384d0bbcd104f051a6fb48. > > > > Our setup consists of 10 workers and a master, all interconnected via a > > switch (100 Mbit, I think). The data set is an NTriple file of about 8 > GB, > > however, intermediate datasets might be much larger. However, for smaller > > datasets, I could not observe this problem, yet. > > > > Also, during the failure there are a lot of concurrent shuffles ongoing > > [1,2]. Additionally, it might be interesting that in the affected jobs > > either this exception occurs or another one that looks like a network > > disruption. [3] So, it might well be, that our setup suffers from > > occasional network errors, especially during high network load - but > that’s > > just a plain guess. > > > > Regarding the reproducibility, I can only say right now, that this error > > occurred twice. I will re-run the jobs and see, if the errors can be > > observed again and let you know. > > > > Cheers, > > Sebastian > > > > [1] > > > https://owncloud.hpi.de/public.php?service=files&t=89dcba7ce63ed053331f5099cad64704 > > [2] > > > https://owncloud.hpi.de/public.php?service=files&t=1b16d66d6b9e41c68bf598309d57b7b1 > > [3] Stack trace: tenem18.hpi.uni-potsdam.de > > Error: java.lang.Exception: The data preparation for task 'CHAIN > > GroupReduce (GroupReduce at > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPlan(AllAtOnceTraversalStrategy.scala:56)) > > -> Filter (Filter at > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPlan(AllAtOnceTraversalStrategy.scala:68)) > > -> FlatMap (FlatMap at > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(TraversalStrategy.scala:46)) > > -> Map (Map at > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' > > , caused an error: Error obtaining the sorted input: Thread 'SortMerger > > Reading Thread' terminated due to an exception: null > > at > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471) > > at > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > > Thread 'SortMerger Reading Thread' terminated due to an exception: null > > at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607) > > at > > > org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145) > > at > > > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94) > > at > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466) > > ... 3 more > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > > terminated due to an exception: null > > at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:784) > > Caused by: java.io.EOFException > > at > > > org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:290) > > at org.apache.flink.types.StringValue.readString(StringValue.java:741) > > at > > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:68) > > at > > > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) > > at > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:102) > > at > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:29) > > at > > > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.deserialize(GenericArraySerializer.java:123) > > at > > > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.deserialize(GenericArraySerializer.java:33) > > at > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:102) > > at > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:95) > > at > > > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:29) > > at > > > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) > > at > > > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:133) > > at > > > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:64) > > at > > > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > > at > > > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > > at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1020) > > at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781) > > > > > > -----Original Message----- > > From: Ufuk Celebi [mailto:u...@apache.org] > > Sent: Mittwoch, 3. Juni 2015 10:33 > > To: dev@flink.apache.org > > Subject: Re: Buffer re-ordering problem > > > > This is a critical bug. > > > > - which version are you using? If snapshot, which commit? > > - what is your setup? Number of machines, datset etc? > > - is it reproducible? > > > > On Wednesday, June 3, 2015, Kruse, Sebastian <sebastian.kr...@hpi.de> > > wrote: > > > > > Hi everyone, > > > > > > I had some jobs running over the night and in two of them after about > > > half an hour the following exception occurred. Do you know why this > > happens? > > > > > > Thanks, > > > Sebastian > > > > > > tenem16.hpi.uni-potsdam.de > > > Error: java.lang.Exception: The data preparation for task 'CHAIN > > > GroupReduce (GroupReduce at > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl > > > an(AllAtOnceTraversalStrategy.scala:56)) > > > -> Filter (Filter at > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl > > > an(AllAtOnceTraversalStrategy.scala:68)) > > > -> FlatMap (FlatMap at > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(Traver > > > salStrategy.scala:46)) > > > -> Map (Map at > > > > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' > > > , caused an error: Error obtaining the sorted input: Thread > > > 'SortMerger Reading Thread' terminated due to an exception: Buffer > > re-ordering: > > > expected buffer with sequence number 17841, but received 17842. > > > at > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask > > > .java:471) > > > at > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactT > > > ask.java:362) at > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > > at java.lang.Thread.run(Thread.java:745) > > > Caused by: java.lang.RuntimeException: Error obtaining the sorted > input: > > > Thread 'SortMerger Reading Thread' terminated due to an exception: > > > Buffer > > > re-ordering: expected buffer with sequence number 17841, but received > > 17842. > > > at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterat > > > or(UnilateralSortMerger.java:607) > > > at > > > org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPac > > > tTask.java:1145) > > > at > > > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupRedu > > > ceDriver.java:94) > > > at > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask > > > .java:466) > > > ... 3 more > > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > > > terminated due to an exception: Buffer re-ordering: expected buffer > > > with sequence number 17841, but received 17842. > > > at > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBas > > > e.run(UnilateralSortMerger.java:784) > > > Caused by: > > > > > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel$BufferReorderingException: > > > Buffer re-ordering: expected buffer with sequence number 17841, but > > > received 17842. > > > at > > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChan > > > nel.onBuffer(RemoteInputChannel.java:253) > > > at > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandle > > > r.decodeBufferOrEvent(PartitionRequestClientHandler.java:279) > > > at > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandle > > > r.decodeMsg(PartitionRequestClientHandler.java:214) > > > at > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandle > > > r.channelRead(PartitionRequestClientHandler.java:158) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstr > > > actChannelHandlerContext.java:339) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstrac > > > tChannelHandlerContext.java:324) > > > at > > > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMe > > > ssageDecoder.java:103) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstr > > > actChannelHandlerContext.java:339) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstrac > > > tChannelHandlerContext.java:324) > > > at > > > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageD > > > ecoder.java:242) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstr > > > actChannelHandlerContext.java:339) > > > at > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstrac > > > tChannelHandlerContext.java:324) > > > at > > > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannel > > > Pipeline.java:847) > > > at > > > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(Abstrac > > > tNioByteChannel.java:131) > > > at > > > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java > > > :511) > > > at > > > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEven > > > tLoop.java:468) > > > at > > > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.jav > > > a:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > > > at > > > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadE > > > ventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) > > > > > > > > > --- > > > Sebastian Kruse > > > Doktorand am Fachbereich Information Systems Group > > > Hasso-Plattner-Institut an der Universität Potsdam > > > Prof.-Dr.-Helmert-Str. 2-3, D-14482 Potsdam Tel +49 331 5509 240 > > > Amtsgericht Potsdam, HRB 12184 > > > Geschäftsführung: Prof. Dr. Christoph Meinel > > > > > > > > > > > >