Hi,
I think you should check TM log first and check if there are some info like:
1430 [main] INFO  org.apache.flink.queryablestate.server.KvStateServerImpl
- Started Queryable State Server @ /127.0.0.1:9069.
1436 [main] INFO
org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl  -
Started Queryable State Proxy Server @ /127.0.0.


Best,
Guowei


Boris Lublinsky <boris.lublin...@lightbend.com> 于2019年4月15日周一 上午4:02写道:

> I was testing with Flink 1.9. Here is how I set up mini cluster
>
> int port = 6124;
> int parallelism = 2;
> Configuration config = new Configuration();
> config.setInteger(JobManagerOptions.PORT, port);
> config.setString(JobManagerOptions.ADDRESS, "localhost");
> config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
> // In a non MiniCluster setup queryable state is enabled by default.
> config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069");
> config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2);
> config.setInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS, 2);
>
> config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9067");
> config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
> config.setInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS, 2);
>
> MiniClusterConfiguration clusterconfig =
> new MiniClusterConfiguration(config, 1, RpcServiceSharing.DEDICATED, null);
> try {
> // Create a local Flink server
> MiniCluster flinkCluster = new MiniCluster(clusterconfig);
> // Start server and create environment
> flinkCluster.start();
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.createRemoteEnvironment("localhost", port);
> env.setParallelism(parallelism);
> // Build Graph
> buildGraph(env);
> JobGraph jobGraph = env.getStreamGraph().getJobGraph();
> // Submit to the server and wait for completion
> JobSubmissionResult result = flinkCluster.submitJob(jobGraph).get();
> System.out.println("Job ID : " + result.getJobID());
> Thread.sleep(Long.MAX_VALUE);
> } catch (Throwable t){
> t.printStackTrace();
> }
>
> And have a client, that looks like follows:
>
> def query(job: String, keys: Seq[String], host: String = "127.0.0.1",
> port: Int = 9069,
>             timeInterval: Long = defaulttimeInterval): Unit = {
>
>     // JobID, has to correspond to a running job
>     val jobId = JobID.fromHexString(job)
>     // Client
>     val client = new QueryableStateClient(host, port)
>
> But when I tried it, it gives an exception that nothing is listening on
> port 9069
>
> It works with the old FlinkLocalMiniCluster, but not with the MiniCluster
>
> Am I missing something?
>
>
>
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com
> https://www.lightbend.com/
>
>

Reply via email to