I have investigated this further:

During normal operation, without Spring Boot, 
OptimizerPlanEnvironment.ProgramAbortException is thrown by Flink from 
StreamPlanEnvironment::execute():70. This is caught by 
PackagedProgram::callMainMethod():537, where it is re-thrown as an Error 
exception. This Error is caught in 
OptimizerPlanEnvironment::getOptimizedPlan():88, which checks if the 
optimizerPlan field != null, if so it returns the value of that field else it 
re-throws, i.e. since the optimizerPlan IS != null the exception stops there 
and the job is executed as expected. I.e. the Flink control flow is relying on 
throwing (and handling) ProgramAbortException.

When using Spring Boot the execution fails due to the 
OptimizerPlanEnvironment.ProgramAbortException mentioned above. In that case 
Spring logic gets between PackagedProgram::callMainMethod() and the invocation 
of the method where the Flink ExecutionEnvironment is built and executed. 
Spring will catch any Throwable and interpret it as a failure and exit.

I guess when deploying the Spring Boot Flink job to a session-cluster, which I 
mentioned above works fine, the Flink job does not rely on passing exceptions 
between Flink bootstrap-code and the Flink job?

/Thomas

________________________________
From: Chesnay Schepler <ches...@apache.org>
Sent: Sunday, February 10, 2019 10:30:54 AM
To: Thomas Eckestad; user@flink.apache.org
Subject: Re: Running single Flink job in a job cluster, problem starting 
JobManager

I'm afraid we haven't had much experience with Spring Boot Flink applications.

It is indeed strange that the job ends up using a StreamPlanEnvironment.
As a debugging step I would look into all calls to 
ExecutionEnviroment#initializeContextEnvironment().
This is how specific execution environments are injected into 
(Stream)ExecutionEnvironment#getEnvironment().

On 08.02.2019 15:17, Thomas Eckestad wrote:
Hi again,

when removing Spring Boot from the application it works.

I would really like to mix Spring Boot and Flink. It does work with Spring Boot 
when submitting jobs to a session cluster, as stated before.

/Thomas
________________________________
From: Thomas Eckestad 
<thomas.eckes...@verisure.com><mailto:thomas.eckes...@verisure.com>
Sent: Friday, February 8, 2019 12:14 PM
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Running single Flink job in a job cluster, problem starting JobManager

Hi,

I am trying to run a flink job cluster in K8s. As a first step I have created a 
Docker image according to:

https://github.com/apache/flink/blob/release-1.7/flink-container/docker/README.md<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_blob_release-2D1.7_flink-2Dcontainer_docker_README.md&d=DwMD-g&c=NXCSiDokCAYy9C9zX2fa5Ly_TC9IMWG6noydjiusVWk&r=P6A97OLLSO6VXw8VE3JCWMO20OzvxG-NoWHVPkk-a9Q&m=Rqsyp0_EEk3-KDRnzTBMT5-bx9GFlZaxGIr_jGWg8VM&s=CXkwM8WcThTDrIFvV0U_OQL5QmZ-Qn2g1lQSSNaAd1k&e=>

When I try to run the image:

docker run --name=flink-job-manager flink-image:latest job-cluster 
--job-classname com.foo.bar.FlinkTest 
-Djobmanager.rpc.address=flink-job-cluster -Dparallelism.default=1 
-Dblob.server.port=6124 -Dqueryable-state.server.ports=6125

the execution fails with the following exception:

org.springframework.beans.factory.BeanCreationException: Error creating bean 
with name 'MyFlinkJob': Invocation of init method failed; nested exception is 
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:139)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:419)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1737)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:576)
at 
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:498)
at 
org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
at 
org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
at 
org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
at 
org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:199)
at 
org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:846)
at 
org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:863)
at 
org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:546)
at 
org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775)
at 
org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:316)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248)
at com.foo.bar.FlinkTest.main(FlinkTest.java:10)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
Caused by: 
org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
at 
org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:70)
at com.foo.bar.FlinkJob.MyFlinkJob.init(MyFlinkJob.java:59)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:363)
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:307)
at 
org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:136)
... 22 more

I can successfully run the same job.jar on a session cluster 
(start-cluster.sh;flink run job.jar). Any ideas? Feels like I am missing 
something obvious?

At MyFlinkJob.java:59 I do: streamExecutionEnvironment.execute("MyFlinkJob");

It feels strange that the execution ends up in 
org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute?

>From 
>https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_blob_master_flink-2Dstreaming-2Djava_src_main_java_org_apache_flink_streaming_api_environment_StreamPlanEnvironment.java&d=DwMD-g&c=NXCSiDokCAYy9C9zX2fa5Ly_TC9IMWG6noydjiusVWk&r=P6A97OLLSO6VXw8VE3JCWMO20OzvxG-NoWHVPkk-a9Q&m=Rqsyp0_EEk3-KDRnzTBMT5-bx9GFlZaxGIr_jGWg8VM&s=_uD6DkRbJadF_sbTvSKpqOibiJEs6UO_r1II3jIWqRI&e=>:

/**
 * A special {@link StreamExecutionEnvironment} that is used in the web 
frontend when generating
 * a user-inspectable graph of a streaming job.
 */
@PublicEvolving
public class StreamPlanEnvironment extends StreamExecutionEnvironment {


I am using 
https://archive.apache.org/dist/flink/flink-1.7.1/flink-1.7.1-bin-scala_2.11.tgz<https://urldefense.proofpoint.com/v2/url?u=https-3A__archive.apache.org_dist_flink_flink-2D1.7.1_flink-2D1.7.1-2Dbin-2Dscala-5F2.11.tgz&d=DwMD-g&c=NXCSiDokCAYy9C9zX2fa5Ly_TC9IMWG6noydjiusVWk&r=P6A97OLLSO6VXw8VE3JCWMO20OzvxG-NoWHVPkk-a9Q&m=Rqsyp0_EEk3-KDRnzTBMT5-bx9GFlZaxGIr_jGWg8VM&s=N1mTTdS0UDjIdWIXlTf_V-Cx-n2mFBmMjnSZX4B4YOY&e=>
 (I have also tried 1.6.3 and 1.7.0, no difference in behavior).

* docker --version -> Docker version 1.13.1
* uname -a -> Linux SEOSTL0069.SEC.INTRA 4.20.4-200.fc29.x86_64 #1 SMP Wed Jan 
23 16:11:28 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux

Thank you,
Thomas Eckestad

Reply via email to