i am writing an integration test where i execute a streaming flink job using faked, "unbounded" input where i want to control when the source function(s) complete by triggering them once the job's operator's maximum output watermarks are beyond some job completion watermark that's relative to the maximum input timestamp because the flink job uses event time timers to produce some output.
here is the faked, "unbounded" source function class: private static class WaitingSourceFunction<OUT> extends FromElementsFunction<OUT> { private boolean isWaiting; private TypeInformation<OUT> typeInfo; private WaitingSourceFunction( StreamExecutionEnvironment env, Collection<OUT> data, TypeInformation<OUT> typeInfo) throws IOException { super(typeInfo.createSerializer(env.getConfig()), data); this.isWaiting = true; this.typeInfo = typeInfo; } @Override public void cancel() { super.cancel(); isWaiting = false; } @Override public void run(SourceContext<OUT> ctx) throws Exception { super.run(ctx); while (isWaiting) { TimeUnit.SECONDS.sleep(10); } } public long getEndWatermark() { // *TODO* return 1000000; } } and here is function where i want to busy wait (currently hacked up to print info to show my problem): private void waitForDone(String jobName, WaitingSourceFunction<?>... functions) throws ConfigurationException, Exception, ExecutionException, IOException, InterruptedException { JobExecutionResult jobResult = env.execute(jobName); RestClient restClient = new RestClient( RestClientConfiguration.fromConfiguration(getClientConfiguration()), scheduledExecutorService); URI restUri = MiniClusterExtension.flinkCluster.getRestAddres(); System.out.printf("** JOB: %s %s\n", jobName, jobResult.getJobID()); long currentWatermark = 0; long lastInputWatermark = Arrays.stream(functions) .map(f -> f.getEndWatermark()) .mapToLong(l -> l) .max().getAsLong(); for (int i = 0; i < 3 ; i++) { //while (currentWatermark < (lastInputWatermark + 1000)) { JobDetailsHeaders getVertexHeaders = JobDetailsHeaders.getInstance(); JobMessageParameters getVertexParams = getVertexHeaders.getUnresolvedMessageParameters(); getVertexParams.jobPathParameter.resolve(jobResult.getJobID()); List<JobVertexID> vertexIds = restClient.sendRequest(restUri.getHost(), restUri.getPort(), getVertexHeaders, getVertexParams, EmptyRequestBody.getInstance()) .get().getJobVertexInfos().stream() .map(v -> v.getJobVertexID()) .collect(Collectors.toUnmodifiableList()); for (JobVertexID vertexId : vertexIds) { JobVertexMetricsHeaders getWatermarkHeader = JobVertexMetricsHeaders.getInstance(); JobVertexMetricsMessageParameters getWatermarkParams = getWatermarkHeader.getUnresolvedMessageParameters(); getWatermarkParams.jobPathParameter.resolve(jobResult.getJobID()); getWatermarkParams.jobVertexIdPathParameter.resolve(vertexId); System.out.printf("** LOG VERTEX: %s\n", vertexId); try { long maxWatermark = restClient.sendRequest( restUri.getHost(), restUri.getPort(), getWatermarkHeader, getWatermarkParams, EmptyRequestBody.getInstance()) .get().getMetrics().stream() .filter(m -> m.getId().endsWith("Watermark")) .map(m -> { System.out.printf("** LOG METRIC: %s\n", m); return Long.valueOf(StringUtil.isBlank(m.getValue()) ? "0" : m.getValue()); }) .mapToLong(v -> v) .max().orElse(0); currentWatermark = Math.max(currentWatermark, maxWatermark); } catch (Exception e) { System.out.printf("** LOG ERROR: %s\n", e); } } System.out.printf("** SLEEP: %s %s %s\n", i, currentWatermark, lastInputWatermark); TimeUnit.SECONDS.sleep(1); } System.out.println("** CANCEL SOURCES"); for (WaitingSourceFunction<?> function : functions) *{* function.cancel(); } } THE PROBLEM: the output of the logging in the test clearly shows that the watermark metrics are all null throughout the wait loop: https://paste-bin.xyz/16195 I also tried using JobVertexWatermarksHeaders instead of JobVertexMetricsHeaders, for the REST get on the job vertices to get watermark information, but the response bodies were empty. QUESTIONS: 1. shouldn't the watermark metrics be populated in the job running in the minicluster? 2. do i have to enable metrics somehow? 3. is there a better way to extract the output watermarks of a running flink job?