In that case you will have to query the REST API instead; you can retrieve the address via MiniCluster#getRestAddress.

Something along these should work:

try (final RestClient restClient =
        new RestClient(
                RestClientConfiguration.fromConfiguration(new Configuration()), 
Executors.directExecutor())) {

    final JobID jobId = ...final URI restApiAddress = 
miniCluster.getRestAddress().get(); final JobDetailsHeaders headers = 
JobDetailsHeaders.getInstance(); final JobMessageParameters parameters = 
headers.getUnresolvedMessageParameters(); parameters.jobPathParameter.resolve(jobId); 
final CompletableFuture<JobDetailsInfo> response =
            restClient.sendRequest(
                    restApiAddress.getHost(), restApiAddress.getPort(), 
headers, parameters, EmptyRequestBody.getInstance()); final boolean 
allTasksAreRunning =
            response.get().getJobVertexInfos().stream()
                    .map(JobDetailsInfo.JobVertexDetailsInfo::getExecutionState)
                    .map(state -> state == ExecutionState.RUNNING)
                    .reduce(true, Boolean::logicalAnd); }


On 1/12/2021 11:14 PM, KristoffSC wrote:
Hi,
that helped however there is a problem with JobStatus. Please refer to [1]

In my case JobStatus is already Running but not all task are running.
Any idea how to get task status from MiniCluster?

[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issue-with-job-status-td36068.html#none



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Reply via email to