smjn commented on code in PR #18014: URL: https://github.com/apache/kafka/pull/18014#discussion_r1881410751
########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ########## @@ -240,9 +253,82 @@ public void startup( log.info("Starting up."); numPartitions = shareGroupTopicPartitionCount.getAsInt(); + setupRecordPruning(); log.info("Startup complete."); } + private void setupRecordPruning() { + log.info("Scheduling share-group state topic prune job."); + timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) { + @Override + public void run() { + List<CompletableFuture<Void>> futures = new ArrayList<>(); + runtime.activeTopicPartitions().forEach(tp -> futures.add(performRecordPruning(tp))); + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})) + .whenComplete((res, exp) -> { + if (exp != null) { + log.error("Received error in share-group state topic prune.", exp); + } + // Perpetual recursion, failure or not. + setupRecordPruning(); + }); + } + }); + } + + private CompletableFuture<Void> performRecordPruning(TopicPartition tp) { + // This future will always be completed normally, exception or not. + CompletableFuture<Void> fut = new CompletableFuture<>(); + + runtime.scheduleWriteOperation( + "write-state-record-prune", + tp, + Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()), + ShareCoordinatorShard::lastRedundantOffset + ).whenComplete((result, exception) -> { + if (exception != null) { + log.debug("Last redundant offset for tp {} lookup threw an error.", tp, exception); + Errors error = Errors.forException(exception); + // These errors might result from partition metadata not loaded + // or shard re-election. Will cause unnecessary noise, hence not logging + if (!(error.equals(Errors.COORDINATOR_LOAD_IN_PROGRESS) || error.equals(Errors.NOT_COORDINATOR))) { + log.error("Last redundant offset lookup for tp {} threw an error.", tp, exception); + fut.completeExceptionally(exception); + return; + } + fut.complete(null); + return; + } + if (result.isPresent()) { + Long off = result.get(); + Long lastPrunedOffset = lastPrunedOffsets.get(tp); + if (lastPrunedOffset != null && lastPrunedOffset.longValue() == off) { + log.debug("{} already pruned till offset {}", tp, off); + fut.complete(null); + return; + } + + log.info("Pruning records in {} till offset {}.", tp, off); + writer.deleteRecords(tp, off) + .whenComplete((res, exp) -> { + if (exp != null) { + log.debug("Exception while deleting records in {} till offset {}.", tp, off, exp); + fut.completeExceptionally(exp); + return; + } + fut.complete(null); + // Best effort prevention of issuing duplicate delete calls. + lastPrunedOffsets.put(tp, off); Review Comment: While my initial approach was to add functionality in `CoordinatorRuntime` - the requirement wasn't general enough to modify the runtime. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org