[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14699493#comment-14699493 ]
ASF GitHub Bot commented on FLINK-1984: --------------------------------------- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/948#discussion_r37184472 --- Diff: flink-mesos/src/main/scala/org/apache/flink/mesos/executor/FlinkExecutor.scala --- @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.executor + +import scala.util.{Failure, Success, Try} + +import org.apache.flink.configuration.{Configuration, GlobalConfiguration} +import org.apache.flink.mesos._ +import org.apache.flink.mesos.scheduler._ +import org.apache.flink.runtime.StreamingMode +import org.apache.log4j.{ConsoleAppender, Level, Logger => ApacheLogger, PatternLayout} +import org.apache.mesos.{Executor, ExecutorDriver} +import org.apache.mesos.Protos._ + +trait FlinkExecutor extends Executor { + // logger to use + def LOG: org.slf4j.Logger + + var currentRunningTaskId: Option[TaskID] = None + val TASK_MANAGER_LOGGING_LEVEL_KEY = "taskmanager.logging.level" + val DEFAULT_TASK_MANAGER_LOGGING_LEVEL = "INFO" + + + // methods that defines how the task is started when a launchTask is sent + def startTask(streamingMode: StreamingMode): Try[Unit] + + var thread: Option[Thread] = None + var slaveId: Option[SlaveID] = None + + override def shutdown(driver: ExecutorDriver): Unit = { + LOG.info("Killing taskManager thread") + // kill task manager thread + for (t <- thread) { + t.stop() + } + + // exit + sys.exit(0) + } + + override def disconnected(driver: ExecutorDriver): Unit = {} + + override def killTask(driver: ExecutorDriver, taskId: TaskID): Unit = { + for (t <- thread) { + LOG.info(s"Killing task : ${taskId.getValue}") + thread = None + currentRunningTaskId = None + + // stop running thread + t.stop() + + // Send the TASK_FINISHED status + driver.sendStatusUpdate(TaskStatus.newBuilder() + .setTaskId(taskId) + .setState(TaskState.TASK_FINISHED) + .build()) + } + } + + + override def error(driver: ExecutorDriver, message: String): Unit = {} + + override def frameworkMessage(driver: ExecutorDriver, data: Array[Byte]): Unit = {} + + override def registered(driver: ExecutorDriver, executorInfo: ExecutorInfo, + frameworkInfo: FrameworkInfo, slaveInfo: SlaveInfo): Unit = { + LOG.info(s"${executorInfo.getName} was registered on slave: ${slaveInfo.getHostname}") + slaveId = Some(slaveInfo.getId) + // get the configuration passed to it + if (executorInfo.hasData) { + val newConfig: Configuration = Utils.deserialize(executorInfo.getData.toByteArray) + GlobalConfiguration.includeConfiguration(newConfig) + } + LOG.debug("Loaded configuration: {}", GlobalConfiguration.getConfiguration) + } + + + override def reregistered(driver: ExecutorDriver, slaveInfo: SlaveInfo): Unit = { + slaveId = Some(slaveInfo.getId) + } + + + override def launchTask(driver: ExecutorDriver, task: TaskInfo): Unit = { + // overlay the new config over this one + val taskConf: Configuration = Utils.deserialize(task.getData.toByteArray) + GlobalConfiguration.includeConfiguration(taskConf) + + // reconfigure log4j + val logLevel = GlobalConfiguration.getString( + TASK_MANAGER_LOGGING_LEVEL_KEY, DEFAULT_TASK_MANAGER_LOGGING_LEVEL) + + initializeLog4j(Level.toLevel(logLevel, Level.DEBUG)) + + // get streaming mode + val streamingMode = getStreamingMode() + + // create the thread + val t = createThread(driver, task.getTaskId, streamingMode) + thread = Some(t) + t.start() + + // send message + driver.sendStatusUpdate(TaskStatus.newBuilder() + .setTaskId(task.getTaskId) + .setState(TaskState.TASK_RUNNING) + .build()) + } + + def initializeLog4j(level: Level): Unit = { --- End diff -- this will overwrite all user defined log4j configurations? I'm not sure if it is still true, but you don't have to use Log4j as a logging backend for Flink. We are using SFL4j as the logger API, with pluggable back-ends. Log4j just happens to be our default logging back end. I think it would be better to configure log4j using JVM properties. I know that this probably requires shipping of the configuration files, but that should be doable. Are there any services in Mesos to make files available to all executors? > Integrate Flink with Apache Mesos > --------------------------------- > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: New Components > Reporter: Robert Metzger > Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > There also is a pending pull request for adding Mesos support for Flink: > https://github.com/apache/flink/pull/251 > But the PR is insufficiently tested. I'll add the code of the pull request to > this JIRA in case somebody wants to pick it up in the future. -- This message was sent by Atlassian JIRA (v6.3.4#6332)