Hey all!

I'm looking for some advice on the following; I'm working on an abstraction
on top of Apache Flink to 'pipeline' Flink applications using Kafka. For
deployment this means that all these Flink jobs are embedded into one jar
and each job is started using an program argument (e.g. "--stage
'FirstFlinkJob'". To ease deploying a set of interconnected Flink jobs onto
a cluster I wrote a Python script which basically communicates with the
REST client of the JobManager. So you can do things like "pipeline start
--jar 'JarWithThePipeline.jar'" and this would deploy every Flink
application separately.

However, this script was written a while ago against Flink version "1.4.2".
This week I tried to upgrade it to Flink latest version but I noticed a
change in the REST responses. In order to get the "pipeline start" command
working,we need to know all the Flink jobs that are in the jar (we call
these Flink jobs 'stages') because we need to know the stage names as
argument for the jar. For the 1.4.2 version we used a dirty trick; we ran
the jar with '--list --asException' as program arguments which basically
runs the jar file and immediately throws an exception with the stage names.
These are then parsed and used to start every stage separately. The error
message that Flink threw looked something like this:

java.util.concurrent.CompletionException:
org.apache.flink.util.FlinkException: Could not run the jar.
at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
... 9 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The
main method caused an error.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
at
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at
org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
at
org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:87)
at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
... 8 more
Caused by: org.codefeedr.pipeline.PipelineListException:
["org.codefeedr.plugin.twitter.stages.TwitterStatusInput","mongo_tweets","elasticsearch_tweets"]
at org.codefeedr.pipeline.Pipeline.showList(Pipeline.scala:114)
at org.codefeedr.pipeline.Pipeline.start(Pipeline.scala:100)
at nl.wouterr.Main$.main(Main.scala:23)
at nl.wouterr.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)

However, for 1.7.0 this trick doesn't work anymore because instead of
returning the full stack trace, it only returns the following:
org.apache.flink.client.program.ProgramInvocationException: The program
caused an error:

In the console of the JobManager it does give the full stack trace though.
So first of all I'm wondering if there might be a way to enable more
detailed stacktraces for Flink 1.7 in the REST responses. If not, do you
have any suggestions on how to tackle this problem. I know, in the end this
isn't really a Flink problem however you might know a workaround in the
Flink REST client to achieve the same.

Some solutions I already considered:
- Running the jar with the "--list --asException" locally through the
Python script; however Flink and Scala are not provided in the jar.
Technically I could add them both to the classpath, but this would require
users to have the Flink jar locally (and also Scala somewhere, but I assume
most have).
- Let users provide a list of stage names for all their (interconnected)
Flink jobs. This is not really an option, because the (main) idea behind
this framework is to reduce the boilerplate and cumbersome of setting up
complex stream processing architectures.

Any help is appreciated. Thanks in advance!

Kind regards,
Wouter Zorgdrager

Reply via email to