[ https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16406210#comment-16406210 ]
ASF GitHub Bot commented on FLINK-8756: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5573#discussion_r175734974 --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java --- @@ -542,6 +549,68 @@ public void testListJobs() throws Exception { } } + @Test + public void testGetAccumulators() throws Exception { + TestAccumulatorHandlers accumulatorHandlers = new TestAccumulatorHandlers(); + TestAccumulatorHandlers.TestAccumulatorHandler accumulatorHandler = accumulatorHandlers.new TestAccumulatorHandler(); + + try (TestRestServerEndpoint ignored = createRestServerEndpoint(accumulatorHandler)){ + + JobID id = new JobID(); + + { + Map<String, Object> accumulators = restClusterClient.getAccumulators(id); + assertNotNull(accumulators); + assertEquals(1, accumulators.size()); + + assertEquals(true, accumulators.containsKey("testKey")); + assertEquals("testValue", accumulators.get("testKey").toString()); + } + } + } + + private class TestAccumulatorHandlers { + + private class TestAccumulatorHandler extends TestHandler<EmptyRequestBody, JobAccumulatorsInfo, JobAccumulatorsMessageParameters> { + + public TestAccumulatorHandler() { + super(JobAccumulatorsHeaders.getInstance()); + } + + @Override + protected CompletableFuture<JobAccumulatorsInfo> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, + JobAccumulatorsMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException { + JobAccumulatorsInfo accumulatorsInfo; + List<Boolean> queryParams = request.getQueryParameter(AccumulatorsIncludeSerializedValueQueryParameter.class); + + boolean includeSerializedValue = false; + if (!queryParams.isEmpty()) { + includeSerializedValue = queryParams.get(0); + } + + List<JobAccumulatorsInfo.UserTaskAccumulator> userTaskAccumulators = new ArrayList<JobAccumulatorsInfo.UserTaskAccumulator>() {{ --- End diff -- Instead of creating a new anonymous class can we simply create an `ArrayList` and then add the element (without the initializer block)? > Support ClusterClient.getAccumulators() in RestClusterClient > ------------------------------------------------------------ > > Key: FLINK-8756 > URL: https://issues.apache.org/jira/browse/FLINK-8756 > Project: Flink > Issue Type: Improvement > Components: Client > Affects Versions: 1.5.0 > Reporter: Aljoscha Krettek > Assignee: vinoyang > Priority: Blocker > Fix For: 1.5.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)