junrao commented on code in PR #18014: URL: https://github.com/apache/kafka/pull/18014#discussion_r1879095991
########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ########## @@ -240,9 +251,96 @@ public void startup( log.info("Starting up."); numPartitions = shareGroupTopicPartitionCount.getAsInt(); + Map<TopicPartition, Long> offsets = new ConcurrentHashMap<>(); + setupRecordPruning(offsets); log.info("Startup complete."); } + private void setupRecordPruning(Map<TopicPartition, Long> offsets) { + log.info("Scheduling share state topic prune job."); + timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) { + @Override + public void run() { + List<CompletableFuture<Void>> futures = new ArrayList<>(); + runtime.activeTopicPartitions().forEach(tp -> futures.add(performRecordPruning(tp, offsets))); + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})) + .whenComplete((res, exp) -> { + if (exp != null) { + log.error("Received error in share state topic prune.", exp); + } + // Perpetual recursion, failure or not. + setupRecordPruning(offsets); + }); + } + }); + } + + private CompletableFuture<Void> performRecordPruning(TopicPartition tp, Map<TopicPartition, Long> offsets) { + // This future will always be completed normally, exception or not. + CompletableFuture<Void> fut = new CompletableFuture<>(); + runtime.scheduleWriteOperation( Review Comment: This call doesn't do any writes in runtime. Should we use scheduleReadOperation? Similarly, I also don't understand why `readState` calls `runtime.scheduleWriteOperation`. ########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java: ########## @@ -240,9 +251,96 @@ public void startup( log.info("Starting up."); numPartitions = shareGroupTopicPartitionCount.getAsInt(); + Map<TopicPartition, Long> offsets = new ConcurrentHashMap<>(); + setupRecordPruning(offsets); log.info("Startup complete."); } + private void setupRecordPruning(Map<TopicPartition, Long> offsets) { + log.info("Scheduling share state topic prune job."); + timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) { + @Override + public void run() { + List<CompletableFuture<Void>> futures = new ArrayList<>(); + runtime.activeTopicPartitions().forEach(tp -> futures.add(performRecordPruning(tp, offsets))); + + CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{})) + .whenComplete((res, exp) -> { + if (exp != null) { + log.error("Received error in share state topic prune.", exp); + } + // Perpetual recursion, failure or not. + setupRecordPruning(offsets); + }); + } + }); + } + + private CompletableFuture<Void> performRecordPruning(TopicPartition tp, Map<TopicPartition, Long> offsets) { + // This future will always be completed normally, exception or not. + CompletableFuture<Void> fut = new CompletableFuture<>(); + runtime.scheduleWriteOperation( + "write-state-record-prune", + tp, + Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()), + ShareCoordinatorShard::lastRedundantOffset + ).whenComplete((result, exception) -> { + if (exception != null) { + log.debug("Last redundant offset for tp {} lookup threw an error.", tp, exception); + Errors error = Errors.forException(exception); + // These errors might result from partition metadata not loaded + // or shard re-election. Will cause unnecessary noise, hence not logging + if (!(error.equals(Errors.COORDINATOR_LOAD_IN_PROGRESS) || error.equals(Errors.NOT_COORDINATOR))) { + log.error("Last redundant offset lookup for tp {} threw an error.", tp, exception); + // Should not reschedule -> unknown exception. + fut.completeExceptionally(exception); + return; + } + // Should reschedule -> could be transient. + fut.complete(null); + return; + } + if (result.isPresent()) { + Long off = result.get(); + // Guard and optimization. + if (off == Long.MAX_VALUE || off <= 0) { + log.warn("Last redundant offset value {} not suitable to make delete call for {}.", off, tp); + // Should reschedule -> next lookup could yield valid value. + fut.complete(null); + return; + } + + if (offsets.containsKey(tp) && Objects.equals(offsets.get(tp), off)) { + log.debug("{} already pruned at offset {}", tp, off); + fut.complete(null); + return; + } + + log.info("Pruning records in {} till offset {}.", tp, off); + + writer.deleteRecords(tp, off) + .whenComplete((res, exp) -> { + if (exp != null) { + // Should not reschedule -> problems while deleting. + log.debug("Exception while deleting records in {} till offset {}.", tp, off, exp); + fut.completeExceptionally(exp); + return; + } + // Should reschedule -> successful delete + fut.complete(null); + // Update offsets map as we do not want to + // issue repeated deleted + offsets.put(tp, off); Review Comment: Since this is called from the purgatory thread, it's possible when this call occurs, the partition leader has already resigned and we need to handle that accordingly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org