[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15423008#comment-15423008 ]
ASF GitHub Bot commented on FLINK-1984: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74972631 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosFlinkResourceManager.java --- @@ -0,0 +1,755 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.Props; +import com.netflix.fenzo.TaskRequest; +import com.netflix.fenzo.TaskScheduler; +import com.netflix.fenzo.VirtualMachineLease; +import com.netflix.fenzo.functions.Action1; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.scheduler.ConnectionMonitor; +import org.apache.flink.mesos.scheduler.LaunchableTask; +import org.apache.flink.mesos.scheduler.LaunchCoordinator; +import org.apache.flink.mesos.scheduler.ReconciliationCoordinator; +import org.apache.flink.mesos.scheduler.SchedulerProxy; +import org.apache.flink.mesos.scheduler.TaskMonitor; +import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder; +import org.apache.flink.mesos.scheduler.Tasks; +import org.apache.flink.mesos.scheduler.messages.AcceptOffers; +import org.apache.flink.mesos.scheduler.messages.Disconnected; +import org.apache.flink.mesos.scheduler.messages.Error; +import org.apache.flink.mesos.scheduler.messages.OfferRescinded; +import org.apache.flink.mesos.scheduler.messages.ReRegistered; +import org.apache.flink.mesos.scheduler.messages.Registered; +import org.apache.flink.mesos.scheduler.messages.ResourceOffers; +import org.apache.flink.mesos.scheduler.messages.StatusUpdate; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.FlinkResourceManager; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.mesos.Protos; +import org.apache.mesos.Protos.FrameworkInfo; +import org.apache.mesos.SchedulerDriver; +import org.slf4j.Logger; +import scala.Option; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +/** + * Flink Resource Manager for Apache Mesos. + */ +public class MesosFlinkResourceManager extends FlinkResourceManager<RegisteredMesosWorkerNode> { + + /** The Mesos configuration (master and framework info) */ + private final MesosConfiguration mesosConfig; + + /** The TaskManager container parameters (like container memory size) */ + private final MesosTaskManagerParameters taskManagerParameters; + + /** Context information used to start a TaskManager Java process */ + private final Protos.TaskInfo.Builder taskManagerLaunchContext; + + /** Number of failed Mesos tasks before stopping the application. -1 means infinite. */ + private final int maxFailedTasks; + + /** Callback handler for the asynchronous Mesos scheduler */ + private SchedulerProxy schedulerCallbackHandler; + + /** Mesos scheduler driver */ + private SchedulerDriver schedulerDriver; + + private ActorRef connectionMonitor; + + private ActorRef taskRouter; + + private ActorRef launchCoordinator; + + private ActorRef reconciliationCoordinator; + + private MesosWorkerStore workerStore; + + final Map<ResourceID, MesosWorkerStore.Worker> workersInNew; + final Map<ResourceID, MesosWorkerStore.Worker> workersInLaunch; + final Map<ResourceID, MesosWorkerStore.Worker> workersBeingReturned; + + /** The number of failed tasks since the master became active */ + private int failedTasksSoFar; + + public MesosFlinkResourceManager( + Configuration flinkConfig, + MesosConfiguration mesosConfig, + MesosWorkerStore workerStore, + LeaderRetrievalService leaderRetrievalService, + MesosTaskManagerParameters taskManagerParameters, + Protos.TaskInfo.Builder taskManagerLaunchContext, + int maxFailedTasks, + int numInitialTaskManagers) { + + super(numInitialTaskManagers, flinkConfig, leaderRetrievalService); + + this.mesosConfig = requireNonNull(mesosConfig); + + this.workerStore = requireNonNull(workerStore); + + this.taskManagerParameters = requireNonNull(taskManagerParameters); + this.taskManagerLaunchContext = requireNonNull(taskManagerLaunchContext); + this.maxFailedTasks = maxFailedTasks; + + this.workersInNew = new HashMap<>(); + this.workersInLaunch = new HashMap<>(); + this.workersBeingReturned = new HashMap<>(); + } + + // ------------------------------------------------------------------------ + // Mesos-specific behavior + // ------------------------------------------------------------------------ + + @Override + protected void initialize() throws Exception { + LOG.info("Initializing Mesos resource master"); + + workerStore.start(); + + // create the scheduler driver to communicate with Mesos + schedulerCallbackHandler = new SchedulerProxy(self()); + + // register with Mesos + FrameworkInfo.Builder frameworkInfo = mesosConfig.frameworkInfo() + .clone() + .setCheckpoint(true); + + Option<Protos.FrameworkID> frameworkID = workerStore.getFrameworkID(); + if(frameworkID.isEmpty()) { + LOG.info("Registering as new framework."); + } + else { + LOG.info("Recovery scenario: re-registering using framework ID {}.", frameworkID.get().getValue()); + frameworkInfo.setId(frameworkID.get()); + } + + MesosConfiguration initializedMesosConfig = mesosConfig.withFrameworkInfo(frameworkInfo); + MesosConfiguration.logMesosConfig(LOG, initializedMesosConfig); + schedulerDriver = initializedMesosConfig.createDriver(schedulerCallbackHandler, false); + + // create supporting actors + connectionMonitor = createConnectionMonitor(); + launchCoordinator = createLaunchCoordinator(); + reconciliationCoordinator = createReconciliationCoordinator(); + taskRouter = createTaskRouter(); + + recoverWorkers(); + + connectionMonitor.tell(new ConnectionMonitor.Start(), self()); + schedulerDriver.start(); + } + + protected ActorRef createConnectionMonitor() { + return context().actorOf( + ConnectionMonitor.createActorProps(ConnectionMonitor.class, config), + "connectionMonitor"); + } + + protected ActorRef createTaskRouter() { + return context().actorOf( + Tasks.createActorProps(Tasks.class, config, schedulerDriver, TaskMonitor.class), + "tasks"); + } + + protected ActorRef createLaunchCoordinator() { + return context().actorOf( + LaunchCoordinator.createActorProps(LaunchCoordinator.class, self(), config, schedulerDriver, createOptimizer()), + "launchCoordinator"); + } + + protected ActorRef createReconciliationCoordinator() { + return context().actorOf( + ReconciliationCoordinator.createActorProps(ReconciliationCoordinator.class, config, schedulerDriver), + "reconciliationCoordinator"); + } + + @Override + public void postStop() { + LOG.info("Stopping Mesos resource master"); + super.postStop(); + } + + // ------------------------------------------------------------------------ + // Actor messages + // ------------------------------------------------------------------------ + + @Override + protected void handleMessage(Object message) { + + // check for Mesos-specific actor messages first + + // --- messages about Mesos connection + if (message instanceof Registered) { + registered((Registered) message); + } else if (message instanceof ReRegistered) { + reregistered((ReRegistered) message); + } else if (message instanceof Disconnected) { + disconnected((Disconnected) message); + } else if (message instanceof Error) { + error(((Error) message).message()); + + // --- messages about offers + } else if (message instanceof ResourceOffers || message instanceof OfferRescinded) { + launchCoordinator.tell(message, self()); + } else if (message instanceof AcceptOffers) { + acceptOffers((AcceptOffers) message); + + // --- messages about tasks + } else if (message instanceof StatusUpdate) { + taskStatusUpdated((StatusUpdate) message); + } else if (message instanceof ReconciliationCoordinator.Reconcile) { + // a reconciliation request from a task + reconciliationCoordinator.tell(message, self()); + } else if (message instanceof TaskMonitor.TaskTerminated) { + // a termination message from a task + TaskMonitor.TaskTerminated msg = (TaskMonitor.TaskTerminated) message; + taskTerminated(msg.taskID(), msg.status()); + + } else { + // message handled by the generic resource master code + super.handleMessage(message); + } + } + + /** + * Called to shut down the cluster (not a failover situation). + * + * @param finalStatus The application status to report. + * @param optionalDiagnostics An optional diagnostics message. + */ + @Override + protected void shutdownApplication(ApplicationStatus finalStatus, String optionalDiagnostics) { + + LOG.info("Shutting down and unregistering as a Mesos framework."); + try { + // unregister the framework, which implicitly removes all tasks. + schedulerDriver.stop(false); + } + catch(Exception ex) { + LOG.warn("unable to unregister the framework", ex); + } + + try { + workerStore.cleanup(); + } + catch(Exception ex) { + LOG.warn("unable to cleanup the ZooKeeper state", ex); + } + + context().stop(self()); + } + + @Override + protected void fatalError(String message, Throwable error) { + // we do not unregister, but cause a hard fail of this process, to have it + // restarted by the dispatcher + LOG.error("FATAL ERROR IN MESOS APPLICATION MASTER: " + message, error); + LOG.error("Shutting down process"); + + // kill this process, this will make an external supervisor (the dispatcher) restart the process + System.exit(EXIT_CODE_FATAL_ERROR); + } + + // ------------------------------------------------------------------------ + // Worker Management + // ------------------------------------------------------------------------ + + /** + * Recover framework/worker information persisted by a prior incarnation of the RM. + */ + private void recoverWorkers() throws Exception { + // if this application master starts as part of an ApplicationMaster/JobManager recovery, + // then some worker tasks are most likely still alive and we can re-obtain them + final List<MesosWorkerStore.Worker> tasksFromPreviousAttempts = workerStore.recoverWorkers(); + + if (!tasksFromPreviousAttempts.isEmpty()) { + LOG.info("Retrieved {} TaskManagers from previous attempt", tasksFromPreviousAttempts.size()); + + List<Tuple2<TaskRequest,String>> toAssign = new ArrayList<>(tasksFromPreviousAttempts.size()); + List<LaunchableTask> toLaunch = new ArrayList<>(tasksFromPreviousAttempts.size()); + + for (final MesosWorkerStore.Worker worker : tasksFromPreviousAttempts) { + LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID()); + + switch(worker.state()) { + case New: + workersInNew.put(extractResourceID(worker.taskID()), worker); + toLaunch.add(launchable); + break; + case Launched: + workersInLaunch.put(extractResourceID(worker.taskID()), worker); + toAssign.add(new Tuple2<>(launchable.taskRequest(), worker.hostname().get())); + break; + case Released: + workersBeingReturned.put(extractResourceID(worker.taskID()), worker); + break; + } + taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self()); + } + + // tell the launch coordinator about prior assignments + if(toAssign.size() >= 1) { + launchCoordinator.tell(new LaunchCoordinator.Assign(toAssign), self()); + } + // tell the launch coordinator to launch any new tasks + if(toLaunch.size() >= 1) { + launchCoordinator.tell(new LaunchCoordinator.Launch(toLaunch), self()); + } + } + } + + /** + * Plan for some additional workers to be launched. + * + * @param numWorkers The number of workers to allocate. + */ + @Override + protected void requestNewWorkers(int numWorkers) { + + try { + List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<>(numWorkers); + List<LaunchableTask> toLaunch = new ArrayList<>(numWorkers); + + // generate new workers into persistent state and launch associated actors + for (int i = 0; i < numWorkers; i++) { + MesosWorkerStore.Worker worker = MesosWorkerStore.Worker.newTask(workerStore.newTaskID()); + workerStore.putWorker(worker); + workersInNew.put(extractResourceID(worker.taskID()), worker); + + LaunchableMesosWorker launchable = createLaunchableMesosWorker(worker.taskID()); + + LOG.info("Scheduling Mesos task {} with ({} MB, {} cpus).", + launchable.taskID().getValue(), launchable.taskRequest().getMemory(), launchable.taskRequest().getCPUs()); + + toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker))); + toLaunch.add(launchable); + } + + // tell the task router about the new plans + for (TaskMonitor.TaskGoalStateUpdated update : toMonitor) { + taskRouter.tell(update, self()); + } + + // tell the launch coordinator to launch the new tasks + if(toLaunch.size() >= 1) { + launchCoordinator.tell(new LaunchCoordinator.Launch(toLaunch), self()); + } + } + catch(Exception ex) { + fatalError("unable to request new workers", ex); + } + } + + /** + * Accept offers as advised by the launch coordinator. + * + * Acceptance is routed through the RM to update the persistent state before + * forwarding the message to Mesos. + */ + private void acceptOffers(AcceptOffers msg) { + + try { + List<TaskMonitor.TaskGoalStateUpdated> toMonitor = new ArrayList<>(msg.operations().size()); + + // transition the persistent state of some tasks to Launched + for (Protos.Offer.Operation op : msg.operations()) { + if (op.getType() != Protos.Offer.Operation.Type.LAUNCH) { + continue; + } + for (Protos.TaskInfo info : op.getLaunch().getTaskInfosList()) { + MesosWorkerStore.Worker worker = workersInNew.remove(extractResourceID(info.getTaskId())); + assert (worker != null); + + worker = worker.launchTask(info.getSlaveId(), msg.hostname()); + workerStore.putWorker(worker); + workersInLaunch.put(extractResourceID(worker.taskID()), worker); + + LOG.info("Launching Mesos task {} on host {}.", + worker.taskID().getValue(), worker.hostname().get()); + + toMonitor.add(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker))); + } + } + + // tell the task router about the new plans + for (TaskMonitor.TaskGoalStateUpdated update : toMonitor) { + taskRouter.tell(update, self()); + } + + // send the acceptance message to Mesos + schedulerDriver.acceptOffers(msg.offerIds(), msg.operations(), msg.filters()); + } + catch(Exception ex) { + fatalError("unable to accept offers", ex); + } + } + + /** + * Handle a task status change. + */ + private void taskStatusUpdated(StatusUpdate message) { + taskRouter.tell(message, self()); + reconciliationCoordinator.tell(message, self()); + schedulerDriver.acknowledgeStatusUpdate(message.status()); + } + + /** + * Accept the given started worker into the internal state. + * + * @param resourceID The worker resource id + * @return A registered worker node record. + */ + @Override + protected RegisteredMesosWorkerNode workerStarted(ResourceID resourceID) { + MesosWorkerStore.Worker inLaunch = workersInLaunch.remove(resourceID); + if (inLaunch == null) { + // Worker was not in state "being launched", this can indicate that the TaskManager + // in this worker was already registered or that the container was not started + // by this resource manager. Simply ignore this resourceID. + return null; + } + return new RegisteredMesosWorkerNode(inLaunch); + } + + /** + * Accept the given registered workers into the internal state. + * + * @param toConsolidate The worker IDs known previously to the JobManager. + * @return A collection of registered worker node records. + */ + @Override + protected Collection<RegisteredMesosWorkerNode> reacceptRegisteredWorkers(Collection<ResourceID> toConsolidate) { + + // we check for each task manager if we recognize its Mesos task ID + List<RegisteredMesosWorkerNode> accepted = new ArrayList<>(toConsolidate.size()); + for (ResourceID resourceID : toConsolidate) { + MesosWorkerStore.Worker worker = workersInLaunch.remove(resourceID); + if (worker != null) { + LOG.info("Mesos worker consolidation recognizes TaskManager {}.", resourceID); + accepted.add(new RegisteredMesosWorkerNode(worker)); + } + else { + if(isStarted(resourceID)) { + LOG.info("TaskManager {} has already been registered at the resource manager.", resourceID); + } + else { + LOG.info("Mesos worker consolidation does not recognize TaskManager {}.", resourceID); + } + } + } + return accepted; + } + + /** + * Release the given pending worker. + */ + @Override + protected void releasePendingWorker(ResourceID id) { + MesosWorkerStore.Worker worker = workersInLaunch.remove(id); + if (worker != null) { + releaseWorker(worker); + } else { + LOG.error("Cannot find worker {} to release. Ignoring request.", id); + } + } + + /** + * Release the given started worker. + */ + @Override + protected void releaseStartedWorker(RegisteredMesosWorkerNode worker) { + releaseWorker(worker.task()); + } + + /** + * Plan for the removal of the given worker. + */ + private void releaseWorker(MesosWorkerStore.Worker worker) { + try { + LOG.info("Releasing worker {}", worker.taskID()); + + // update persistent state of worker to Released + worker = worker.releaseTask(); + workerStore.putWorker(worker); + workersBeingReturned.put(extractResourceID(worker.taskID()), worker); + taskRouter.tell(new TaskMonitor.TaskGoalStateUpdated(extractGoalState(worker)), self()); + + if (worker.hostname().isDefined()) { + // tell the launch coordinator that the task is being unassigned from the host, for planning purposes + launchCoordinator.tell(new LaunchCoordinator.Unassign(worker.taskID(), worker.hostname().get()), self()); + } + } + catch (Exception ex) { + fatalError("unable to release worker", ex); + } + } + + @Override + protected int getNumWorkerRequestsPending() { + return workersInNew.size(); + } + + @Override + protected int getNumWorkersPendingRegistration() { + return workersInLaunch.size(); + } + + // ------------------------------------------------------------------------ + // Callbacks from the Mesos Master + // ------------------------------------------------------------------------ + + /** + * Called when connected to Mesos as a new framework. + */ + private void registered(Registered message) { + connectionMonitor.tell(message, self()); + + try { + workerStore.setFrameworkID(Option.apply(message.frameworkId())); + } + catch(Exception ex) { + fatalError("unable to store the assigned framework ID", ex); + return; + } + + launchCoordinator.tell(message, self()); + reconciliationCoordinator.tell(message, self()); + taskRouter.tell(message, self()); + } + + /** + * Called when reconnected to Mesos following a failover event. + */ + private void reregistered(ReRegistered message) { + connectionMonitor.tell(message, self()); + launchCoordinator.tell(message, self()); + reconciliationCoordinator.tell(message, self()); + taskRouter.tell(message, self()); + } + + /** + * Called when disconnected from Mesos. + */ + private void disconnected(Disconnected message) { + connectionMonitor.tell(message, self()); + launchCoordinator.tell(message, self()); + reconciliationCoordinator.tell(message, self()); + taskRouter.tell(message, self()); + } + + /** + * Called when an error is reported by the scheduler callback. + */ + private void error(String message) { + self().tell(new FatalErrorOccurred("Connection to Mesos failed", new Exception(message)), self()); + } + + /** + * Invoked when a Mesos task reaches a terminal status. + */ + private void taskTerminated(Protos.TaskID taskID, Protos.TaskStatus status) { + // this callback occurs for failed containers and for released containers alike + + final ResourceID id = extractResourceID(taskID); + + try { + workerStore.removeWorker(taskID); + } + catch(Exception ex) { + fatalError("unable to remove worker", ex); + return; + } + + // check if this is a failed task or a released task + if (workersBeingReturned.remove(id) != null) { + // regular finished worker that we released + LOG.info("Worker {} finished successfully with diagnostics: {}", + id, status.getMessage()); + } else { + // failed worker, either at startup, or running + final MesosWorkerStore.Worker launched = workersInLaunch.remove(id); + if (launched != null) { + LOG.info("Mesos task {} failed, with a TaskManager in launch or registration. " + + "State: {} Reason: {} ({})", id, status.getState(), status.getReason(), status.getMessage()); + // we will trigger re-acquiring new workers at the end + } else { + // failed registered worker + LOG.info("Mesos task {} failed, with a registered TaskManager. " + + "State: {} Reason: {} ({})", id, status.getState(), status.getReason(), status.getMessage()); + + // notify the generic logic, which notifies the JobManager, etc. + notifyWorkerFailed(id, "Mesos task " + id + " failed. State: " + status.getState()); + } + + // general failure logging + failedTasksSoFar++; + + String diagMessage = String.format("Diagnostics for task %s in state %s : " + + "reason=%s message=%s", + id, status.getState(), status.getReason(), status.getMessage()); + sendInfoMessage(diagMessage); + + LOG.info(diagMessage); + LOG.info("Total number of failed tasks so far: " + failedTasksSoFar); + + // maxFailedTasks == -1 is infinite number of retries. + if (maxFailedTasks >= 0 && failedTasksSoFar > maxFailedTasks) { + String msg = "Stopping Mesos session because the number of failed tasks (" + + failedTasksSoFar + ") exceeded the maximum failed tasks (" + + maxFailedTasks + "). This number is controlled by the '" + + ConfigConstants.MESOS_MAX_FAILED_TASKS + "' configuration setting. " + + "By default its the number of requested tasks."; + + LOG.error(msg); + self().tell(decorateMessage(new StopCluster(ApplicationStatus.FAILED, msg)), + ActorRef.noSender()); + + // no need to do anything else + return; + } + } + + // in case failed containers were among the finished containers, make + // sure we re-examine and request new ones + triggerCheckWorkers(); + } + + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + + private LaunchableMesosWorker createLaunchableMesosWorker(Protos.TaskID taskID) { + LaunchableMesosWorker launchable = + new LaunchableMesosWorker(taskManagerParameters, taskManagerLaunchContext, taskID); + return launchable; + } + + /** + * Extracts a unique ResourceID from the Mesos task. + * + * @param taskId the Mesos TaskID + * @return The ResourceID for the container + */ + static ResourceID extractResourceID(Protos.TaskID taskId) { + return new ResourceID(taskId.getValue()); + } + + /** + * Extracts the Mesos task goal state from the worker information. + * @param worker the persistent worker information. + * @return goal state information for the {@Link TaskMonitor}. + */ + static TaskMonitor.TaskGoalState extractGoalState(MesosWorkerStore.Worker worker) { + switch(worker.state()) { + case New: return new TaskMonitor.New(worker.taskID()); + case Launched: return new TaskMonitor.Launched(worker.taskID(), worker.slaveID().get()); + case Released: return new TaskMonitor.Released(worker.taskID(), worker.slaveID().get()); + default: throw new IllegalArgumentException(); --- End diff -- Maybe we could add an exception message > Integrate Flink with Apache Mesos > --------------------------------- > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management > Reporter: Robert Metzger > Assignee: Eron Wright > Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)