[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15424783#comment-15424783 ]
ASF GitHub Bot commented on FLINK-1984: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r75151735 --- Diff: flink-mesos/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala --- @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework + +import java.util.concurrent.{TimeUnit, ExecutorService} + +import akka.actor.ActorRef + +import org.apache.flink.api.common.JobID +import org.apache.flink.configuration.{Configuration => FlinkConfiguration, ConfigConstants} +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory +import org.apache.flink.runtime.clusterframework.ApplicationStatus +import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory +import org.apache.flink.runtime.clusterframework.messages._ +import org.apache.flink.runtime.jobgraph.JobStatus +import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, JobManager} +import org.apache.flink.runtime.leaderelection.LeaderElectionService +import org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, CurrentJobStatus, JobNotFound} +import org.apache.flink.runtime.messages.Messages.Acknowledge +import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry} +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager +import org.apache.flink.runtime.instance.InstanceManager +import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler} + +import scala.concurrent.duration._ +import scala.language.postfixOps + + +/** JobManager actor for execution on Yarn or Mesos. It enriches the [[JobManager]] with additional messages + * to start/administer/stop the session. + * + * @param flinkConfiguration Configuration object for the actor + * @param executorService Execution context which is used to execute concurrent tasks in the + * [[org.apache.flink.runtime.executiongraph.ExecutionGraph]] + * @param instanceManager Instance manager to manage the registered + * [[org.apache.flink.runtime.taskmanager.TaskManager]] + * @param scheduler Scheduler to schedule Flink jobs + * @param libraryCacheManager Manager to manage uploaded jar files + * @param archive Archive for finished Flink jobs + * @param restartStrategyFactory Restart strategy to be used in case of a job recovery + * @param timeout Timeout for futures + * @param leaderElectionService LeaderElectionService to participate in the leader election + */ +abstract class ContaineredJobManager( + flinkConfiguration: FlinkConfiguration, + executorService: ExecutorService, + instanceManager: InstanceManager, + scheduler: FlinkScheduler, + libraryCacheManager: BlobLibraryCacheManager, + archive: ActorRef, + restartStrategyFactory: RestartStrategyFactory, + timeout: FiniteDuration, + leaderElectionService: LeaderElectionService, + submittedJobGraphs : SubmittedJobGraphStore, + checkpointRecoveryFactory : CheckpointRecoveryFactory, + savepointStore: SavepointStore, + jobRecoveryTimeout: FiniteDuration, + metricsRegistry: Option[FlinkMetricRegistry]) + extends JobManager( + flinkConfiguration, + executorService, + instanceManager, + scheduler, + libraryCacheManager, + archive, + restartStrategyFactory, + timeout, + leaderElectionService, + submittedJobGraphs, + checkpointRecoveryFactory, + savepointStore, + jobRecoveryTimeout, + metricsRegistry) { + + val jobPollingInterval: FiniteDuration + + // indicates if this JM has been started in a dedicated (per-job) mode. + var stopWhenJobFinished: JobID = null + + override def handleMessage: Receive = { + handleContainerMessage orElse super.handleMessage + } + + def handleContainerMessage: Receive = { + + case msg @ (_: RegisterInfoMessageListener | _: UnRegisterInfoMessageListener) => + // forward to ResourceManager + currentResourceManager match { + case Some(rm) => + // we forward the message + rm.forward(decorateMessage(msg)) + case None => + // client has to try again + } + + case msg: ShutdownClusterAfterJob => + val jobId = msg.jobId() + log.info(s"ApplicationMaster will shut down session when job $jobId has finished.") + stopWhenJobFinished = jobId + // trigger regular job status messages (if this is a dedicated/per-job cluster) + if (stopWhenJobFinished != null) { + context.system.scheduler.schedule(0 seconds, --- End diff -- Can't we listen on the `RemoveJob` message or the `JobStatusChanged` message to get notified when a job has terminated. Then we don't have to poll the status from oneself. > Integrate Flink with Apache Mesos > --------------------------------- > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management > Reporter: Robert Metzger > Assignee: Eron Wright > Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)