Hi, 1. I think Mini cluster supports queryable state. 2. You could set queryable-state.enable to true and try again. You could check AbstractQueryableStateTestBase and there are some tests. :)
Best, Guowei Boris Lublinsky <boris.lublin...@lightbend.com> 于2019年4月16日周二 下午9:09写道: > Thanks Guowei > The questions that I am asking is slightly different: > 1. Does Mini cluster support queryable state? > 2. If the answer is yes, how to set it up? > > Boris Lublinsky > FDP Architect > boris.lublin...@lightbend.com > https://www.lightbend.com/ > > On Apr 15, 2019, at 12:07 AM, Guowei Ma <guowei....@gmail.com> wrote: > > Hi, > I think you should check TM log first and check if there are some info > like: > 1430 [main] INFO > org.apache.flink.queryablestate.server.KvStateServerImpl - Started > Queryable State Server @ /127.0.0.1:9069. > 1436 [main] INFO > org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl - > Started Queryable State Proxy Server @ /127.0.0. <http://127.0.0.0/> > > > Best, > Guowei > > > Boris Lublinsky <boris.lublin...@lightbend.com> 于2019年4月15日周一 上午4:02写道: > >> I was testing with Flink 1.9. Here is how I set up mini cluster >> >> int port = 6124; >> int parallelism = 2; >> Configuration config = new Configuration(); >> config.setInteger(JobManagerOptions.PORT, port); >> config.setString(JobManagerOptions.ADDRESS, "localhost"); >> config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism); >> // In a non MiniCluster setup queryable state is enabled by default. >> config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069"); >> config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2); >> config.setInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS, 2); >> >> config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9067"); >> config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2); >> config.setInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS, 2); >> >> MiniClusterConfiguration clusterconfig = >> new MiniClusterConfiguration(config, 1, RpcServiceSharing.DEDICATED, >> null); >> try { >> // Create a local Flink server >> MiniCluster flinkCluster = new MiniCluster(clusterconfig); >> // Start server and create environment >> flinkCluster.start(); >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.createRemoteEnvironment("localhost", port); >> env.setParallelism(parallelism); >> // Build Graph >> buildGraph(env); >> JobGraph jobGraph = env.getStreamGraph().getJobGraph(); >> // Submit to the server and wait for completion >> JobSubmissionResult result = flinkCluster.submitJob(jobGraph).get(); >> System.out.println("Job ID : " + result.getJobID()); >> Thread.sleep(Long.MAX_VALUE); >> } catch (Throwable t){ >> t.printStackTrace(); >> } >> >> And have a client, that looks like follows: >> >> def query(job: String, keys: Seq[String], host: String = "127.0.0.1", >> port: Int = 9069, >> timeInterval: Long = defaulttimeInterval): Unit = { >> >> // JobID, has to correspond to a running job >> val jobId = JobID.fromHexString(job) >> // Client >> val client = new QueryableStateClient(host, port) >> >> But when I tried it, it gives an exception that nothing is listening on >> port 9069 >> >> It works with the old FlinkLocalMiniCluster, but not with the MiniCluster >> >> Am I missing something? >> >> >> >> Boris Lublinsky >> FDP Architect >> boris.lublin...@lightbend.com >> https://www.lightbend.com/ >> >> >