@Stephan my flink cluster setup- 5 nodes, each running 1 TaskManager. Slots
per task manager: 2-4 (I tried varying this to see if this has any impact).
Network buffers: 5k - 20k (tried different values for it).

@Ufuk: Thank you for creating a branch with checksum. I will use this build
to test the jobs and post here what I will learn.

Best,
Tarandeep

On Wed, Oct 5, 2016 at 4:53 AM, Ufuk Celebi <u...@apache.org> wrote:

> @Tarandeep and Flavio: +1 to Stephan's question.
>
> Furthermore, I've created a branch which adds a simple CRC32 checksum
> calculation over the network buffer content here:
> https://github.com/uce/flink/tree/checksum
>
> It would be great if you could run your job with a build from this
> branch. It's based on the current 1.1. release branch. If you need
> help building and running from this branch, feel free to ping me.
>
> git clone https://github.com/uce/flink.git flink-uce
> cd flink-uce
> git checkout -b checksum origin/checksum
> mvn clean install -DskipTests
>
> The build binary distro is found in
> flink-dist/target/flink-1.1-SNAPSHOT-bin/flink-1.1-SNAPSHOT. Just copy
> your config files to the conf dir there and start Flink as usual in
> that directory.
>
> If the checksums don't match the job should fail with an Exception. If
> this happens, it is likely that the problems are caused by data
> corruption on the network layer. If not, it's more likely that there
> is something off with the Kryo serializers.
>
> (You can also follow this guide depending on your Hadoop requirements:
> https://ci.apache.org/projects/flink/flink-docs-
> master/setup/building.html#hadoop-versions)
>
>
> On Tue, Oct 4, 2016 at 7:10 PM, Stephan Ewen <se...@apache.org> wrote:
> > It would be great to know if this only occurs in setups where Netty in
> > involved (more than one TaskManager and, and at least one
> shuffle/rebalance)
> > or also in one-taskmanager setups (which have local channels only).
> >
> > Stephan
> >
> > On Tue, Oct 4, 2016 at 11:49 AM, Till Rohrmann <trohrm...@apache.org>
> wrote:
> >>
> >> Hi Tarandeep,
> >>
> >> it would be great if you could compile a small example data set with
> which
> >> you're able to reproduce your problem. We could then try to debug it. It
> >> would also be interesting to know whether Flavio's bug solves your
> problem
> >> or not.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Mon, Oct 3, 2016 at 10:26 PM, Flavio Pompermaier <
> pomperma...@okkam.it>
> >> wrote:
> >>>
> >>> I think you're running into the same exception I face sometimes..I've
> >>> opened a jira for it [1]. Could you please try to apply that patch and
> see
> >>> if things get better?
> >>>
> >>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/FLINK-4719
> >>>
> >>> Best,
> >>> Flavio
> >>>
> >>>
> >>> On 3 Oct 2016 22:09, "Tarandeep Singh" <tarand...@gmail.com> wrote:
> >>>>
> >>>> Now, when I ran it again (with lower task slots per machine) I got a
> >>>> different error-
> >>>>
> >>>> org.apache.flink.client.program.ProgramInvocationException: The
> program
> >>>> execution failed: Job execution failed.
> >>>>     at
> >>>> org.apache.flink.client.program.Client.runBlocking(Client.java:381)
> >>>>     at
> >>>> org.apache.flink.client.program.Client.runBlocking(Client.java:355)
> >>>>     at
> >>>> org.apache.flink.client.program.Client.runBlocking(Client.java:315)
> >>>>     at
> >>>> org.apache.flink.client.program.ContextEnvironment.
> execute(ContextEnvironment.java:60)
> >>>>     at
> >>>> org.apache.flink.api.java.ExecutionEnvironment.execute(
> ExecutionEnvironment.java:855)
> >>>>     ....
> >>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>>>     at
> >>>> sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> >>>>     at
> >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> >>>>     at java.lang.reflect.Method.invoke(Method.java:498)
> >>>>     at
> >>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(
> PackagedProgram.java:505)
> >>>>     at
> >>>> org.apache.flink.client.program.PackagedProgram.
> invokeInteractiveModeForExecution(PackagedProgram.java:403)
> >>>>     at
> >>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> >>>>     at
> >>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(
> CliFrontend.java:866)
> >>>>     at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> >>>>     at
> >>>> org.apache.flink.client.CliFrontend.parseParameters(
> CliFrontend.java:1189)
> >>>>     at org.apache.flink.client.CliFrontend.main(CliFrontend.
> java:1239)
> >>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> >>>> execution failed.
> >>>>     at
> >>>> org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$
> mcV$sp(JobManager.scala:714)
> >>>>     at
> >>>> org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
> >>>>     at
> >>>> org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:660)
> >>>>     at
> >>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.
> liftedTree1$1(Future.scala:24)
> >>>>     at
> >>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> Future.scala:24)
> >>>>     at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
> >>>>     at
> >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> AbstractDispatcher.scala:401)
> >>>>     at
> >>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >>>>     at
> >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> pollAndExecAll(ForkJoinPool.java:1253)
> >>>>     at
> >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1346)
> >>>>     at
> >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> >>>>     at
> >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> >>>> Caused by: com.esotericsoftware.kryo.KryoException: Unable to find
> >>>> class: javaec40-d994-yteBuffer
> >>>>     at
> >>>> com.esotericsoftware.kryo.util.DefaultClassResolver.
> readName(DefaultClassResolver.java:138)
> >>>>     at
> >>>> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> DefaultClassResolver.java:115)
> >>>>     at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> >>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.
> java:752)
> >>>>     at
> >>>> org.apache.flink.api.java.typeutils.runtime.kryo.
> KryoSerializer.deserialize(KryoSerializer.java:228)
> >>>>     at
> >>>> org.apache.flink.api.java.typeutils.runtime.
> PojoSerializer.deserialize(PojoSerializer.java:431)
> >>>>     at
> >>>> org.apache.flink.runtime.plugable.NonReusingDeserializationDeleg
> ate.read(NonReusingDeserializationDelegate.java:55)
> >>>>     at
> >>>> org.apache.flink.runtime.io.network.api.serialization.
> SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(
> SpillingAdaptiveSpanningRecordDeserializer.java:124)
> >>>>     at
> >>>> org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.
> getNextRecord(AbstractRecordReader.java:65)
> >>>>     at
> >>>> org.apache.flink.runtime.io.network.api.reader.
> MutableRecordReader.next(MutableRecordReader.java:34)
> >>>>     at
> >>>> org.apache.flink.runtime.operators.util.ReaderIterator.
> next(ReaderIterator.java:73)
> >>>>     at
> >>>> org.apache.flink.runtime.operators.FlatMapDriver.run(
> FlatMapDriver.java:101)
> >>>>     at
> >>>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:480)
> >>>>     at
> >>>> org.apache.flink.runtime.operators.BatchTask.invoke(
> BatchTask.java:345)
> >>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
> >>>>     at java.lang.Thread.run(Thread.java:745)
> >>>> Caused by: java.lang.ClassNotFoundException: javaec40-d994-yteBuffer
> >>>>     at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> >>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> >>>>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >>>>     at java.lang.Class.forName0(Native Method)
> >>>>     at java.lang.Class.forName(Class.java:348)
> >>>>     at
> >>>> com.esotericsoftware.kryo.util.DefaultClassResolver.
> readName(DefaultClassResolver.java:136)
> >>>>     ... 15 more
> >>>>
> >>>>
> >>>> -Tarandeep
> >>>>
> >>>> On Mon, Oct 3, 2016 at 12:49 PM, Tarandeep Singh <tarand...@gmail.com
> >
> >>>> wrote:
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> I am using flink-1.0.0 and running ETL (batch) jobs on it for quite
> >>>>> some time (few months) without any problem. Starting this morning, I
> have
> >>>>> been getting errors like these-
> >>>>>
> >>>>> "Received an event in channel 3 while still having data from a
> record.
> >>>>> This indicates broken serialization logic. If you are using custom
> >>>>> serialization code (Writable or Value types), check their
> serialization
> >>>>> routines. In the case of Kryo, check the respective Kryo serializer."
> >>>>>
> >>>>> My datasets are in Avro format. The only thing that changed today is
> -
> >>>>> I moved to smaller cluster. When I first ran the ETL jobs, they
> failed with
> >>>>> this error-
> >>>>>
> >>>>> "Insufficient number of network buffers: required 20, but only 10
> >>>>> available. The total number of network buffers is currently set to
> 20000.
> >>>>> You can increase this number by setting the configuration key
> >>>>> 'taskmanager.network.numberOfBuffers'"
> >>>>>
> >>>>> I increased number of buffers to 30k. Also decreased number of slots
> >>>>> per machine as I noticed load per machine was too high. After that,
> when I
> >>>>> restart the jobs, I am getting the above error.
> >>>>>
> >>>>> Can someone please help me debug it?
> >>>>>
> >>>>> Thank you,
> >>>>> Tarandeep
> >>>>
> >>>>
> >>
> >
>

Reply via email to