vcrfxia commented on code in PR #13656:
URL: https://github.com/apache/kafka/pull/13656#discussion_r1183023137


##########
streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java:
##########
@@ -34,10 +34,34 @@ public class SmokeTestUtil {
 
     final static int END = Integer.MAX_VALUE;
 
+    static ProcessorSupplier<Object, Object, Void, Void> 
printTaskProcessorSupplier(final String topic) {
+        return printTaskProcessorSupplier(topic, "");
+    }
+
     static ProcessorSupplier<Object, Object, Void, Void> 
printProcessorSupplier(final String topic) {
         return printProcessorSupplier(topic, "");
     }
 
+    static ProcessorSupplier<Object, Object, Void, Void> 
printTaskProcessorSupplier(final String topic, final String name) {

Review Comment:
   nit: this new processor is the same as the existing one except that it 
doesn't track or print the number of records processed, right? Would it be 
better to have a boolean to toggle the print behavior, rather than duplicating 
the rest of the processor code? (Not a big deal either way since it's not much 
code, but as a reader I had to spent some time determining/verifying that the 
print behavior is the only difference.)



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -41,6 +41,7 @@
 metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), 
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)]
 fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), 
str(LATEST_2_7), str(LATEST_2_8), 
                     str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), 
str(LATEST_3_3)]
+table_agg_versions = [str(LATEST_3_3)]

Review Comment:
   What's the reason for adding older versions? Do we expect that upgrading 
from a version older than 3.3 will be different than updating from 3.3?



##########
streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##########
@@ -143,6 +173,94 @@ private static void buildFKTable(final KStream<String, 
Integer> primaryTable,
         kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
     }
 
+    private static void buildTableAgg(final KTable<String, Integer> 
sourceTable,
+                                      final String aggProduceValue,
+                                      final List<String> expectedAggValues) {
+        final KStream<Integer, String> result = sourceTable
+            .groupBy(
+                (k, v) -> new KeyValue<>(v, aggProduceValue),
+                Grouped.with(intSerde, stringSerde))
+            .aggregate(
+                () -> new Agg(Collections.emptyList(), 0),
+                (k, v, agg) -> {
+                    final List<String> seenValues;
+                    final boolean updated;
+                    if (!agg.seenValues.contains(v)) {
+                        seenValues = new ArrayList<>(agg.seenValues);
+                        seenValues.add(v);
+                        Collections.sort(seenValues);
+                        updated = true;
+                    } else {
+                        seenValues = agg.seenValues;
+                        updated = false;
+                    }
+
+                    final boolean shouldLog = updated || (agg.recordsProcessed 
% 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 
100 once test is passing.
+                    if (shouldLog) {

Review Comment:
   Hmm I'm not seeing what the change was. Should we increase the value in the 
line above from 10 too 100? Currently the comment still says "value of 10 is 
chosen for debugging purposes. can increase to 100 once test is passing"



##########
streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java:
##########
@@ -60,6 +71,9 @@ public static void main(final String[] args) throws Exception 
{
         final boolean runFkJoin = 
Boolean.parseBoolean(streamsProperties.getProperty(
             "test.run_fk_join",
             "false"));
+        final boolean runTableAgg = 
Boolean.parseBoolean(streamsProperties.getProperty(

Review Comment:
   Doh! This is the step I was missing when I was testing these test changes 
earlier. Thanks for solving my mystery :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to