Hi Tison -

This is the code that builds the computation graph. readStream reads from
Kafka and writeStream writes to Kafka.

    override def buildExecutionGraph = {
      val rides: DataStream[TaxiRide] =
        readStream(inTaxiRide)
          .filter { ride ⇒ ride.getIsStart().booleanValue }
          .keyBy("rideId")

      val fares: DataStream[TaxiFare] =
        readStream(inTaxiFare)
          .keyBy("rideId")

      val processed: DataStream[TaxiRideFare] =
        rides
          .connect(fares)
          .flatMap(new EnrichmentFunction)

      writeStream(out, processed)
    }

I also checked that my code enters this function
https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57
and
then the exception is thrown. I tried to do a grep on the Flink code base
to see where this exception is caught. If I take off the tests, I don't see
any catch of this exception ..

$ find . -name "*.java" | xargs grep
"OptimizerPlanEnvironment.ProgramAbortException"
./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java:
throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java:
throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java:
throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java:
throw new OptimizerPlanEnvironment.ProgramAbortException();
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
} catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
} catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
} catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
} catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
} catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
} catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
} catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
} catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
} catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
} catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
} catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
} catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java:
} catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java:
} catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import
org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java:
@Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class,
timeout = 30_000)

What am I missing here ?

regards.

On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <wander4...@gmail.com> wrote:

> Hi Debasish,
>
> As mentioned by Dian, it is an internal exception that should be always
> caught by
> Flink internally. I would suggest you share the job(abstractly). Generally
> it is because
> you use StreamPlanEnvironment/OptimizerPlanEnvironment directly.
>
> Best,
> tison.
>
>
> Austin Cawley-Edwards <austin.caw...@gmail.com> 于2019年9月23日周一 上午5:09写道:
>
>> Have you reached out to the FlinkK8sOperator team on Slack? They’re
>> usually pretty active on there.
>>
>> Here’s the link:
>>
>> https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU
>>
>>
>>
>> Best,
>> Austin
>>
>> On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <ghosh.debas...@gmail.com>
>> wrote:
>>
>>> The problem is I am submitting Flink jobs to Kubernetes cluster using a
>>> Flink Operator. Hence it's difficult to debug in the traditional sense of
>>> the term. And all I get is the exception that I reported ..
>>>
>>> Caused by:
>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>> at
>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>> at
>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>
>>> I am thinking that this exception must be coming because of some other
>>> exceptions, which are not reported BTW. I expected a Caused By portion in
>>> the stack trace. Any clue as to which area I should look into to debug this.
>>>
>>> regards.
>>>
>>> On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <ghosh.debas...@gmail.com>
>>> wrote:
>>>
>>>> Thanks for the pointer .. I will try debugging. I am getting this
>>>> exception running my application on Kubernetes using the Flink operator
>>>> from Lyft.
>>>>
>>>> regards.
>>>>
>>>> On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <dian0511...@gmail.com> wrote:
>>>>
>>>>> This exception is used internally to get the plan of a job before
>>>>> submitting it for execution. It's thrown with special purpose and will be
>>>>> caught internally in [1] and will not be thrown to end users usually.
>>>>>
>>>>> You could check the following places to find out the cause to this
>>>>> problem:
>>>>> 1. Check the execution environment you used
>>>>> 2. If you can debug, set a breakpoint at[2] to see if the type of the
>>>>> env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment.
>>>>> Usually it should be.
>>>>>
>>>>> Regards,
>>>>> Dian
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76
>>>>> [2]
>>>>> https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57
>>>>>
>>>>> 在 2019年9月21日,上午4:14,Debasish Ghosh <ghosh.debas...@gmail.com> 写道:
>>>>>
>>>>> Hi -
>>>>>
>>>>> When you get an exception stack trace like this ..
>>>>>
>>>>> Caused by:
>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>>>> at
>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>>> at
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>>> at
>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>>>
>>>>> what is the recommended approach of debugging ? I mean what kind of
>>>>> errors can potentially lead to such a stacktrace ? In my case it starts
>>>>> from env.execute(..) but does not give any information as to what can
>>>>> go wrong.
>>>>>
>>>>> Any help will be appreciated.
>>>>>
>>>>> regards.
>>>>>
>>>>> --
>>>>> Debasish Ghosh
>>>>> http://manning.com/ghosh2
>>>>> http://manning.com/ghosh
>>>>>
>>>>> Twttr: @debasishg
>>>>> Blog: http://debasishg.blogspot.com
>>>>> Code: http://github.com/debasishg
>>>>>
>>>>>
>>>>> --
>>>> Sent from my iPhone
>>>>
>>>
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>

-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to