I was testing with Flink 1.9. Here is how I set up mini cluster
int port = 6124;
int parallelism = 2;
Configuration config = new Configuration();
config.setInteger(JobManagerOptions.PORT, port);
config.setString(JobManagerOptions.ADDRESS, "localhost");
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS,
parallelism);
// In a non MiniCluster setup queryable state is enabled by
default.
config.setString(QueryableStateOptions.PROXY_PORT_RANGE,
"9069");
config.setInteger(QueryableStateOptions.PROXY_NETWORK_THREADS,
2);
config.setInteger(QueryableStateOptions.PROXY_ASYNC_QUERY_THREADS, 2);
config.setString(QueryableStateOptions.SERVER_PORT_RANGE,
"9067");
config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS,
2);
config.setInteger(QueryableStateOptions.SERVER_ASYNC_QUERY_THREADS, 2);
MiniClusterConfiguration clusterconfig =
new MiniClusterConfiguration(config, 1,
RpcServiceSharing.DEDICATED, null);
try {
// Create a local Flink server
MiniCluster flinkCluster = new
MiniCluster(clusterconfig);
// Start server and create environment
flinkCluster.start();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createRemoteEnvironment("localhost", port);
env.setParallelism(parallelism);
// Build Graph
buildGraph(env);
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
// Submit to the server and wait for completion
JobSubmissionResult result =
flinkCluster.submitJob(jobGraph).get();
System.out.println("Job ID : " + result.getJobID());
Thread.sleep(Long.MAX_VALUE);
} catch (Throwable t){
t.printStackTrace();
}
And have a client, that looks like follows:
def query(job: String, keys: Seq[String], host: String = "127.0.0.1", port: Int
= 9069,
timeInterval: Long = defaulttimeInterval): Unit = {
// JobID, has to correspond to a running job
val jobId = JobID.fromHexString(job)
// Client
val client = new QueryableStateClient(host, port)
But when I tried it, it gives an exception that nothing is listening on port
9069
It works with the old FlinkLocalMiniCluster, but not with the MiniCluster
Am I missing something?
Boris Lublinsky
FDP Architect
[email protected]
https://www.lightbend.com/