Hi, I think you should check TM log first and check if there are some info like: 1430 [main] INFO org.apache.flink.queryablestate.server.KvStateServerImpl - Started Queryable State Server @ /127.0.0.1:9069. 1436 [main] INFO org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl - Started Queryable State Proxy Server @ /127.0.0.
Best, Guowei Boris Lublinsky <boris.lublin...@lightbend.com> 于2019年4月15日周一 上午4:02写道: > I was testing with Flink 1.9. Here is how I set up mini cluster > > int port = 6124; > int parallelism = 2; > Configuration config = new Configuration(); > config.setInteger(JobManagerOptions.PORT, port); > config.setString(JobManagerOptions.ADDRESS, "localhost"); > config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism); > // In a non MiniCluster setup queryable state is enabled by default. > config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069"); > config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2); > config.setInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS, 2); > > config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9067"); > config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2); > config.setInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS, 2); > > MiniClusterConfiguration clusterconfig = > new MiniClusterConfiguration(config, 1, RpcServiceSharing.DEDICATED, null); > try { > // Create a local Flink server > MiniCluster flinkCluster = new MiniCluster(clusterconfig); > // Start server and create environment > flinkCluster.start(); > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createRemoteEnvironment("localhost", port); > env.setParallelism(parallelism); > // Build Graph > buildGraph(env); > JobGraph jobGraph = env.getStreamGraph().getJobGraph(); > // Submit to the server and wait for completion > JobSubmissionResult result = flinkCluster.submitJob(jobGraph).get(); > System.out.println("Job ID : " + result.getJobID()); > Thread.sleep(Long.MAX_VALUE); > } catch (Throwable t){ > t.printStackTrace(); > } > > And have a client, that looks like follows: > > def query(job: String, keys: Seq[String], host: String = "127.0.0.1", > port: Int = 9069, > timeInterval: Long = defaulttimeInterval): Unit = { > > // JobID, has to correspond to a running job > val jobId = JobID.fromHexString(job) > // Client > val client = new QueryableStateClient(host, port) > > But when I tried it, it gives an exception that nothing is listening on > port 9069 > > It works with the old FlinkLocalMiniCluster, but not with the MiniCluster > > Am I missing something? > > > > Boris Lublinsky > FDP Architect > boris.lublin...@lightbend.com > https://www.lightbend.com/ > >