Thanks Guowei The questions that I am asking is slightly different: 1. Does Mini cluster support queryable state? 2. If the answer is yes, how to set it up?
Boris Lublinsky FDP Architect boris.lublin...@lightbend.com https://www.lightbend.com/ > On Apr 15, 2019, at 12:07 AM, Guowei Ma <guowei....@gmail.com> wrote: > > Hi, > I think you should check TM log first and check if there are some info like: > 1430 [main] INFO org.apache.flink.queryablestate.server.KvStateServerImpl - > Started Queryable State Server @ /127.0.0.1:9069 <http://127.0.0.1:9069/>. > 1436 [main] INFO > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl - > Started Queryable State Proxy Server @ /127.0.0. <http://127.0.0.0/> > > > Best, > Guowei > > > Boris Lublinsky <boris.lublin...@lightbend.com > <mailto:boris.lublin...@lightbend.com>> 于2019年4月15日周一 上午4:02写道: > I was testing with Flink 1.9. Here is how I set up mini cluster > > int port = 6124; > int parallelism = 2; > Configuration config = new Configuration(); > config.setInteger(JobManagerOptions.PORT, port); > config.setString(JobManagerOptions.ADDRESS, "localhost"); > config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, > parallelism); > // In a non MiniCluster setup queryable state is enabled by > default. > config.setString(QueryableStateOptions.PROXY_PORT_RANGE, > "9069"); > config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, > 2); > > config.setInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS, 2); > > config.setString(QueryableStateOptions.SERVER_PORT_RANGE, > "9067"); > config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, > 2); > > config.setInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS, 2); > > MiniClusterConfiguration clusterconfig = > new MiniClusterConfiguration(config, 1, > RpcServiceSharing.DEDICATED, null); > try { > // Create a local Flink server > MiniCluster flinkCluster = new > MiniCluster(clusterconfig); > // Start server and create environment > flinkCluster.start(); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createRemoteEnvironment("localhost", port); > env.setParallelism(parallelism); > // Build Graph > buildGraph(env); > JobGraph jobGraph = env.getStreamGraph().getJobGraph(); > // Submit to the server and wait for completion > JobSubmissionResult result = > flinkCluster.submitJob(jobGraph).get(); > System.out.println("Job ID : " + result.getJobID()); > Thread.sleep(Long.MAX_VALUE); > } catch (Throwable t){ > t.printStackTrace(); > } > > And have a client, that looks like follows: > > def query(job: String, keys: Seq[String], host: String = "127.0.0.1", port: > Int = 9069, > timeInterval: Long = defaulttimeInterval): Unit = { > > // JobID, has to correspond to a running job > val jobId = JobID.fromHexString(job) > // Client > val client = new QueryableStateClient(host, port) > > But when I tried it, it gives an exception that nothing is listening on port > 9069 > > It works with the old FlinkLocalMiniCluster, but not with the MiniCluster > > Am I missing something? > > > > Boris Lublinsky > FDP Architect > boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com> > https://www.lightbend.com/ <https://www.lightbend.com/>