Hi Till,
I’ve found that a StandaloneMiniCluster doesn’t startup web fronted
when it is running.so,how can i cancel a running job on it with restful method.
Cheers,
Till
> 在 2017年9月20日,15:43,Till Rohrmann <[email protected]> 写道:
>
> Hi XiangWei,
>
> programmatically there is no nice tooling yet to cancel jobs on a dedicated
> cluster. What you can do is to use Flink's REST API to issue a cancel command
> [1]. You have to send a GET request to the target URL `/jobs/:jobid/cancel`.
> In the future we will improve the programmatic job control which will allow
> you to do these kind of things more easily.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#job-cancellation
>
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#job-cancellation>
>
> Cheers,
> Till
>
> On Wed, Sep 20, 2017 at 5:46 AM, XiangWei Huang <[email protected]
> <mailto:[email protected]>> wrote:
> Hi Till,
>
> Thanks for your answer,it worked when i use StandaloneMiniCluster,but
> another problem is that i can’t find a way to cancel
> a running Flink job without shutting down the cluster,for
> LocalFlinkMiniCluster i can do it with below code :
>
> for (job <- cluster.getCurrentlyRunningJobsJava()) {
> cluster.stopJob(job)
> }
>
> Is it possible to cancel a running Flink job without shutting down a
> StandaloneMiniCluster ?
>
> Best Regards,
> XiangWei
>
>
>
>> 在 2017年9月14日,16:58,Till Rohrmann <[email protected]
>> <mailto:[email protected]>> 写道:
>>
>> Hi XiangWei,
>>
>> the problem is that the LocalFlinkMiniCluster can no longer be used in
>> combination with a RemoteExecutionEnvironment. The reason is that the
>> LocalFlinkMiniCluster uses now an internal leader election service and
>> assigns leader ids to its components. Since this is an internal service it
>> is not possible to retrieve this information like it is the case with the
>> ZooKeeper based leader election services.
>>
>> Long story short, the Flink Scala shell currently does not work with a
>> LocalFlinkMiniCluster and would have to be fixed to work properly together
>> with a local execution environment. Until then, I recommend starting a local
>> standalone cluster and let the code run there.
>>
>> Cheers,
>> Till
>>
>>
>> On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang <[email protected]
>> <mailto:[email protected]>> wrote:
>> dear all,
>>
>> Below is the code i execute:
>>
>> import java.io <http://java.io/>._
>> import java.net <http://java.net/>.{URL, URLClassLoader}
>> import java.nio.charset.Charset
>> import java.util.Collections
>> import java.util.concurrent.at
>> <http://java.util.concurrent.at/>omic.AtomicBoolean
>>
>> import com.netease.atom.common.util.logging.Logging
>> import com.netease.atom.interpreter.Code.Code
>> import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult,
>> InterpreterUtils}
>> import io.netty.buffer._
>> import org.apache.flink.api.sc
>> <http://org.apache.flink.api.sc/>ala.FlinkILoop
>> import org.apache.flink.client.CliFrontend
>> import org.apache.flink.client.cli.CliFrontendParser
>> import org.apache.flink.client.program.ClusterClient
>> import org.apache.flink.configuration.{QueryableStateOptions, Configuration,
>> ConfigConstants, GlobalConfiguration}
>> import org.apache.flink.runtime.akka.AkkaUtils
>> import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster,
>> LocalFlinkMiniCluster}
>>
>> import scala.Console
>> import scala.beans.BeanProperty
>> import scala.collection.JavaConversions._
>> import scala.collection.mutable
>> import scala.collection.mutable.{ArrayBuffer, ListBuffer}
>> import scala.runtime.AbstractFunction0
>> import scala.tools.nsc.Settings
>> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
>>
>> class FlinkInterpreter extends Interpreter {
>> private var bufferedReader: Option[BufferedReader] = None
>> private var jprintWriter: JPrintWriter = _
>> private val config = new Configuration;
>> private var cluster: LocalFlinkMiniCluster = _
>> @BeanProperty var imain: IMain = _
>> @BeanProperty var flinkILoop: FlinkILoop = _
>> private var out: ByteBufOutputStream = null
>> private var outBuf: ByteBuf = null
>> private var in: ByteBufInputStream = _
>> private var isRunning: AtomicBoolean = new AtomicBoolean(false)
>>
>> override def isOpen: Boolean = {
>> isRunning.get()
>> }
>>
>> def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
>> config.toMap.toMap.foreach(println)
>> config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
>> config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
>> config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
>> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
>> config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
>> config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
>> val localCluster = new LocalFlinkMiniCluster(config, false)
>> localCluster.start(true)
>> val port =
>> AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port
>> println(s"Starting local Flink cluster (host: localhost,port:
>> ${localCluster.getLeaderRPCPort}).\n")
>> ("localhost", localCluster.getLeaderRPCPort, localCluster)
>> }
>>
>>
>> /**
>> * Start flink cluster and create interpreter
>> */
>> override def open: Unit = {
>> outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
>> out = new ByteBufOutputStream(outBuf)
>> in = new ByteBufInputStream(outBuf)
>> // val (host, port, yarnCluster) =
>> deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1),
>> None))
>> val (host, port, localCluster) = startLocalMiniCluster()
>> this.cluster = localCluster
>> val conf = cluster.configuration
>> println(s"Connecting to Flink cluster (host:$host,port:$port)...")
>> flinkILoop = new FlinkILoop(host, port, conf, None)
>> val settings = new Settings()
>> settings.usejavacp.value = true
>> settings.Yreplsync.value = true
>> flinkILoop.settings_$eq(settings)
>> flinkILoop.createInterpreter()
>> imain = flinkILoop.intp
>> FlinkInterpreter.ourClassloader = imain.classLoader
>> val benv = flinkILoop.scalaBenv
>> val senv = flinkILoop.scalaSenv
>> benv.getConfig.disableSysoutLogging()
>> senv.getConfig.disableSysoutLogging()
>> // import libraries
>> imain.interpret("import scala.tools.nsc.io
>> <http://scala.tools.nsc.io/>._")
>> // imain.interpret("import Properties.userHome")
>> imain.interpret("import scala.compat.Platform.EOL")
>> imain.interpret("import org.apache.flink.api.scala._")
>> imain.interpret("import org.apache.flink.api.common.functions._")
>> isRunning.set(true)
>> }
>>
>> override def interpret(line: String): InterpreterResult = {
>> if (line == null || line.trim.length == 0) {
>> return new InterpreterResult(Code.SUCCESS)
>> }
>> interpret(line.split("\n"))
>> }
>>
>> /**
>> * Interprete code
>> * @param lines
>> * @return
>> */
>> def interpret(lines: Array[String]): InterpreterResult = {
>> val imain: IMain = getImain
>> val linesToRun: Array[String] = new Array[String](lines.length + 1)
>> for (i <- 0 until lines.length) {
>> linesToRun(i) = lines(i)
>> }
>> linesToRun(lines.length) = "print(\"\")"
>> System.setOut(new PrintStream(out))
>> out.buffer().clear()
>> var r: Code = null
>> var incomplete: String = ""
>> var inComment: Boolean = false
>> for (l <- 0 until linesToRun.length) {
>> val s: String = linesToRun(l)
>> var continuation: Boolean = false
>> if (l + 1 < linesToRun.length) {
>> val nextLine: String = linesToRun(l + 1).trim
>> if (nextLine.isEmpty ||
>> nextLine.startsWith("//") ||
>> nextLine.startsWith("}") ||
>> nextLine.startsWith("object")) {
>> continuation = true
>> } else if (!inComment && nextLine.startsWith("/*")) {
>> inComment = true
>> continuation = true
>> } else if (!inComment && nextLine.lastIndexOf("*/") >= 0) {
>> inComment = false
>> continuation = true
>> } else if (nextLine.length > 1 &&
>> nextLine.charAt(0) == '.' &&
>> nextLine.charAt(1) != '.' &&
>> nextLine.charAt(1) != '/') {
>> continuation = true
>> } else if (inComment) {
>> continuation = true
>> }
>> if (continuation) {
>> incomplete += s + "\n"
>> }
>> }
>> if (!continuation) {
>> val currentCommand: String = incomplete
>> var res: Results.Result = null
>> try {
>> res = Console.withOut(System.out)(new
>> AbstractFunction0[Results.Result] {
>> override def apply() = {
>> imain.interpret(currentCommand + s)
>> }
>> }.apply())
>> } catch {
>> case e: Exception =>
>> logError("Interpreter Exception ", e)
>> return new InterpreterResult(Code.ERROR,
>> InterpreterUtils.getMostRelevantMessage(e))
>> }
>> r = getResultCode(res)
>> if (r == Code.ERROR) {
>> return new InterpreterResult(r, out.toString)
>> } else if (r eq Code.INCOMPLETE) {
>> incomplete += s + "\n"
>> } else {
>> incomplete = ""
>> }
>> }
>> }
>>
>> if (r eq Code.INCOMPLETE) {
>> return new InterpreterResult(r, "Incomplete expression")
>> }
>> else {
>> return new InterpreterResult(r,
>> out.buffer().toString(Charset.forName("utf-8")))
>> }
>> }
>>
>> private def getResultCode(r: Results.Result): Code = {
>> if (r.isInstanceOf[Results.Success.type]) {
>> return Code.SUCCESS
>> }
>> else if (r.isInstanceOf[Results.Incomplete.type]) {
>> return Code.INCOMPLETE
>> }
>> else {
>> return Code.ERROR
>> }
>> }
>>
>> }
>> }
>>
>> object FlinkInterpreter extends Logging {
>> var ourClassloader: ClassLoader = _
>>
>> def main(args: Array[String]): Unit = {
>> val interpreter: FlinkInterpreter = new FlinkInterpreter
>> val code =
>> """
>> |val dataStream = senv.fromElements(1,2,3,4,5)
>> |dataStream.countWindowAll(2).sum(0).print()
>> |senv.execute("My streaming program")
>> """.stripMargin
>> interpreter.open
>> val result = interpreter.interpret(code)
>> }
>> }
>>
>> The error messages i got are:
>> …
>> …
>> ...
>> [WARN] [17/09/13 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager]
>> Discard message
>> LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId:
>> 1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES))
>> because the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7
>> did not equal the received leader session ID
>> 00000000-0000-0000-0000-000000000000.
>> [INFO] [17/09/13 12:05:52]
>> [org.apache.flink.runtime.client.JobSubmissionClientActor] Terminate
>> JobClientActor.
>> [INFO] [17/09/13 12:05:52]
>> [org.apache.flink.runtime.client.JobSubmissionClientActor] Disconnect from
>> JobManager Actor[akka.tcp://flink@localhost:63522/user/jobmanager#82627940
>> <>].
>> [INFO] [17/09/13 12:05:52]
>> [akka.remote.RemoteActorRefProvider$RemotingTerminator] Shutting down remote
>> daemon.
>> [INFO] [17/09/13 12:05:52]
>> [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remote daemon shut
>> down; proceeding with flushing remote transports.
>> [INFO] [17/09/13 12:05:52]
>> [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remoting shut down.
>> org.apache.flink.client.program.ProgramInvocationException: The program
>> execution failed: Couldn't retrieve the JobExecutionResult from the
>> JobManager.
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
>> at
>> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:434)
>> at
>> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212)
>> at
>> org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.executeRemotely(ScalaShellRemoteStreamEnvironment.java:87)
>> at
>> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176)
>> at
>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
>> ... 34 elided
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't
>> retrieve the JobExecutionResult from the JobManager.
>> at
>> org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
>> at
>> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
>> at
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
>> ... 41 more
>> Caused by:
>> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException:
>> Job submission to the JobManager timed out. You may increase
>> 'akka.client.timeout' in case the JobManager needs more time to configure
>> and confirm the job submission.
>> at
>> org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
>> at
>> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
>> at
>> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
>> at
>> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
>> at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>>
>>
>>
>
>