Chia-Ping Tsai created KAFKA-17336:
--------------------------------------

             Summary: Add IT to make sure the production MV does not use 
unstable version of LIST_OFFSET 
                 Key: KAFKA-17336
                 URL: https://issues.apache.org/jira/browse/KAFKA-17336
             Project: Kafka
          Issue Type: Sub-task
            Reporter: Chia-Ping Tsai
            Assignee: Chia-Ping Tsai


see https://github.com/apache/kafka/pull/16841#discussion_r1715730246

the sample code:

{code:java}
  @ParameterizedTest
  @ValueSource(strings = Array("kraft"))
  def test(quorum: String): Unit = {
    client = createAdminClient

    client.createTopics(Collections.singletonList(new NewTopic("ikea", 1, 
1.toShort))).all().get()
    TimeUnit.SECONDS.sleep(1)
    val producer = new 
KafkaProducer(Collections.singletonMap(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
      plaintextBootstrapServers(brokers).asInstanceOf[Object]), new 
ByteArraySerializer, new ByteArraySerializer)
    try {
      (0 until 100).foreach { i =>
        producer.send(new ProducerRecord[Array[Byte], Array[Byte]]("ikea", new 
Array[Byte](10), new Array[Byte](10)))
      }
      producer.flush()
    } finally producer.close()
    client.alterPartitionReassignments(util.Collections.singletonMap(new 
TopicPartition("ikea", 0), Optional.of(new 
NewPartitionReassignment(util.Arrays.asList(0, 1)))))
      .all().get()
    client.deleteRecords(util.Collections.singletonMap(new 
TopicPartition("ikea", 0), RecordsToDelete.beforeOffset(3))).all().get()
    TimeUnit.SECONDS.sleep(5)
    assertEquals(2, 
client.describeTopics(util.Collections.singletonList("ikea")).topicNameValues().get("ikea").get().partitions()
      .get(0).isr().size())
  }
{code}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to