This is the complete stack trace which we get from execution on Kubernetes
using the Flink Kubernetes operator .. The boxed error comes from the fact
that we complete a Promise with Success when it returns a
JobExecutionResult and with Failure when we get an exception. And here we r
getting an exception. So the real stack trace we have is the one below in
Caused By.

java.util.concurrent.ExecutionException: Boxed Error
at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
at
scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at scala.concurrent.Promise.tryFailure(Promise.scala:112)
at scala.concurrent.Promise.tryFailure$(Promise.scala:112)
at
scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187)
at
pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3(FlinkStreamlet.scala:186)
at
pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3$adapted(FlinkStreamlet.scala:186)
at scala.util.Failure.fold(Try.scala:240)
at
pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:187)
at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:153)
at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
at scala.util.Try$.apply(Try.scala:213)
at pipelines.runner.Runner$.run(Runner.scala:43)
at pipelines.runner.Runner$.main(Runner.scala:30)
at pipelines.runner.Runner.main(Runner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at
org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
at
pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:320)
at
pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$2(FlinkStreamlet.scala:184)
at scala.util.Try$.apply(Try.scala:213)
at
pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:184)
... 20 more

regards.

On Mon, Sep 23, 2019 at 5:36 PM Dian Fu <dian0511...@gmail.com> wrote:

