Hi Till, I’ve found that a StandaloneMiniCluster doesn’t startup web fronted when it is running.so,how can i cancel a running job on it with restful method.
Cheers, Till > 在 2017年9月20日,15:43,Till Rohrmann <trohrm...@apache.org> 写道: > > Hi XiangWei, > > programmatically there is no nice tooling yet to cancel jobs on a dedicated > cluster. What you can do is to use Flink's REST API to issue a cancel command > [1]. You have to send a GET request to the target URL `/jobs/:jobid/cancel`. > In the future we will improve the programmatic job control which will allow > you to do these kind of things more easily. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#job-cancellation > > <https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#job-cancellation> > > Cheers, > Till > > On Wed, Sep 20, 2017 at 5:46 AM, XiangWei Huang <xw.huang...@gmail.com > <mailto:xw.huang...@gmail.com>> wrote: > Hi Till, > > Thanks for your answer,it worked when i use StandaloneMiniCluster,but > another problem is that i can’t find a way to cancel > a running Flink job without shutting down the cluster,for > LocalFlinkMiniCluster i can do it with below code : > > for (job <- cluster.getCurrentlyRunningJobsJava()) { > cluster.stopJob(job) > } > > Is it possible to cancel a running Flink job without shutting down a > StandaloneMiniCluster ? > > Best Regards, > XiangWei > > > >> 在 2017年9月14日,16:58,Till Rohrmann <trohrm...@apache.org >> <mailto:trohrm...@apache.org>> 写道: >> >> Hi XiangWei, >> >> the problem is that the LocalFlinkMiniCluster can no longer be used in >> combination with a RemoteExecutionEnvironment. The reason is that the >> LocalFlinkMiniCluster uses now an internal leader election service and >> assigns leader ids to its components. Since this is an internal service it >> is not possible to retrieve this information like it is the case with the >> ZooKeeper based leader election services. >> >> Long story short, the Flink Scala shell currently does not work with a >> LocalFlinkMiniCluster and would have to be fixed to work properly together >> with a local execution environment. Until then, I recommend starting a local >> standalone cluster and let the code run there. >> >> Cheers, >> Till >> >> >> On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang <xw.huang...@gmail.com >> <mailto:xw.huang...@gmail.com>> wrote: >> dear all, >> >> Below is the code i execute: >> >> import java.io <http://java.io/>._ >> import java.net <http://java.net/>.{URL, URLClassLoader} >> import java.nio.charset.Charset >> import java.util.Collections >> import java.util.concurrent.at >> <http://java.util.concurrent.at/>omic.AtomicBoolean >> >> import com.netease.atom.common.util.logging.Logging >> import com.netease.atom.interpreter.Code.Code >> import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult, >> InterpreterUtils} >> import io.netty.buffer._ >> import org.apache.flink.api.sc >> <http://org.apache.flink.api.sc/>ala.FlinkILoop >> import org.apache.flink.client.CliFrontend >> import org.apache.flink.client.cli.CliFrontendParser >> import org.apache.flink.client.program.ClusterClient >> import org.apache.flink.configuration.{QueryableStateOptions, Configuration, >> ConfigConstants, GlobalConfiguration} >> import org.apache.flink.runtime.akka.AkkaUtils >> import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, >> LocalFlinkMiniCluster} >> >> import scala.Console >> import scala.beans.BeanProperty >> import scala.collection.JavaConversions._ >> import scala.collection.mutable >> import scala.collection.mutable.{ArrayBuffer, ListBuffer} >> import scala.runtime.AbstractFunction0 >> import scala.tools.nsc.Settings >> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results} >> >> class FlinkInterpreter extends Interpreter { >> private var bufferedReader: Option[BufferedReader] = None >> private var jprintWriter: JPrintWriter = _ >> private val config = new Configuration; >> private var cluster: LocalFlinkMiniCluster = _ >> @BeanProperty var imain: IMain = _ >> @BeanProperty var flinkILoop: FlinkILoop = _ >> private var out: ByteBufOutputStream = null >> private var outBuf: ByteBuf = null >> private var in: ByteBufInputStream = _ >> private var isRunning: AtomicBoolean = new AtomicBoolean(false) >> >> override def isOpen: Boolean = { >> isRunning.get() >> } >> >> def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = { >> config.toMap.toMap.foreach(println) >> config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1) >> config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) >> config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1) >> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0) >> config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true) >> config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) >> val localCluster = new LocalFlinkMiniCluster(config, false) >> localCluster.start(true) >> val port = >> AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port >> println(s"Starting local Flink cluster (host: localhost,port: >> ${localCluster.getLeaderRPCPort}).\n") >> ("localhost", localCluster.getLeaderRPCPort, localCluster) >> } >> >> >> /** >> * Start flink cluster and create interpreter >> */ >> override def open: Unit = { >> outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480) >> out = new ByteBufOutputStream(outBuf) >> in = new ByteBufInputStream(outBuf) >> // val (host, port, yarnCluster) = >> deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1), >> None)) >> val (host, port, localCluster) = startLocalMiniCluster() >> this.cluster = localCluster >> val conf = cluster.configuration >> println(s"Connecting to Flink cluster (host:$host,port:$port)...") >> flinkILoop = new FlinkILoop(host, port, conf, None) >> val settings = new Settings() >> settings.usejavacp.value = true >> settings.Yreplsync.value = true >> flinkILoop.settings_$eq(settings) >> flinkILoop.createInterpreter() >> imain = flinkILoop.intp >> FlinkInterpreter.ourClassloader = imain.classLoader >> val benv = flinkILoop.scalaBenv >> val senv = flinkILoop.scalaSenv >> benv.getConfig.disableSysoutLogging() >> senv.getConfig.disableSysoutLogging() >> // import libraries >> imain.interpret("import scala.tools.nsc.io >> <http://scala.tools.nsc.io/>._") >> // imain.interpret("import Properties.userHome") >> imain.interpret("import scala.compat.Platform.EOL") >> imain.interpret("import org.apache.flink.api.scala._") >> imain.interpret("import org.apache.flink.api.common.functions._") >> isRunning.set(true) >> } >> >> override def interpret(line: String): InterpreterResult = { >> if (line == null || line.trim.length == 0) { >> return new InterpreterResult(Code.SUCCESS) >> } >> interpret(line.split("\n")) >> } >> >> /** >> * Interprete code >> * @param lines >> * @return >> */ >> def interpret(lines: Array[String]): InterpreterResult = { >> val imain: IMain = getImain >> val linesToRun: Array[String] = new Array[String](lines.length + 1) >> for (i <- 0 until lines.length) { >> linesToRun(i) = lines(i) >> } >> linesToRun(lines.length) = "print(\"\")" >> System.setOut(new PrintStream(out)) >> out.buffer().clear() >> var r: Code = null >> var incomplete: String = "" >> var inComment: Boolean = false >> for (l <- 0 until linesToRun.length) { >> val s: String = linesToRun(l) >> var continuation: Boolean = false >> if (l + 1 < linesToRun.length) { >> val nextLine: String = linesToRun(l + 1).trim >> if (nextLine.isEmpty || >> nextLine.startsWith("//") || >> nextLine.startsWith("}") || >> nextLine.startsWith("object")) { >> continuation = true >> } else if (!inComment && nextLine.startsWith("/*")) { >> inComment = true >> continuation = true >> } else if (!inComment && nextLine.lastIndexOf("*/") >= 0) { >> inComment = false >> continuation = true >> } else if (nextLine.length > 1 && >> nextLine.charAt(0) == '.' && >> nextLine.charAt(1) != '.' && >> nextLine.charAt(1) != '/') { >> continuation = true >> } else if (inComment) { >> continuation = true >> } >> if (continuation) { >> incomplete += s + "\n" >> } >> } >> if (!continuation) { >> val currentCommand: String = incomplete >> var res: Results.Result = null >> try { >> res = Console.withOut(System.out)(new >> AbstractFunction0[Results.Result] { >> override def apply() = { >> imain.interpret(currentCommand + s) >> } >> }.apply()) >> } catch { >> case e: Exception => >> logError("Interpreter Exception ", e) >> return new InterpreterResult(Code.ERROR, >> InterpreterUtils.getMostRelevantMessage(e)) >> } >> r = getResultCode(res) >> if (r == Code.ERROR) { >> return new InterpreterResult(r, out.toString) >> } else if (r eq Code.INCOMPLETE) { >> incomplete += s + "\n" >> } else { >> incomplete = "" >> } >> } >> } >> >> if (r eq Code.INCOMPLETE) { >> return new InterpreterResult(r, "Incomplete expression") >> } >> else { >> return new InterpreterResult(r, >> out.buffer().toString(Charset.forName("utf-8"))) >> } >> } >> >> private def getResultCode(r: Results.Result): Code = { >> if (r.isInstanceOf[Results.Success.type]) { >> return Code.SUCCESS >> } >> else if (r.isInstanceOf[Results.Incomplete.type]) { >> return Code.INCOMPLETE >> } >> else { >> return Code.ERROR >> } >> } >> >> } >> } >> >> object FlinkInterpreter extends Logging { >> var ourClassloader: ClassLoader = _ >> >> def main(args: Array[String]): Unit = { >> val interpreter: FlinkInterpreter = new FlinkInterpreter >> val code = >> """ >> |val dataStream = senv.fromElements(1,2,3,4,5) >> |dataStream.countWindowAll(2).sum(0).print() >> |senv.execute("My streaming program") >> """.stripMargin >> interpreter.open >> val result = interpreter.interpret(code) >> } >> } >> >> The error messages i got are: >> … >> … >> ... >> [WARN] [17/09/13 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager] >> Discard message >> LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: >> 1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES)) >> because the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7 >> did not equal the received leader session ID >> 00000000-0000-0000-0000-000000000000. >> [INFO] [17/09/13 12:05:52] >> [org.apache.flink.runtime.client.JobSubmissionClientActor] Terminate >> JobClientActor. >> [INFO] [17/09/13 12:05:52] >> [org.apache.flink.runtime.client.JobSubmissionClientActor] Disconnect from >> JobManager Actor[akka.tcp://flink@localhost:63522/user/jobmanager#82627940 >> <>]. >> [INFO] [17/09/13 12:05:52] >> [akka.remote.RemoteActorRefProvider$RemotingTerminator] Shutting down remote >> daemon. >> [INFO] [17/09/13 12:05:52] >> [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remote daemon shut >> down; proceeding with flushing remote transports. >> [INFO] [17/09/13 12:05:52] >> [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remoting shut down. >> org.apache.flink.client.program.ProgramInvocationException: The program >> execution failed: Couldn't retrieve the JobExecutionResult from the >> JobManager. >> at >> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478) >> at >> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105) >> at >> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442) >> at >> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:434) >> at >> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212) >> at >> org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.executeRemotely(ScalaShellRemoteStreamEnvironment.java:87) >> at >> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176) >> at >> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638) >> ... 34 elided >> Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't >> retrieve the JobExecutionResult from the JobManager. >> at >> org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309) >> at >> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396) >> at >> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467) >> ... 41 more >> Caused by: >> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: >> Job submission to the JobManager timed out. You may increase >> 'akka.client.timeout' in case the JobManager needs more time to configure >> and confirm the job submission. >> at >> org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119) >> at >> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251) >> at >> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89) >> at >> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68) >> at >> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> >> >> >> > >