In that case you will have to query the REST API instead; you can
retrieve the address via MiniCluster#getRestAddress.
Something along these should work:
try (final RestClient restClient =
new RestClient(
RestClientConfiguration.fromConfiguration(new Configuration()),
Executors.directExecutor())) {
final JobID jobId = ...final URI restApiAddress =
miniCluster.getRestAddress().get(); final JobDetailsHeaders headers =
JobDetailsHeaders.getInstance(); final JobMessageParameters parameters =
headers.getUnresolvedMessageParameters(); parameters.jobPathParameter.resolve(jobId);
final CompletableFuture<JobDetailsInfo> response =
restClient.sendRequest(
restApiAddress.getHost(), restApiAddress.getPort(),
headers, parameters, EmptyRequestBody.getInstance()); final boolean
allTasksAreRunning =
response.get().getJobVertexInfos().stream()
.map(JobDetailsInfo.JobVertexDetailsInfo::getExecutionState)
.map(state -> state == ExecutionState.RUNNING)
.reduce(true, Boolean::logicalAnd); }
On 1/12/2021 11:14 PM, KristoffSC wrote:
Hi,
that helped however there is a problem with JobStatus. Please refer to [1]
In my case JobStatus is already Running but not all task are running.
Any idea how to get task status from MiniCluster?
[1]
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Issue-with-job-status-td36068.html#none
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/