so i went ahead and put some logging in the WatermarkGeneartor.onEvent and .onPeriodicEmit functions in the test source watermark generator, and i do see the watermarks come by with values through those functions. they're just not being returned as expected via the rest api.
On Tue, Nov 30, 2021 at 7:24 PM Caizhi Weng <tsreape...@gmail.com> wrote: > Hi! > > I see. So to test your watermark strategy you would like to fetch the > watermarks downstream. > > I would suggest taking a look at > org.apache.flink.streaming.api.operators.AbstractStreamOperator. This > class has a processWatermark method, which is called when a watermark > flows through this operator. You can make your own testing operator by > extending this class and stuff the testing operator in a > org.apache.flink.streaming.api.transformations.OneInputTransformation. In > this case you do not need to fetch watermarks from the metrics. If > processWatermark is never called then it means no watermark ever comes > and you might want to check your watermark strategy implementation. > > Jin Yi <j...@promoted.ai> 于2021年12月1日周三 上午4:14写道: > >> thanks for the reply caizhi! >> >> we're on flink 1.12.3. in the test, i'm using a custom watermark >> strategy that is derived from BoundedOutOfOrdernessWatermarks that emits >> watermarks using processing time after a period of no events to keep the >> timer-reliant operators happy. basically, it's using event time for >> everything, but the inputs have watermarks periodically output if there's >> no events coming in through them. >> >> we started w/ test data w/ their own event times in the tests and simply >> used the SEE.fromCollection with a timestamp assigner that extracts the >> timestamp from the test event data. however, doing things this way, the >> minicluster during the test terminates (and completes the test) once all >> the input is processed, even though there are timers in the operators that >> are meant to supply additional output still outstanding. so, that's why i >> cobbled together an attempt to control when the input sources are complete >> by using the posted WaitingSourceFunction to send the signal to >> close/cancel the input stream based on some form of state checking on the >> job (which is where this thread starts). >> >> what's the best way to achieve what i need? i would love to set the >> inputs up so that watermarks get emitted appropriately throughout the >> processing of the test data as well as for a defined period after all the >> "input" is complete such that the timer-reliant operators get their timers >> triggered. thanks! >> >> On Mon, Nov 29, 2021 at 6:37 PM Caizhi Weng <tsreape...@gmail.com> wrote: >> >>> Hi! >>> >>> I believe metrics are enabled by default even for a mini cluster. Which >>> Flink version are you using and how do you set your watermark strategy? >>> Could you share your user code about how to create the datastream / SQL and >>> get the job graph? >>> >>> I'm also curious about why do you need to extract the output watermarks >>> just for stopping the source. You can control the records and the watermark >>> strategy from the source. From my point of view, constructing some test >>> data with some specific row time would be enough. >>> >>> Jin Yi <j...@promoted.ai> 于2021年11月30日周二 上午3:34写道: >>> >>>> bump. a more general question is what do people do for more end to >>>> end, full integration tests to test event time based jobs with timers? >>>> >>>> On Tue, Nov 23, 2021 at 11:26 AM Jin Yi <j...@promoted.ai> wrote: >>>> >>>>> i am writing an integration test where i execute a streaming flink job >>>>> using faked, "unbounded" input where i want to control when the source >>>>> function(s) complete by triggering them once the job's operator's maximum >>>>> output watermarks are beyond some job completion watermark that's relative >>>>> to the maximum input timestamp because the flink job uses event time >>>>> timers >>>>> to produce some output. >>>>> >>>>> here is the faked, "unbounded" source function class: >>>>> >>>>> private static class WaitingSourceFunction<OUT> extends >>>>> FromElementsFunction<OUT> { >>>>> >>>>> private boolean isWaiting; >>>>> >>>>> private TypeInformation<OUT> typeInfo; >>>>> >>>>> >>>>> private WaitingSourceFunction( >>>>> >>>>> StreamExecutionEnvironment env, Collection<OUT> data, >>>>> TypeInformation<OUT> typeInfo) >>>>> >>>>> throws IOException { >>>>> >>>>> super(typeInfo.createSerializer(env.getConfig()), data); >>>>> >>>>> this.isWaiting = true; >>>>> >>>>> this.typeInfo = typeInfo; >>>>> >>>>> } >>>>> >>>>> >>>>> @Override >>>>> >>>>> public void cancel() { >>>>> >>>>> super.cancel(); >>>>> >>>>> isWaiting = false; >>>>> >>>>> } >>>>> >>>>> >>>>> @Override >>>>> >>>>> public void run(SourceContext<OUT> ctx) throws Exception { >>>>> >>>>> super.run(ctx); >>>>> >>>>> while (isWaiting) { >>>>> >>>>> TimeUnit.SECONDS.sleep(10); >>>>> >>>>> } >>>>> >>>>> } >>>>> >>>>> >>>>> public long getEndWatermark() { >>>>> >>>>> // *TODO* >>>>> >>>>> return 1000000; >>>>> >>>>> } >>>>> >>>>> } >>>>> >>>>> and here is function where i want to busy wait (currently hacked up to >>>>> print info to show my problem): >>>>> >>>>> private void waitForDone(String jobName, WaitingSourceFunction<?>... >>>>> functions) >>>>> >>>>> throws ConfigurationException, Exception, ExecutionException, >>>>> IOException, InterruptedException { >>>>> >>>>> JobExecutionResult jobResult = env.execute(jobName); >>>>> >>>>> RestClient restClient = new RestClient( >>>>> >>>>> >>>>> RestClientConfiguration.fromConfiguration(getClientConfiguration()), >>>>> scheduledExecutorService); >>>>> >>>>> URI restUri = MiniClusterExtension.flinkCluster.getRestAddres(); >>>>> >>>>> >>>>> System.out.printf("** JOB: %s %s\n", jobName, >>>>> jobResult.getJobID()); >>>>> >>>>> >>>>> long currentWatermark = 0; >>>>> >>>>> long lastInputWatermark = Arrays.stream(functions) >>>>> >>>>> .map(f -> f.getEndWatermark()) >>>>> >>>>> .mapToLong(l -> l) >>>>> >>>>> .max().getAsLong(); >>>>> >>>>> for (int i = 0; i < 3 ; i++) { >>>>> >>>>> //while (currentWatermark < (lastInputWatermark + 1000)) { >>>>> >>>>> JobDetailsHeaders getVertexHeaders = >>>>> JobDetailsHeaders.getInstance(); >>>>> >>>>> JobMessageParameters getVertexParams = >>>>> getVertexHeaders.getUnresolvedMessageParameters(); >>>>> >>>>> getVertexParams.jobPathParameter.resolve(jobResult.getJobID()); >>>>> >>>>> List<JobVertexID> vertexIds = >>>>> >>>>> restClient.sendRequest(restUri.getHost(), restUri.getPort(), >>>>> getVertexHeaders, getVertexParams, EmptyRequestBody.getInstance()) >>>>> >>>>> .get().getJobVertexInfos().stream() >>>>> >>>>> .map(v -> v.getJobVertexID()) >>>>> >>>>> .collect(Collectors.toUnmodifiableList()); >>>>> >>>>> >>>>> for (JobVertexID vertexId : vertexIds) { >>>>> >>>>> JobVertexMetricsHeaders getWatermarkHeader = >>>>> JobVertexMetricsHeaders.getInstance(); >>>>> >>>>> JobVertexMetricsMessageParameters getWatermarkParams = >>>>> getWatermarkHeader.getUnresolvedMessageParameters(); >>>>> >>>>> >>>>> getWatermarkParams.jobPathParameter.resolve(jobResult.getJobID()); >>>>> >>>>> getWatermarkParams.jobVertexIdPathParameter.resolve(vertexId); >>>>> >>>>> System.out.printf("** LOG VERTEX: %s\n", vertexId); >>>>> >>>>> try { >>>>> >>>>> long maxWatermark = restClient.sendRequest( >>>>> >>>>> restUri.getHost(), restUri.getPort(), >>>>> getWatermarkHeader, getWatermarkParams, EmptyRequestBody.getInstance()) >>>>> >>>>> .get().getMetrics().stream() >>>>> >>>>> .filter(m -> m.getId().endsWith("Watermark")) >>>>> >>>>> .map(m -> { >>>>> >>>>> System.out.printf("** LOG METRIC: %s\n", m); >>>>> >>>>> return Long.valueOf(StringUtil.isBlank(m.getValue()) ? >>>>> "0" : m.getValue()); >>>>> >>>>> }) >>>>> >>>>> .mapToLong(v -> v) >>>>> >>>>> .max().orElse(0); >>>>> >>>>> currentWatermark = Math.max(currentWatermark, maxWatermark); >>>>> >>>>> } catch (Exception e) { >>>>> >>>>> System.out.printf("** LOG ERROR: %s\n", e); >>>>> >>>>> } >>>>> >>>>> } >>>>> >>>>> System.out.printf("** SLEEP: %s %s %s\n", i, currentWatermark, >>>>> lastInputWatermark); >>>>> >>>>> TimeUnit.SECONDS.sleep(1); >>>>> >>>>> } >>>>> >>>>> >>>>> System.out.println("** CANCEL SOURCES"); >>>>> >>>>> for (WaitingSourceFunction<?> function : functions) *{* >>>>> >>>>> function.cancel(); >>>>> >>>>> } >>>>> >>>>> } >>>>> >>>>> THE PROBLEM: the output of the logging in the test clearly shows that >>>>> the watermark metrics are all null throughout the wait loop: >>>>> https://paste-bin.xyz/16195 >>>>> >>>>> I also tried using JobVertexWatermarksHeaders instead of >>>>> JobVertexMetricsHeaders, for the REST get on the job vertices to get >>>>> watermark information, but the response bodies were empty. >>>>> >>>>> QUESTIONS: >>>>> 1. shouldn't the watermark metrics be populated in the job running in >>>>> the minicluster? >>>>> 2. do i have to enable metrics somehow? >>>>> 3. is there a better way to extract the output watermarks of a >>>>> running flink job? >>>>> >>>>