dajac commented on code in PR #18014:
URL: https://github.com/apache/kafka/pull/18014#discussion_r1875536417


##########
core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala:
##########
@@ -165,4 +165,25 @@ class CoordinatorPartitionWriter(
     // Required offset.
     partitionResult.lastOffset + 1
   }
+
+  override def deleteRecords(tp: TopicPartition, deleteBeforeOffset: Long): 
CompletableFuture[Void] = {
+    val responseFuture: CompletableFuture[Void] = new CompletableFuture[Void]()
+
+    replicaManager.deleteRecords(
+      timeout = 30000L, // 30 seconds.
+      offsetPerPartition = Map(tp -> deleteBeforeOffset),
+      responseCallback = results => {
+        val result = results.get(tp)
+        if (result.isEmpty) {
+          responseFuture.completeExceptionally(new 
IllegalStateException(s"Delete status $result should have partition $tp."))
+        } else if (result.get.errorCode() != Errors.NONE.code()) {
+          
responseFuture.completeExceptionally(Errors.forCode(result.get.errorCode()).exception())

Review Comment:
   nit: You can omit the `()` for `errorCode`, `code` and `exception`.



##########
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala:
##########
@@ -238,4 +239,86 @@ class CoordinatorPartitionWriterTest {
       batch
     ))
   }
+
+  @Test
+  def testDeleteRecordsResponseContainsError(): Unit = {
+    val replicaManager = mock(classOf[ReplicaManager])
+    val partitionRecordWriter = new CoordinatorPartitionWriter(
+      replicaManager
+    )
+
+    val callbackCapture: ArgumentCaptor[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit] =
+      ArgumentCaptor.forClass(classOf[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit])
+
+    // Response contains error.
+    when(replicaManager.deleteRecords(
+      ArgumentMatchers.anyLong(),
+      ArgumentMatchers.any(),
+      callbackCapture.capture(),
+      ArgumentMatchers.eq(true)
+    )).thenAnswer(_ => {
+      callbackCapture.getValue.apply(Map(
+        new TopicPartition("random-topic", 0) -> new 
DeleteRecordsPartitionResult()
+          .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code())
+      ))
+    })
+
+    partitionRecordWriter.deleteRecords(
+      new TopicPartition("random-topic", 0),
+      10L
+    ).whenComplete((_, exp) => {

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala:
##########
@@ -238,4 +239,86 @@ class CoordinatorPartitionWriterTest {
       batch
     ))
   }
+
+  @Test
+  def testDeleteRecordsResponseContainsError(): Unit = {
+    val replicaManager = mock(classOf[ReplicaManager])
+    val partitionRecordWriter = new CoordinatorPartitionWriter(
+      replicaManager
+    )
+
+    val callbackCapture: ArgumentCaptor[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit] =
+      ArgumentCaptor.forClass(classOf[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit])
+
+    // Response contains error.
+    when(replicaManager.deleteRecords(
+      ArgumentMatchers.anyLong(),
+      ArgumentMatchers.any(),
+      callbackCapture.capture(),
+      ArgumentMatchers.eq(true)
+    )).thenAnswer(_ => {
+      callbackCapture.getValue.apply(Map(
+        new TopicPartition("random-topic", 0) -> new 
DeleteRecordsPartitionResult()
+          .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code())
+      ))
+    })
+
+    partitionRecordWriter.deleteRecords(
+      new TopicPartition("random-topic", 0),
+      10L
+    ).whenComplete((_, exp) => {
+      assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), exp)
+    })
+
+    // Response does not contain topic queried.
+    when(replicaManager.deleteRecords(
+      ArgumentMatchers.anyLong(),
+      ArgumentMatchers.any(),
+      callbackCapture.capture(),
+      ArgumentMatchers.eq(true)
+    )).thenAnswer(_ => {
+      callbackCapture.getValue.apply(Map(
+        new TopicPartition("other-random-topic", 0) -> new 
DeleteRecordsPartitionResult()
+          .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code())
+      ))
+    })
+
+    partitionRecordWriter.deleteRecords(
+      new TopicPartition("random-topic", 0),
+      10L
+    ).whenComplete((_, exp) => {

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala:
##########
@@ -238,4 +239,86 @@ class CoordinatorPartitionWriterTest {
       batch
     ))
   }
