Thanks Guowei
The questions that I am asking is slightly different:
1. Does Mini cluster support queryable state?
2. If the answer is yes, how to set it up? 

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> On Apr 15, 2019, at 12:07 AM, Guowei Ma <guowei....@gmail.com> wrote:
> 
> Hi,
> I think you should check TM log first and check if there are some info like:
> 1430 [main] INFO  org.apache.flink.queryablestate.server.KvStateServerImpl  - 
> Started Queryable State Server @ /127.0.0.1:9069 <http://127.0.0.1:9069/>.
> 1436 [main] INFO  
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl  - 
> Started Queryable State Proxy Server @ /127.0.0. <http://127.0.0.0/>
> 
> 
> Best,
> Guowei
> 
> 
> Boris Lublinsky <boris.lublin...@lightbend.com 
> <mailto:boris.lublin...@lightbend.com>> 于2019年4月15日周一 上午4:02写道:
> I was testing with Flink 1.9. Here is how I set up mini cluster
> 
>               int port = 6124;
>               int parallelism = 2;
>               Configuration config = new Configuration();
>               config.setInteger(JobManagerOptions.PORT, port);
>               config.setString(JobManagerOptions.ADDRESS, "localhost");
>               config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
> parallelism);
>               // In a non MiniCluster setup queryable state is enabled by 
> default.
>               config.setString(QueryableStateOptions.PROXY_PORT_RANGE, 
> "9069");
>               config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 
> 2);
>               
> config.setInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS, 2);
> 
>               config.setString(QueryableStateOptions.SERVER_PORT_RANGE, 
> "9067");
>               config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 
> 2);
>               
> config.setInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS, 2);
> 
>               MiniClusterConfiguration clusterconfig =
>                       new MiniClusterConfiguration(config, 1, 
> RpcServiceSharing.DEDICATED, null);
>               try {
>                       // Create a local Flink server
>                       MiniCluster flinkCluster = new 
> MiniCluster(clusterconfig);
>                       // Start server and create environment
>                       flinkCluster.start();
>                       StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.createRemoteEnvironment("localhost", port);
>                       env.setParallelism(parallelism);
>                       // Build Graph
>                       buildGraph(env);
>                       JobGraph jobGraph = env.getStreamGraph().getJobGraph();
>                       // Submit to the server and wait for completion
>                       JobSubmissionResult result = 
> flinkCluster.submitJob(jobGraph).get();
>                       System.out.println("Job ID : " + result.getJobID());
>                       Thread.sleep(Long.MAX_VALUE);
>               } catch (Throwable t){
>                       t.printStackTrace();
>               }
> 
> And have a client, that looks like follows:
> 
> def query(job: String, keys: Seq[String], host: String = "127.0.0.1", port: 
> Int = 9069,
>             timeInterval: Long = defaulttimeInterval): Unit = {
> 
>     // JobID, has to correspond to a running job
>     val jobId = JobID.fromHexString(job)
>     // Client
>     val client = new QueryableStateClient(host, port)
> 
> But when I tried it, it gives an exception that nothing is listening on port 
> 9069
> 
> It works with the old FlinkLocalMiniCluster, but not with the MiniCluster
> 
> Am I missing something?
> 
> 
> 
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>
> https://www.lightbend.com/ <https://www.lightbend.com/>

Reply via email to