Hi,
I'm trying to stop derailing the discussion in FLINK-14807 [1], that I
started but didn't want to go that far. Let's try and continue that here
if we need to.
So far, the timeline is this:
Stephan:
Have we ever considered putting the job name onto the Environment,
rather than in "execute()" or "collect()"?
Aljoscha:
I think tying the job name to the environment would be problematic. I
have some thoughts on this but they span multiple things and are a bit
more general than this issue. I'll post them here nevertheless:
One environment can spawn multiple jobs if you call execute() multiple
times. For batch jobs, this is sometimes not a problem. It becomes a
problem when the component (or user) that runs the Flink program expects
there to be only one job. For example, if you bin/flink run
--fromSavepoint, which execute() should "pick up" the savepoint.
Currently, it will be the first execute call that happens, this might or
might not work depending on whether it's the right savepoint for that
one. Subsequent execute() calls will also try and restore from that
savepoint which, again, might or might not fail.
Another scenario where this will be problematic is "driver mode", or a
mode where we run the main() method on the "JobManager", for example in
the per-job standalone entrypoint or potential future other modes where
the main() method is run in the cluster.
In general, I now think that the "execute-style" of writing jobs does
not work well for streaming programs and we might have to re-introduce
an interface like
interface FlinkJob {
Pipeline getPipeline();
}
for streaming scenarios.
Kostas:
I have to think about the whole issue more, but definitely an interface
like the one Aljoscha Krettek described (like the old and now deleted
Program) could at least make easier a lot of things. Currently, the only
"entrypoint" of the framework to the user code is the execute() method
and this often limits our alternatives for implementing things.
Stephan:
I would like to understand this a bit more. Having two ways of doing
things is always tricky - more complexity for maintainers, harder to
understand for users, etc.
Given the savepoint resuming - fair enough, I see that this is something
that has subtle semantics. Since that is a parameter on the executor
(the environment), would it be fair to say always the first execution
uses that savepoint? If you want the driver to have independent jobs
resuming from the same savepoint, you need different environments. That
sounds like quite well defined behavior.
For the "driver mode" or the "run main() in cluster", I don't fully
understand the issues. This should work the same way that the often
discussed "library mode" works. One thing that has caused frequent
confusion about these issues here is the assumption that somehow the
"execute()" method needs to produce a job graph that can be passed to
another component that was previously started. I think that is not
necessary, the "execute()" method would just inside create a JobMaster
(against a ResourceManager that is created in the environment) and block
while this JobMaster is executing.
Best,
Aljoscha
[1] https://issues.apache.org/jira/browse/FLINK-14807