+
+  @Test
+  def testDeleteRecordsResponseContainsError(): Unit = {
+    val replicaManager = mock(classOf[ReplicaManager])
+    val partitionRecordWriter = new CoordinatorPartitionWriter(
+      replicaManager
+    )
+
+    val callbackCapture: ArgumentCaptor[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit] =
+      ArgumentCaptor.forClass(classOf[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit])
+
+    // Response contains error.
+    when(replicaManager.deleteRecords(
+      ArgumentMatchers.anyLong(),
+      ArgumentMatchers.any(),
+      callbackCapture.capture(),
+      ArgumentMatchers.eq(true)
+    )).thenAnswer(_ => {
+      callbackCapture.getValue.apply(Map(
+        new TopicPartition("random-topic", 0) -> new 
DeleteRecordsPartitionResult()
+          .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code())

Review Comment:
   nit: You can omit the `()` after `code`.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -6493,9 +6494,9 @@ class ReplicaManagerTest {
   }
 
   def verifyPartitionIsOnlineAndHasId(
-    replicaManager: ReplicaManager,
-    topicIdPartition: TopicIdPartition
-  ): Unit = {
+                                       replicaManager: ReplicaManager,

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -6509,9 +6510,9 @@ class ReplicaManagerTest {
   }
 
   def verifyPartitionIsOffline(
-    replicaManager: ReplicaManager,
-    topicIdPartition: TopicIdPartition
-  ): Unit = {
+                                replicaManager: ReplicaManager,

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala:
##########
@@ -238,4 +239,86 @@ class CoordinatorPartitionWriterTest {
       batch
     ))
   }
+
+  @Test
+  def testDeleteRecordsResponseContainsError(): Unit = {
+    val replicaManager = mock(classOf[ReplicaManager])
+    val partitionRecordWriter = new CoordinatorPartitionWriter(
+      replicaManager
+    )
+
+    val callbackCapture: ArgumentCaptor[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit] =
+      ArgumentCaptor.forClass(classOf[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit])
+
+    // Response contains error.
+    when(replicaManager.deleteRecords(
+      ArgumentMatchers.anyLong(),
+      ArgumentMatchers.any(),
+      callbackCapture.capture(),
+      ArgumentMatchers.eq(true)
+    )).thenAnswer(_ => {
+      callbackCapture.getValue.apply(Map(
+        new TopicPartition("random-topic", 0) -> new 
DeleteRecordsPartitionResult()
+          .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code())
+      ))
+    })

Review Comment:
   nit: `.thenAnswer(_ => {` -> `.thenAnswer { _ => `



##########
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala:
##########
@@ -238,4 +239,86 @@ class CoordinatorPartitionWriterTest {
       batch
     ))
   }
+
+  @Test
+  def testDeleteRecordsResponseContainsError(): Unit = {
+    val replicaManager = mock(classOf[ReplicaManager])
+    val partitionRecordWriter = new CoordinatorPartitionWriter(
+      replicaManager
+    )
+
+    val callbackCapture: ArgumentCaptor[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit] =
+      ArgumentCaptor.forClass(classOf[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit])
+
+    // Response contains error.
+    when(replicaManager.deleteRecords(
+      ArgumentMatchers.anyLong(),
+      ArgumentMatchers.any(),
+      callbackCapture.capture(),
+      ArgumentMatchers.eq(true)
+    )).thenAnswer(_ => {
+      callbackCapture.getValue.apply(Map(
+        new TopicPartition("random-topic", 0) -> new 
DeleteRecordsPartitionResult()
+          .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code())
+      ))
+    })
+
+    partitionRecordWriter.deleteRecords(
+      new TopicPartition("random-topic", 0),
+      10L
+    ).whenComplete((_, exp) => {
+      assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), exp)
+    })
+
+    // Response does not contain topic queried.
+    when(replicaManager.deleteRecords(
+      ArgumentMatchers.anyLong(),
+      ArgumentMatchers.any(),
+      callbackCapture.capture(),
+      ArgumentMatchers.eq(true)
+    )).thenAnswer(_ => {
+      callbackCapture.getValue.apply(Map(
+        new TopicPartition("other-random-topic", 0) -> new 
DeleteRecordsPartitionResult()
+          .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code())
+      ))
+    })
+
+    partitionRecordWriter.deleteRecords(
+      new TopicPartition("random-topic", 0),
+      10L
+    ).whenComplete((_, exp) => {
+      assertTrue(exp.isInstanceOf[IllegalStateException])
+    })
+  }
+
+  @Test
+  def testDeleteRecordsSuccess(): Unit = {
+    val replicaManager = mock(classOf[ReplicaManager])
+    val partitionRecordWriter = new CoordinatorPartitionWriter(
+      replicaManager
+    )
+
+    val callbackCapture: ArgumentCaptor[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit] =
+      ArgumentCaptor.forClass(classOf[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit])
+
+    // response contains error
+    when(replicaManager.deleteRecords(
+      ArgumentMatchers.anyLong(),
+      ArgumentMatchers.any(),
+      callbackCapture.capture(),
+      ArgumentMatchers.eq(true)
+    )).thenAnswer(_ => {
+      callbackCapture.getValue.apply(Map(
+        new TopicPartition("random-topic", 0) -> new 
DeleteRecordsPartitionResult()
+          .setErrorCode(Errors.NONE.code())

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala:
##########
@@ -238,4 +239,86 @@ class CoordinatorPartitionWriterTest {
       batch
     ))
   }
+
+  @Test
+  def testDeleteRecordsResponseContainsError(): Unit = {
+    val replicaManager = mock(classOf[ReplicaManager])
+    val partitionRecordWriter = new CoordinatorPartitionWriter(
+      replicaManager
+    )
+
+    val callbackCapture: ArgumentCaptor[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit] =
+      ArgumentCaptor.forClass(classOf[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit])
+
+    // Response contains error.
+    when(replicaManager.deleteRecords(
+      ArgumentMatchers.anyLong(),
+      ArgumentMatchers.any(),
+      callbackCapture.capture(),
+      ArgumentMatchers.eq(true)
+    )).thenAnswer(_ => {
+      callbackCapture.getValue.apply(Map(
+        new TopicPartition("random-topic", 0) -> new 
DeleteRecordsPartitionResult()
+          .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code())
+      ))
+    })
+
+    partitionRecordWriter.deleteRecords(
+      new TopicPartition("random-topic", 0),
+      10L
+    ).whenComplete((_, exp) => {
+      assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), exp)
+    })
+
+    // Response does not contain topic queried.
+    when(replicaManager.deleteRecords(
+      ArgumentMatchers.anyLong(),
+      ArgumentMatchers.any(),
+      callbackCapture.capture(),
+      ArgumentMatchers.eq(true)
+    )).thenAnswer(_ => {
+      callbackCapture.getValue.apply(Map(
+        new TopicPartition("other-random-topic", 0) -> new 
DeleteRecordsPartitionResult()
+          .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code())
+      ))
+    })
+
+    partitionRecordWriter.deleteRecords(
+      new TopicPartition("random-topic", 0),
+      10L
+    ).whenComplete((_, exp) => {
+      assertTrue(exp.isInstanceOf[IllegalStateException])
+    })
+  }
+
+  @Test
+  def testDeleteRecordsSuccess(): Unit = {
+    val replicaManager = mock(classOf[ReplicaManager])
+    val partitionRecordWriter = new CoordinatorPartitionWriter(
+      replicaManager
+    )
+
+    val callbackCapture: ArgumentCaptor[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit] =
+      ArgumentCaptor.forClass(classOf[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit])
+
+    // response contains error
+    when(replicaManager.deleteRecords(
+      ArgumentMatchers.anyLong(),
+      ArgumentMatchers.any(),
+      callbackCapture.capture(),
+      ArgumentMatchers.eq(true)
+    )).thenAnswer(_ => {

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3255,16 +3256,16 @@ class ReplicaManagerTest {
   }
 
   private def fetchPartition(
-    replicaManager: ReplicaManager,
-    replicaId: Int,
-    partition: TopicIdPartition,
-    partitionData: PartitionData,
-    minBytes: Int,
-    maxBytes: Int,
-    isolation: FetchIsolation,
-    clientMetadata: Option[ClientMetadata],
-    maxWaitMs: Long
-  ): CallbackResult[FetchPartitionData] = {
+                              replicaManager: ReplicaManager,

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3233,14 +3234,14 @@ class ReplicaManagerTest {
   }
 
   private def fetchPartitionAsFollower(
-    replicaManager: ReplicaManager,
-    partition: TopicIdPartition,
-    partitionData: PartitionData,
-    replicaId: Int,
-    maxWaitMs: Long = 0,
-    minBytes: Int = 1,
-    maxBytes: Int = 1024 * 1024,
-  ): CallbackResult[FetchPartitionData] = {
+                                        replicaManager: ReplicaManager,

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3289,18 +3290,18 @@ class ReplicaManagerTest {
   }
 
   private def fetchPartitions(
-    replicaManager: ReplicaManager,
-    replicaId: Int,
-    fetchInfos: Seq[(TopicIdPartition, PartitionData)],
-    responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit,
-    requestVersion: Short = ApiKeys.FETCH.latestVersion,
-    maxWaitMs: Long = 0,
-    minBytes: Int = 1,
-    maxBytes: Int = 1024 * 1024,
-    quota: ReplicaQuota = UNBOUNDED_QUOTA,
-    isolation: FetchIsolation = FetchIsolation.LOG_END,
-    clientMetadata: Option[ClientMetadata] = None
-  ): Unit = {
+                               replicaManager: ReplicaManager,

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3352,22 +3353,22 @@ class ReplicaManagerTest {
   }
 
   private def setupReplicaManagerWithMockedPurgatories(
-    timer: MockTimer,
-    brokerId: Int = 0,
-    aliveBrokerIds: Seq[Int] = Seq(0, 1),
-    propsModifier: Properties => Unit = _ => {},
-    mockReplicaFetcherManager: Option[ReplicaFetcherManager] = None,
-    mockReplicaAlterLogDirsManager: Option[ReplicaAlterLogDirsManager] = None,
-    isShuttingDown: AtomicBoolean = new AtomicBoolean(false),
-    enableRemoteStorage: Boolean = false,
-    shouldMockLog: Boolean = false,
-    remoteLogManager: Option[RemoteLogManager] = None,
-    defaultTopicRemoteLogStorageEnable: Boolean = true,
-    setupLogDirMetaProperties: Boolean = false,
-    directoryEventHandler: DirectoryEventHandler = DirectoryEventHandler.NOOP,
-    buildRemoteLogAuxState: Boolean = false,
-    remoteFetchQuotaExceeded: Option[Boolean] = None
-  ): ReplicaManager = {
+                                                        timer: MockTimer,

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala:
##########
@@ -238,4 +239,86 @@ class CoordinatorPartitionWriterTest {
       batch
     ))
   }
+
+  @Test
+  def testDeleteRecordsResponseContainsError(): Unit = {
+    val replicaManager = mock(classOf[ReplicaManager])
+    val partitionRecordWriter = new CoordinatorPartitionWriter(
+      replicaManager
+    )
+
+    val callbackCapture: ArgumentCaptor[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit] =
+      ArgumentCaptor.forClass(classOf[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit])
+
+    // Response contains error.
+    when(replicaManager.deleteRecords(
+      ArgumentMatchers.anyLong(),
+      ArgumentMatchers.any(),
+      callbackCapture.capture(),
+      ArgumentMatchers.eq(true)
+    )).thenAnswer(_ => {
+      callbackCapture.getValue.apply(Map(
+        new TopicPartition("random-topic", 0) -> new 
DeleteRecordsPartitionResult()
+          .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code())
+      ))
+    })
+
+    partitionRecordWriter.deleteRecords(
+      new TopicPartition("random-topic", 0),
+      10L
+    ).whenComplete((_, exp) => {
+      assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), exp)
+    })
+
+    // Response does not contain topic queried.
+    when(replicaManager.deleteRecords(
+      ArgumentMatchers.anyLong(),
+      ArgumentMatchers.any(),
+      callbackCapture.capture(),
+      ArgumentMatchers.eq(true)
+    )).thenAnswer(_ => {

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala:
##########
@@ -238,4 +239,86 @@ class CoordinatorPartitionWriterTest {
       batch
     ))
   }
+
+  @Test
+  def testDeleteRecordsResponseContainsError(): Unit = {
+    val replicaManager = mock(classOf[ReplicaManager])
+    val partitionRecordWriter = new CoordinatorPartitionWriter(
+      replicaManager
+    )
+
+    val callbackCapture: ArgumentCaptor[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit] =
+      ArgumentCaptor.forClass(classOf[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit])
+
+    // Response contains error.
+    when(replicaManager.deleteRecords(
+      ArgumentMatchers.anyLong(),
+      ArgumentMatchers.any(),
+      callbackCapture.capture(),
+      ArgumentMatchers.eq(true)
+    )).thenAnswer(_ => {
+      callbackCapture.getValue.apply(Map(
+        new TopicPartition("random-topic", 0) -> new 
DeleteRecordsPartitionResult()
+          .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code())
+      ))
+    })
+
+    partitionRecordWriter.deleteRecords(
+      new TopicPartition("random-topic", 0),
+      10L
+    ).whenComplete((_, exp) => {
+      assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.exception(), exp)
+    })
+
+    // Response does not contain topic queried.
+    when(replicaManager.deleteRecords(
+      ArgumentMatchers.anyLong(),
+      ArgumentMatchers.any(),
+      callbackCapture.capture(),
+      ArgumentMatchers.eq(true)
+    )).thenAnswer(_ => {
+      callbackCapture.getValue.apply(Map(
+        new TopicPartition("other-random-topic", 0) -> new 
DeleteRecordsPartitionResult()
+          .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code())
+      ))
+    })
+
+    partitionRecordWriter.deleteRecords(
+      new TopicPartition("random-topic", 0),
+      10L
+    ).whenComplete((_, exp) => {
+      assertTrue(exp.isInstanceOf[IllegalStateException])
+    })
+  }
+
+  @Test
+  def testDeleteRecordsSuccess(): Unit = {
+    val replicaManager = mock(classOf[ReplicaManager])
+    val partitionRecordWriter = new CoordinatorPartitionWriter(
+      replicaManager
+    )
+
+    val callbackCapture: ArgumentCaptor[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit] =
+      ArgumentCaptor.forClass(classOf[Map[TopicPartition, 
DeleteRecordsPartitionResult] => Unit])
+
+    // response contains error
+    when(replicaManager.deleteRecords(
+      ArgumentMatchers.anyLong(),
+      ArgumentMatchers.any(),
+      callbackCapture.capture(),
+      ArgumentMatchers.eq(true)
+    )).thenAnswer(_ => {
+      callbackCapture.getValue.apply(Map(
+        new TopicPartition("random-topic", 0) -> new 
DeleteRecordsPartitionResult()
+          .setErrorCode(Errors.NONE.code())
+      ))
+    })
+
+    partitionRecordWriter.deleteRecords(

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3205,15 +3206,15 @@ class ReplicaManagerTest {
   }
 
   private def fetchPartitionAsConsumer(
-    replicaManager: ReplicaManager,
-    partition: TopicIdPartition,
-    partitionData: PartitionData,
-    maxWaitMs: Long = 0,
-    minBytes: Int = 1,
-    maxBytes: Int = 1024 * 1024,
-    isolationLevel: IsolationLevel = IsolationLevel.READ_UNCOMMITTED,
-    clientMetadata: Option[ClientMetadata] = None,
-  ): CallbackResult[FetchPartitionData] = {
+                                        replicaManager: ReplicaManager,

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3521,9 +3522,9 @@ class ReplicaManagerTest {
       }
 
       override def createReplicaAlterLogDirsManager(
-        quotaManager: ReplicationQuotaManager,
-        brokerTopicStats: BrokerTopicStats
-      ): ReplicaAlterLogDirsManager = {
+                                                     quotaManager: 
ReplicationQuotaManager,

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -3466,11 +3467,11 @@ class ReplicaManagerTest {
       } else None) {
 
       override protected def createReplicaFetcherManager(
-        metrics: Metrics,
-        time: Time,
-        threadNamePrefix: Option[String],
-        quotaManager: ReplicationQuotaManager
-      ): ReplicaFetcherManager = {
+                                                          metrics: Metrics,

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -4934,16 +4935,16 @@ class ReplicaManagerTest {
   }
 
   private def makeLeaderAndIsrRequest(
-    topicId: Uuid,
-    topicPartition: TopicPartition,
-    replicas: Seq[Int],
-    leaderAndIsr: LeaderAndIsr,
-    isNew: Boolean = true,
-    brokerEpoch: Int = 0,
-    controllerId: Int = 0,
-    controllerEpoch: Int = 0,
-    version: Short = LeaderAndIsrRequestData.HIGHEST_SUPPORTED_VERSION
-  ): LeaderAndIsrRequest = {
+                                       topicId: Uuid,

Review Comment:
   ditto.



##########
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##########
@@ -2852,10 +2853,10 @@ class ReplicaManagerTest {
   }
 
   private def sendProducerAppend(
-    replicaManager: ReplicaManager,
-    topicPartition: TopicPartition,
-    numOfRecords: Int
-  ): AtomicReference[PartitionResponse] = {
+                                  replicaManager: ReplicaManager,

Review Comment:
   Let's revert all those changes.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -240,9 +249,76 @@ public void startup(
 
         log.info("Starting up.");
         numPartitions = shareGroupTopicPartitionCount.getAsInt();
+        setupRecordPruning();
         log.info("Startup complete.");
     }
 
+    // visibility for tests
+    void setupRecordPruning() {
+        timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) 
{
+            @Override
+            public void run() {
+                List<CompletableFuture<Void>> futures = new ArrayList<>();
+                runtime.activeTopicPartitions().forEach(tp -> {
+                    futures.add(performRecordPruning(tp));
+                });

Review Comment:
   nit: You could omit the `{}` in this case.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorOffsetsManager.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.server.share.SharePartitionKey;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Util class to track the offsets written into the internal topic
+ * per share partition key.
+ * It calculates the minimum offset globally up to which the records
+ * in the internal partition are redundant i.e. they have been overridden
+ * by newer records.
+ */
+public class ShareCoordinatorOffsetsManager {
+
+    // Map to store share partition key => current partition offset
+    // being written.
+    private static final String MIN_OFFSET = "min-offset";
+    private static final String REDUNDANT_OFFSET = "redundant-offset";
+    private final TimelineHashMap<SharePartitionKey, Long> offsets;
+
+    // Minimum offset representing the smallest necessary offset 
(non-redundant)
+    // across the internal partition.
+    // We are using timeline maps here because the offsets which are passed 
into
+    // updateState might not be committed yet. In case of retry, these offsets 
would
+    // be invalidated via the snapshot registry. Hence, using timeline hashmaps
+    // the values would automatically revert in accordance with the last 
committed offset.
+    private final TimelineHashMap<String, Long> minOffset;
+    private final TimelineHashMap<String, Long> redundantOffset;

Review Comment:
   Have you considered using TimelineObject for those two?



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -240,9 +249,76 @@ public void startup(
 
         log.info("Starting up.");
         numPartitions = shareGroupTopicPartitionCount.getAsInt();
+        setupRecordPruning();
         log.info("Startup complete.");
     }
 
+    // visibility for tests
+    void setupRecordPruning() {
+        timer.add(new TimerTask(config.shareCoordinatorTopicPruneIntervalMs()) 
{
+            @Override
+            public void run() {
+                List<CompletableFuture<Void>> futures = new ArrayList<>();
+                runtime.activeTopicPartitions().forEach(tp -> {
+                    futures.add(performRecordPruning(tp));
+                });
+
+                // None of the futures will complete exceptionally.
+                CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[]{}))
+                    .thenRun(() -> {
+                        // perpetual recursion
+                        setupRecordPruning();
+                    });

Review Comment:
   You could omit them here too. I also wonder whether we should re-reschedule 
in case of failure too. `thenRun` would only reschedule if the future succeeds. 
It would make it more robust to unexpected failures.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to