Chesnay Schepler created FLINK-22211:
----------------------------------------

             Summary: DataStream.collect() logs warnings if job is not 
initialized yet
                 Key: FLINK-22211
                 URL: https://issues.apache.org/jira/browse/FLINK-22211
             Project: Flink
          Issue Type: Sub-task
          Components: Client / Job Submission
            Reporter: Chesnay Schepler
            Assignee: Chesnay Schepler
             Fix For: 1.13.0


When using {{DataStream.collect()}} we always have an excpetion in the log for 
the first fetch attempt, before the JM is ready.
The loop retries and the program succeeds, but the exception in the log raises 
confusion about whether there is a swallowed but impactful error.

{code}
7199 [main] WARN  
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher [] - An 
exception occurs when fetching query results
java.util.concurrent.ExecutionException: 
org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException: 
Unable to get JobMasterGateway for initializing job. The requested operation is 
not available while the JobManager is initializing.
        at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
~[?:?]
        at 
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) ~[?:?]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.sendRequest(CollectResultFetcher.java:155)
 ~[classes/:?]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:126)
 [classes/:?]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
 [classes/:?]
        at 
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
 [classes/:?]
        at 
org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1320)
 [classes/:?]
        at 
org.apache.flink.streaming.api.datastream.DataStream.executeAndCollect(DataStream.java:1303)
 [classes/:?]
        at 
org.apache.flink.runtime.operators.coordination.OperatorEventSendingCheckpointITCase.testLostOperatorEventsLeadsToRecovery(OperatorEventSendingCheckpointITCase.java:88)
 [test-classes/:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:?]
        at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 ~[?:?]
        at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:?]
        at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
 [junit-4.12.jar:4.12]
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 [junit-4.12.jar:4.12]
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
 [junit-4.12.jar:4.12]
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 [junit-4.12.jar:4.12]
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) 
[junit-4.12.jar:4.12]
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
 [junit-4.12.jar:4.12]
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
 [junit-4.12.jar:4.12]
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) 
[junit-4.12.jar:4.12]
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) 
[junit-4.12.jar:4.12]
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) 
[junit-4.12.jar:4.12]
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) 
[junit-4.12.jar:4.12]
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) 
[junit-4.12.jar:4.12]
        at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) 
[junit-4.12.jar:4.12]
        at 
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) 
[junit-4.12.jar:4.12]
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363) 
[junit-4.12.jar:4.12]
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137) 
[junit-4.12.jar:4.12]
        at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
 [junit-rt.jar:?]
        at 
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
 [junit-rt.jar:?]
        at 
com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
 [junit-rt.jar:?]
        at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53) 
[junit-rt.jar:?]
Caused by: 
org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException: 
Unable to get JobMasterGateway for initializing job. The requested operation is 
not available while the JobManager is initializing.
        at 
org.apache.flink.runtime.dispatcher.Dispatcher.getJobMasterGateway(Dispatcher.java:892)
 ~[classes/:?]
        at 
org.apache.flink.runtime.dispatcher.Dispatcher.performOperationOnJobMasterGateway(Dispatcher.java:902)
 ~[classes/:?]
        at 
org.apache.flink.runtime.dispatcher.Dispatcher.deliverCoordinationRequestToCoordinator(Dispatcher.java:724)
 ~[classes/:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
~[?:?]
        at 
jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 ~[?:?]
        at 
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:?]
        at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
 ~[classes/:?]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
 ~[classes/:?]
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[classes/:?]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[classes/:?]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
~[scala-library-2.11.12.jar:?]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
~[scala-library-2.11.12.jar:?]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[scala-library-2.11.12.jar:?]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[scala-library-2.11.12.jar:?]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
        at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
~[akka-actor_2.11-2.5.21.jar:2.5.21]
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to