Can it be the case that the threadLocal stuff in https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609 does not behave deterministically when we submit job through a Kubernetes Flink operator ? Utils also selects the factory to create the context based on either Thread local storage or a static mutable variable.
Can these be source of problems in our case ? regards. On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <ghosh.debas...@gmail.com> wrote: > ah .. Ok .. I get the Throwable part. I am using > > import org.apache.flink.streaming.api.scala._ > val env = StreamExecutionEnvironment.getExecutionEnvironment > > How can this lead to a wrong StreamExecutionEnvironment ? Any suggestion ? > > regards. > > On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <dian0511...@gmail.com> wrote: > >> Hi Debasish, >> >> As I said before, the exception is caught in [1]. It catches the >> Throwable and so it could also catch " >> OptimizerPlanEnvironment.ProgramAbortException". Regarding to the cause >> of this exception, I have the same feeling with Tison and I also think that >> the wrong StreamExecutionEnvironment is used. >> >> Regards, >> Dian >> >> [1] >> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76 >> >> 在 2019年9月23日,下午6:08,Debasish Ghosh <ghosh.debas...@gmail.com> 写道: >> >> Hi Tison - >> >> This is the code that builds the computation graph. readStream reads >> from Kafka and writeStream writes to Kafka. >> >> override def buildExecutionGraph = { >> val rides: DataStream[TaxiRide] = >> readStream(inTaxiRide) >> .filter { ride ⇒ ride.getIsStart().booleanValue } >> .keyBy("rideId") >> >> val fares: DataStream[TaxiFare] = >> readStream(inTaxiFare) >> .keyBy("rideId") >> >> val processed: DataStream[TaxiRideFare] = >> rides >> .connect(fares) >> .flatMap(new EnrichmentFunction) >> >> writeStream(out, processed) >> } >> >> I also checked that my code enters this function >> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 >> and >> then the exception is thrown. I tried to do a grep on the Flink code base >> to see where this exception is caught. If I take off the tests, I don't see >> any catch of this exception .. >> >> $ find . -name "*.java" | xargs grep >> "OptimizerPlanEnvironment.ProgramAbortException" >> ./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: >> throw new OptimizerPlanEnvironment.ProgramAbortException(); >> ./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: >> throw new OptimizerPlanEnvironment.ProgramAbortException(); >> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: >> throw new OptimizerPlanEnvironment.ProgramAbortException(); >> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: >> throw new OptimizerPlanEnvironment.ProgramAbortException(); >> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: >> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: >> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import >> org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException; >> ./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: >> @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, >> timeout = 30_000) >> >> What am I missing here ? >> >> regards. >> >> On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <wander4...@gmail.com> wrote: >> >>> Hi Debasish, >>> >>> As mentioned by Dian, it is an internal exception that should be always >>> caught by >>> Flink internally. I would suggest you share the job(abstractly). >>> Generally it is because >>> you use StreamPlanEnvironment/OptimizerPlanEnvironment directly. >>> >>> Best, >>> tison. >>> >>> >>> Austin Cawley-Edwards <austin.caw...@gmail.com> 于2019年9月23日周一 上午5:09写道: >>> >>>> Have you reached out to the FlinkK8sOperator team on Slack? They’re >>>> usually pretty active on there. >>>> >>>> Here’s the link: >>>> >>>> https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU >>>> >>>> >>>> >>>> Best, >>>> Austin >>>> >>>> On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh < >>>> ghosh.debas...@gmail.com> wrote: >>>> >>>>> The problem is I am submitting Flink jobs to Kubernetes cluster using >>>>> a Flink Operator. Hence it's difficult to debug in the traditional sense >>>>> of >>>>> the term. And all I get is the exception that I reported .. >>>>> >>>>> Caused by: >>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException >>>>> at >>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >>>>> at >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>>>> at >>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>>> >>>>> I am thinking that this exception must be coming because of some other >>>>> exceptions, which are not reported BTW. I expected a Caused By portion in >>>>> the stack trace. Any clue as to which area I should look into to debug >>>>> this. >>>>> >>>>> regards. >>>>> >>>>> On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh < >>>>> ghosh.debas...@gmail.com> wrote: >>>>> >>>>>> Thanks for the pointer .. I will try debugging. I am getting this >>>>>> exception running my application on Kubernetes using the Flink operator >>>>>> from Lyft. >>>>>> >>>>>> regards. >>>>>> >>>>>> On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <dian0511...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> This exception is used internally to get the plan of a job before >>>>>>> submitting it for execution. It's thrown with special purpose and will >>>>>>> be >>>>>>> caught internally in [1] and will not be thrown to end users usually. >>>>>>> >>>>>>> You could check the following places to find out the cause to this >>>>>>> problem: >>>>>>> 1. Check the execution environment you used >>>>>>> 2. If you can debug, set a breakpoint at[2] to see if the type of >>>>>>> the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. >>>>>>> Usually it should be. >>>>>>> >>>>>>> Regards, >>>>>>> Dian >>>>>>> >>>>>>> [1] >>>>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76 >>>>>>> [2] >>>>>>> https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 >>>>>>> >>>>>>> 在 2019年9月21日,上午4:14,Debasish Ghosh <ghosh.debas...@gmail.com> 写道: >>>>>>> >>>>>>> Hi - >>>>>>> >>>>>>> When you get an exception stack trace like this .. >>>>>>> >>>>>>> Caused by: >>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException >>>>>>> at >>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>>>>> >>>>>>> what is the recommended approach of debugging ? I mean what kind of >>>>>>> errors can potentially lead to such a stacktrace ? In my case it starts >>>>>>> from env.execute(..) but does not give any information as to what can >>>>>>> go wrong. >>>>>>> >>>>>>> Any help will be appreciated. >>>>>>> >>>>>>> regards. >>>>>>> >>>>>>> -- >>>>>>> Debasish Ghosh >>>>>>> http://manning.com/ghosh2 >>>>>>> http://manning.com/ghosh >>>>>>> >>>>>>> Twttr: @debasishg >>>>>>> Blog: http://debasishg.blogspot.com >>>>>>> Code: http://github.com/debasishg >>>>>>> >>>>>>> >>>>>>> -- >>>>>> Sent from my iPhone >>>>>> >>>>> >>>>> >>>>> -- >>>>> Debasish Ghosh >>>>> http://manning.com/ghosh2 >>>>> http://manning.com/ghosh >>>>> >>>>> Twttr: @debasishg >>>>> Blog: http://debasishg.blogspot.com >>>>> Code: http://github.com/debasishg >>>>> >>>> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> >> >> > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg