Hi,
in our more-or-less development environment we're doing sth like that in
our main method:
val processName = name_of_our_stream
val configuration = GlobalConfiguration.getConfiguration
val system = JobClient.startJobClientActorSystem(configuration)
val timeout = FiniteDuration(10, TimeUnit.SECONDS)
val gateway =
LeaderRetrievalUtils.retrieveLeaderGateway(
LeaderRetrievalUtils.createLeaderRetrievalService(configuration),
system, timeout)
implicit val executor = system.dispatcher
val cancelResult =
gateway.ask(JobManagerMessages.getRequestRunningJobsStatus,
timeout).mapTo[RunningJobsStatus].flatMap {
case RunningJobsStatus(runningJobs) =>
runningJobs.toList.find(_.getJobName == processName).map(job => {
gateway.ask(JobManagerMessages.CancelJob(job.getJobId),
FiniteDuration(1, TimeUnit.MINUTES))
}).getOrElse(Future.successful(()))
}
Await.result(cancelResult, FiniteDuration(1, TimeUnit.MINUTES))
system.shutdown()
- this basically searches running jobs by name and cancels running one.
Doing sth similar you can trigger savepoint, but unfortunatelly I don't
see easy way of telling ExecutionEnvironment you want to use it.
Probably it can be done by some clever hack :)
br,
maciek
On 04/05/2016 19:52, Hanson, Bruce wrote:
Hi all,
I’m working on using Flink to do a variety of streaming jobs that will
be processing very high-volume streams. I want to be able to update a
job’s software with an absolute minimum impact on the processing of
the data. What I don’t understand the best way to update the software
running the job. From what I gather, the way it works today is that I
would have to shut down the first job, ensuring that it properly
checkpoints, and then start up a new job. My concern is that this may
take a relatively long time and cause problems with SLAs I may have
with my users.
Does Flink provide more nuanced ways of upgrading a job’s software?
Are there folks out there that are working with this sort of problem,
either within Flink or around it?
Thank you for any help, thoughts, etc. you may have.
-Bruce