dajac commented on code in PR #18014:
URL: https://github.com/apache/kafka/pull/18014#discussion_r1871349386


##########
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##########
@@ -165,4 +166,21 @@ class CoordinatorPartitionWriter(
     // Required offset.
     partitionResult.lastOffset + 1
   }
+
+  override def deleteRecords(tp: TopicPartition, deleteBeforeOffset: Long): 
Unit = {
+    var deleteResults: Map[TopicPartition, DeleteRecordsPartitionResult] = 
Map.empty
+    replicaManager.deleteRecords(
+      timeout = 0L,

Review Comment:
   I have doubts about using timeout=0 here. My understanding is that the it 
will delete records from the local logs but it wont wait on the replication to 
acknowledges the deletion from the replicas. Hence, I suppose that it will 
always throw an error. I suppose that we don't really care about the 
replication in this case but it may spam the logs with unnecessary errors.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -240,9 +249,60 @@ public void startup(
 
         log.info("Starting up.");
         numPartitions = shareGroupTopicPartitionCount.getAsInt();
+        setupRecordPruning();
         log.info("Startup complete.");
     }
 
+    // visibility for tests
+    void setupRecordPruning() {
+        timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) 
{
+            @Override
+            public void run() {
+                runtime.activeTopicPartitions().forEach(tp -> {
+                    performRecordPruning(tp);
+                });
+                // perpetual recursion
+                setupRecordPruning();
+            }
+        });
+    }
+
+    // visibility for tests
+    void performRecordPruning(TopicPartition tp) {
+        runtime.scheduleWriteOperation(
+            "write-state-record-prune",
+            tp,
+            Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
+            ShareCoordinatorShard::lastRedundantOffset
+        ).whenComplete((result, exception) -> {
+            if (exception != null) {
+                Errors error = Errors.forException(exception);
+                // These errors might result from partition metadata not loaded
+                // or shard re-election. Will cause unnecessary noise, hence 
not logging
+                if (!(error.equals(Errors.COORDINATOR_LOAD_IN_PROGRESS) || 
error.equals(Errors.NOT_COORDINATOR))) {
+                    log.error("Last redundant offset lookup threw an error.", 
exception);
+                }
+                return;
+            }
+            result.ifPresent(
+                off -> {
+                    // guard and optimization
+                    if (off == Long.MAX_VALUE || off <= 0) {
+                        log.warn("Last redundant offset value {} not suitable 
to make delete call for {}.", off, tp);
+                        return;
+                    }
+
+                    log.info("Pruning records in {} till offset {}.", tp, off);
+                    try {
+                        writer.deleteRecords(tp, off);
+                    } catch (Exception e) {
+                        log.error("Failed to delete records in {} till offset 
{}.", tp, off, e);

Review Comment:
   I believe this will be printed all the time.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -219,6 +232,8 @@ private void handleShareSnapshot(ShareSnapshotKey key, 
ShareSnapshotValue value)
                 snapshotUpdateCount.put(mapKey, 0);
             }
         }
+
+        offsetsManager.updateState(mapKey, offset);

Review Comment:
   My understanding is that you don't use a timeline data structure in the 
offset manager. The risk is that a snapshot maybe replayed and hence updates 
the offset here. However, at this point, we don't know whether the snapshot 
will be durably stored. Let's imagine the worst case, the write of the snapshot 
fails for whatever reason. In this case, the snapshot registry is reverted to 
undo all the uncommitted changes related to the write. Hence all the state will 
be reverted but the state in the offset manager. Now, another operation may 
come before the snapshot is retried so your offset would be completely wrong 
now. It seems to me that there is a risk of trimming the log at a wrong offset 
due to this.



##########
core/src/main/scala/kafka/server/ReplicaManager.scala:
##########
@@ -1369,9 +1369,10 @@ class ReplicaManager(val config: KafkaConfig,
 
   def deleteRecords(timeout: Long,

Review Comment:
   Let's add a unit test for this new flag in ReplicaManagerTest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to