Hi,

I think you can make it start the Web Frontend via

conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);

In the future, this will become moot, though, when the JobManager has a proper 
REST API that is always there.

Best,
Aljoscha


> On 27. Sep 2017, at 11:40, XiangWei Huang <xw.huang...@gmail.com> wrote:
> 
> Hi Till,
>       
>       I’ve found that a StandaloneMiniCluster doesn’t startup web fronted 
> when it is running.so,how can i cancel a running job on it with restful 
> method.
> 
> Cheers,
> Till
> 
>> 在 2017年9月20日,15:43,Till Rohrmann <trohrm...@apache.org> 写道:
>> 
>> Hi XiangWei,
>> 
>> programmatically there is no nice tooling yet to cancel jobs on a dedicated 
>> cluster. What you can do is to use Flink's REST API to issue a cancel 
>> command [1]. You have to send a GET request to the target URL 
>> `/jobs/:jobid/cancel`. In the future we will improve the programmatic job 
>> control which will allow you to do these kind of things more easily.
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#job-cancellation
>> 
>> Cheers,
>> Till
>> 
>> On Wed, Sep 20, 2017 at 5:46 AM, XiangWei Huang <xw.huang...@gmail.com> 
>> wrote:
>> Hi Till,
>>    
>>      Thanks for your answer,it worked when i use StandaloneMiniCluster,but 
>> another problem is that i can’t find a way to cancel
>> a running Flink job without shutting down the cluster,for 
>> LocalFlinkMiniCluster i can do  it with below code :
>> 
>>    for (job <- cluster.getCurrentlyRunningJobsJava()) {
>>       cluster.stopJob(job)
>>    }
>> 
>>    Is it possible to cancel a running Flink job without shutting down a 
>> StandaloneMiniCluster ?
>> 
>> Best Regards,
>> XiangWei
>> 
>> 
>> 
>>> 在 2017年9月14日,16:58,Till Rohrmann <trohrm...@apache.org> 写道:
>>> 
>>> Hi XiangWei,
>>> 
>>> the problem is that the LocalFlinkMiniCluster can no longer be used in 
>>> combination with a RemoteExecutionEnvironment. The reason is that the 
>>> LocalFlinkMiniCluster uses now an internal leader election service and 
>>> assigns leader ids to its components. Since this is an internal service it 
>>> is not possible to retrieve this information like it is the case with the 
>>> ZooKeeper based leader election services.
>>> 
>>> Long story short, the Flink Scala shell currently does not work with a 
>>> LocalFlinkMiniCluster and would have to be fixed to work properly together 
>>> with a local execution environment. Until then, I recommend starting a 
>>> local standalone cluster and let the code run there.
>>> 
>>> Cheers,
>>> Till
>>> 
>>> 
>>> On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang <xw.huang...@gmail.com> 
>>> wrote:
>>> dear all,
>>> 
>>> Below is the code i execute:
>>> 
>>> import java.io._
>>> import java.net.{URL, URLClassLoader}
>>> import java.nio.charset.Charset
>>> import java.util.Collections
>>> import java.util.concurrent.atomic.AtomicBoolean
>>> 
>>> import com.netease.atom.common.util.logging.Logging
>>> import com.netease.atom.interpreter.Code.Code
>>> import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult, 
>>> InterpreterUtils}
>>> import io.netty.buffer._
>>> import org.apache.flink.api.scala.FlinkILoop
>>> import org.apache.flink.client.CliFrontend
>>> import org.apache.flink.client.cli.CliFrontendParser
>>> import org.apache.flink.client.program.ClusterClient
>>> import org.apache.flink.configuration.{QueryableStateOptions, 
>>> Configuration, ConfigConstants, GlobalConfiguration}
>>> import org.apache.flink.runtime.akka.AkkaUtils
>>> import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, 
>>> LocalFlinkMiniCluster}
>>> 
>>> import scala.Console
>>> import scala.beans.BeanProperty
>>> import scala.collection.JavaConversions._
>>> import scala.collection.mutable
>>> import scala.collection.mutable.{ArrayBuffer, ListBuffer}
>>> import scala.runtime.AbstractFunction0
>>> import scala.tools.nsc.Settings
>>> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
>>> 
>>> class FlinkInterpreter extends Interpreter {
>>>   private var bufferedReader: Option[BufferedReader] = None
>>>   private var jprintWriter: JPrintWriter = _
>>>   private val config = new Configuration;
>>>   private var cluster: LocalFlinkMiniCluster = _
>>>   @BeanProperty var imain: IMain = _
>>>   @BeanProperty var flinkILoop: FlinkILoop = _
>>>   private var out: ByteBufOutputStream = null
>>>   private var outBuf: ByteBuf = null
>>>   private var in: ByteBufInputStream = _
>>>   private var isRunning: AtomicBoolean = new AtomicBoolean(false)
>>> 
>>>   override def isOpen: Boolean = {
>>>     isRunning.get()
>>>   }
>>> 
>>>   def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
>>>     config.toMap.toMap.foreach(println)
>>>     config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
>>>     config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
>>>     config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
>>>     config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
>>>     config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
>>>     config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
>>>     val localCluster = new LocalFlinkMiniCluster(config, false)
>>>     localCluster.start(true)
>>>     val port = 
>>> AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port
>>>     println(s"Starting local Flink cluster (host: localhost,port: 
>>> ${localCluster.getLeaderRPCPort}).\n")
>>>     ("localhost", localCluster.getLeaderRPCPort, localCluster)
>>>   }
>>> 
>>>  
>>>   /**
>>>    * Start flink cluster and create interpreter
>>>    */
>>>   override def open: Unit = {
>>>     outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
>>>     out = new ByteBufOutputStream(outBuf)
>>>     in = new ByteBufInputStream(outBuf)
>>>     //    val (host, port, yarnCluster) = 
>>> deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1), 
>>> None))
>>>     val (host, port, localCluster) = startLocalMiniCluster()
>>>     this.cluster = localCluster
>>>     val conf = cluster.configuration
>>>     println(s"Connecting to Flink cluster (host:$host,port:$port)...")
>>>     flinkILoop = new FlinkILoop(host, port, conf, None)
>>>     val settings = new Settings()
>>>     settings.usejavacp.value = true
>>>     settings.Yreplsync.value = true
>>>     flinkILoop.settings_$eq(settings)
>>>     flinkILoop.createInterpreter()
>>>     imain = flinkILoop.intp
>>>     FlinkInterpreter.ourClassloader = imain.classLoader
>>>     val benv = flinkILoop.scalaBenv
>>>     val senv = flinkILoop.scalaSenv
>>>     benv.getConfig.disableSysoutLogging()
>>>     senv.getConfig.disableSysoutLogging()
>>>     // import libraries
>>>     imain.interpret("import scala.tools.nsc.io._")
>>>     //    imain.interpret("import Properties.userHome")
>>>     imain.interpret("import scala.compat.Platform.EOL")
>>>     imain.interpret("import org.apache.flink.api.scala._")
>>>     imain.interpret("import org.apache.flink.api.common.functions._")
>>>     isRunning.set(true)
>>>   }
>>> 
>>>   override def interpret(line: String): InterpreterResult = {
>>>     if (line == null || line.trim.length == 0) {
>>>       return new InterpreterResult(Code.SUCCESS)
>>>     }
>>>     interpret(line.split("\n"))
>>>   }
>>> 
>>>   /**
>>>    * Interprete code
>>>    * @param lines
>>>    * @return
>>>    */
>>>   def interpret(lines: Array[String]): InterpreterResult = {
>>>     val imain: IMain = getImain
>>>     val linesToRun: Array[String] = new Array[String](lines.length + 1)
>>>     for (i <- 0 until lines.length) {
>>>       linesToRun(i) = lines(i)
>>>     }
>>>     linesToRun(lines.length) = "print(\"\")"
>>>     System.setOut(new PrintStream(out))
>>>     out.buffer().clear()
>>>     var r: Code = null
>>>     var incomplete: String = ""
>>>     var inComment: Boolean = false
>>>     for (l <- 0 until linesToRun.length) {
>>>       val s: String = linesToRun(l)
>>>       var continuation: Boolean = false
>>>       if (l + 1 < linesToRun.length) {
>>>         val nextLine: String = linesToRun(l + 1).trim
>>>         if (nextLine.isEmpty ||
>>>             nextLine.startsWith("//") ||
>>>             nextLine.startsWith("}") ||
>>>             nextLine.startsWith("object")) {
>>>           continuation = true
>>>         } else if (!inComment && nextLine.startsWith("/*")) {
>>>           inComment = true
>>>           continuation = true
>>>         } else if (!inComment && nextLine.lastIndexOf("*/") >= 0) {
>>>           inComment = false
>>>           continuation = true
>>>         } else if (nextLine.length > 1 &&
>>>             nextLine.charAt(0) == '.' &&
>>>             nextLine.charAt(1) != '.' &&
>>>             nextLine.charAt(1) != '/') {
>>>           continuation = true
>>>         } else if (inComment) {
>>>           continuation = true
>>>         }
>>>         if (continuation) {
>>>           incomplete += s + "\n"
>>>         }
>>>       }
>>>       if (!continuation) {
>>>         val currentCommand: String = incomplete
>>>         var res: Results.Result = null
>>>         try {
>>>           res = Console.withOut(System.out)(new 
>>> AbstractFunction0[Results.Result] {
>>>             override def apply() = {
>>>               imain.interpret(currentCommand + s)
>>>             }
>>>           }.apply())
>>>         } catch {
>>>           case e: Exception =>
>>>             logError("Interpreter Exception ", e)
>>>             return new InterpreterResult(Code.ERROR, 
>>> InterpreterUtils.getMostRelevantMessage(e))
>>>         }
>>>         r = getResultCode(res)
>>>         if (r == Code.ERROR) {
>>>           return new InterpreterResult(r, out.toString)
>>>         } else if (r eq Code.INCOMPLETE) {
>>>           incomplete += s + "\n"
>>>         } else {
>>>           incomplete = ""
>>>         }
>>>       }
>>>     }
>>> 
>>>     if (r eq Code.INCOMPLETE) {
>>>       return new InterpreterResult(r, "Incomplete expression")
>>>     }
>>>     else {
>>>       return new InterpreterResult(r, 
>>> out.buffer().toString(Charset.forName("utf-8")))
>>>     }
>>>   }
>>> 
>>>   private def getResultCode(r: Results.Result): Code = {
>>>     if (r.isInstanceOf[Results.Success.type]) {
>>>       return Code.SUCCESS
>>>     }
>>>     else if (r.isInstanceOf[Results.Incomplete.type]) {
>>>       return Code.INCOMPLETE
>>>     }
>>>     else {
>>>       return Code.ERROR
>>>     }
>>>   }
>>> 
>>>   }
>>> }
>>> 
>>> object FlinkInterpreter extends Logging {
>>>   var ourClassloader: ClassLoader = _
>>> 
>>>   def main(args: Array[String]): Unit = {
>>>     val interpreter: FlinkInterpreter = new FlinkInterpreter
>>>     val code =
>>>       """
>>>         |val dataStream = senv.fromElements(1,2,3,4,5)
>>>         |dataStream.countWindowAll(2).sum(0).print()
>>>         |senv.execute("My streaming program")
>>>       """.stripMargin
>>>     interpreter.open
>>>     val result = interpreter.interpret(code)
>>>   }
>>> }
>>> 
>>> The error messages i got are:
>>> …
>>> …
>>> ...
>>> [WARN] [17/09/13 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager] 
>>> Discard message 
>>> LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId:
>>>  1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES)) 
>>> because the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7 
>>> did not equal the received leader session ID 
>>> 00000000-0000-0000-0000-000000000000.
>>> [INFO] [17/09/13 12:05:52] 
>>> [org.apache.flink.runtime.client.JobSubmissionClientActor] Terminate 
>>> JobClientActor.
>>> [INFO] [17/09/13 12:05:52] 
>>> [org.apache.flink.runtime.client.JobSubmissionClientActor] Disconnect from 
>>> JobManager Actor[akka.tcp://flink@localhost:63522/user/jobmanager#82627940].
>>> [INFO] [17/09/13 12:05:52] 
>>> [akka.remote.RemoteActorRefProvider$RemotingTerminator] Shutting down 
>>> remote daemon.
>>> [INFO] [17/09/13 12:05:52] 
>>> [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remote daemon shut 
>>> down; proceeding with flushing remote transports.
>>> [INFO] [17/09/13 12:05:52] 
>>> [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remoting shut down.
>>> org.apache.flink.client.program.ProgramInvocationException: The program 
>>> execution failed: Couldn't retrieve the JobExecutionResult from the 
>>> JobManager.
>>>   at 
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
>>>   at 
>>> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
>>>   at 
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
>>>   at 
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:434)
>>>   at 
>>> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212)
>>>   at 
>>> org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.executeRemotely(ScalaShellRemoteStreamEnvironment.java:87)
>>>   at 
>>> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176)
>>>   at 
>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
>>>   ... 34 elided
>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't 
>>> retrieve the JobExecutionResult from the JobManager.
>>>   at 
>>> org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
>>>   at 
>>> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
>>>   at 
>>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
>>>   ... 41 more
>>> Caused by: 
>>> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: 
>>> Job submission to the JobManager timed out. You may increase 
>>> 'akka.client.timeout' in case the JobManager needs more time to configure 
>>> and confirm the job submission.
>>>   at 
>>> org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
>>>   at 
>>> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
>>>   at 
>>> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
>>>   at 
>>> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
>>>   at 
>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>>>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>>>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>>   at 
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>   at 
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>   at 
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>   at 
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> 
>>> 
>>> 
>>> 
>>> 
>> 
>> 
> 

Reply via email to