[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14699493#comment-14699493
 ] 

ASF GitHub Bot commented on FLINK-1984:
---------------------------------------

Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/948#discussion_r37184472
  
    --- Diff: 
flink-mesos/src/main/scala/org/apache/flink/mesos/executor/FlinkExecutor.scala 
---
    @@ -0,0 +1,172 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.mesos.executor
    +
    +import scala.util.{Failure, Success, Try}
    +
    +import org.apache.flink.configuration.{Configuration, GlobalConfiguration}
    +import org.apache.flink.mesos._
    +import org.apache.flink.mesos.scheduler._
    +import org.apache.flink.runtime.StreamingMode
    +import org.apache.log4j.{ConsoleAppender, Level, Logger => ApacheLogger, 
PatternLayout}
    +import org.apache.mesos.{Executor, ExecutorDriver}
    +import org.apache.mesos.Protos._
    +
    +trait FlinkExecutor extends Executor {
    +  // logger to use
    +  def LOG: org.slf4j.Logger
    +
    +  var currentRunningTaskId: Option[TaskID] = None
    +  val TASK_MANAGER_LOGGING_LEVEL_KEY = "taskmanager.logging.level"
    +  val DEFAULT_TASK_MANAGER_LOGGING_LEVEL = "INFO"
    +
    +
    +  // methods that defines how the task is started when a launchTask is sent
    +  def startTask(streamingMode: StreamingMode): Try[Unit]
    +
    +  var thread: Option[Thread] = None
    +  var slaveId: Option[SlaveID] = None
    +
    +  override def shutdown(driver: ExecutorDriver): Unit = {
    +    LOG.info("Killing taskManager thread")
    +    // kill task manager thread
    +    for (t <- thread) {
    +      t.stop()
    +    }
    +
    +    // exit
    +    sys.exit(0)
    +  }
    +
    +  override def disconnected(driver: ExecutorDriver): Unit = {}
    +
    +  override def killTask(driver: ExecutorDriver, taskId: TaskID): Unit = {
    +    for (t <- thread) {
    +      LOG.info(s"Killing task : ${taskId.getValue}")
    +      thread = None
    +      currentRunningTaskId = None
    +
    +      // stop running thread
    +      t.stop()
    +
    +      // Send the TASK_FINISHED status
    +      driver.sendStatusUpdate(TaskStatus.newBuilder()
    +        .setTaskId(taskId)
    +        .setState(TaskState.TASK_FINISHED)
    +        .build())
    +    }
    +  }
    +
    +
    +  override def error(driver: ExecutorDriver, message: String): Unit = {}
    +
    +  override def frameworkMessage(driver: ExecutorDriver, data: 
Array[Byte]): Unit = {}
    +
    +  override def registered(driver: ExecutorDriver, executorInfo: 
ExecutorInfo,
    +                          frameworkInfo: FrameworkInfo, slaveInfo: 
SlaveInfo): Unit = {
    +    LOG.info(s"${executorInfo.getName} was registered on slave: 
${slaveInfo.getHostname}")
    +    slaveId = Some(slaveInfo.getId)
    +    // get the configuration passed to it
    +    if (executorInfo.hasData) {
    +      val newConfig: Configuration = 
Utils.deserialize(executorInfo.getData.toByteArray)
    +      GlobalConfiguration.includeConfiguration(newConfig)
    +    }
    +    LOG.debug("Loaded configuration: {}", 
GlobalConfiguration.getConfiguration)
    +  }
    +
    +
    +  override def reregistered(driver: ExecutorDriver, slaveInfo: SlaveInfo): 
Unit = {
    +    slaveId = Some(slaveInfo.getId)
    +  }
    +
    +
    +  override def launchTask(driver: ExecutorDriver, task: TaskInfo): Unit = {
    +    // overlay the new config over this one
    +    val taskConf: Configuration = 
Utils.deserialize(task.getData.toByteArray)
    +    GlobalConfiguration.includeConfiguration(taskConf)
    +
    +    // reconfigure log4j
    +    val logLevel = GlobalConfiguration.getString(
    +      TASK_MANAGER_LOGGING_LEVEL_KEY, DEFAULT_TASK_MANAGER_LOGGING_LEVEL)
    +
    +    initializeLog4j(Level.toLevel(logLevel, Level.DEBUG))
    +
    +    // get streaming mode
    +    val streamingMode = getStreamingMode()
    +
    +    // create the thread
    +    val t = createThread(driver, task.getTaskId, streamingMode)
    +    thread = Some(t)
    +    t.start()
    +
    +    // send message
    +    driver.sendStatusUpdate(TaskStatus.newBuilder()
    +      .setTaskId(task.getTaskId)
    +      .setState(TaskState.TASK_RUNNING)
    +      .build())
    +  }
    +
    +  def initializeLog4j(level: Level): Unit = {
    --- End diff --
    
    this will overwrite all user defined log4j configurations?
    I'm not sure if it is still true, but you don't have to use Log4j as a 
logging backend for Flink.
    We are using SFL4j as the logger API, with pluggable back-ends. Log4j just 
happens to be our default logging back end.
    I think it would be better to configure log4j using JVM properties. I know 
that this probably requires shipping of the configuration files, but that 
should be doable.
    Are there any services in Mesos to make files available to all executors?


> Integrate Flink with Apache Mesos
> ---------------------------------
>
>                 Key: FLINK-1984
>                 URL: https://issues.apache.org/jira/browse/FLINK-1984
>             Project: Flink
>          Issue Type: New Feature
>          Components: New Components
>            Reporter: Robert Metzger
>            Priority: Minor
>         Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> There also is a pending pull request for adding Mesos support for Flink: 
> https://github.com/apache/flink/pull/251
> But the PR is insufficiently tested. I'll add the code of the pull request to 
> this JIRA in case somebody wants to pick it up in the future.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to