dajac commented on code in PR #18014: URL: https://github.com/apache/kafka/pull/18014#discussion_r1875665143
########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ########## @@ -240,9 +249,87 @@ public void startup( log.info("Starting up."); numPartitions = shareGroupTopicPartitionCount.getAsInt(); + setupRecordPruning(); log.info("Startup complete."); } + // visibility for tests + void setupRecordPruning() { + log.info("Scheduling share state topic prune job."); + timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) { + @Override + public void run() { + List<CompletableFuture<Void>> futures = new ArrayList<>(); + runtime.activeTopicPartitions().forEach(tp -> futures.add(performRecordPruning(tp))); + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})) + .whenComplete((res, exp) -> { + if (exp != null) { + log.error("Received error in share state topic prune, stopping job.", exp); + return; + } Review Comment: Are you sure about this? Don't you want to retry after the next interval? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org