smjn commented on code in PR #18014:
URL: https://github.com/apache/kafka/pull/18014#discussion_r1875674980


##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -240,9 +249,87 @@ public void startup(
 
         log.info("Starting up.");
         numPartitions = shareGroupTopicPartitionCount.getAsInt();
+        setupRecordPruning();
         log.info("Startup complete.");
     }
 
+    // visibility for tests
+    void setupRecordPruning() {
+        log.info("Scheduling share state topic prune job.");
+        timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) 
{
+            @Override
+            public void run() {
+                List<CompletableFuture<Void>> futures = new ArrayList<>();
+                runtime.activeTopicPartitions().forEach(tp -> 
futures.add(performRecordPruning(tp)));
+
+                CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[]{}))
+                    .whenComplete((res, exp) -> {
+                        if (exp != null) {
+                            log.error("Received error in share state topic 
prune, stopping job.", exp);
+                            return;
+                        }

Review Comment:
   @dajac 
   But you mentioned:
   ```
   I also wonder whether we should re-reschedule in case of failure too. 
thenRun would only reschedule if the future succeeds. It would make it more 
robust to unexpected failures.
   ```
   
   I have modified the `performRecordPruning` to complete the future 
exceptionally in cases
   - state machine lookup throws error OTHER THAN NOT_COORDINATOR or 
COORDINATOR_LOAD_IN_PROGRESS
   - `writer.deleteRecords` throws exception
   - other exception cases like  NOT_COORDINATOR or 
COORDINATOR_LOAD_IN_PROGRESS - future completes normally. Abnormal offset value 
also it completes normally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to