i am writing an integration test where i execute a streaming flink job
using faked, "unbounded" input where i want to control when the source
function(s) complete by triggering them once the job's operator's maximum
output watermarks are beyond some job completion watermark that's relative
to the maximum input timestamp because the flink job uses event time timers
to produce some output.

here is the faked, "unbounded" source function class:

  private static class WaitingSourceFunction<OUT> extends
FromElementsFunction<OUT> {

    private boolean isWaiting;

    private TypeInformation<OUT> typeInfo;


    private WaitingSourceFunction(

        StreamExecutionEnvironment env, Collection<OUT> data,
TypeInformation<OUT> typeInfo)

        throws IOException {

      super(typeInfo.createSerializer(env.getConfig()), data);

      this.isWaiting = true;

      this.typeInfo = typeInfo;

    }


    @Override

    public void cancel() {

      super.cancel();

      isWaiting = false;

    }


    @Override

    public void run(SourceContext<OUT> ctx) throws Exception {

      super.run(ctx);

      while (isWaiting) {

        TimeUnit.SECONDS.sleep(10);

      }

    }


    public long getEndWatermark() {

      // *TODO*

      return 1000000;

    }

  }

and here is function where i want to busy wait (currently hacked up to
print info to show my problem):

  private void waitForDone(String jobName, WaitingSourceFunction<?>...
functions)

      throws ConfigurationException, Exception, ExecutionException,
IOException, InterruptedException {

    JobExecutionResult jobResult = env.execute(jobName);

    RestClient restClient = new RestClient(

        RestClientConfiguration.fromConfiguration(getClientConfiguration()),
scheduledExecutorService);

    URI restUri = MiniClusterExtension.flinkCluster.getRestAddres();


    System.out.printf("** JOB: %s %s\n", jobName, jobResult.getJobID());


    long currentWatermark = 0;

    long lastInputWatermark = Arrays.stream(functions)

      .map(f -> f.getEndWatermark())

      .mapToLong(l -> l)

      .max().getAsLong();

    for (int i = 0; i < 3 ; i++) {

    //while (currentWatermark < (lastInputWatermark + 1000)) {

      JobDetailsHeaders getVertexHeaders = JobDetailsHeaders.getInstance();

      JobMessageParameters getVertexParams =
getVertexHeaders.getUnresolvedMessageParameters();

      getVertexParams.jobPathParameter.resolve(jobResult.getJobID());

      List<JobVertexID> vertexIds =

        restClient.sendRequest(restUri.getHost(), restUri.getPort(),
getVertexHeaders, getVertexParams, EmptyRequestBody.getInstance())

          .get().getJobVertexInfos().stream()

          .map(v -> v.getJobVertexID())

          .collect(Collectors.toUnmodifiableList());


      for (JobVertexID vertexId : vertexIds) {

        JobVertexMetricsHeaders getWatermarkHeader =
JobVertexMetricsHeaders.getInstance();

        JobVertexMetricsMessageParameters getWatermarkParams =
getWatermarkHeader.getUnresolvedMessageParameters();

        getWatermarkParams.jobPathParameter.resolve(jobResult.getJobID());

        getWatermarkParams.jobVertexIdPathParameter.resolve(vertexId);

        System.out.printf("** LOG VERTEX: %s\n", vertexId);

        try {

          long maxWatermark = restClient.sendRequest(

              restUri.getHost(), restUri.getPort(), getWatermarkHeader,
getWatermarkParams, EmptyRequestBody.getInstance())

            .get().getMetrics().stream()

            .filter(m -> m.getId().endsWith("Watermark"))

            .map(m -> {

              System.out.printf("** LOG METRIC: %s\n", m);

              return Long.valueOf(StringUtil.isBlank(m.getValue()) ? "0" :
m.getValue());

            })

            .mapToLong(v -> v)

            .max().orElse(0);

          currentWatermark = Math.max(currentWatermark, maxWatermark);

        } catch (Exception e) {

          System.out.printf("** LOG ERROR: %s\n", e);

        }

      }

      System.out.printf("** SLEEP: %s %s %s\n", i, currentWatermark,
lastInputWatermark);

      TimeUnit.SECONDS.sleep(1);

    }


    System.out.println("** CANCEL SOURCES");

    for (WaitingSourceFunction<?> function : functions) *{*

      function.cancel();

    }

  }

THE PROBLEM: the output of the logging in the test clearly shows that the
watermark metrics are all null throughout the wait loop:
https://paste-bin.xyz/16195

I also tried using JobVertexWatermarksHeaders instead of
JobVertexMetricsHeaders, for the REST get on the job vertices to get
watermark information, but the response bodies were empty.

QUESTIONS:
1.  shouldn't the watermark metrics be populated in the job running in the
minicluster?
2.  do i have to enable metrics somehow?
3.  is there a better way to extract the output watermarks of a running
flink job?

Reply via email to