Great to hear. On Wed, Feb 11, 2015 at 12:29 PM, Chesnay Schepler <c.schep...@web.de> wrote:
> Works now :) Thank you for your help. > > > On 11.02.2015 11:39, Till Rohrmann wrote: > >> I found the error. Due to some refactoring, a wrong message was sent to >> the >> JobManager in the JobManagerInfoServlet.java. I pushed a fix. Could you >> try >> it out again? >> >> On Wed, Feb 11, 2015 at 11:34 AM, Till Rohrmann <till.rohrm...@gmail.com> >> wrote: >> >> Could you check the rebasing because it seems as if the web server is now >>> sending RequestArchivedJobs messages to the JobManager which should not >>> happen. These messages should go directly to the MemoryArchivist. The >>> corresponding file is JobManagerInfoServlet.java, I think. >>> >>> On Wed, Feb 11, 2015 at 11:11 AM, Chesnay Schepler < >>> chesnay.schep...@fu-berlin.de> wrote: >>> >>> I just tried Till's fix, rebased to the latest master and got a whole >>>> lot >>>> of these exceptions right away: >>>> >>>> java.lang.Exception: The slot in which the task was scheduled has been >>>> killed (probably loss of TaskManager). >>>> at org.apache.flink.runtime.instance.SimpleSlot.cancel( >>>> SimpleSlot.java:98) >>>> at org.apache.flink.runtime.jobmanager.scheduler. >>>> SlotSharingGroupAssignment.releaseSimpleSlot( >>>> SlotSharingGroupAssignment. >>>> java:320) >>>> at org.apache.flink.runtime.jobmanager.scheduler. >>>> SlotSharingGroupAssignment.releaseSharedSlot( >>>> SlotSharingGroupAssignment. >>>> java:304) >>>> at org.apache.flink.runtime.instance.SharedSlot. >>>> releaseSlot(SharedSlot.java:106) >>>> at org.apache.flink.runtime.instance.Instance.markDead( >>>> Instance.java:148) >>>> at org.apache.flink.runtime.instance.InstanceManager. >>>> shutdown(InstanceManager.java:111) >>>> at org.apache.flink.runtime.jobmanager.JobManager. >>>> postStop(JobManager.scala:132) >>>> at org.apache.flink.runtime.jobmanager.JobManager$$ >>>> anonfun$main$1$$anon$1.org$apache$flink$runtime$ >>>> jobmanager$WithWebServer$$super$postStop(JobManager.scala:559) >>>> at org.apache.flink.runtime.jobmanager.WithWebServer$ >>>> class.postStop(WithWebServer.scala:38) >>>> at org.apache.flink.runtime.jobmanager.JobManager$$ >>>> anonfun$main$1$$anon$1.postStop(JobManager.scala:559) >>>> at akka.actor.Actor$class.preRestart(Actor.scala:533) >>>> at org.apache.flink.runtime.jobmanager.JobManager. >>>> preRestart(JobManager.scala:80) >>>> at akka.actor.Actor$class.aroundPreRestart(Actor.scala:480) >>>> at org.apache.flink.runtime.jobmanager.JobManager. >>>> aroundPreRestart(JobManager.scala:80) >>>> at akka.actor.dungeon.FaultHandling$class. >>>> faultRecreate(FaultHandling.scala:67) >>>> at akka.actor.ActorCell.faultRecreate(ActorCell.scala:369) >>>> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:459) >>>> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) >>>> at akka.dispatch.Mailbox.processAllSystemMessages( >>>> Mailbox.scala:279) >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec( >>>> ForkJoinTask.java:260) >>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. >>>> runTask(ForkJoinPool.java:1339) >>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker( >>>> ForkJoinPool.java:1979) >>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( >>>> ForkJoinWorkerThread.java:107) >>>> >>>> the following is an exempt from the jobmanager log: >>>> >>>> 10:47:13,567 ERROR akka.actor.OneForOneStrategy >>>> - Received unknown message RequestArchivedJobs >>>> java.lang.RuntimeException: Received unknown message RequestArchivedJobs >>>> at org.apache.flink.runtime.jobmanager.JobManager. >>>> unhandled(JobManager.scala:510) >>>> at akka.actor.Actor$$anonfun$aroundReceive$1.apply(Actor. >>>> scala:465) >>>> at akka.actor.Actor$$anonfun$aroundReceive$1.apply(Actor. >>>> scala:465) >>>> at scala.PartialFunction$class.applyOrElse(PartialFunction. >>>> scala:118) >>>> at org.apache.flink.runtime.ActorLogMessages$$anon$1. >>>> applyOrElse(ActorLogMessages.scala:30) >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:465) >>>> at org.apache.flink.runtime.jobmanager.JobManager. >>>> aroundReceive(JobManager.scala:80) >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:221) >>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:231) >>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec( >>>> ForkJoinTask.java:260) >>>> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue. >>>> runTask(ForkJoinPool.java:1339) >>>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker( >>>> ForkJoinPool.java:1979) >>>> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run( >>>> ForkJoinWorkerThread.java:107) >>>> 10:47:13,569 INFO org.apache.flink.runtime.jobmanager.JobManager$$ >>>> anonfun$main$1$$anon$1 >>>> - Stopping webserver. >>>> 10:47:13,620 WARN org.eclipse.jetty.util.log >>>> - /jobsInfo >>>> org.eclipse.jetty.io.RuntimeIOException: org.eclipse.jetty.io. >>>> EofException >>>> at org.eclipse.jetty.io.UncheckedPrintWriter.setError( >>>> UncheckedPrintWriter.java:107) >>>> at org.eclipse.jetty.io.UncheckedPrintWriter.write( >>>> UncheckedPrintWriter.java:280) >>>> at org.eclipse.jetty.io.UncheckedPrintWriter.write( >>>> UncheckedPrintWriter.java:295) >>>> at org.eclipse.jetty.io.UncheckedPrintWriter.print( >>>> UncheckedPrintWriter.java:460) >>>> at org.apache.flink.runtime.jobmanager.web. >>>> JobManagerInfoServlet.doGet(JobManagerInfoServlet.java:158) >>>> at javax.servlet.http.HttpServlet.service(HttpServlet.java:734) >>>> at javax.servlet.http.HttpServlet.service(HttpServlet.java:847) >>>> at org.eclipse.jetty.servlet.ServletHolder.handle( >>>> ServletHolder.java:532) >>>> at org.eclipse.jetty.servlet.ServletHandler.doHandle( >>>> ServletHandler.java:453) >>>> at org.eclipse.jetty.server.session.SessionHandler. >>>> doHandle(SessionHandler.java:227) >>>> at org.eclipse.jetty.server.handler.ContextHandler. >>>> doHandle(ContextHandler.java:965) >>>> at org.eclipse.jetty.servlet.ServletHandler.doScope( >>>> ServletHandler.java:388) >>>> at org.eclipse.jetty.server.session.SessionHandler. >>>> doScope(SessionHandler.java:187) >>>> at org.eclipse.jetty.server.handler.ContextHandler. >>>> doScope(ContextHandler.java:901) >>>> at org.eclipse.jetty.server.handler.ScopedHandler.handle( >>>> ScopedHandler.java:117) >>>> at org.eclipse.jetty.server.handler.HandlerList.handle( >>>> HandlerList.java:47) >>>> at org.eclipse.jetty.server.handler.HandlerWrapper.handle( >>>> HandlerWrapper.java:113) >>>> at org.eclipse.jetty.server.Server.handle(Server.java:352) >>>> at org.eclipse.jetty.server.HttpConnection.handleRequest( >>>> HttpConnection.java:596) >>>> at org.eclipse.jetty.server.HttpConnection$RequestHandler. >>>> headerComplete(HttpConnection.java:1048) >>>> at org.eclipse.jetty.http.HttpParser.parseNext( >>>> HttpParser.java:549) >>>> at org.eclipse.jetty.http.HttpParser.parseAvailable( >>>> HttpParser.java:211) >>>> at org.eclipse.jetty.server.HttpConnection.handle( >>>> HttpConnection.java:425) >>>> at org.eclipse.jetty.io.nio.SelectChannelEndPoint.run( >>>> SelectChannelEndPoint.java:489) >>>> at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run( >>>> QueuedThreadPool.java:436) >>>> at java.lang.Thread.run(Thread.java:745) >>>> Caused by: org.eclipse.jetty.io.EofException >>>> at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:142) >>>> at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:86) >>>> at java.io.ByteArrayOutputStream.writeTo(ByteArrayOutputStream. >>>> java:167) >>>> at org.eclipse.jetty.server.HttpWriter.write(HttpWriter.java:258) >>>> at org.eclipse.jetty.server.HttpWriter.write(HttpWriter.java:107) >>>> at org.eclipse.jetty.io.UncheckedPrintWriter.write( >>>> UncheckedPrintWriter.java:271) >>>> ... 24 more >>>> 10:47:13,623 INFO org.apache.flink.runtime.jobmanager.JobManager$$ >>>> anonfun$main$1$$anon$1 >>>> - Stopped webserver. >>>> 10:47:13,624 INFO org.apache.flink.runtime.jobmanager.JobManager$$ >>>> anonfun$main$1$$anon$1 >>>> - Stopping job manager akka://flink/user/jobmanager. >>>> 10:47:13,624 INFO org.apache.flink.runtime.jobmanager.JobManager$$ >>>> anonfun$main$1$$anon$1 >>>> - Starting job manager at akka://flink/user/jobmanager. >>>> 10:47:13,625 INFO org.apache.flink.runtime.blob.BlobServer >>>> - Started BLOB server on port 34038 >>>> 10:47:13,625 INFO org.apache.flink.runtime.blob.BlobServer >>>> - Created BLOB server storage directory >>>> /tmp/blobStore-88f5ebb0-15e2-47a6-ad56-fb2970d83ee2 >>>> 10:47:13,626 INFO org.apache.flink.runtime.jobmanager.web.WebInfoServer >>>> - Setting up web info server, using web-root directoryjar:file: >>>> ... >>>> 10:47:13,626 INFO org.apache.flink.runtime.jobmanager.JobManager$$ >>>> anonfun$main$1$$anon$1 >>>> - Started job manager. Waiting for incoming messages. >>>> 10:47:13,626 INFO org.apache.flink.runtime.jobmanager.web.WebInfoServer >>>> - Web info server will display information about flink >>>> job-manager >>>> on ... >>>> 10:47:13,627 INFO org.apache.flink.runtime.jobmanager.web.WebInfoServer >>>> - Starting web info server for JobManager on port ... >>>> 10:47:13,627 INFO org.eclipse.jetty.util.log >>>> - jetty-0.9-SNAPSHOT >>>> 10:47:13,738 INFO org.eclipse.jetty.util.log >>>> - Started SelectChannelConnector@0.0.0.0:8082 >>>> 10:47:14,032 ERROR org.apache.flink.runtime.jobmanager.JobManager$$ >>>> anonfun$main$1$$anon$1 >>>> - Cannot find execution graph for job ID 1e0c0e5a1a7a741a1182791ea597ba >>>> 7e. >>>> 10:47:14,068 ERROR org.apache.flink.runtime.jobmanager.JobManager$$ >>>> anonfun$main$1$$anon$1 >>>> - Cannot find execution graph for job ID 1e0c0e5a1a7a741a1182791ea597ba >>>> 7e. >>>> 10:47:14,069 ERROR org.apache.flink.runtime.jobmanager.JobManager$$ >>>> anonfun$main$1$$anon$1 >>>> - Cannot find execution graph for job ID 1e0c0e5a1a7a741a1182791ea597ba >>>> 7e. >>>> 10:47:14,107 ERROR org.apache.flink.runtime.jobmanager.JobManager$$ >>>> anonfun$main$1$$anon$1 >>>> - Cannot find execution graph for job ID 1e0c0e5a1a7a741a1182791ea597ba >>>> 7e. >>>> >>>> >>>> >>>> On 05.02.2015 11:09, Till Rohrmann wrote: >>>> >>>> I checked and indeed the scheduleOrUpdateConsumers method can throw an >>>>> IllegalStateException without properly handling such an exception on >>>>> the >>>>> JobManager level. >>>>> >>>>> It is a design decision of Scala not to complain about unhandled >>>>> exceptions >>>>> which are otherwise properly annotated in Java code. We should >>>>> definitely >>>>> pay attention in Scala to properly handle thrown exceptions of Java >>>>> code. >>>>> >>>>> On Thu, Feb 5, 2015 at 10:42 AM, Stephan Ewen <se...@apache.org> >>>>> wrote: >>>>> >>>>> I suspect that this is one of the cases where an exception in an >>>>> actor >>>>> >>>>>> causes the actor to die (here the job manager) >>>>>> >>>>>> On Thu, Feb 5, 2015 at 10:40 AM, Till Rohrmann <trohrm...@apache.org> >>>>>> wrote: >>>>>> >>>>>> It looks to me that the TaskManager does not receive a >>>>>> >>>>>>> ConsumerNotificationResult after having send the >>>>>>> >>>>>>> ScheduleOrUpdateConsumers >>>>>> >>>>>> message. This can either mean that something went wrong in >>>>>>> ExecutionGraph.scheduleOrUpdateConsumers method or the connection >>>>>>> was >>>>>>> disassociated for some reasons. The logs would indeed be very helpful >>>>>>> to >>>>>>> understand what happened. >>>>>>> >>>>>>> On Thu, Feb 5, 2015 at 10:36 AM, Ufuk Celebi <u...@apache.org> wrote: >>>>>>> >>>>>>> Hey Chesnay, >>>>>>> >>>>>>>> I will look into it. Can you share the complete LOGs? >>>>>>>> >>>>>>>> – Ufuk >>>>>>>> >>>>>>>> On 04 Feb 2015, at 14:49, Chesnay Schepler < >>>>>>>> >>>>>>>> chesnay.schep...@fu-berlin.de> >>>>>>> >>>>>>> wrote: >>>>>>>> >>>>>>>> Hello, >>>>>>>> >>>>>>>>> I'm trying to run python jobs with the latest master on a cluster >>>>>>>>> and >>>>>>>>> >>>>>>>>> get the following exception: >>>>>>>> >>>>>>>> Error: The program execution failed: JobManager not reachable >>>>>>>>> >>>>>>>>> anymore. >>>>>>>> >>>>>>> Terminate waiting for job answer. >>>>>>> >>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The >>>>>>>>> >>>>>>>>> program >>>>>>>> >>>>>>> execution failed: JobManager not reachable anymore. Terminate waiting >>>>>>> for >>>>>>> job answer. >>>>>>> >>>>>>>> at org.apache.flink.client.program.Client.run(Client.java:345) >>>>>>>>> at org.apache.flink.client.program.Client.run(Client. >>>>>>>>> java:304) >>>>>>>>> at org.apache.flink.client.program.Client.run(Client. >>>>>>>>> java:298) >>>>>>>>> at >>>>>>>>> >>>>>>>>> org.apache.flink.client.program.ContextEnvironment. >>>>>>>> >>>>>>> execute(ContextEnvironment.java:55) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.api.java.ExecutionEnvironment.execute( >>>>>>>> >>>>>>> ExecutionEnvironment.java:677) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.languagebinding.api.java.python.PythonPlanBinder. >>>>>>>> >>>>>>> runPlan(PythonPlanBinder.java:106) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.languagebinding.api.java. >>>>>>>> >>>>>>> python.PythonPlanBinder.main(PythonPlanBinder.java:79) >>>>>> >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>> >>>>>>>> at >>>>>>>>> >>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke( >>>>>>>> >>>>>>> NativeMethodAccessorImpl.java:57) >>>>>> >>>>>> at >>>>>>> >>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke( >>>>>>>> >>>>>>> DelegatingMethodAccessorImpl.java:43) >>>>>> >>>>>> at java.lang.reflect.Method.invoke(Method.java:606) >>>>>>> >>>>>>>> at >>>>>>>>> >>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod( >>>>>>>> >>>>>>> PackagedProgram.java:437) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.client.program.PackagedProgram. >>>>>>>> >>>>>>> invokeInteractiveModeForExecution(PackagedProgram.java:353) >>>>>> >>>>>> at org.apache.flink.client.program.Client.run(Client.java:250) >>>>>>> >>>>>>>> at >>>>>>>>> >>>>>>>>> org.apache.flink.client.CliFrontend.executeProgram( >>>>>>>> >>>>>>> CliFrontend.java:387) >>>>>> >>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend. >>>>>>> java:356) >>>>>>> >>>>>>>> at >>>>>>>>> >>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters( >>>>>>>> >>>>>>> CliFrontend.java:1066) >>>>>> >>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend. >>>>>>> java:1090) >>>>>>> >>>>>>>> In the jobmanager log file i find this exception: >>>>>>>>> >>>>>>>>> java.lang.IllegalStateException: Buffer has already been recycled. >>>>>>>>> at >>>>>>>>> >>>>>>>>> org.apache.flink.shaded.com.google.common.base. >>>>>>>> >>>>>>> Preconditions.checkState(Preconditions.java:176) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.runtime.io.network.buffer.Buffer. >>>>>>>> >>>>>>> ensureNotRecycled(Buffer.java:131) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.runtime.io.network.buffer.Buffer.setSize( >>>>>>>> >>>>>>> Buffer.java:95) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.runtime.io.network.api.serialization. >>>>>>>> >>>>>>> SpanningRecordSerializer.getCurrentBuffer( >>>>>> SpanningRecordSerializer.java:151) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.runtime.io.network.api.writer. >>>>>>>> >>>>>>> RecordWriter.clearBuffers(RecordWriter.java:158) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.runtime.operators.RegularPactTask. >>>>>>>> >>>>>>> clearWriters(RegularPactTask.java:1533) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.runtime.operators.RegularPactTask. >>>>>>>> >>>>>>> invoke(RegularPactTask.java:367) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment. >>>>>>>> >>>>>>> run(RuntimeEnvironment.java:204) >>>>>> >>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>>> the same exception is in the task manager logs, along with the >>>>>>>>> >>>>>>>>> following >>>>>>>> one: >>>>>>>> >>>>>>>> java.util.concurrent.TimeoutException: Futures timed out after >>>>>>>>> [100 >>>>>>>>> >>>>>>>>> seconds] >>>>>>>> >>>>>>>> at >>>>>>>>> >>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise. >>>>>>>> scala:219) >>>>>>>> >>>>>>>> at >>>>>>>>> >>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise. >>>>>>>> scala:223) >>>>>>>> >>>>>>>> at >>>>>>>>> >>>>>>>>> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) >>>>>>>> >>>>>>>> at >>>>>>>>> >>>>>>>>> scala.concurrent.BlockContext$DefaultBlockContext$.blockOn( >>>>>>>> >>>>>>> BlockContext.scala:53) >>>>>> >>>>>> at scala.concurrent.Await$.result(package.scala:107) >>>>>>> >>>>>>>> at >>>>>>>>> >>>>>>>>> org.apache.flink.runtime.akka.AkkaUtils$.ask(AkkaUtils.scala:265) >>>>>>>> >>>>>>>> at >>>>>>>>> >>>>>>>>> org.apache.flink.runtime.akka.AkkaUtils.ask(AkkaUtils.scala) >>>>>>>> >>>>>>> at >>>>>>> >>>>>>>> org.apache.flink.runtime.io.network.partition. >>>>>>>> >>>>>>> IntermediateResultPartition.scheduleOrUpdateConsumers( >>>>>> IntermediateResultPartition.java:247) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.runtime.io.network.partition. >>>>>>>> >>>>>>> IntermediateResultPartition.maybeNotifyConsumers( >>>>>> IntermediateResultPartition.java:240) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.runtime.io.network.partition. >>>>>>>> >>>>>>> IntermediateResultPartition.add(IntermediateResultPartition. >>>>>> java:144) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.runtime.io.network.api.writer. >>>>>>>> >>>>>>> BufferWriter.writeBuffer(BufferWriter.java:74) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit( >>>>>>>> >>>>>>> RecordWriter.java:91) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.runtime.operators.shipping. >>>>>>>> OutputCollector.collect( >>>>>>>> >>>>>>> OutputCollector.java:88) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.languagebinding.api.java. >>>>>>>> common.streaming.Receiver. >>>>>>>> >>>>>>> collectBuffer(Receiver.java:253) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.languagebinding.api.java. >>>>>>>> common.streaming.Streamer. >>>>>>>> >>>>>>> streamBufferWithoutGroups(Streamer.java:193) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.languagebinding.api.java.python.functions. >>>>>>>> >>>>>>> PythonMapPartition.mapPartition(PythonMapPartition.java:54) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.runtime.operators.MapPartitionDriver. >>>>>>>> >>>>>>> run(MapPartitionDriver.java:98) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.runtime.operators.RegularPactTask.run( >>>>>>>> >>>>>>> RegularPactTask.java:496) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.runtime.operators.RegularPactTask. >>>>>>>> >>>>>>> invoke(RegularPactTask.java:360) >>>>>> >>>>>> at >>>>>>> >>>>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment. >>>>>>>> >>>>>>> run(RuntimeEnvironment.java:204) >>>>>> >>>>>> at java.lang.Thread.run(Thread.java:745) >>>>>>> >>>>>>>> >>>>>>>>> >