Hi everyone,

I just wanted to let you know, that after quite a few more runs on different 
machines the buffer reordering problem did not happen to appear again. I don't 
know what caused the problem, maybe it really was due to the potentially 
illegal UTF-16 code within strings. If the error should ever happen again, I 
will let you know but otherwise it might just have been some strange edge-case 
side effect. :)

Cheers,
Sebastian

-----Original Message-----
From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf Of Stephan 
Ewen
Sent: Donnerstag, 4. Juni 2015 13:13
To: dev@flink.apache.org
Subject: Re: Buffer re-ordering problem

Thanks for helping us debug this.

You can start many taskmanagers in one JVM, by using the LocalMiniCluster.
Have a look at this (manually triggered) test, which runs 100 TaskManagers in 
one JVM:

https://github.com/apache/flink/blob/master/flink-tests/src/test/java/org/apache/flink/test/manual/NotSoMiniClusterIterations.java

The data type serialization should have no impact on the network stream, as 
that is buffer-oriented and has no understanding or notion of the types inside. 
That transition comes later in the readers/writers.
It is never impossible for a bug to be in there such that the serialization 
affects the buffer transport. Thanks for the pointer, we will definitely look 
into that.

On Thu, Jun 4, 2015 at 1:07 PM, Kruse, Sebastian <sebastian.kr...@hpi.de>
wrote:

