Hi,

1. I think Mini cluster supports queryable state.
2. You could set queryable-state.enable to true and try again.
You could check AbstractQueryableStateTestBase and there are some tests.
:)

Best,
Guowei


Boris Lublinsky <boris.lublin...@lightbend.com> 于2019年4月16日周二 下午9:09写道:

> Thanks Guowei
> The questions that I am asking is slightly different:
> 1. Does Mini cluster support queryable state?
> 2. If the answer is yes, how to set it up?
>
> Boris Lublinsky
> FDP Architect
> boris.lublin...@lightbend.com
> https://www.lightbend.com/
>
> On Apr 15, 2019, at 12:07 AM, Guowei Ma <guowei....@gmail.com> wrote:
>
> Hi,
> I think you should check TM log first and check if there are some info
> like:
> 1430 [main] INFO
> org.apache.flink.queryablestate.server.KvStateServerImpl  - Started
> Queryable State Server @ /127.0.0.1:9069.
> 1436 [main] INFO
> org.apache.flink.queryablestate.client.proxy.KvStateClientProxyImpl  -
> Started Queryable State Proxy Server @ /127.0.0. <http://127.0.0.0/>
>
>
> Best,
> Guowei
>
>
> Boris Lublinsky <boris.lublin...@lightbend.com> 于2019年4月15日周一 上午4:02写道:
>
>> I was testing with Flink 1.9. Here is how I set up mini cluster
>>
>> int port = 6124;
>> int parallelism = 2;
>> Configuration config = new Configuration();
>> config.setInteger(JobManagerOptions.PORT, port);
>> config.setString(JobManagerOptions.ADDRESS, "localhost");
>> config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism);
>> // In a non MiniCluster setup queryable state is enabled by default.
>> config.setString(QueryableStateOptions.PROXY_PORT_RANGE, "9069");
>> config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS, 2);
>> config.setInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS, 2);
>>
>> config.setString(QueryableStateOptions.SERVER_PORT_RANGE, "9067");
>> config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
>> config.setInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS, 2);
>>
>> MiniClusterConfiguration clusterconfig =
>> new MiniClusterConfiguration(config, 1, RpcServiceSharing.DEDICATED,
>> null);
>> try {
>> // Create a local Flink server
>> MiniCluster flinkCluster = new MiniCluster(clusterconfig);
>> // Start server and create environment
>> flinkCluster.start();
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.createRemoteEnvironment("localhost", port);
>> env.setParallelism(parallelism);
>> // Build Graph
>> buildGraph(env);
>> JobGraph jobGraph = env.getStreamGraph().getJobGraph();
>> // Submit to the server and wait for completion
>> JobSubmissionResult result = flinkCluster.submitJob(jobGraph).get();
>> System.out.println("Job ID : " + result.getJobID());
>> Thread.sleep(Long.MAX_VALUE);
>> } catch (Throwable t){
>> t.printStackTrace();
>> }
>>
>> And have a client, that looks like follows:
>>
>> def query(job: String, keys: Seq[String], host: String = "127.0.0.1",
>> port: Int = 9069,
>>             timeInterval: Long = defaulttimeInterval): Unit = {
>>
>>     // JobID, has to correspond to a running job
>>     val jobId = JobID.fromHexString(job)
>>     // Client
>>     val client = new QueryableStateClient(host, port)
>>
>> But when I tried it, it gives an exception that nothing is listening on
>> port 9069
>>
>> It works with the old FlinkLocalMiniCluster, but not with the MiniCluster
>>
>> Am I missing something?
>>
>>
>>
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com
>> https://www.lightbend.com/
>>
>>
>

Reply via email to