smjn commented on code in PR #18014:
URL: https://github.com/apache/kafka/pull/18014#discussion_r1871663144


##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -240,9 +249,60 @@ public void startup(
 
         log.info("Starting up.");
         numPartitions = shareGroupTopicPartitionCount.getAsInt();
+        setupRecordPruning();
         log.info("Startup complete.");
     }
 
+    // visibility for tests
+    void setupRecordPruning() {
+        timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) 
{
+            @Override
+            public void run() {
+                runtime.activeTopicPartitions().forEach(tp -> {
+                    performRecordPruning(tp);
+                });
+                // perpetual recursion
+                setupRecordPruning();
+            }
+        });
+    }
+
+    // visibility for tests
+    void performRecordPruning(TopicPartition tp) {
+        runtime.scheduleWriteOperation(
+            "write-state-record-prune",
+            tp,
+            Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
+            ShareCoordinatorShard::lastRedundantOffset
+        ).whenComplete((result, exception) -> {
+            if (exception != null) {
+                Errors error = Errors.forException(exception);
+                // These errors might result from partition metadata not loaded
+                // or shard re-election. Will cause unnecessary noise, hence 
not logging
+                if (!(error.equals(Errors.COORDINATOR_LOAD_IN_PROGRESS) || 
error.equals(Errors.NOT_COORDINATOR))) {
+                    log.error("Last redundant offset lookup threw an error.", 
exception);
+                }
+                return;
+            }
+            result.ifPresent(
+                off -> {
+                    // guard and optimization
+                    if (off == Long.MAX_VALUE || off <= 0) {
+                        log.warn("Last redundant offset value {} not suitable 
to make delete call for {}.", off, tp);
+                        return;
+                    }
+
+                    log.info("Pruning records in {} till offset {}.", tp, off);
+                    try {
+                        writer.deleteRecords(tp, off);
+                    } catch (Exception e) {
+                        log.error("Failed to delete records in {} till offset 
{}.", tp, off, e);

Review Comment:
   will change to debug



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to