chia7712 commented on code in PR #17964:
URL: https://github.com/apache/kafka/pull/17964#discussion_r1861262476


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala:
##########
@@ -178,6 +178,49 @@ class PlaintextConsumerSubscriptionTest extends 
AbstractConsumerTest {
     assertEquals(0, consumer.assignment().size)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testRe2JPatternSubscription(quorum: String, groupProtocol: String): Unit 
= {
+    val topic1 = "tblablac" // matches subscribed pattern
+    createTopic(topic1, 2, brokerCount)
+
+    val topic2 = "tblablak" // does not match subscribed pattern
+    createTopic(topic2, 2, brokerCount)
+
+    val topic3 = "tblab1" // does not match subscribed pattern
+    createTopic(topic3, 2, brokerCount)
+
+    val consumer = createConsumer()
+    assertEquals(0, consumer.assignment().size)
+
+    val pattern = new SubscriptionPattern("t.*c")
+    consumer.subscribe(pattern, new TestConsumerReassignmentListener)

Review Comment:
   we don't use the `TestConsumerReassignmentListener` in this test, right?



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala:
##########
@@ -178,6 +178,49 @@ class PlaintextConsumerSubscriptionTest extends 
AbstractConsumerTest {
     assertEquals(0, consumer.assignment().size)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testRe2JPatternSubscription(quorum: String, groupProtocol: String): Unit 
= {
+    val topic1 = "tblablac" // matches subscribed pattern
+    createTopic(topic1, 2, brokerCount)
+
+    val topic2 = "tblablak" // does not match subscribed pattern
+    createTopic(topic2, 2, brokerCount)
+
+    val topic3 = "tblab1" // does not match subscribed pattern
+    createTopic(topic3, 2, brokerCount)
+
+    val consumer = createConsumer()
+    assertEquals(0, consumer.assignment().size)
+
+    val pattern = new SubscriptionPattern("t.*c")
+    consumer.subscribe(pattern, new TestConsumerReassignmentListener)
+
+    val assignment = Set(
+      new TopicPartition(topic, 0),
+      new TopicPartition(topic, 1),
+      new TopicPartition(topic1, 0),
+      new TopicPartition(topic1, 1))
+    awaitAssignment(consumer, assignment)
+    consumer.unsubscribe()

Review Comment:
   Does it support to subscribe another pattern after unsubscribe? If yes, 
could you please include such test case?



##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerSubscriptionTest.scala:
##########
@@ -178,6 +178,49 @@ class PlaintextConsumerSubscriptionTest extends 
AbstractConsumerTest {
     assertEquals(0, consumer.assignment().size)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testRe2JPatternSubscription(quorum: String, groupProtocol: String): Unit 
= {
+    val topic1 = "tblablac" // matches subscribed pattern
+    createTopic(topic1, 2, brokerCount)
+
+    val topic2 = "tblablak" // does not match subscribed pattern
+    createTopic(topic2, 2, brokerCount)
+
+    val topic3 = "tblab1" // does not match subscribed pattern
+    createTopic(topic3, 2, brokerCount)
+
+    val consumer = createConsumer()
+    assertEquals(0, consumer.assignment().size)
+
+    val pattern = new SubscriptionPattern("t.*c")
+    consumer.subscribe(pattern, new TestConsumerReassignmentListener)
+
+    val assignment = Set(
+      new TopicPartition(topic, 0),
+      new TopicPartition(topic, 1),
+      new TopicPartition(topic1, 0),
+      new TopicPartition(topic1, 1))
+    awaitAssignment(consumer, assignment)
+    consumer.unsubscribe()
+    assertEquals(0, consumer.assignment().size)
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly"))
+  def testRe2JPatternSubscriptionInvalidRegex(quorum: String, groupProtocol: 
String): Unit = {
+    val consumer = createConsumer()
+    assertEquals(0, consumer.assignment().size)
+
+    val pattern = new SubscriptionPattern("(t.*c")
+    consumer.subscribe(pattern, new TestConsumerReassignmentListener)

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to