thanks for the reply caizhi!

we're on flink 1.12.3.  in the test, i'm using a custom watermark strategy
that is derived from BoundedOutOfOrdernessWatermarks that emits watermarks
using processing time after a period of no events to keep the timer-reliant
operators happy.  basically, it's using event time for everything, but the
inputs have watermarks periodically output if there's no events coming in
through them.

we started w/ test data w/ their own event times in the tests and simply
used the SEE.fromCollection with a timestamp assigner that extracts the
timestamp from the test event data.  however, doing things this way, the
minicluster during the test terminates (and completes the test) once all
the input is processed, even though there are timers in the operators that
are meant to supply additional output still outstanding.  so, that's why i
cobbled together an attempt to control when the input sources are complete
by using the posted WaitingSourceFunction to send the signal to
close/cancel the input stream based on some form of state checking on the
job (which is where this thread starts).

what's the best way to achieve what i need?  i would love to set the inputs
up so that watermarks get emitted appropriately throughout the processing
of the test data as well as for a defined period after all the "input" is
complete such that the timer-reliant operators get their timers triggered.

On Mon, Nov 29, 2021 at 6:37 PM Caizhi Weng <> wrote:

> Hi!
> I believe metrics are enabled by default even for a mini cluster. Which
> Flink version are you using and how do you set your watermark strategy?
> Could you share your user code about how to create the datastream / SQL and
> get the job graph?
> I'm also curious about why do you need to extract the output watermarks
> just for stopping the source. You can control the records and the watermark
> strategy from the source. From my point of view, constructing some test
> data with some specific row time would be enough.
> Jin Yi <> 于2021年11月30日周二 上午3:34写道:
>> bump.  a more general question is what do people do for more end to end,
>> full integration tests to test event time based jobs with timers?
>> On Tue, Nov 23, 2021 at 11:26 AM Jin Yi <> wrote:
>>> i am writing an integration test where i execute a streaming flink job
>>> using faked, "unbounded" input where i want to control when the source
>>> function(s) complete by triggering them once the job's operator's maximum
>>> output watermarks are beyond some job completion watermark that's relative
>>> to the maximum input timestamp because the flink job uses event time timers
>>> to produce some output.
>>> here is the faked, "unbounded" source function class:
>>>   private static class WaitingSourceFunction<OUT> extends
>>> FromElementsFunction<OUT> {
>>>     private boolean isWaiting;
>>>     private TypeInformation<OUT> typeInfo;
>>>     private WaitingSourceFunction(
>>>         StreamExecutionEnvironment env, Collection<OUT> data,
>>> TypeInformation<OUT> typeInfo)
>>>         throws IOException {
>>>       super(typeInfo.createSerializer(env.getConfig()), data);
>>>       this.isWaiting = true;
>>>       this.typeInfo = typeInfo;
>>>     }
>>>     @Override
>>>     public void cancel() {
>>>       super.cancel();
>>>       isWaiting = false;
>>>     }
>>>     @Override
>>>     public void run(SourceContext<OUT> ctx) throws Exception {
>>>       while (isWaiting) {
>>>         TimeUnit.SECONDS.sleep(10);
>>>       }
>>>     }
>>>     public long getEndWatermark() {
>>>       // *TODO*
>>>       return 1000000;
>>>     }
>>>   }
>>> and here is function where i want to busy wait (currently hacked up to
>>> print info to show my problem):
>>>   private void waitForDone(String jobName, WaitingSourceFunction<?>...
>>> functions)
>>>       throws ConfigurationException, Exception, ExecutionException,
>>> IOException, InterruptedException {
>>>     JobExecutionResult jobResult = env.execute(jobName);
>>>     RestClient restClient = new RestClient(
>>>         RestClientConfiguration.fromConfiguration(getClientConfiguration()),
>>> scheduledExecutorService);
>>>     URI restUri = MiniClusterExtension.flinkCluster.getRestAddres();
>>>     System.out.printf("** JOB: %s %s\n", jobName, jobResult.getJobID());
>>>     long currentWatermark = 0;
>>>     long lastInputWatermark =
>>>       .map(f -> f.getEndWatermark())
>>>       .mapToLong(l -> l)
>>>       .max().getAsLong();
>>>     for (int i = 0; i < 3 ; i++) {
>>>     //while (currentWatermark < (lastInputWatermark + 1000)) {
>>>       JobDetailsHeaders getVertexHeaders =
>>> JobDetailsHeaders.getInstance();
>>>       JobMessageParameters getVertexParams =
>>> getVertexHeaders.getUnresolvedMessageParameters();
>>>       getVertexParams.jobPathParameter.resolve(jobResult.getJobID());
>>>       List<JobVertexID> vertexIds =
>>>         restClient.sendRequest(restUri.getHost(), restUri.getPort(),
>>> getVertexHeaders, getVertexParams, EmptyRequestBody.getInstance())
>>>           .get().getJobVertexInfos().stream()
>>>           .map(v -> v.getJobVertexID())
>>>           .collect(Collectors.toUnmodifiableList());
>>>       for (JobVertexID vertexId : vertexIds) {
>>>         JobVertexMetricsHeaders getWatermarkHeader =
>>> JobVertexMetricsHeaders.getInstance();
>>>         JobVertexMetricsMessageParameters getWatermarkParams =
>>> getWatermarkHeader.getUnresolvedMessageParameters();
>>> getWatermarkParams.jobPathParameter.resolve(jobResult.getJobID());
>>>         getWatermarkParams.jobVertexIdPathParameter.resolve(vertexId);
>>>         System.out.printf("** LOG VERTEX: %s\n", vertexId);
>>>         try {
>>>           long maxWatermark = restClient.sendRequest(
>>>               restUri.getHost(), restUri.getPort(), getWatermarkHeader,
>>> getWatermarkParams, EmptyRequestBody.getInstance())
>>>             .get().getMetrics().stream()
>>>             .filter(m -> m.getId().endsWith("Watermark"))
>>>             .map(m -> {
>>>               System.out.printf("** LOG METRIC: %s\n", m);
>>>               return Long.valueOf(StringUtil.isBlank(m.getValue()) ? "0"
>>> : m.getValue());
>>>             })
>>>             .mapToLong(v -> v)
>>>             .max().orElse(0);
>>>           currentWatermark = Math.max(currentWatermark, maxWatermark);
>>>         } catch (Exception e) {
>>>           System.out.printf("** LOG ERROR: %s\n", e);
>>>         }
>>>       }
>>>       System.out.printf("** SLEEP: %s %s %s\n", i, currentWatermark,
>>> lastInputWatermark);
>>>       TimeUnit.SECONDS.sleep(1);
>>>     }
>>>     System.out.println("** CANCEL SOURCES");
>>>     for (WaitingSourceFunction<?> function : functions) *{*
>>>       function.cancel();
>>>     }
>>>   }
>>> THE PROBLEM: the output of the logging in the test clearly shows that
>>> the watermark metrics are all null throughout the wait loop:
>>> I also tried using JobVertexWatermarksHeaders instead of
>>> JobVertexMetricsHeaders, for the REST get on the job vertices to get
>>> watermark information, but the response bodies were empty.
>>> 1.  shouldn't the watermark metrics be populated in the job running in
>>> the minicluster?
>>> 2.  do i have to enable metrics somehow?
>>> 3.  is there a better way to extract the output watermarks of a running
>>> flink job?

Reply via email to