Hey Sebastian, would you mind sharing the code and dataset with me (privately would work fine). I want to try to reproduce this as well.
– Ufuk On Wed, Jun 3, 2015 at 11:07 AM, Kruse, Sebastian <sebastian.kr...@hpi.de> wrote: > I am currently using 0.9-SNAPSHOT. All the non-jar files are from an older > build, but I recently manually updated the flink-dist.jar with commit > d163a817fa2e330e86384d0bbcd104f051a6fb48. > > Our setup consists of 10 workers and a master, all interconnected via a > switch (100 Mbit, I think). The data set is an NTriple file of about 8 GB, > however, intermediate datasets might be much larger. However, for smaller > datasets, I could not observe this problem, yet. > > Also, during the failure there are a lot of concurrent shuffles ongoing > [1,2]. Additionally, it might be interesting that in the affected jobs > either this exception occurs or another one that looks like a network > disruption. [3] So, it might well be, that our setup suffers from > occasional network errors, especially during high network load - but that’s > just a plain guess. > > Regarding the reproducibility, I can only say right now, that this error > occurred twice. I will re-run the jobs and see, if the errors can be > observed again and let you know. > > Cheers, > Sebastian > > [1] > https://owncloud.hpi.de/public.php?service=files&t=89dcba7ce63ed053331f5099cad64704 > [2] > https://owncloud.hpi.de/public.php?service=files&t=1b16d66d6b9e41c68bf598309d57b7b1 > [3] Stack trace: tenem18.hpi.uni-potsdam.de > Error: java.lang.Exception: The data preparation for task 'CHAIN > GroupReduce (GroupReduce at > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPlan(AllAtOnceTraversalStrategy.scala:56)) > -> Filter (Filter at > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPlan(AllAtOnceTraversalStrategy.scala:68)) > -> FlatMap (FlatMap at > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(TraversalStrategy.scala:46)) > -> Map (Map at > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' > , caused an error: Error obtaining the sorted input: Thread 'SortMerger > Reading Thread' terminated due to an exception: null > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:471) > at > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > Thread 'SortMerger Reading Thread' terminated due to an exception: null > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:607) > at > org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPactTask.java:1145) > at > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94) > at > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:466) > ... 3 more > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > terminated due to an exception: null > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:784) > Caused by: java.io.EOFException > at > org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:290) > at org.apache.flink.types.StringValue.readString(StringValue.java:741) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:68) > at > org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:28) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:102) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:29) > at > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.deserialize(GenericArraySerializer.java:123) > at > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.deserialize(GenericArraySerializer.java:33) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:102) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:95) > at > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize(CaseClassSerializer.scala:29) > at > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.read(ReusingDeserializationDelegate.java:57) > at > org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:133) > at > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:64) > at > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:34) > at > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:59) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ReadingThread.go(UnilateralSortMerger.java:1020) > at > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:781) > > > -----Original Message----- > From: Ufuk Celebi [mailto:u...@apache.org] > Sent: Mittwoch, 3. Juni 2015 10:33 > To: dev@flink.apache.org > Subject: Re: Buffer re-ordering problem > > This is a critical bug. > > - which version are you using? If snapshot, which commit? > - what is your setup? Number of machines, datset etc? > - is it reproducible? > > On Wednesday, June 3, 2015, Kruse, Sebastian <sebastian.kr...@hpi.de> > wrote: > > > Hi everyone, > > > > I had some jobs running over the night and in two of them after about > > half an hour the following exception occurred. Do you know why this > happens? > > > > Thanks, > > Sebastian > > > > tenem16.hpi.uni-potsdam.de > > Error: java.lang.Exception: The data preparation for task 'CHAIN > > GroupReduce (GroupReduce at > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl > > an(AllAtOnceTraversalStrategy.scala:56)) > > -> Filter (Filter at > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlinkPl > > an(AllAtOnceTraversalStrategy.scala:68)) > > -> FlatMap (FlatMap at > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(Traver > > salStrategy.scala:46)) > > -> Map (Map at > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))' > > , caused an error: Error obtaining the sorted input: Thread > > 'SortMerger Reading Thread' terminated due to an exception: Buffer > re-ordering: > > expected buffer with sequence number 17841, but received 17842. > > at > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask > > .java:471) > > at > > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactT > > ask.java:362) at > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.lang.RuntimeException: Error obtaining the sorted input: > > Thread 'SortMerger Reading Thread' terminated due to an exception: > > Buffer > > re-ordering: expected buffer with sequence number 17841, but received > 17842. > > at > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterat > > or(UnilateralSortMerger.java:607) > > at > > org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularPac > > tTask.java:1145) > > at > > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupRedu > > ceDriver.java:94) > > at > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask > > .java:466) > > ... 3 more > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread' > > terminated due to an exception: Buffer re-ordering: expected buffer > > with sequence number 17841, but received 17842. > > at > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBas > > e.run(UnilateralSortMerger.java:784) > > Caused by: > > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel$BufferReorderingException: > > Buffer re-ordering: expected buffer with sequence number 17841, but > > received 17842. > > at > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChan > > nel.onBuffer(RemoteInputChannel.java:253) > > at > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandle > > r.decodeBufferOrEvent(PartitionRequestClientHandler.java:279) > > at > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandle > > r.decodeMsg(PartitionRequestClientHandler.java:214) > > at > > org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandle > > r.channelRead(PartitionRequestClientHandler.java:158) > > at > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstr > > actChannelHandlerContext.java:339) > > at > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstrac > > tChannelHandlerContext.java:324) > > at > > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMe > > ssageDecoder.java:103) > > at > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstr > > actChannelHandlerContext.java:339) > > at > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstrac > > tChannelHandlerContext.java:324) > > at > > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageD > > ecoder.java:242) > > at > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Abstr > > actChannelHandlerContext.java:339) > > at > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Abstrac > > tChannelHandlerContext.java:324) > > at > > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannel > > Pipeline.java:847) > > at > > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(Abstrac > > tNioByteChannel.java:131) > > at > > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java > > :511) > > at > > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEven > > tLoop.java:468) > > at > > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.jav > > a:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > > at > > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadE > > ventExecutor.java:111) at java.lang.Thread.run(Thread.java:745) > > > > > > --- > > Sebastian Kruse > > Doktorand am Fachbereich Information Systems Group > > Hasso-Plattner-Institut an der Universität Potsdam > > Prof.-Dr.-Helmert-Str. 2-3, D-14482 Potsdam Tel +49 331 5509 240 > > Amtsgericht Potsdam, HRB 12184 > > Geschäftsführung: Prof. Dr. Christoph Meinel > > > > > > >