Hi Till, Thanks for your answer,it worked when i use StandaloneMiniCluster,but another problem is that i can’t find a way to cancel a running Flink job without shutting down the cluster,for LocalFlinkMiniCluster i can do it with below code :
for (job <- cluster.getCurrentlyRunningJobsJava()) { cluster.stopJob(job) } Is it possible to cancel a running Flink job without shutting down a StandaloneMiniCluster ? Best Regards, XiangWei > 在 2017年9月14日,16:58,Till Rohrmann <trohrm...@apache.org> 写道: > > Hi XiangWei, > > the problem is that the LocalFlinkMiniCluster can no longer be used in > combination with a RemoteExecutionEnvironment. The reason is that the > LocalFlinkMiniCluster uses now an internal leader election service and > assigns leader ids to its components. Since this is an internal service it is > not possible to retrieve this information like it is the case with the > ZooKeeper based leader election services. > > Long story short, the Flink Scala shell currently does not work with a > LocalFlinkMiniCluster and would have to be fixed to work properly together > with a local execution environment. Until then, I recommend starting a local > standalone cluster and let the code run there. > > Cheers, > Till > > > On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang <xw.huang...@gmail.com > <mailto:xw.huang...@gmail.com>> wrote: > dear all, > > Below is the code i execute: > > import java.io <http://java.io/>._ > import java.net <http://java.net/>.{URL, URLClassLoader} > import java.nio.charset.Charset > import java.util.Collections > import java.util.concurrent.atomic.AtomicBoolean > > import com.netease.atom.common.util.logging.Logging > import com.netease.atom.interpreter.Code.Code > import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult, > InterpreterUtils} > import io.netty.buffer._ > import org.apache.flink.api.scala.FlinkILoop > import org.apache.flink.client.CliFrontend > import org.apache.flink.client.cli.CliFrontendParser > import org.apache.flink.client.program.ClusterClient > import org.apache.flink.configuration.{QueryableStateOptions, Configuration, > ConfigConstants, GlobalConfiguration} > import org.apache.flink.runtime.akka.AkkaUtils > import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, > LocalFlinkMiniCluster} > > import scala.Console > import scala.beans.BeanProperty > import scala.collection.JavaConversions._ > import scala.collection.mutable > import scala.collection.mutable.{ArrayBuffer, ListBuffer} > import scala.runtime.AbstractFunction0 > import scala.tools.nsc.Settings > import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results} > > class FlinkInterpreter extends Interpreter { > private var bufferedReader: Option[BufferedReader] = None > private var jprintWriter: JPrintWriter = _ > private val config = new Configuration; > private var cluster: LocalFlinkMiniCluster = _ > @BeanProperty var imain: IMain = _ > @BeanProperty var flinkILoop: FlinkILoop = _ > private var out: ByteBufOutputStream = null > private var outBuf: ByteBuf = null > private var in: ByteBufInputStream = _ > private var isRunning: AtomicBoolean = new AtomicBoolean(false) > > override def isOpen: Boolean = { > isRunning.get() > } > > def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = { > config.toMap.toMap.foreach(println) > config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1) > config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1) > config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1) > config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0) > config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true) > config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) > val localCluster = new LocalFlinkMiniCluster(config, false) > localCluster.start(true) > val port = > AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port > println(s"Starting local Flink cluster (host: localhost,port: > ${localCluster.getLeaderRPCPort}).\n") > ("localhost", localCluster.getLeaderRPCPort, localCluster) > } > > > /** > * Start flink cluster and create interpreter > */ > override def open: Unit = { > outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480) > out = new ByteBufOutputStream(outBuf) > in = new ByteBufInputStream(outBuf) > // val (host, port, yarnCluster) = > deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1), None)) > val (host, port, localCluster) = startLocalMiniCluster() > this.cluster = localCluster > val conf = cluster.configuration > println(s"Connecting to Flink cluster (host:$host,port:$port)...") > flinkILoop = new FlinkILoop(host, port, conf, None) > val settings = new Settings() > settings.usejavacp.value = true > settings.Yreplsync.value = true > flinkILoop.settings_$eq(settings) > flinkILoop.createInterpreter() > imain = flinkILoop.intp > FlinkInterpreter.ourClassloader = imain.classLoader > val benv = flinkILoop.scalaBenv > val senv = flinkILoop.scalaSenv > benv.getConfig.disableSysoutLogging() > senv.getConfig.disableSysoutLogging() > // import libraries > imain.interpret("import scala.tools.nsc.io > <http://scala.tools.nsc.io/>._") > // imain.interpret("import Properties.userHome") > imain.interpret("import scala.compat.Platform.EOL") > imain.interpret("import org.apache.flink.api.scala._") > imain.interpret("import org.apache.flink.api.common.functions._") > isRunning.set(true) > } > > override def interpret(line: String): InterpreterResult = { > if (line == null || line.trim.length == 0) { > return new InterpreterResult(Code.SUCCESS) > } > interpret(line.split("\n")) > } > > /** > * Interprete code > * @param lines > * @return > */ > def interpret(lines: Array[String]): InterpreterResult = { > val imain: IMain = getImain > val linesToRun: Array[String] = new Array[String](lines.length + 1) > for (i <- 0 until lines.length) { > linesToRun(i) = lines(i) > } > linesToRun(lines.length) = "print(\"\")" > System.setOut(new PrintStream(out)) > out.buffer().clear() > var r: Code = null > var incomplete: String = "" > var inComment: Boolean = false > for (l <- 0 until linesToRun.length) { > val s: String = linesToRun(l) > var continuation: Boolean = false > if (l + 1 < linesToRun.length) { > val nextLine: String = linesToRun(l + 1).trim > if (nextLine.isEmpty || > nextLine.startsWith("//") || > nextLine.startsWith("}") || > nextLine.startsWith("object")) { > continuation = true > } else if (!inComment && nextLine.startsWith("/*")) { > inComment = true > continuation = true > } else if (!inComment && nextLine.lastIndexOf("*/") >= 0) { > inComment = false > continuation = true > } else if (nextLine.length > 1 && > nextLine.charAt(0) == '.' && > nextLine.charAt(1) != '.' && > nextLine.charAt(1) != '/') { > continuation = true > } else if (inComment) { > continuation = true > } > if (continuation) { > incomplete += s + "\n" > } > } > if (!continuation) { > val currentCommand: String = incomplete > var res: Results.Result = null > try { > res = Console.withOut(System.out)(new > AbstractFunction0[Results.Result] { > override def apply() = { > imain.interpret(currentCommand + s) > } > }.apply()) > } catch { > case e: Exception => > logError("Interpreter Exception ", e) > return new InterpreterResult(Code.ERROR, > InterpreterUtils.getMostRelevantMessage(e)) > } > r = getResultCode(res) > if (r == Code.ERROR) { > return new InterpreterResult(r, out.toString) > } else if (r eq Code.INCOMPLETE) { > incomplete += s + "\n" > } else { > incomplete = "" > } > } > } > > if (r eq Code.INCOMPLETE) { > return new InterpreterResult(r, "Incomplete expression") > } > else { > return new InterpreterResult(r, > out.buffer().toString(Charset.forName("utf-8"))) > } > } > > private def getResultCode(r: Results.Result): Code = { > if (r.isInstanceOf[Results.Success.type]) { > return Code.SUCCESS > } > else if (r.isInstanceOf[Results.Incomplete.type]) { > return Code.INCOMPLETE > } > else { > return Code.ERROR > } > } > > } > } > > object FlinkInterpreter extends Logging { > var ourClassloader: ClassLoader = _ > > def main(args: Array[String]): Unit = { > val interpreter: FlinkInterpreter = new FlinkInterpreter > val code = > """ > |val dataStream = senv.fromElements(1,2,3,4,5) > |dataStream.countWindowAll(2).sum(0).print() > |senv.execute("My streaming program") > """.stripMargin > interpreter.open > val result = interpreter.interpret(code) > } > } > > The error messages i got are: > … > … > ... > [WARN] [17/09/13 12:04:52] [org.apache.flink.runtime.jobmanager.JobManager] > Discard message > LeaderSessionMessage(00000000-0000-0000-0000-000000000000,SubmitJob(JobGraph(jobId: > 1b923a7a54c06ffa1c91d276a45be826),EXECUTION_RESULT_AND_STATE_CHANGES)) > because the expected leader session ID 678ef53b-ff25-4298-b566-9c2d9e7371c7 > did not equal the received leader session ID > 00000000-0000-0000-0000-000000000000. > [INFO] [17/09/13 12:05:52] > [org.apache.flink.runtime.client.JobSubmissionClientActor] Terminate > JobClientActor. > [INFO] [17/09/13 12:05:52] > [org.apache.flink.runtime.client.JobSubmissionClientActor] Disconnect from > JobManager Actor[akka.tcp://flink@localhost:63522/user/jobmanager#82627940 > <>]. > [INFO] [17/09/13 12:05:52] > [akka.remote.RemoteActorRefProvider$RemotingTerminator] Shutting down remote > daemon. > [INFO] [17/09/13 12:05:52] > [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remote daemon shut > down; proceeding with flushing remote transports. > [INFO] [17/09/13 12:05:52] > [akka.remote.RemoteActorRefProvider$RemotingTerminator] Remoting shut down. > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Couldn't retrieve the JobExecutionResult from the > JobManager. > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478) > at > org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:434) > at > org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.executeRemotely(RemoteStreamEnvironment.java:212) > at > org.apache.flink.api.java.ScalaShellRemoteStreamEnvironment.executeRemotely(ScalaShellRemoteStreamEnvironment.java:87) > at > org.apache.flink.streaming.api.environment.RemoteStreamEnvironment.execute(RemoteStreamEnvironment.java:176) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:638) > ... 34 elided > Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't > retrieve the JobExecutionResult from the JobManager. > at > org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309) > at > org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396) > at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467) > ... 41 more > Caused by: > org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job > submission to the JobManager timed out. You may increase > 'akka.client.timeout' in case the JobManager needs more time to configure and > confirm the job submission. > at > org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119) > at > org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251) > at > org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89) > at > org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68) > at > akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > > > >