vvcephei commented on a change in pull request #9911:
URL: https://github.com/apache/kafka/pull/9911#discussion_r558572032



##########
File path: tests/kafkatest/tests/streams/streams_optimized_test.py
##########
@@ -65,30 +66,41 @@ def test_upgrade_optimized_topology(self):
 
         processors = [processor1, processor2, processor3]
 
-        # produce records continually during the test
+        self.logger.info("produce records continually during the test")

Review comment:
       Switched comments to logs so that I could tell what phase of the test we 
were in while reading the logs.

##########
File path: tests/kafkatest/tests/streams/streams_optimized_test.py
##########
@@ -65,30 +66,41 @@ def test_upgrade_optimized_topology(self):
 
         processors = [processor1, processor2, processor3]
 
-        # produce records continually during the test
+        self.logger.info("produce records continually during the test")
         self.producer.start()
 
-        # start all processors unoptimized
+        self.logger.info("start all processors unoptimized")
         for processor in processors:
             self.set_topics(processor)
             processor.CLEAN_NODE_ENABLED = False
             self.verify_running_repartition_topic_count(processor, 4)
 
+        self.logger.info("verify unoptimized")
         self.verify_processing(processors, verify_individual_operations=False)
 
+        self.logger.info("stop unoptimized")
         stop_processors(processors, self.stopped_message)
 
+        self.logger.info("reset")
         self.reset_application()
+        for processor in processors:
+            processor.node.account.ssh("mv " + processor.LOG_FILE + " " + 
processor.LOG_FILE + ".1", allow_fail=False)
+            processor.node.account.ssh("mv " + processor.STDOUT_FILE + " " + 
processor.STDOUT_FILE + ".1", allow_fail=False)
+            processor.node.account.ssh("mv " + processor.STDERR_FILE + " " + 
processor.STDERR_FILE + ".1", allow_fail=False)
+            processor.node.account.ssh("mv " + processor.CONFIG_FILE + " " + 
processor.CONFIG_FILE + ".1", allow_fail=False)

Review comment:
       resetting all these files helps debugging, because you can easily see 
what happened before vs. after the reset. It also simplifies our verification 
logic, making it easy to search for processing that took place after the reset.

##########
File path: tests/kafkatest/tests/streams/streams_optimized_test.py
##########
@@ -110,34 +122,25 @@ def verify_running_repartition_topic_count(processor, 
repartition_topic_count):
                                        % repartition_topic_count + 
str(processor.node.account))
 
     def verify_processing(self, processors, verify_individual_operations):
+        # This test previously had logic to account for skewed assignments, in 
which not all processors may
+        # receive active assignments. I don't think this will happen anymore, 
but keep an eye out if we see
+        # test failures here. If that does resurface, note that the prior 
implementation was not correct.
+        # A better approach would be to make sure we see processing of each 
partition across the whole cluster
+        # instead of just expecting to see each node perform some processing.
         for processor in processors:
-            if not self.all_source_subtopology_tasks(processor):
-                if verify_individual_operations:
-                    for operation in self.operation_pattern.split('\|'):
-                        self.do_verify(processor, operation)
-                else:
-                    self.do_verify(processor, self.operation_pattern)
+            if verify_individual_operations:
+                for operation in self.operation_pattern.split('\|'):
+                    self.do_verify(processor, operation)
             else:
-                self.logger.info("Skipping processor %s with all source tasks" 
% processor.node.account)
+                self.do_verify(processor, self.operation_pattern)
 
     def do_verify(self, processor, pattern):
         self.logger.info("Verifying %s processing pattern in STDOUT_FILE" % 
pattern)
-        with processor.node.account.monitor_log(processor.STDOUT_FILE) as 
monitor:
-            monitor.wait_until(pattern,
-                               timeout_sec=60,
-                               err_msg="Never saw processing of %s " % pattern 
+ str(processor.node.account))
-
-    def all_source_subtopology_tasks(self, processor):
-        retries = 0
-        while retries < 5:
-            found = list(processor.node.account.ssh_capture("sed -n 
's/.*current active tasks: \[\(\(0_[0-9], \)\{3\}0_[0-9]\)\].*/\1/p' %s" % 
processor.LOG_FILE, allow_fail=True))

Review comment:
       Sadly, this message never appeared in any of the logs. I guess the log 
format has changed. What that meant for the test was that it was just excluding 
every node from the verification!
   
   The comment at L125 contains my suggestions for how to perform this check in 
a more reliable fashion, but I actually don't think we need it anymore at all.

##########
File path: tests/kafkatest/tests/streams/streams_optimized_test.py
##########
@@ -110,34 +122,25 @@ def verify_running_repartition_topic_count(processor, 
repartition_topic_count):
                                        % repartition_topic_count + 
str(processor.node.account))
 
     def verify_processing(self, processors, verify_individual_operations):
+        # This test previously had logic to account for skewed assignments, in 
which not all processors may
+        # receive active assignments. I don't think this will happen anymore, 
but keep an eye out if we see
+        # test failures here. If that does resurface, note that the prior 
implementation was not correct.
+        # A better approach would be to make sure we see processing of each 
partition across the whole cluster
+        # instead of just expecting to see each node perform some processing.

Review comment:
       This comment explains the following diff.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to