> Thanks for your feedback. I am neither running IPSec nor the 
> aesni-intel module.
>
> So far, I could not reproduce the reordering issue. I also have 
> detected that my code might have created String objects with invalid 
> UTF16 content in exactly those jobs that suffered from the reordering. 
> I wanted to use binary data as a key, so I put it into a char array 
> and wrapped it in a String, hoping that would do the trick. Maybe, 
> this has caused some error in the String (de-)serialization with 
> further consequences, e.g, skipping of a buffer. Ever since I have 
> encoded the binary data in Base64, the reordering error did not pop up 
> again. I will do some more runs to verify this.
>
> However, the other problem with the serialization exceptions should 
> not be affected of this. To see if network stream corruption is an 
> issue, I also perform some runs now on a larger single machine with a 
> single task manager, so that no network communication is involved. 
> Serialization should take place anyway, right? Is there a way, to run 
> a second task manager on the same machine within the same OS?
>
> Cheers,
> Sebastian
>
> -----Original Message-----
> From: ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] On Behalf 
> Of Stephan Ewen
> Sent: Mittwoch, 3. Juni 2015 20:14
> To: dev@flink.apache.org
> Subject: Re: Buffer re-ordering problem
>
> Hi Sebastian!
>
> The first error that you report looks could also be a lost buffer, 
> rather than a reordering.
> The second error that you post seems a serialization issue.
> Both can be consequences of a network stream corruption.
>
> Would be good to figure out why the netty event loop does not 
> encode/decode this properly in your case.
>
> BTW: I would assume we can virtually rule out TCP/kernel bugs as 
> sources for the corruption, unless you have this setup here:
>
> http://arstechnica.com/information-technology/2015/05/the-discovery-of
> -apache-zookeepers-poison-packet/
>
> Stephan
>
>
> On Wed, Jun 3, 2015 at 7:51 PM, Ufuk Celebi <u...@apache.org> wrote:
>
> > Hey Sebastian,
> >
> > would you mind sharing the code and dataset with me (privately would 
> > work fine). I want to try to reproduce this as well.
> >
> > – Ufuk
> >
> > On Wed, Jun 3, 2015 at 11:07 AM, Kruse, Sebastian 
> > <sebastian.kr...@hpi.de>
> > wrote:
> >
> > > I am currently using 0.9-SNAPSHOT. All the non-jar files are from 
> > > an
> > older
> > > build, but I recently manually updated the flink-dist.jar with 
> > > commit d163a817fa2e330e86384d0bbcd104f051a6fb48.
> > >
> > > Our setup consists of 10 workers and a master, all interconnected 
> > > via a switch (100 Mbit, I think). The data set is an NTriple file 
> > > of about 8
> > GB,
> > > however, intermediate datasets might be much larger. However, for 
> > > smaller datasets, I could not observe this problem, yet.
> > >
> > > Also, during the failure there are a lot of concurrent shuffles 
> > > ongoing [1,2]. Additionally, it might be interesting that in the 
> > > affected jobs either this exception occurs or another one that 
> > > looks like a network disruption. [3] So, it might well be, that 
> > > our setup suffers from occasional network errors, especially 
> > > during high network load - but
> > that’s
> > > just a plain guess.
> > >
> > > Regarding the reproducibility, I can only say right now, that this 
> > > error occurred twice. I will re-run the jobs and see, if the 
> > > errors can be observed again and let you know.
> > >
> > > Cheers,
> > > Sebastian
> > >
> > > [1]
> > >
> > https://owncloud.hpi.de/public.php?service=files&t=89dcba7ce63ed0533
> > 31
> > f5099cad64704
> > > [2]
> > >
> > https://owncloud.hpi.de/public.php?service=files&t=1b16d66d6b9e41c68
> > bf
> > 598309d57b7b1
> > > [3] Stack trace: tenem18.hpi.uni-potsdam.de
> > > Error: java.lang.Exception: The data preparation for task 'CHAIN 
> > > GroupReduce (GroupReduce at
> > >
> > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlink
> > Pl
> > an(AllAtOnceTraversalStrategy.scala:56))
> > > -> Filter (Filter at
> > >
> > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceFlink
> > Pl
> > an(AllAtOnceTraversalStrategy.scala:68))
> > > -> FlatMap (FlatMap at
> > >
> > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(Trav
> > er
> > salStrategy.scala:46))
> > > -> Map (Map at
> > >
> >
> de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))'
> > > , caused an error: Error obtaining the sorted input: Thread 
> > > 'SortMerger Reading Thread' terminated due to an exception: null 
> > > at
> > >
> > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTa
> > sk
> > .java:471)
> > > at
> > >
> > org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPac
> > tT
> > ask.java:362)
> > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > at java.lang.Thread.run(Thread.java:745)
> > > Caused by: java.lang.RuntimeException: Error obtaining the sorted
> input:
> > > Thread 'SortMerger Reading Thread' terminated due to an exception:
> > > null at
> > >
> > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIter
> > at
> > or(UnilateralSortMerger.java:607)
> > > at
> > >
> > org.apache.flink.runtime.operators.RegularPactTask.getInput(RegularP
> > ac
> > tTask.java:1145)
> > > at
> > >
> > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupRe
> > du
> > ceDriver.java:94)
> > > at
> > >
> > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTa
> > sk
> > .java:466)
> > > ... 3 more
> > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> > > terminated due to an exception: null at
> > >
> > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadB
> > as
> > e.run(UnilateralSortMerger.java:784)
> > > Caused by: java.io.EOFException
> > > at
> > >
> > org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte
> > (D
> > ataInputDeserializer.java:290)
> > > at
> > > org.apache.flink.types.StringValue.readString(StringValue.java:741
> > > )
> > > at
> > >
> > org.apache.flink.api.common.typeutils.base.StringSerializer.deserial
> > iz
> > e(StringSerializer.java:68)
> > > at
> > >
> > org.apache.flink.api.common.typeutils.base.StringSerializer.deserial
> > iz
> > e(StringSerializer.java:28)
> > > at
> > >
> > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize
> > (C
> > aseClassSerializer.scala:102)
> > > at
> > >
> > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize
> > (C
> > aseClassSerializer.scala:29)
> > > at
> > >
> > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.de
> > se
> > rialize(GenericArraySerializer.java:123)
> > > at
> > >
> > org.apache.flink.api.common.typeutils.base.GenericArraySerializer.de
> > se
> > rialize(GenericArraySerializer.java:33)
> > > at
> > >
> > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize
> > (C
> > aseClassSerializer.scala:102)
> > > at
> > >
> > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize
> > (C
> > aseClassSerializer.scala:95)
> > > at
> > >
> > org.apache.flink.api.scala.typeutils.CaseClassSerializer.deserialize
> > (C
> > aseClassSerializer.scala:29)
> > > at
> > >
> > org.apache.flink.runtime.plugable.ReusingDeserializationDelegate.rea
> > d(
> > ReusingDeserializationDelegate.java:57)
> > > at
> > >
> > org.apache.flink.runtime.io.network.api.serialization.SpillingAdapti
> > ve 
> > SpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRec
> > or
> > dDeserializer.java:133)
> > > at
> > >
> > org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.
> > ge
> > tNextRecord(AbstractRecordReader.java:64)
> > > at
> > >
> > org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.n
> > ex
> > t(MutableRecordReader.java:34)
> > > at
> > >
> > org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIt
> > er
> > ator.java:59)
> > > at
> > >
> > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$Reading
> > Th
> > read.go(UnilateralSortMerger.java:1020)
> > > at
> > >
> > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadB
> > as
> > e.run(UnilateralSortMerger.java:781)
> > >
> > >
> > > -----Original Message-----
> > > From: Ufuk Celebi [mailto:u...@apache.org]
> > > Sent: Mittwoch, 3. Juni 2015 10:33
> > > To: dev@flink.apache.org
> > > Subject: Re: Buffer re-ordering problem
> > >
> > > This is a critical bug.
> > >
> > > - which version are you using? If snapshot, which commit?
> > > - what is your setup? Number of machines, datset etc?
> > > - is it reproducible?
> > >
> > > On Wednesday, June 3, 2015, Kruse, Sebastian 
> > > <sebastian.kr...@hpi.de>
> > > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > I had some jobs running over the night and in two of them after 
> > > > about half an hour the following exception occurred. Do you know 
> > > > why this
> > > happens?
> > > >
> > > > Thanks,
> > > > Sebastian
> > > >
> > > > tenem16.hpi.uni-potsdam.de
> > > > Error: java.lang.Exception: The data preparation for task 'CHAIN 
> > > > GroupReduce (GroupReduce at 
> > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceF
> > > > li
> > > > nkPl
> > > > an(AllAtOnceTraversalStrategy.scala:56))
> > > > -> Filter (Filter at
> > > > de.hpi.isg.sodap.rdfind.plan.AllAtOnceTraversalStrategy.enhanceF
> > > > li
> > > > nkPl
> > > > an(AllAtOnceTraversalStrategy.scala:68))
> > > > -> FlatMap (FlatMap at
> > > > de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$$anonfun$1.apply(
> > > > Tr
> > > > aver
> > > > salStrategy.scala:46))
> > > > -> Map (Map at
> > > >
> > >
> >
> de.hpi.isg.sodap.rdfind.plan.TraversalStrategy$class.splitAndCleanCindSets(TraversalStrategy.scala:58))'
> > > > , caused an error: Error obtaining the sorted input: Thread 
> > > > 'SortMerger Reading Thread' terminated due to an exception: 
> > > > Buffer
> > > re-ordering:
> > > > expected buffer with sequence number 17841, but received 17842.
> > > > at
> > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPa
> > > > ct
> > > > Task
> > > > .java:471)
> > > > at
> > > > org.apache.flink.runtime.operators.RegularPactTask.invoke(Regula
> > > > rP
> > > > actT
> > > > ask.java:362) at
> > > > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> > > > at java.lang.Thread.run(Thread.java:745)
> > > > Caused by: java.lang.RuntimeException: Error obtaining the 
> > > > sorted
> > input:
> > > > Thread 'SortMerger Reading Thread' terminated due to an exception:
> > > > Buffer
> > > > re-ordering: expected buffer with sequence number 17841, but 
> > > > received
> > > 17842.
> > > > at
> > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger.get
> > > > It
> > > > erat
> > > > or(UnilateralSortMerger.java:607) at 
> > > > org.apache.flink.runtime.operators.RegularPactTask.getInput(Regu
> > > > la
> > > > rPac
> > > > tTask.java:1145)
> > > > at
> > > > org.apache.flink.runtime.operators.GroupReduceDriver.prepare(Gro
> > > > up
> > > > Redu
> > > > ceDriver.java:94)
> > > > at
> > > > org.apache.flink.runtime.operators.RegularPactTask.run(RegularPa
> > > > ct
> > > > Task
> > > > .java:466)
> > > > ... 3 more
> > > > Caused by: java.io.IOException: Thread 'SortMerger Reading Thread'
> > > > terminated due to an exception: Buffer re-ordering: expected 
> > > > buffer with sequence number 17841, but received 17842.
> > > > at
> > > > org.apache.flink.runtime.operators.sort.UnilateralSortMerger$Thr
> > > > ea
> > > > dBas
> > > > e.run(UnilateralSortMerger.java:784)
> > > > Caused by:
> > > >
> > >
> >
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel$BufferReorderingException:
> > > > Buffer re-ordering: expected buffer with sequence number 17841, 
> > > > but received 17842.
> > > > at
> > > > org.apache.flink.runtime.io.network.partition.consumer.RemoteInp
> > > > ut
> > > > Chan
> > > > nel.onBuffer(RemoteInputChannel.java:253)
> > > > at
> > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClient
> > > > Ha
> > > > ndle
> > > > r.decodeBufferOrEvent(PartitionRequestClientHandler.java:279)
> > > > at
> > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClient
> > > > Ha
> > > > ndle
> > > > r.decodeMsg(PartitionRequestClientHandler.java:214)
> > > > at
> > > > org.apache.flink.runtime.io.network.netty.PartitionRequestClient
> > > > Ha
> > > > ndle
> > > > r.channelRead(PartitionRequestClientHandler.java:158)
> > > > at
> > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead
> > > > (A
> > > > bstr
> > > > actChannelHandlerContext.java:339)
> > > > at
> > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(A
> > > > bs
> > > > trac
> > > > tChannelHandlerContext.java:324) at 
> > > > io.netty.handler.codec.MessageToMessageDecoder.channelRead(Messa
> > > > ge
> > > > ToMe
> > > > ssageDecoder.java:103)
> > > > at
> > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead
> > > > (A
> > > > bstr
> > > > actChannelHandlerContext.java:339)
> > > > at
> > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(A
> > > > bs
> > > > trac
> > > > tChannelHandlerContext.java:324) at 
> > > > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMe
> > > > ss
> > > > ageD
> > > > ecoder.java:242)
> > > > at
> > > > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead
> > > > (A
> > > > bstr
> > > > actChannelHandlerContext.java:339)
> > > > at
> > > > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(A
> > > > bs
> > > > trac
> > > > tChannelHandlerContext.java:324) at 
> > > > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultC
> > > > ha
> > > > nnel
> > > > Pipeline.java:847)
> > > > at
> > > > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(A
> > > > bs
> > > > trac
> > > > tNioByteChannel.java:131)
> > > > at
> > > > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.
> > > > java
> > > > :511)
> > > > at
> > > > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(N
> > > > io
> > > > Even
> > > > tLoop.java:468)
> > > > at
> > > > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLo
> > > > op
> > > > .jav
> > > > a:382) at
> > > > io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> > > > at
> > > > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleT
> > > > hr
> > > > eadE
> > > > ventExecutor.java:111) at java.lang.Thread.run(Thread.java:745)
> > > >
> > > >
> > > > ---
> > > > Sebastian Kruse
> > > > Doktorand am Fachbereich Information Systems Group 
> > > > Hasso-Plattner-Institut an der Universität Potsdam 
> > > > Prof.-Dr.-Helmert-Str. 2-3, D-14482 Potsdam Tel +49 331 5509 240 
> > > > Amtsgericht Potsdam, HRB 12184
> > > > Geschäftsführung: Prof. Dr. Christoph Meinel
> > > >
> > > >
> > > >
> > >
> >
>

Reply via email to