Hi tison - Please find my response below in >>.
regards. On Mon, Sep 23, 2019 at 6:20 PM Zili Chen <wander4...@gmail.com> wrote: > Hi Debasish, > > The OptimizerPlanEnvironment.ProgramAbortException should be caught at > OptimizerPlanEnvironment#getOptimizedPlan > in its catch (Throwable t) branch. > >> true but what I get is a StreamPlanEnvironment. From my code I am only doing val env = StreamExecutionEnvironment.getExecutionEnvironment. > > It should always throw a ProgramInvocationException instead of > OptimizerPlanEnvironment.ProgramAbortException if any > exception thrown in the main method of your code. > > Another important problem is how the code is executed, (set context > environment should be another flink internal operation) > but given that you submit the job via flink k8s operator it might require time > to take a look at k8s operator implementation. > >> We submit the code through Kubernetes Flink Operator which uses the REST API to submit the job to the Job Manager > > However, given we catch Throwable in the place this exception thrown, I > highly suspect whether it is executed by an official > flink release. > >> It is an official Flink release 1.9.0 > > A completed version of the code and the submission process is helpful. > Besides, what is buildExecutionGraph return type, > I think it is not ExecutionGraph in flink... > >> buildExecutionGraph is our function which returns a Unit. It's not ExecutionGraph. It builds the DataStream s by reading from Kafka and then finally writes to Kafka. > > Best, > tison. > > > Debasish Ghosh <ghosh.debas...@gmail.com> 于2019年9月23日周一 下午8:21写道: > >> This is the complete stack trace which we get from execution on >> Kubernetes using the Flink Kubernetes operator .. The boxed error comes >> from the fact that we complete a Promise with Success when it returns a >> JobExecutionResult and with Failure when we get an exception. And here we r >> getting an exception. So the real stack trace we have is the one below in >> Caused By. >> >> java.util.concurrent.ExecutionException: Boxed Error >> at scala.concurrent.impl.Promise$.resolver(Promise.scala:87) >> at >> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) >> at >> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) >> at scala.concurrent.Promise.tryFailure(Promise.scala:112) >> at scala.concurrent.Promise.tryFailure$(Promise.scala:112) >> at >> scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187) >> at >> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3(FlinkStreamlet.scala:186) >> at >> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3$adapted(FlinkStreamlet.scala:186) >> at scala.util.Failure.fold(Try.scala:240) >> at >> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:187) >> at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:153) >> at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44) >> at scala.util.Try$.apply(Try.scala:213) >> at pipelines.runner.Runner$.run(Runner.scala:43) >> at pipelines.runner.Runner$.main(Runner.scala:30) >> at pipelines.runner.Runner.main(Runner.scala) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) >> at >> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) >> at >> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) >> at >> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) >> at >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) >> at >> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: >> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException >> at >> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >> at >> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >> at >> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:320) >> at >> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$2(FlinkStreamlet.scala:184) >> at scala.util.Try$.apply(Try.scala:213) >> at >> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:184) >> ... 20 more >> >> regards. >> >> On Mon, Sep 23, 2019 at 5:36 PM Dian Fu <dian0511...@gmail.com> wrote: >> >>> Regarding to the code you pasted, personally I think nothing is wrong. >>> The problem is how it's executed. As you can see from the implementation of >>> of StreamExecutionEnvironment.getExecutionEnvironment, it may created >>> different StreamExecutionEnvironment implementations under different >>> scenarios. Could you paste the full exception stack if it exists? It's >>> difficult to figure out what's wrong with the current stack trace. >>> >>> Regards, >>> Dian >>> >>> 在 2019年9月23日,下午6:55,Debasish Ghosh <ghosh.debas...@gmail.com> 写道: >>> >>> Can it be the case that the threadLocal stuff in >>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609 >>> does >>> not behave deterministically when we submit job through a Kubernetes Flink >>> operator ? Utils also selects the factory to create the context based on >>> either Thread local storage or a static mutable variable. >>> >>> Can these be source of problems in our case ? >>> >>> regards. >>> >>> On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <ghosh.debas...@gmail.com> >>> wrote: >>> >>>> ah .. Ok .. I get the Throwable part. I am using >>>> >>>> import org.apache.flink.streaming.api.scala._ >>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>> >>>> How can this lead to a wrong StreamExecutionEnvironment ? Any >>>> suggestion ? >>>> >>>> regards. >>>> >>>> On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <dian0511...@gmail.com> wrote: >>>> >>>>> Hi Debasish, >>>>> >>>>> As I said before, the exception is caught in [1]. It catches the >>>>> Throwable and so it could also catch " >>>>> OptimizerPlanEnvironment.ProgramAbortException". Regarding to the >>>>> cause of this exception, I have the same feeling with Tison and I also >>>>> think that the wrong StreamExecutionEnvironment is used. >>>>> >>>>> Regards, >>>>> Dian >>>>> >>>>> [1] >>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76 >>>>> >>>>> 在 2019年9月23日,下午6:08,Debasish Ghosh <ghosh.debas...@gmail.com> 写道: >>>>> >>>>> Hi Tison - >>>>> >>>>> This is the code that builds the computation graph. readStream reads >>>>> from Kafka and writeStream writes to Kafka. >>>>> >>>>> override def buildExecutionGraph = { >>>>> val rides: DataStream[TaxiRide] = >>>>> readStream(inTaxiRide) >>>>> .filter { ride ⇒ ride.getIsStart().booleanValue } >>>>> .keyBy("rideId") >>>>> >>>>> val fares: DataStream[TaxiFare] = >>>>> readStream(inTaxiFare) >>>>> .keyBy("rideId") >>>>> >>>>> val processed: DataStream[TaxiRideFare] = >>>>> rides >>>>> .connect(fares) >>>>> .flatMap(new EnrichmentFunction) >>>>> >>>>> writeStream(out, processed) >>>>> } >>>>> >>>>> I also checked that my code enters this function >>>>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 >>>>> and >>>>> then the exception is thrown. I tried to do a grep on the Flink code base >>>>> to see where this exception is caught. If I take off the tests, I don't >>>>> see >>>>> any catch of this exception .. >>>>> >>>>> $ find . -name "*.java" | xargs grep >>>>> "OptimizerPlanEnvironment.ProgramAbortException" >>>>> ./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: >>>>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>>>> ./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: >>>>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: >>>>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: >>>>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: >>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: >>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import >>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException; >>>>> ./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: >>>>> @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, >>>>> timeout = 30_000) >>>>> >>>>> What am I missing here ? >>>>> >>>>> regards. >>>>> >>>>> On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <wander4...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Debasish, >>>>>> >>>>>> As mentioned by Dian, it is an internal exception that should be >>>>>> always caught by >>>>>> Flink internally. I would suggest you share the job(abstractly). >>>>>> Generally it is because >>>>>> you use StreamPlanEnvironment/OptimizerPlanEnvironment directly. >>>>>> >>>>>> Best, >>>>>> tison. >>>>>> >>>>>> >>>>>> Austin Cawley-Edwards <austin.caw...@gmail.com> 于2019年9月23日周一 >>>>>> 上午5:09写道: >>>>>> >>>>>>> Have you reached out to the FlinkK8sOperator team on Slack? They’re >>>>>>> usually pretty active on there. >>>>>>> >>>>>>> Here’s the link: >>>>>>> >>>>>>> https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU >>>>>>> >>>>>>> >>>>>>> >>>>>>> Best, >>>>>>> Austin >>>>>>> >>>>>>> On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh < >>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>> >>>>>>>> The problem is I am submitting Flink jobs to Kubernetes cluster >>>>>>>> using a Flink Operator. Hence it's difficult to debug in the >>>>>>>> traditional >>>>>>>> sense of the term. And all I get is the exception that I reported .. >>>>>>>> >>>>>>>> Caused by: >>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>>>>>>> at >>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>>>>>> >>>>>>>> I am thinking that this exception must be coming because of some >>>>>>>> other exceptions, which are not reported BTW. I expected a Caused By >>>>>>>> portion in the stack trace. Any clue as to which area I should look >>>>>>>> into to >>>>>>>> debug this. >>>>>>>> >>>>>>>> regards. >>>>>>>> >>>>>>>> On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh < >>>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Thanks for the pointer .. I will try debugging. I am getting this >>>>>>>>> exception running my application on Kubernetes using the Flink >>>>>>>>> operator >>>>>>>>> from Lyft. >>>>>>>>> >>>>>>>>> regards. >>>>>>>>> >>>>>>>>> On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <dian0511...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> This exception is used internally to get the plan of a job before >>>>>>>>>> submitting it for execution. It's thrown with special purpose and >>>>>>>>>> will be >>>>>>>>>> caught internally in [1] and will not be thrown to end users usually. >>>>>>>>>> >>>>>>>>>> You could check the following places to find out the cause to >>>>>>>>>> this problem: >>>>>>>>>> 1. Check the execution environment you used >>>>>>>>>> 2. If you can debug, set a breakpoint at[2] to see if the type of >>>>>>>>>> the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. >>>>>>>>>> Usually it should be. >>>>>>>>>> >>>>>>>>>> Regards, >>>>>>>>>> Dian >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76 >>>>>>>>>> [2] >>>>>>>>>> https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 >>>>>>>>>> >>>>>>>>>> 在 2019年9月21日,上午4:14,Debasish Ghosh <ghosh.debas...@gmail.com> 写道: >>>>>>>>>> >>>>>>>>>> Hi - >>>>>>>>>> >>>>>>>>>> When you get an exception stack trace like this .. >>>>>>>>>> >>>>>>>>>> Caused by: >>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException >>>>>>>>>> at >>>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>>>>>>>> >>>>>>>>>> what is the recommended approach of debugging ? I mean what kind >>>>>>>>>> of errors can potentially lead to such a stacktrace ? In my case it >>>>>>>>>> starts >>>>>>>>>> from env.execute(..) but does not give any information as to what can >>>>>>>>>> go wrong. >>>>>>>>>> >>>>>>>>>> Any help will be appreciated. >>>>>>>>>> >>>>>>>>>> regards. >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Debasish Ghosh >>>>>>>>>> http://manning.com/ghosh2 >>>>>>>>>> http://manning.com/ghosh >>>>>>>>>> >>>>>>>>>> Twttr: @debasishg >>>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>>> Code: http://github.com/debasishg >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>> Sent from my iPhone >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Debasish Ghosh >>>>>>>> http://manning.com/ghosh2 >>>>>>>> http://manning.com/ghosh >>>>>>>> >>>>>>>> Twttr: @debasishg >>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>> Code: http://github.com/debasishg >>>>>>>> >>>>>>> >>>>> >>>>> -- >>>>> Debasish Ghosh >>>>> http://manning.com/ghosh2 >>>>> http://manning.com/ghosh >>>>> >>>>> Twttr: @debasishg >>>>> Blog: http://debasishg.blogspot.com >>>>> Code: http://github.com/debasishg >>>>> >>>>> >>>>> >>>> >>>> -- >>>> Debasish Ghosh >>>> http://manning.com/ghosh2 >>>> http://manning.com/ghosh >>>> >>>> Twttr: @debasishg >>>> Blog: http://debasishg.blogspot.com >>>> Code: http://github.com/debasishg >>>> >>> >>> >>> -- >>> Debasish Ghosh >>> http://manning.com/ghosh2 >>> http://manning.com/ghosh >>> >>> Twttr: @debasishg >>> Blog: http://debasishg.blogspot.com >>> Code: http://github.com/debasishg >>> >>> >>> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg