Hi Till,
Thanks for your answer,it worked when i use StandaloneMiniCluster,but
another problem is that i can’t find a way to cancel
a running Flink job without shutting down the cluster,for LocalFlinkMiniCluster
i can do it with below code :
for (job <- cluster.getCurrentlyRunningJobsJava()) {
cluster.stopJob(job)
}
Is it possible to cancel a running Flink job without shutting down a
StandaloneMiniCluster ?
Best Regards,
XiangWei
> 在 2017年9月14日,16:58,Till Rohrmann <[email protected]> 写道:
>
> Hi XiangWei,
>
> the problem is that the LocalFlinkMiniCluster can no longer be used in
> combination with a RemoteExecutionEnvironment. The reason is that the
> LocalFlinkMiniCluster uses now an internal leader election service and
> assigns leader ids to its components. Since this is an internal service it is
> not possible to retrieve this information like it is the case with the
> ZooKeeper based leader election services.
>
> Long story short, the Flink Scala shell currently does not work with a
> LocalFlinkMiniCluster and would have to be fixed to work properly together
> with a local execution environment. Until then, I recommend starting a local
> standalone cluster and let the code run there.
>
> Cheers,
> Till
>
>
> On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang <[email protected]
> <mailto:[email protected]>> wrote:
> dear all,
>
> Below is the code i execute:
>
> import java.io <http://java.io/>._
> import java.net <http://java.net/>.{URL, URLClassLoader}
> import java.nio.charset.Charset
> import java.util.Collections
> import java.util.concurrent.atomic.AtomicBoolean
>
> import com.netease.atom.common.util.logging.Logging
> import com.netease.atom.interpreter.Code.Code
> import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult,
> InterpreterUtils}
> import io.netty.buffer._
> import org.apache.flink.api.scala.FlinkILoop
> import org.apache.flink.client.CliFrontend
> import org.apache.flink.client.cli.CliFrontendParser
> import org.apache.flink.client.program.ClusterClient
> import org.apache.flink.configuration.{QueryableStateOptions, Configuration,
> ConfigConstants, GlobalConfiguration}
> import org.apache.flink.runtime.akka.AkkaUtils
> import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster,
> LocalFlinkMiniCluster}
>
> import scala.Console
> import scala.beans.BeanProperty
> import scala.collection.JavaConversions._
> import scala.collection.mutable
> import scala.collection.mutable.{ArrayBuffer, ListBuffer}
> import scala.runtime.AbstractFunction0
> import scala.tools.nsc.Settings
> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
>
> class FlinkInterpreter extends Interpreter {
> private var bufferedReader: Option[BufferedReader] = None
> private var jprintWriter: JPrintWriter = _
> private val config = new Configuration;
> private var cluster: LocalFlinkMiniCluster = _
> @BeanProperty var imain: IMain = _
> @BeanProperty var flinkILoop: FlinkILoop = _
> private var out: ByteBufOutputStream = null
> private var outBuf: ByteBuf = null
> private var in: ByteBufInputStream = _
> private var isRunning: AtomicBoolean = new AtomicBoolean(false)
>
> override def isOpen: Boolean = {
> isRunning.get()
> }
>
> def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
> config.toMap.toMap.foreach(println)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
> config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
> config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
> val localCluster = new LocalFlinkMiniCluster(config, false)
> localCluster.start(true)
> val port =
> AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port
> println(s"Starting local Flink cluster (host: localhost,port:
> ${localCluster.getLeaderRPCPort}).\n")
> ("localhost", localCluster.getLeaderRPCPort, localCluster)
> }
>
>
> /**
> * Start flink cluster and create interpreter
> */
> override def open: Unit = {
> outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
> out = new ByteBufOutputStream(outBuf)
> in = new ByteBufInputStream(outBuf)
> // val (host, port, yarnCluster) =
> deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1), None))
> val (host, port, localCluster) = startLocalMiniCluster()
> this.cluster = localCluster
> val conf = cluster.configuration
> println(s"Connecting to Flink cluster (host:$host,port:$port)...")
> flinkILoop = new FlinkILoop(host, port, conf, None)
> val settings = new Settings()
> settings.usejavacp.value = true
> settings.Yreplsync.value = true
> flinkILoop.settings_$eq(settings)
> flinkILoop.createInterpreter()
> imain = flinkILoop.intp
> FlinkInterpreter.ourClassloader = imain.classLoader
> val benv = flinkILoop.scalaBenv
> val senv = flinkILoop.scalaSenv
> benv.getConfig.disableSysoutLogging()
> senv.getConfig.disableSysoutLogging()
> // import libraries
> imain.interpret("import scala.tools.nsc.io
> <http://scala.tools.nsc.io/>._")
> // imain.interpret("import Properties.userHome")
> imain.interpret("import scala.compat.Platform.EOL")
> imain.interpret("import org.apache.flink.api.scala._")
> imain.interpret("import org.apache.flink.api.common.functions._")
> isRunning.set(true)
> }
>
> override def interpret(line: String): InterpreterResult = {
> if (line == null || line.trim.length == 0) {
> return new InterpreterResult(Code.SUCCESS)
> }
> interpret(line.split("\n"))
> }
>
> /**
> * Interprete code
> * @param lines
> * @return
> */
> def interpret(lines: Array[String]): InterpreterResult = {
> val imain: IMain = getImain
> val linesToRun: Array[String] = new Array[String](lines.length + 1)
> for (i <- 0 until lines.length) {
> linesToRun(i) = lines(i)
> }
> linesToRun(lines.length) = "print(\"\")"
> System.setOut(new PrintStream(out))
> out.buffer().clear()
> var r: Code = null
> var incomplete: String = ""
> var inComment: Boolean = false
> for (l <- 0 until linesToRun.length) {
> val s: String = linesToRun(l)
> var continuation: Boolean = false
> if (l + 1 < linesToRun.length) {
> val nextLine: String = linesToRun(l + 1).trim
> if (nextLine.isEmpty ||
> nextLine.startsWith("//") ||
> nextLine.startsWith("}") ||
> nextLine.startsWith("object")) {
> continuation = true
> } else if (!inComment && nextLine.startsWith("/*")) {
> inComment = true
> continuation = true
> } else if (!inComment && nextLine.lastIndexOf("*/") >= 0) {
> inComment = false
> continuation = true
> } else if (nextLine.length > 1 &&
> nextLine.charAt(0) == '.' &&
> nextLine.charAt(1) != '.' &&
> nextLine.charAt(1) != '/') {
> continuation = true
> } else if (inComment) {
> continuation = true
> }
> if (continuation) {
> incomplete += s + "\n"
> }
> }
> if (!continuation) {
> val currentCommand: String = incomplete
> var res: Results.Result = null
> try {
> res = Console.withOut(System.out)(new
> AbstractFunction0[Results.Result] {
> override def apply() = {
> imain.interpret(currentCommand + s)
> }
> }.apply())
> } catch {
> case e: Exception =>
> logError("Interpreter Exception ", e)
> return new InterpreterResult(Code.ERROR,
> InterpreterUtils.getMostRelevantMessage(e))
> }
> r = getResultCode(res)
> if (r == Code.ERROR) {
> return new InterpreterResult(r, out.toString)
> } else if (r eq Code.INCOMPLETE) {
> incomplete += s + "\n"
> } else {
> incomplete = ""
> }
> }
> }
>
> if (r eq Code.INCOMPLETE) {
> return new InterpreterResult(r, "Incomplete expression")
> }
> else {
> return new InterpreterResult(r,
> out.buffer().toString(Charset.forName("utf-8")))
> }
> }
>
> private def getResultCode(r: Results.Result): Code = {
> if (r.isInstanceOf[Results.Success.type]) {
> return Code.SUCCESS
> }
> else if (r.isInstanceOf[Results.Incomplete.type]) {
> return Code.INCOMPLETE
> }
> else {
> return Code.ERROR
> }
> }
>
> }
> }
>
> object FlinkInterpreter extends Logging {
> var ourClassloader: ClassLoader = _
>
> def main(args: Array[String]): Unit = {
> val interpreter: FlinkInterpreter = new FlinkInterpreter
> val code =
> """
> |val dataStream = senv.fromElements(1,2,3,4,5)
> |dataStream.countWindowAll(2).sum(0).print()
> |senv.execute("My streaming program")
> """.stripMargin
> interpreter.open
> val result = interpreter.interpret(code)
> }
> }
>
> The error messages i got are:
> …
> …
> ...
> [WARN] [17/09/13 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager]
> Discard message
> LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId:
> 1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES))
> because the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7
> did not equal the received leader session ID
> 00000000-0000-0000-0000-000000000000.
> [INFO] [17/09/13 12:05:52]
> [org.apache.flink.runtime.client.JobSubmissionClientActor] Terminate
> JobClientActor.
> [INFO] [17/09/13 12:05:52]
> [org.apache.flink.runtime.client.JobSubmissionClientActor] Disconnect from
> JobManager Actor[akka.tcp://flink@localhost:63522/user/jobmanager#82627940
> <>].
> [INFO] [17/09/13 12:05:52]
> [akka.remote.RemoteActorRefProvider$RemotingTerminator] Shutting down remote
> daemon.
> [INFO] [17/09/13 12:05:52]
> [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remote daemon shut
> down; proceeding with flushing remote transports.
> [INFO] [17/09/13 12:05:52]
> [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remoting shut down.
> org.apache.flink.client.program.ProgramInvocationException: The program
> execution failed: Couldn't retrieve the JobExecutionResult from the
> JobManager.
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
> at
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:434)
> at
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212)
> at
> org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.executeRemotely(ScalaShellRemoteStreamEnvironment.java:87)
> at
> org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176)
> at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638)
> ... 34 elided
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't
> retrieve the JobExecutionResult from the JobManager.
> at
> org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
> at
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
> at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
> ... 41 more
> Caused by:
> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job
> submission to the JobManager timed out. You may increase
> 'akka.client.timeout' in case the JobManager needs more time to configure and
> confirm the job submission.
> at
> org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
> at
> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
> at
> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
> at
> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>
>