Hi Tison - This is the code that builds the computation graph. readStream reads from Kafka and writeStream writes to Kafka.
override def buildExecutionGraph = { val rides: DataStream[TaxiRide] = readStream(inTaxiRide) .filter { ride ⇒ ride.getIsStart().booleanValue } .keyBy("rideId") val fares: DataStream[TaxiFare] = readStream(inTaxiFare) .keyBy("rideId") val processed: DataStream[TaxiRideFare] = rides .connect(fares) .flatMap(new EnrichmentFunction) writeStream(out, processed) } I also checked that my code enters this function https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 and then the exception is thrown. I tried to do a grep on the Flink code base to see where this exception is caught. If I take off the tests, I don't see any catch of this exception .. $ find . -name "*.java" | xargs grep "OptimizerPlanEnvironment.ProgramAbortException" ./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException(); ./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: throw new OptimizerPlanEnvironment.ProgramAbortException(); ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException(); ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: throw new OptimizerPlanEnvironment.ProgramAbortException(); ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { ./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException; ./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 30_000) What am I missing here ? regards. On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <wander4...@gmail.com> wrote: > Hi Debasish, > > As mentioned by Dian, it is an internal exception that should be always > caught by > Flink internally. I would suggest you share the job(abstractly). Generally > it is because > you use StreamPlanEnvironment/OptimizerPlanEnvironment directly. > > Best, > tison. > > > Austin Cawley-Edwards <austin.caw...@gmail.com> 于2019年9月23日周一 上午5:09写道: > >> Have you reached out to the FlinkK8sOperator team on Slack? They’re >> usually pretty active on there. >> >> Here’s the link: >> >> https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU >> >> >> >> Best, >> Austin >> >> On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <ghosh.debas...@gmail.com> >> wrote: >> >>> The problem is I am submitting Flink jobs to Kubernetes cluster using a >>> Flink Operator. Hence it's difficult to debug in the traditional sense of >>> the term. And all I get is the exception that I reported .. >>> >>> Caused by: >>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException >>> at >>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >>> at >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>> at >>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>> >>> I am thinking that this exception must be coming because of some other >>> exceptions, which are not reported BTW. I expected a Caused By portion in >>> the stack trace. Any clue as to which area I should look into to debug this. >>> >>> regards. >>> >>> On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <ghosh.debas...@gmail.com> >>> wrote: >>> >>>> Thanks for the pointer .. I will try debugging. I am getting this >>>> exception running my application on Kubernetes using the Flink operator >>>> from Lyft. >>>> >>>> regards. >>>> >>>> On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <dian0511...@gmail.com> wrote: >>>> >>>>> This exception is used internally to get the plan of a job before >>>>> submitting it for execution. It's thrown with special purpose and will be >>>>> caught internally in [1] and will not be thrown to end users usually. >>>>> >>>>> You could check the following places to find out the cause to this >>>>> problem: >>>>> 1. Check the execution environment you used >>>>> 2. If you can debug, set a breakpoint at[2] to see if the type of the >>>>> env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. >>>>> Usually it should be. >>>>> >>>>> Regards, >>>>> Dian >>>>> >>>>> [1] >>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76 >>>>> [2] >>>>> https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 >>>>> >>>>> 在 2019年9月21日,上午4:14,Debasish Ghosh <ghosh.debas...@gmail.com> 写道: >>>>> >>>>> Hi - >>>>> >>>>> When you get an exception stack trace like this .. >>>>> >>>>> Caused by: >>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException >>>>> at >>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >>>>> at >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>>>> at >>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>>> >>>>> what is the recommended approach of debugging ? I mean what kind of >>>>> errors can potentially lead to such a stacktrace ? In my case it starts >>>>> from env.execute(..) but does not give any information as to what can >>>>> go wrong. >>>>> >>>>> Any help will be appreciated. >>>>> >>>>> regards. >>>>> >>>>> -- >>>>> Debasish Ghosh >>>>> http://manning.com/ghosh2 >>>>> http://manning.com/ghosh >>>>> >>>>> Twttr: @debasishg >>>>> Blog: http://debasishg.blogspot.com >>>>> Code: http://github.com/debasishg >>>>> >>>>> >>>>> -- >>>> Sent from my iPhone >>>> >>> >>> >>> -- >>> Debasish Ghosh >>> http://manning.com/ghosh2 >>> http://manning.com/ghosh >>> >>> Twttr: @debasishg >>> Blog: http://debasishg.blogspot.com >>> Code: http://github.com/debasishg >>> >> -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg