[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15424783#comment-15424783
 ] 

ASF GitHub Bot commented on FLINK-1984:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2315#discussion_r75151735
  
    --- Diff: 
flink-mesos/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
 ---
    @@ -0,0 +1,174 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.clusterframework
    +
    +import java.util.concurrent.{TimeUnit, ExecutorService}
    +
    +import akka.actor.ActorRef
    +
    +import org.apache.flink.api.common.JobID
    +import org.apache.flink.configuration.{Configuration => 
FlinkConfiguration, ConfigConstants}
    +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
    +import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
    +import org.apache.flink.runtime.clusterframework.ApplicationStatus
    +import 
org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
    +import org.apache.flink.runtime.clusterframework.messages._
    +import org.apache.flink.runtime.jobgraph.JobStatus
    +import org.apache.flink.runtime.jobmanager.{SubmittedJobGraphStore, 
JobManager}
    +import org.apache.flink.runtime.leaderelection.LeaderElectionService
    +import 
org.apache.flink.runtime.messages.JobManagerMessages.{RequestJobStatus, 
CurrentJobStatus, JobNotFound}
    +import org.apache.flink.runtime.messages.Messages.Acknowledge
    +import org.apache.flink.runtime.metrics.{MetricRegistry => 
FlinkMetricRegistry}
    +import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
    +import org.apache.flink.runtime.instance.InstanceManager
    +import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
    +
    +import scala.concurrent.duration._
    +import scala.language.postfixOps
    +
    +
    +/** JobManager actor for execution on Yarn or Mesos. It enriches the 
[[JobManager]] with additional messages
    +  * to start/administer/stop the session.
    +  *
    +  * @param flinkConfiguration Configuration object for the actor
    +  * @param executorService Execution context which is used to execute 
concurrent tasks in the
    +  *                         
[[org.apache.flink.runtime.executiongraph.ExecutionGraph]]
    +  * @param instanceManager Instance manager to manage the registered
    +  *                        
[[org.apache.flink.runtime.taskmanager.TaskManager]]
    +  * @param scheduler Scheduler to schedule Flink jobs
    +  * @param libraryCacheManager Manager to manage uploaded jar files
    +  * @param archive Archive for finished Flink jobs
    +  * @param restartStrategyFactory Restart strategy to be used in case of a 
job recovery
    +  * @param timeout Timeout for futures
    +  * @param leaderElectionService LeaderElectionService to participate in 
the leader election
    +  */
    +abstract class ContaineredJobManager(
    +                      flinkConfiguration: FlinkConfiguration,
    +                      executorService: ExecutorService,
    +                      instanceManager: InstanceManager,
    +                      scheduler: FlinkScheduler,
    +                      libraryCacheManager: BlobLibraryCacheManager,
    +                      archive: ActorRef,
    +                      restartStrategyFactory: RestartStrategyFactory,
    +                      timeout: FiniteDuration,
    +                      leaderElectionService: LeaderElectionService,
    +                      submittedJobGraphs : SubmittedJobGraphStore,
    +                      checkpointRecoveryFactory : 
CheckpointRecoveryFactory,
    +                      savepointStore: SavepointStore,
    +                      jobRecoveryTimeout: FiniteDuration,
    +                      metricsRegistry: Option[FlinkMetricRegistry])
    +  extends JobManager(
    +    flinkConfiguration,
    +    executorService,
    +    instanceManager,
    +    scheduler,
    +    libraryCacheManager,
    +    archive,
    +    restartStrategyFactory,
    +    timeout,
    +    leaderElectionService,
    +    submittedJobGraphs,
    +    checkpointRecoveryFactory,
    +    savepointStore,
    +    jobRecoveryTimeout,
    +    metricsRegistry) {
    +
    +  val jobPollingInterval: FiniteDuration
    +
    +  // indicates if this JM has been started in a dedicated (per-job) mode.
    +  var stopWhenJobFinished: JobID = null
    +
    +  override def handleMessage: Receive = {
    +    handleContainerMessage orElse super.handleMessage
    +  }
    +
    +  def handleContainerMessage: Receive = {
    +
    +    case msg @ (_: RegisterInfoMessageListener | _: 
UnRegisterInfoMessageListener) =>
    +      // forward to ResourceManager
    +      currentResourceManager match {
    +        case Some(rm) =>
    +          // we forward the message
    +          rm.forward(decorateMessage(msg))
    +        case None =>
    +        // client has to try again
    +      }
    +
    +    case msg: ShutdownClusterAfterJob =>
    +      val jobId = msg.jobId()
    +      log.info(s"ApplicationMaster will shut down session when job $jobId 
has finished.")
    +      stopWhenJobFinished = jobId
    +      // trigger regular job status messages (if this is a 
dedicated/per-job cluster)
    +      if (stopWhenJobFinished != null) {
    +        context.system.scheduler.schedule(0 seconds,
    --- End diff --
    
    Can't we listen on the `RemoveJob` message or the `JobStatusChanged` 
message to get notified when a job has terminated. Then we don't have to poll 
the status from oneself.


> Integrate Flink with Apache Mesos
> ---------------------------------
>
>                 Key: FLINK-1984
>                 URL: https://issues.apache.org/jira/browse/FLINK-1984
>             Project: Flink
>          Issue Type: New Feature
>          Components: Cluster Management
>            Reporter: Robert Metzger
>            Assignee: Eron Wright 
>            Priority: Minor
>         Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-: 
> https://github.com/apache/flink/pull/251
> Update (May '16):  a new effort is now underway, building on the recent 
> ResourceManager work.
> Design document:  ([google 
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to