so i went ahead and put some logging in the WatermarkGeneartor.onEvent and
.onPeriodicEmit functions in the test source watermark generator, and i do
see the watermarks come by with values through those functions.  they're
just not being returned as expected via the rest api.

On Tue, Nov 30, 2021 at 7:24 PM Caizhi Weng <tsreape...@gmail.com> wrote:

> Hi!
>
> I see. So to test your watermark strategy you would like to fetch the
> watermarks downstream.
>
> I would suggest taking a look at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator. This
> class has a processWatermark method, which is called when a watermark
> flows through this operator. You can make your own testing operator by
> extending this class and stuff the testing operator in a
> org.apache.flink.streaming.api.transformations.OneInputTransformation. In
> this case you do not need to fetch watermarks from the metrics. If
> processWatermark is never called then it means no watermark ever comes
> and you might want to check your watermark strategy implementation.
>
> Jin Yi <j...@promoted.ai> 于2021年12月1日周三 上午4:14写道:
>
>> thanks for the reply caizhi!
>>
>> we're on flink 1.12.3.  in the test, i'm using a custom watermark
>> strategy that is derived from BoundedOutOfOrdernessWatermarks that emits
>> watermarks using processing time after a period of no events to keep the
>> timer-reliant operators happy.  basically, it's using event time for
>> everything, but the inputs have watermarks periodically output if there's
>> no events coming in through them.
>>
>> we started w/ test data w/ their own event times in the tests and simply
>> used the SEE.fromCollection with a timestamp assigner that extracts the
>> timestamp from the test event data.  however, doing things this way, the
>> minicluster during the test terminates (and completes the test) once all
>> the input is processed, even though there are timers in the operators that
>> are meant to supply additional output still outstanding.  so, that's why i
>> cobbled together an attempt to control when the input sources are complete
>> by using the posted WaitingSourceFunction to send the signal to
>> close/cancel the input stream based on some form of state checking on the
>> job (which is where this thread starts).
>>
>> what's the best way to achieve what i need?  i would love to set the
>> inputs up so that watermarks get emitted appropriately throughout the
>> processing of the test data as well as for a defined period after all the
>> "input" is complete such that the timer-reliant operators get their timers
>> triggered.  thanks!
>>
>> On Mon, Nov 29, 2021 at 6:37 PM Caizhi Weng <tsreape...@gmail.com> wrote:
>>
>>> Hi!
>>>
>>> I believe metrics are enabled by default even for a mini cluster. Which
>>> Flink version are you using and how do you set your watermark strategy?
>>> Could you share your user code about how to create the datastream / SQL and
>>> get the job graph?
>>>
>>> I'm also curious about why do you need to extract the output watermarks
>>> just for stopping the source. You can control the records and the watermark
>>> strategy from the source. From my point of view, constructing some test
>>> data with some specific row time would be enough.
>>>
>>> Jin Yi <j...@promoted.ai> 于2021年11月30日周二 上午3:34写道:
>>>
>>>> bump.  a more general question is what do people do for more end to
>>>> end, full integration tests to test event time based jobs with timers?
>>>>
>>>> On Tue, Nov 23, 2021 at 11:26 AM Jin Yi <j...@promoted.ai> wrote:
>>>>
>>>>> i am writing an integration test where i execute a streaming flink job
>>>>> using faked, "unbounded" input where i want to control when the source
>>>>> function(s) complete by triggering them once the job's operator's maximum
>>>>> output watermarks are beyond some job completion watermark that's relative
>>>>> to the maximum input timestamp because the flink job uses event time 
>>>>> timers
>>>>> to produce some output.
>>>>>
>>>>> here is the faked, "unbounded" source function class:
>>>>>
>>>>>   private static class WaitingSourceFunction<OUT> extends
>>>>> FromElementsFunction<OUT> {
>>>>>
>>>>>     private boolean isWaiting;
>>>>>
>>>>>     private TypeInformation<OUT> typeInfo;
>>>>>
>>>>>
>>>>>     private WaitingSourceFunction(
>>>>>
>>>>>         StreamExecutionEnvironment env, Collection<OUT> data,
>>>>> TypeInformation<OUT> typeInfo)
>>>>>
>>>>>         throws IOException {
>>>>>
>>>>>       super(typeInfo.createSerializer(env.getConfig()), data);
>>>>>
>>>>>       this.isWaiting = true;
>>>>>
>>>>>       this.typeInfo = typeInfo;
>>>>>
>>>>>     }
>>>>>
>>>>>
>>>>>     @Override
>>>>>
>>>>>     public void cancel() {
>>>>>
>>>>>       super.cancel();
>>>>>
>>>>>       isWaiting = false;
>>>>>
>>>>>     }
>>>>>
>>>>>
>>>>>     @Override
>>>>>
>>>>>     public void run(SourceContext<OUT> ctx) throws Exception {
>>>>>
>>>>>       super.run(ctx);
>>>>>
>>>>>       while (isWaiting) {
>>>>>
>>>>>         TimeUnit.SECONDS.sleep(10);
>>>>>
>>>>>       }
>>>>>
>>>>>     }
>>>>>
>>>>>
>>>>>     public long getEndWatermark() {
>>>>>
>>>>>       // *TODO*
>>>>>
>>>>>       return 1000000;
>>>>>
>>>>>     }
>>>>>
>>>>>   }
>>>>>
>>>>> and here is function where i want to busy wait (currently hacked up to
>>>>> print info to show my problem):
>>>>>
>>>>>   private void waitForDone(String jobName, WaitingSourceFunction<?>...
>>>>> functions)
>>>>>
>>>>>       throws ConfigurationException, Exception, ExecutionException,
>>>>> IOException, InterruptedException {
>>>>>
>>>>>     JobExecutionResult jobResult = env.execute(jobName);
>>>>>
>>>>>     RestClient restClient = new RestClient(
>>>>>
>>>>>         
>>>>> RestClientConfiguration.fromConfiguration(getClientConfiguration()),
>>>>> scheduledExecutorService);
>>>>>
>>>>>     URI restUri = MiniClusterExtension.flinkCluster.getRestAddres();
>>>>>
>>>>>
>>>>>     System.out.printf("** JOB: %s %s\n", jobName,
>>>>> jobResult.getJobID());
>>>>>
>>>>>
>>>>>     long currentWatermark = 0;
>>>>>
>>>>>     long lastInputWatermark = Arrays.stream(functions)
>>>>>
>>>>>       .map(f -> f.getEndWatermark())
>>>>>
>>>>>       .mapToLong(l -> l)
>>>>>
>>>>>       .max().getAsLong();
>>>>>
>>>>>     for (int i = 0; i < 3 ; i++) {
>>>>>
>>>>>     //while (currentWatermark < (lastInputWatermark + 1000)) {
>>>>>
>>>>>       JobDetailsHeaders getVertexHeaders =
>>>>> JobDetailsHeaders.getInstance();
>>>>>
>>>>>       JobMessageParameters getVertexParams =
>>>>> getVertexHeaders.getUnresolvedMessageParameters();
>>>>>
>>>>>       getVertexParams.jobPathParameter.resolve(jobResult.getJobID());
>>>>>
>>>>>       List<JobVertexID> vertexIds =
>>>>>
>>>>>         restClient.sendRequest(restUri.getHost(), restUri.getPort(),
>>>>> getVertexHeaders, getVertexParams, EmptyRequestBody.getInstance())
>>>>>
>>>>>           .get().getJobVertexInfos().stream()
>>>>>
>>>>>           .map(v -> v.getJobVertexID())
>>>>>
>>>>>           .collect(Collectors.toUnmodifiableList());
>>>>>
>>>>>
>>>>>       for (JobVertexID vertexId : vertexIds) {
>>>>>
>>>>>         JobVertexMetricsHeaders getWatermarkHeader =
>>>>> JobVertexMetricsHeaders.getInstance();
>>>>>
>>>>>         JobVertexMetricsMessageParameters getWatermarkParams =
>>>>> getWatermarkHeader.getUnresolvedMessageParameters();
>>>>>
>>>>>
>>>>> getWatermarkParams.jobPathParameter.resolve(jobResult.getJobID());
>>>>>
>>>>>         getWatermarkParams.jobVertexIdPathParameter.resolve(vertexId);
>>>>>
>>>>>         System.out.printf("** LOG VERTEX: %s\n", vertexId);
>>>>>
>>>>>         try {
>>>>>
>>>>>           long maxWatermark = restClient.sendRequest(
>>>>>
>>>>>               restUri.getHost(), restUri.getPort(),
>>>>> getWatermarkHeader, getWatermarkParams, EmptyRequestBody.getInstance())
>>>>>
>>>>>             .get().getMetrics().stream()
>>>>>
>>>>>             .filter(m -> m.getId().endsWith("Watermark"))
>>>>>
>>>>>             .map(m -> {
>>>>>
>>>>>               System.out.printf("** LOG METRIC: %s\n", m);
>>>>>
>>>>>               return Long.valueOf(StringUtil.isBlank(m.getValue()) ?
>>>>> "0" : m.getValue());
>>>>>
>>>>>             })
>>>>>
>>>>>             .mapToLong(v -> v)
>>>>>
>>>>>             .max().orElse(0);
>>>>>
>>>>>           currentWatermark = Math.max(currentWatermark, maxWatermark);
>>>>>
>>>>>         } catch (Exception e) {
>>>>>
>>>>>           System.out.printf("** LOG ERROR: %s\n", e);
>>>>>
>>>>>         }
>>>>>
>>>>>       }
>>>>>
>>>>>       System.out.printf("** SLEEP: %s %s %s\n", i, currentWatermark,
>>>>> lastInputWatermark);
>>>>>
>>>>>       TimeUnit.SECONDS.sleep(1);
>>>>>
>>>>>     }
>>>>>
>>>>>
>>>>>     System.out.println("** CANCEL SOURCES");
>>>>>
>>>>>     for (WaitingSourceFunction<?> function : functions) *{*
>>>>>
>>>>>       function.cancel();
>>>>>
>>>>>     }
>>>>>
>>>>>   }
>>>>>
>>>>> THE PROBLEM: the output of the logging in the test clearly shows that
>>>>> the watermark metrics are all null throughout the wait loop:
>>>>> https://paste-bin.xyz/16195
>>>>>
>>>>> I also tried using JobVertexWatermarksHeaders instead of
>>>>> JobVertexMetricsHeaders, for the REST get on the job vertices to get
>>>>> watermark information, but the response bodies were empty.
>>>>>
>>>>> QUESTIONS:
>>>>> 1.  shouldn't the watermark metrics be populated in the job running in
>>>>> the minicluster?
>>>>> 2.  do i have to enable metrics somehow?
>>>>> 3.  is there a better way to extract the output watermarks of a
>>>>> running flink job?
>>>>>
>>>>

Reply via email to