showuon commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r917485378
##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +973,86 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}
}
+ @ParameterizedTest
+ @ValueSource(strings =
Array(CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME,
RangeAssignor.RANGE_ASSIGNOR_NAME))
Review Comment:
It needs a full class name, to be passed into
PARTITION_ASSIGNMENT_STRATEGY_CONFIG config, ex:
`@ValueSource(strings = Array(classOf[CooperativeStickyAssignor].getName,
classOf[RangeAssignor].getName))`
##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +973,86 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}
}
+ @ParameterizedTest
+ @ValueSource(strings =
Array(CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME,
RangeAssignor.RANGE_ASSIGNOR_NAME))
+ def testRebalanceAndRejoin(assignmentStrategy: String): Unit = {
+ // create 2 consumers
+ this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"rebalance-and-rejoin-group")
+
this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
assignmentStrategy)
+ this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"true")
+ val consumer1 = createConsumer()
+ val consumer2 = createConsumer()
+
+ // create a new topic, have 2 partitions
+ val topic = "topic1"
+ val producer = createProducer()
+ val expectedAssignment = createTopicAndSendRecords(producer, topic, 2, 100)
+
+ assertEquals(0, consumer1.assignment().size)
+ assertEquals(0, consumer2.assignment().size)
+
+ val lock = new ReentrantLock()
+ var generationId1 = -1
+ var memberId1 = ""
+ val customRebalanceListener = new ConsumerRebalanceListener {
+ override def onPartitionsRevoked(partitions:
util.Collection[TopicPartition]): Unit = {
+ }
+ override def onPartitionsAssigned(partitions:
util.Collection[TopicPartition]): Unit = {
+ if (lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+ fail(s"Time out while awaiting for lock.")
Review Comment:
Are you sure this is correct?
```
Returns:
true if the lock was free and was acquired by the current thread, or the
lock was already held by the current thread; and false if the waiting time
elapsed before the lock could be acquired
```
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/locks/ReentrantLock.html#tryLock-long-java.util.concurrent.TimeUnit-
Maybe it is like this:
```
if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
fail(s"Time out while awaiting for lock.")
}
##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -969,6 +973,86 @@ class PlaintextConsumerTest extends BaseConsumerTest {
}
}
+ @ParameterizedTest
+ @ValueSource(strings =
Array(CooperativeStickyAssignor.COOPERATIVE_STICKY_ASSIGNOR_NAME,
RangeAssignor.RANGE_ASSIGNOR_NAME))
+ def testRebalanceAndRejoin(assignmentStrategy: String): Unit = {
+ // create 2 consumers
+ this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"rebalance-and-rejoin-group")
+
this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
assignmentStrategy)
+ this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
"true")
+ val consumer1 = createConsumer()
+ val consumer2 = createConsumer()
+
+ // create a new topic, have 2 partitions
+ val topic = "topic1"
+ val producer = createProducer()
+ val expectedAssignment = createTopicAndSendRecords(producer, topic, 2, 100)
+
+ assertEquals(0, consumer1.assignment().size)
+ assertEquals(0, consumer2.assignment().size)
+
+ val lock = new ReentrantLock()
+ var generationId1 = -1
+ var memberId1 = ""
+ val customRebalanceListener = new ConsumerRebalanceListener {
+ override def onPartitionsRevoked(partitions:
util.Collection[TopicPartition]): Unit = {
+ }
+ override def onPartitionsAssigned(partitions:
util.Collection[TopicPartition]): Unit = {
+ if (lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+ fail(s"Time out while awaiting for lock.")
+ return
Review Comment:
`return` is not necessary because `fail` will throw exception directly.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]