chirag-wadhwa5 commented on code in PR #16456:
URL: https://github.com/apache/kafka/pull/16456#discussion_r1668342690


##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -4390,6 +4401,2215 @@ class KafkaApisTest extends Logging {
     assertEquals("broker2", node.host)
   }
 
+  private def expectedAcquiredRecords(firstOffset : Long, lastOffset : Long, 
deliveryCount : Int) : util.List[AcquiredRecords] = {
+    val acquiredRecordsList : util.List[AcquiredRecords] = new util.ArrayList()
+    acquiredRecordsList.add(new AcquiredRecords()
+      .setFirstOffset(firstOffset)
+      .setLastOffset(lastOffset)
+      .setDeliveryCount(deliveryCount.toShort))
+    acquiredRecordsList
+  }
+
+  private def memoryRecordsBuilder(numOfRecords : Int, startOffset : Long) : 
MemoryRecordsBuilder = {
+
+    val buffer: ByteBuffer = ByteBuffer.allocate(1024)
+    val compression: Compression = Compression.of(CompressionType.NONE).build()
+    val timestampType: TimestampType = TimestampType.CREATE_TIME
+
+    val builder: MemoryRecordsBuilder = MemoryRecords.builder(buffer, 
compression, timestampType, startOffset)
+    for (i <- 0 until numOfRecords) {
+      builder.appendWithOffset(startOffset + i, 0L, TestUtils.randomBytes(10), 
TestUtils.randomBytes(10))
+    }
+    builder
+  }
+
+  private def memoryRecords(numOfRecords : Int, startOffset : Long) : 
MemoryRecords = {
+    memoryRecordsBuilder(numOfRecords, startOffset).build()
+  }
+
+  @Test
+  def testHandleShareFetchRequestSuccessWithoutAcknowledgements(): Unit = {
+    val topicName = "foo"
+    val topicId = Uuid.randomUuid()
+    val partitionIndex = 0
+    metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+    addTopicToMetadataCache(topicName, 1, topicId = topicId)
+    val memberId : Uuid = Uuid.ZERO_UUID
+
+    val shareSessionEpoch = 0
+
+    val records = memoryRecords(10, 0)
+
+    val sharePartitionManager : SharePartitionManager = 
mock(classOf[SharePartitionManager])
+
+    when(sharePartitionManager.fetchMessages(any(), any(), any(), any(), 
any())).thenReturn(
+      CompletableFuture.completedFuture(Map[TopicIdPartition, 
ShareFetchResponseData.PartitionData](
+        new TopicIdPartition(topicId, new TopicPartition(topicName, 
partitionIndex)) ->
+                  new ShareFetchResponseData.PartitionData()
+                    .setErrorCode(Errors.NONE.code)
+                    .setAcknowledgeErrorCode(Errors.NONE.code)
+                    .setRecords(records)
+                    .setAcquiredRecords(new util.ArrayList(List(
+                      new ShareFetchResponseData.AcquiredRecords()
+                        .setFirstOffset(0)
+                        .setLastOffset(9)
+                        .setDeliveryCount(1)
+                      ).asJava))

Review Comment:
   Thanks for the review. Actually yes it is required, since the List() is a 
scala list. There are other ways that do not use asJava, but they span over 
multiple lines, and would require new variable initialisations, reducing the 
code readability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to