> Regarding to the code you pasted, personally I think nothing is wrong. The
> problem is how it's executed. As you can see from the implementation of of
> StreamExecutionEnvironment.getExecutionEnvironment, it may created
> different StreamExecutionEnvironment implementations under different
> scenarios. Could you paste the full exception stack if it exists? It's
> difficult to figure out what's wrong with the current stack trace.
>
> Regards,
> Dian
>
> 在 2019年9月23日,下午6:55,Debasish Ghosh <ghosh.debas...@gmail.com> 写道:
>
> Can it be the case that the threadLocal stuff in
> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609
>  does
> not behave deterministically when we submit job through a Kubernetes Flink
> operator ? Utils also selects the factory to create the context based on
> either Thread local storage or a static mutable variable.
>
> Can these be source of problems in our case ?
>
> regards.
>
> On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
>
>> ah .. Ok .. I get the Throwable part. I am using
>>
>> import org.apache.flink.streaming.api.scala._
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>
>> How can this lead to a wrong StreamExecutionEnvironment ? Any suggestion
>> ?
>>
>> regards.
>>
>> On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <dian0511...@gmail.com> wrote:
>>
>>> Hi Debasish,
>>>
>>> As I said before, the exception is caught in [1]. It catches the
>>> Throwable and so it could also catch "
>>> OptimizerPlanEnvironment.ProgramAbortException". Regarding to the cause
>>> of this exception, I have the same feeling with Tison and I also think that
>>> the wrong StreamExecutionEnvironment is used.
>>>
>>> Regards,
>>> Dian
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76
>>>
>>> 在 2019年9月23日,下午6:08,Debasish Ghosh <ghosh.debas...@gmail.com> 写道:
>>>
>>> Hi Tison -
>>>
>>> This is the code that builds the computation graph. readStream reads
>>> from Kafka and writeStream writes to Kafka.
>>>
>>>     override def buildExecutionGraph = {
>>>       val rides: DataStream[TaxiRide] =
>>>         readStream(inTaxiRide)
>>>           .filter { ride ⇒ ride.getIsStart().booleanValue }
>>>           .keyBy("rideId")
>>>
>>>       val fares: DataStream[TaxiFare] =
>>>         readStream(inTaxiFare)
>>>           .keyBy("rideId")
>>>
>>>       val processed: DataStream[TaxiRideFare] =
>>>         rides
>>>           .connect(fares)
>>>           .flatMap(new EnrichmentFunction)
>>>
>>>       writeStream(out, processed)
>>>     }
>>>
>>> I also checked that my code enters this function
>>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57
>>>  and
>>> then the exception is thrown. I tried to do a grep on the Flink code base
>>> to see where this exception is caught. If I take off the tests, I don't see
>>> any catch of this exception ..
>>>
>>> $ find . -name "*.java" | xargs grep
>>> "OptimizerPlanEnvironment.ProgramAbortException"
>>> ./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java:
>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>> ./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java:
>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java:
>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java:
>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java:
>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java:
>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import
>>> org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
>>> ./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java:
>>> @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class,
>>> timeout = 30_000)
>>>
>>> What am I missing here ?
>>>
>>> regards.
>>>
>>> On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <wander4...@gmail.com> wrote:
>>>
>>>> Hi Debasish,
>>>>
>>>> As mentioned by Dian, it is an internal exception that should be always
>>>> caught by
>>>> Flink internally. I would suggest you share the job(abstractly).
>>>> Generally it is because
>>>> you use StreamPlanEnvironment/OptimizerPlanEnvironment directly.
>>>>
>>>> Best,
>>>> tison.
>>>>
>>>>
>>>> Austin Cawley-Edwards <austin.caw...@gmail.com> 于2019年9月23日周一 上午5:09写道:
>>>>
>>>>> Have you reached out to the FlinkK8sOperator team on Slack? They’re
>>>>> usually pretty active on there.
>>>>>
>>>>> Here’s the link:
>>>>>
>>>>> https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU
>>>>>
>>>>>
>>>>>
>>>>> Best,
>>>>> Austin
>>>>>
>>>>> On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <
>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>
>>>>>> The problem is I am submitting Flink jobs to Kubernetes cluster using
>>>>>> a Flink Operator. Hence it's difficult to debug in the traditional sense 
>>>>>> of
>>>>>> the term. And all I get is the exception that I reported ..
>>>>>>
>>>>>> Caused by:
>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>>>>> at
>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>>>>
>>>>>> I am thinking that this exception must be coming because of some
>>>>>> other exceptions, which are not reported BTW. I expected a Caused By
>>>>>> portion in the stack trace. Any clue as to which area I should look into 
>>>>>> to
>>>>>> debug this.
>>>>>>
>>>>>> regards.
>>>>>>
>>>>>> On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <
>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks for the pointer .. I will try debugging. I am getting this
>>>>>>> exception running my application on Kubernetes using the Flink operator
>>>>>>> from Lyft.
>>>>>>>
>>>>>>> regards.
>>>>>>>
>>>>>>> On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <dian0511...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> This exception is used internally to get the plan of a job before
>>>>>>>> submitting it for execution. It's thrown with special purpose and will 
>>>>>>>> be
>>>>>>>> caught internally in [1] and will not be thrown to end users usually.
>>>>>>>>
>>>>>>>> You could check the following places to find out the cause to this
>>>>>>>> problem:
>>>>>>>> 1. Check the execution environment you used
>>>>>>>> 2. If you can debug, set a breakpoint at[2] to see if the type of
>>>>>>>> the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment.
>>>>>>>> Usually it should be.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Dian
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76
>>>>>>>> [2]
>>>>>>>> https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57
>>>>>>>>
>>>>>>>> 在 2019年9月21日,上午4:14,Debasish Ghosh <ghosh.debas...@gmail.com> 写道:
>>>>>>>>
>>>>>>>> Hi -
>>>>>>>>
>>>>>>>> When you get an exception stack trace like this ..
>>>>>>>>
>>>>>>>> Caused by:
>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>>>>>>
>>>>>>>> what is the recommended approach of debugging ? I mean what kind of
>>>>>>>> errors can potentially lead to such a stacktrace ? In my case it starts
>>>>>>>> from env.execute(..) but does not give any information as to what can
>>>>>>>> go wrong.
>>>>>>>>
>>>>>>>> Any help will be appreciated.
>>>>>>>>
>>>>>>>> regards.
>>>>>>>>
>>>>>>>> --
>>>>>>>> Debasish Ghosh
>>>>>>>> http://manning.com/ghosh2
>>>>>>>> http://manning.com/ghosh
>>>>>>>>
>>>>>>>> Twttr: @debasishg
>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>> Sent from my iPhone
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Debasish Ghosh
>>>>>> http://manning.com/ghosh2
>>>>>> http://manning.com/ghosh
>>>>>>
>>>>>> Twttr: @debasishg
>>>>>> Blog: http://debasishg.blogspot.com
>>>>>> Code: http://github.com/debasishg
>>>>>>
>>>>>
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>>
>>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>
>
>

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to