Hi, I think you can make it start the Web Frontend via
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); In the future, this will become moot, though, when the JobManager has a proper REST API that is always there. Best, Aljoscha > On 27. Sep 2017, at 11:40, XiangWei Huang <xw.huang...@gmail.com> wrote: > > Hi Till, > > I’ve found that a StandaloneMiniCluster doesn’t startup web fronted > when it is running.so,how can i cancel a running job on it with restful > method. > > Cheers, > Till > >> 在 2017年9月20日,15:43,Till Rohrmann <trohrm...@apache.org> 写道: >> >> Hi XiangWei, >> >> programmatically there is no nice tooling yet to cancel jobs on a dedicated >> cluster. What you can do is to use Flink's REST API to issue a cancel >> command [1]. You have to send a GET request to the target URL >> `/jobs/:jobid/cancel`. In the future we will improve the programmatic job >> control which will allow you to do these kind of things more easily. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#job-cancellation >> >> Cheers, >> Till >> >> On Wed, Sep 20, 2017 at 5:46 AM, XiangWei Huang <xw.huang...@gmail.com> >> wrote: >> Hi Till, >> >> Thanks for your answer,it worked when i use StandaloneMiniCluster,but >> another problem is that i can’t find a way to cancel >> a running Flink job without shutting down the cluster,for >> LocalFlinkMiniCluster i can do it with below code : >> >> for (job <- cluster.getCurrentlyRunningJobsJava()) { >> cluster.stopJob(job) >> } >> >> Is it possible to cancel a running Flink job without shutting down a >> StandaloneMiniCluster ? >> >> Best Regards, >> XiangWei >> >> >> >>> 在 2017年9月14日,16:58,Till Rohrmann <trohrm...@apache.org> 写道: >>> >>> Hi XiangWei, >>> >>> the problem is that the LocalFlinkMiniCluster can no longer be used in >>> combination with a RemoteExecutionEnvironment. The reason is that the >>> LocalFlinkMiniCluster uses now an internal leader election service and >>> assigns leader ids to its components. Since this is an internal service it >>> is not possible to retrieve this information like it is the case with the >>> ZooKeeper based leader election services. >>> >>> Long story short, the Flink Scala shell currently does not work with a >>> LocalFlinkMiniCluster and would have to be fixed to work properly together >>> with a local execution environment. Until then, I recommend starting a >>> local standalone cluster and let the code run there. >>> >>> Cheers, >>> Till >>> >>> >>> On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang <xw.huang...@gmail.com> >>> wrote: >>> dear all, >>> >>> Below is the code i execute: >>> >>> import java.io._ >>> import java.net.{URL, URLClassLoader} >>> import java.nio.charset.Charset >>> import java.util.Collections >>> import java.util.concurrent.atomic.AtomicBoolean >>> >>> import com.netease.atom.common.util.logging.Logging >>> import com.netease.atom.interpreter.Code.Code >>> import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult, >>> InterpreterUtils} >>> import io.netty.buffer._ >>> import org.apache.flink.api.scala.FlinkILoop >>> import org.apache.flink.client.CliFrontend >>> import org.apache.flink.client.cli.CliFrontendParser >>> import org.apache.flink.client.program.ClusterClient >>> import org.apache.flink.configuration.{QueryableStateOptions, >>> Configuration, ConfigConstants, GlobalConfiguration} >>> import org.apache.flink.runtime.akka.AkkaUtils >>> import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, >>> LocalFlinkMiniCluster} >>> >>> import scala.Console >>> import scala.beans.BeanProperty >>> import scala.collection.JavaConversions._ >>> import scala.collection.mutable >>> import scala.collection.mutable.{ArrayBuffer, ListBuffer} >>> import scala.runtime.AbstractFunction0 >>> import scala.tools.nsc.Settings >>> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results} >>> >>> class FlinkInterpreter extends Interpreter { >>> private var bufferedReader: Option[BufferedReader] = None >>> private var jprintWriter: JPrintWriter = _ >>> private val config = new Configuration; >>> private var cluster: LocalFlinkMiniCluster = _ >>> @BeanProperty var imain: IMain = _ >>> @BeanProperty var flinkILoop: FlinkILoop = _ >>> private var out: ByteBufOutputStream = null >>> private var outBuf: ByteBuf = null >>> private var in: ByteBufInputStream = _ >>> private var isRunning: AtomicBoolean = new AtomicBoolean(false) >>> >>> override def isOpen: Boolean = { >>> isRunning.get() >>> } >>> >>> def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = { >>> config.toMap.toMap.foreach(println) >>> config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1) >>> config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) >>> config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1) >>> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0) >>> config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true) >>> config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) >>> val localCluster = new LocalFlinkMiniCluster(config, false) >>> localCluster.start(true) >>> val port = >>> AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port >>> println(s"Starting local Flink cluster (host: localhost,port: >>> ${localCluster.getLeaderRPCPort}).\n") >>> ("localhost", localCluster.getLeaderRPCPort, localCluster) >>> } >>> >>> >>> /** >>> * Start flink cluster and create interpreter >>> */ >>> override def open: Unit = { >>> outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480) >>> out = new ByteBufOutputStream(outBuf) >>> in = new ByteBufInputStream(outBuf) >>> // val (host, port, yarnCluster) = >>> deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1), >>> None)) >>> val (host, port, localCluster) = startLocalMiniCluster() >>> this.cluster = localCluster >>> val conf = cluster.configuration >>> println(s"Connecting to Flink cluster (host:$host,port:$port)...") >>> flinkILoop = new FlinkILoop(host, port, conf, None) >>> val settings = new Settings() >>> settings.usejavacp.value = true >>> settings.Yreplsync.value = true >>> flinkILoop.settings_$eq(settings) >>> flinkILoop.createInterpreter() >>> imain = flinkILoop.intp >>> FlinkInterpreter.ourClassloader = imain.classLoader >>> val benv = flinkILoop.scalaBenv >>> val senv = flinkILoop.scalaSenv >>> benv.getConfig.disableSysoutLogging() >>> senv.getConfig.disableSysoutLogging() >>> // import libraries >>> imain.interpret("import scala.tools.nsc.io._") >>> // imain.interpret("import Properties.userHome") >>> imain.interpret("import scala.compat.Platform.EOL") >>> imain.interpret("import org.apache.flink.api.scala._") >>> imain.interpret("import org.apache.flink.api.common.functions._") >>> isRunning.set(true) >>> } >>> >>> override def interpret(line: String): InterpreterResult = { >>> if (line == null || line.trim.length == 0) { >>> return new InterpreterResult(Code.SUCCESS) >>> } >>> interpret(line.split("\n")) >>> } >>> >>> /** >>> * Interprete code >>> * @param lines >>> * @return >>> */ >>> def interpret(lines: Array[String]): InterpreterResult = { >>> val imain: IMain = getImain >>> val linesToRun: Array[String] = new Array[String](lines.length + 1) >>> for (i <- 0 until lines.length) { >>> linesToRun(i) = lines(i) >>> } >>> linesToRun(lines.length) = "print(\"\")" >>> System.setOut(new PrintStream(out)) >>> out.buffer().clear() >>> var r: Code = null >>> var incomplete: String = "" >>> var inComment: Boolean = false >>> for (l <- 0 until linesToRun.length) { >>> val s: String = linesToRun(l) >>> var continuation: Boolean = false >>> if (l + 1 < linesToRun.length) { >>> val nextLine: String = linesToRun(l + 1).trim >>> if (nextLine.isEmpty || >>> nextLine.startsWith("//") || >>> nextLine.startsWith("}") || >>> nextLine.startsWith("object")) { >>> continuation = true >>> } else if (!inComment && nextLine.startsWith("/*")) { >>> inComment = true >>> continuation = true >>> } else if (!inComment && nextLine.lastIndexOf("*/") >= 0) { >>> inComment = false >>> continuation = true >>> } else if (nextLine.length > 1 && >>> nextLine.charAt(0) == '.' && >>> nextLine.charAt(1) != '.' && >>> nextLine.charAt(1) != '/') { >>> continuation = true >>> } else if (inComment) { >>> continuation = true >>> } >>> if (continuation) { >>> incomplete += s + "\n" >>> } >>> } >>> if (!continuation) { >>> val currentCommand: String = incomplete >>> var res: Results.Result = null >>> try { >>> res = Console.withOut(System.out)(new >>> AbstractFunction0[Results.Result] { >>> override def apply() = { >>> imain.interpret(currentCommand + s) >>> } >>> }.apply()) >>> } catch { >>> case e: Exception => >>> logError("Interpreter Exception ", e) >>> return new InterpreterResult(Code.ERROR, >>> InterpreterUtils.getMostRelevantMessage(e)) >>> } >>> r = getResultCode(res) >>> if (r == Code.ERROR) { >>> return new InterpreterResult(r, out.toString) >>> } else if (r eq Code.INCOMPLETE) { >>> incomplete += s + "\n" >>> } else { >>> incomplete = "" >>> } >>> } >>> } >>> >>> if (r eq Code.INCOMPLETE) { >>> return new InterpreterResult(r, "Incomplete expression") >>> } >>> else { >>> return new InterpreterResult(r, >>> out.buffer().toString(Charset.forName("utf-8"))) >>> } >>> } >>> >>> private def getResultCode(r: Results.Result): Code = { >>> if (r.isInstanceOf[Results.Success.type]) { >>> return Code.SUCCESS >>> } >>> else if (r.isInstanceOf[Results.Incomplete.type]) { >>> return Code.INCOMPLETE >>> } >>> else { >>> return Code.ERROR >>> } >>> } >>> >>> } >>> } >>> >>> object FlinkInterpreter extends Logging { >>> var ourClassloader: ClassLoader = _ >>> >>> def main(args: Array[String]): Unit = { >>> val interpreter: FlinkInterpreter = new FlinkInterpreter >>> val code = >>> """ >>> |val dataStream = senv.fromElements(1,2,3,4,5) >>> |dataStream.countWindowAll(2).sum(0).print() >>> |senv.execute("My streaming program") >>> """.stripMargin >>> interpreter.open >>> val result = interpreter.interpret(code) >>> } >>> } >>> >>> The error messages i got are: >>> … >>> … >>> ... >>> [WARN] [17/09/13 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager] >>> Discard message >>> LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: >>> 1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES)) >>> because the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7 >>> did not equal the received leader session ID >>> 00000000-0000-0000-0000-000000000000. >>> [INFO] [17/09/13 12:05:52] >>> [org.apache.flink.runtime.client.JobSubmissionClientActor] Terminate >>> JobClientActor. >>> [INFO] [17/09/13 12:05:52] >>> [org.apache.flink.runtime.client.JobSubmissionClientActor] Disconnect from >>> JobManager Actor[akka.tcp://flink@localhost:63522/user/jobmanager#82627940]. >>> [INFO] [17/09/13 12:05:52] >>> [akka.remote.RemoteActorRefProvider$RemotingTerminator] Shutting down >>> remote daemon. >>> [INFO] [17/09/13 12:05:52] >>> [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remote daemon shut >>> down; proceeding with flushing remote transports. >>> [INFO] [17/09/13 12:05:52] >>> [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remoting shut down. >>> org.apache.flink.client.program.ProgramInvocationException: The program >>> execution failed: Couldn't retrieve the JobExecutionResult from the >>> JobManager. >>> at >>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478) >>> at >>> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105) >>> at >>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442) >>> at >>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:434) >>> at >>> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212) >>> at >>> org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.executeRemotely(ScalaShellRemoteStreamEnvironment.java:87) >>> at >>> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176) >>> at >>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638) >>> ... 34 elided >>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't >>> retrieve the JobExecutionResult from the JobManager. >>> at >>> org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309) >>> at >>> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396) >>> at >>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467) >>> ... 41 more >>> Caused by: >>> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: >>> Job submission to the JobManager timed out. You may increase >>> 'akka.client.timeout' in case the JobManager needs more time to configure >>> and confirm the job submission. >>> at >>> org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119) >>> at >>> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251) >>> at >>> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89) >>> at >>> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68) >>> at >>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >>> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >>> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >>> at >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> >>> >>> >>> >>> >> >> >