bump. a more general question is what do people do for more end to end, full integration tests to test event time based jobs with timers?
On Tue, Nov 23, 2021 at 11:26 AM Jin Yi <j...@promoted.ai> wrote: > i am writing an integration test where i execute a streaming flink job > using faked, "unbounded" input where i want to control when the source > function(s) complete by triggering them once the job's operator's maximum > output watermarks are beyond some job completion watermark that's relative > to the maximum input timestamp because the flink job uses event time timers > to produce some output. > > here is the faked, "unbounded" source function class: > > private static class WaitingSourceFunction<OUT> extends > FromElementsFunction<OUT> { > > private boolean isWaiting; > > private TypeInformation<OUT> typeInfo; > > > private WaitingSourceFunction( > > StreamExecutionEnvironment env, Collection<OUT> data, > TypeInformation<OUT> typeInfo) > > throws IOException { > > super(typeInfo.createSerializer(env.getConfig()), data); > > this.isWaiting = true; > > this.typeInfo = typeInfo; > > } > > > @Override > > public void cancel() { > > super.cancel(); > > isWaiting = false; > > } > > > @Override > > public void run(SourceContext<OUT> ctx) throws Exception { > > super.run(ctx); > > while (isWaiting) { > > TimeUnit.SECONDS.sleep(10); > > } > > } > > > public long getEndWatermark() { > > // *TODO* > > return 1000000; > > } > > } > > and here is function where i want to busy wait (currently hacked up to > print info to show my problem): > > private void waitForDone(String jobName, WaitingSourceFunction<?>... > functions) > > throws ConfigurationException, Exception, ExecutionException, > IOException, InterruptedException { > > JobExecutionResult jobResult = env.execute(jobName); > > RestClient restClient = new RestClient( > > RestClientConfiguration.fromConfiguration(getClientConfiguration()), > scheduledExecutorService); > > URI restUri = MiniClusterExtension.flinkCluster.getRestAddres(); > > > System.out.printf("** JOB: %s %s\n", jobName, jobResult.getJobID()); > > > long currentWatermark = 0; > > long lastInputWatermark = Arrays.stream(functions) > > .map(f -> f.getEndWatermark()) > > .mapToLong(l -> l) > > .max().getAsLong(); > > for (int i = 0; i < 3 ; i++) { > > //while (currentWatermark < (lastInputWatermark + 1000)) { > > JobDetailsHeaders getVertexHeaders = > JobDetailsHeaders.getInstance(); > > JobMessageParameters getVertexParams = > getVertexHeaders.getUnresolvedMessageParameters(); > > getVertexParams.jobPathParameter.resolve(jobResult.getJobID()); > > List<JobVertexID> vertexIds = > > restClient.sendRequest(restUri.getHost(), restUri.getPort(), > getVertexHeaders, getVertexParams, EmptyRequestBody.getInstance()) > > .get().getJobVertexInfos().stream() > > .map(v -> v.getJobVertexID()) > > .collect(Collectors.toUnmodifiableList()); > > > for (JobVertexID vertexId : vertexIds) { > > JobVertexMetricsHeaders getWatermarkHeader = > JobVertexMetricsHeaders.getInstance(); > > JobVertexMetricsMessageParameters getWatermarkParams = > getWatermarkHeader.getUnresolvedMessageParameters(); > > getWatermarkParams.jobPathParameter.resolve(jobResult.getJobID()); > > getWatermarkParams.jobVertexIdPathParameter.resolve(vertexId); > > System.out.printf("** LOG VERTEX: %s\n", vertexId); > > try { > > long maxWatermark = restClient.sendRequest( > > restUri.getHost(), restUri.getPort(), getWatermarkHeader, > getWatermarkParams, EmptyRequestBody.getInstance()) > > .get().getMetrics().stream() > > .filter(m -> m.getId().endsWith("Watermark")) > > .map(m -> { > > System.out.printf("** LOG METRIC: %s\n", m); > > return Long.valueOf(StringUtil.isBlank(m.getValue()) ? "0" > : m.getValue()); > > }) > > .mapToLong(v -> v) > > .max().orElse(0); > > currentWatermark = Math.max(currentWatermark, maxWatermark); > > } catch (Exception e) { > > System.out.printf("** LOG ERROR: %s\n", e); > > } > > } > > System.out.printf("** SLEEP: %s %s %s\n", i, currentWatermark, > lastInputWatermark); > > TimeUnit.SECONDS.sleep(1); > > } > > > System.out.println("** CANCEL SOURCES"); > > for (WaitingSourceFunction<?> function : functions) *{* > > function.cancel(); > > } > > } > > THE PROBLEM: the output of the logging in the test clearly shows that the > watermark metrics are all null throughout the wait loop: > https://paste-bin.xyz/16195 > > I also tried using JobVertexWatermarksHeaders instead of > JobVertexMetricsHeaders, for the REST get on the job vertices to get > watermark information, but the response bodies were empty. > > QUESTIONS: > 1. shouldn't the watermark metrics be populated in the job running in the > minicluster? > 2. do i have to enable metrics somehow? > 3. is there a better way to extract the output watermarks of a running > flink job? >