Can it be the case that the threadLocal stuff in
https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609
does
not behave deterministically when we submit job through a Kubernetes Flink
operator ? Utils also selects the factory to create the context based on
either Thread local storage or a static mutable variable.

Can these be source of problems in our case ?

regards.

On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <ghosh.debas...@gmail.com>
wrote:

> ah .. Ok .. I get the Throwable part. I am using
>
> import org.apache.flink.streaming.api.scala._
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> How can this lead to a wrong StreamExecutionEnvironment ? Any suggestion ?
>
> regards.
>
> On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <dian0511...@gmail.com> wrote:
>
>> Hi Debasish,
>>
>> As I said before, the exception is caught in [1]. It catches the
>> Throwable and so it could also catch "
>> OptimizerPlanEnvironment.ProgramAbortException". Regarding to the cause
>> of this exception, I have the same feeling with Tison and I also think that
>> the wrong StreamExecutionEnvironment is used.
>>
>> Regards,
>> Dian
>>
>> [1]
>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76
>>
>> 在 2019年9月23日,下午6:08,Debasish Ghosh <ghosh.debas...@gmail.com> 写道:
>>
>> Hi Tison -
>>
>> This is the code that builds the computation graph. readStream reads
>> from Kafka and writeStream writes to Kafka.
>>
>>     override def buildExecutionGraph = {
>>       val rides: DataStream[TaxiRide] =
>>         readStream(inTaxiRide)
>>           .filter { ride ⇒ ride.getIsStart().booleanValue }
>>           .keyBy("rideId")
>>
>>       val fares: DataStream[TaxiFare] =
>>         readStream(inTaxiFare)
>>           .keyBy("rideId")
>>
>>       val processed: DataStream[TaxiRideFare] =
>>         rides
>>           .connect(fares)
>>           .flatMap(new EnrichmentFunction)
>>
>>       writeStream(out, processed)
>>     }
>>
>> I also checked that my code enters this function
>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57
>>  and
>> then the exception is thrown. I tried to do a grep on the Flink code base
>> to see where this exception is caught. If I take off the tests, I don't see
>> any catch of this exception ..
>>
>> $ find . -name "*.java" | xargs grep
>> "OptimizerPlanEnvironment.ProgramAbortException"
>> ./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java:
>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>> ./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java:
>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java:
>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java:
>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java:
>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java:
>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import
>> org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
>> ./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java:
>> @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class,
>> timeout = 30_000)
>>
>> What am I missing here ?
>>
>> regards.
>>
>> On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <wander4...@gmail.com> wrote:
>>
>>> Hi Debasish,
>>>
>>> As mentioned by Dian, it is an internal exception that should be always
>>> caught by
>>> Flink internally. I would suggest you share the job(abstractly).
>>> Generally it is because
>>> you use StreamPlanEnvironment/OptimizerPlanEnvironment directly.
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> Austin Cawley-Edwards <austin.caw...@gmail.com> 于2019年9月23日周一 上午5:09写道:
>>>
>>>> Have you reached out to the FlinkK8sOperator team on Slack? They’re
>>>> usually pretty active on there.
>>>>
>>>> Here’s the link:
>>>>
>>>> https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU
>>>>
>>>>
>>>>
>>>> Best,
>>>> Austin
>>>>
>>>> On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <
>>>> ghosh.debas...@gmail.com> wrote:
>>>>
>>>>> The problem is I am submitting Flink jobs to Kubernetes cluster using
>>>>> a Flink Operator. Hence it's difficult to debug in the traditional sense 
>>>>> of
>>>>> the term. And all I get is the exception that I reported ..
>>>>>
>>>>> Caused by:
>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>>>> at
>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>>> at
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>>> at
>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>>>
>>>>> I am thinking that this exception must be coming because of some other
>>>>> exceptions, which are not reported BTW. I expected a Caused By portion in
>>>>> the stack trace. Any clue as to which area I should look into to debug 
>>>>> this.
>>>>>
>>>>> regards.
>>>>>
>>>>> On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <
>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>
>>>>>> Thanks for the pointer .. I will try debugging. I am getting this
>>>>>> exception running my application on Kubernetes using the Flink operator
>>>>>> from Lyft.
>>>>>>
>>>>>> regards.
>>>>>>
>>>>>> On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <dian0511...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> This exception is used internally to get the plan of a job before
>>>>>>> submitting it for execution. It's thrown with special purpose and will 
>>>>>>> be
>>>>>>> caught internally in [1] and will not be thrown to end users usually.
>>>>>>>
>>>>>>> You could check the following places to find out the cause to this
>>>>>>> problem:
>>>>>>> 1. Check the execution environment you used
>>>>>>> 2. If you can debug, set a breakpoint at[2] to see if the type of
>>>>>>> the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment.
>>>>>>> Usually it should be.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Dian
>>>>>>>
>>>>>>> [1]
>>>>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76
>>>>>>> [2]
>>>>>>> https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57
>>>>>>>
>>>>>>> 在 2019年9月21日,上午4:14,Debasish Ghosh <ghosh.debas...@gmail.com> 写道:
>>>>>>>
>>>>>>> Hi -
>>>>>>>
>>>>>>> When you get an exception stack trace like this ..
>>>>>>>
>>>>>>> Caused by:
>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>>>>>
>>>>>>> what is the recommended approach of debugging ? I mean what kind of
>>>>>>> errors can potentially lead to such a stacktrace ? In my case it starts
>>>>>>> from env.execute(..) but does not give any information as to what can
>>>>>>> go wrong.
>>>>>>>
>>>>>>> Any help will be appreciated.
>>>>>>>
>>>>>>> regards.
>>>>>>>
>>>>>>> --
>>>>>>> Debasish Ghosh
>>>>>>> http://manning.com/ghosh2
>>>>>>> http://manning.com/ghosh
>>>>>>>
>>>>>>> Twttr: @debasishg
>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>> Code: http://github.com/debasishg
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>> Sent from my iPhone
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Debasish Ghosh
>>>>> http://manning.com/ghosh2
>>>>> http://manning.com/ghosh
>>>>>
>>>>> Twttr: @debasishg
>>>>> Blog: http://debasishg.blogspot.com
>>>>> Code: http://github.com/debasishg
>>>>>
>>>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>>
>>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to