vcrfxia commented on code in PR #13656: URL: https://github.com/apache/kafka/pull/13656#discussion_r1183023137
########## streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java: ########## @@ -34,10 +34,34 @@ public class SmokeTestUtil { final static int END = Integer.MAX_VALUE; + static ProcessorSupplier<Object, Object, Void, Void> printTaskProcessorSupplier(final String topic) { + return printTaskProcessorSupplier(topic, ""); + } + static ProcessorSupplier<Object, Object, Void, Void> printProcessorSupplier(final String topic) { return printProcessorSupplier(topic, ""); } + static ProcessorSupplier<Object, Object, Void, Void> printTaskProcessorSupplier(final String topic, final String name) { Review Comment: nit: this new processor is the same as the existing one except that it doesn't track or print the number of records processed, right? Would it be better to have a boolean to toggle the print behavior, rather than duplicating the rest of the processor code? (Not a big deal either way since it's not much code, but as a reader I had to spent some time determining/verifying that the print behavior is the only difference.) ########## tests/kafkatest/tests/streams/streams_upgrade_test.py: ########## @@ -41,6 +41,7 @@ metadata_2_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] fk_join_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7), str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3)] +table_agg_versions = [str(LATEST_3_3)] Review Comment: What's the reason for adding older versions? Do we expect that upgrading from a version older than 3.3 will be different than updating from 3.3? ########## streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java: ########## @@ -143,6 +173,94 @@ private static void buildFKTable(final KStream<String, Integer> primaryTable, kStream.to("fk-result", Produced.with(stringSerde, stringSerde)); } + private static void buildTableAgg(final KTable<String, Integer> sourceTable, + final String aggProduceValue, + final List<String> expectedAggValues) { + final KStream<Integer, String> result = sourceTable + .groupBy( + (k, v) -> new KeyValue<>(v, aggProduceValue), + Grouped.with(intSerde, stringSerde)) + .aggregate( + () -> new Agg(Collections.emptyList(), 0), + (k, v, agg) -> { + final List<String> seenValues; + final boolean updated; + if (!agg.seenValues.contains(v)) { + seenValues = new ArrayList<>(agg.seenValues); + seenValues.add(v); + Collections.sort(seenValues); + updated = true; + } else { + seenValues = agg.seenValues; + updated = false; + } + + final boolean shouldLog = updated || (agg.recordsProcessed % 10 == 0); // value of 10 is chosen for debugging purposes. can increase to 100 once test is passing. + if (shouldLog) { Review Comment: Hmm I'm not seeing what the change was. Should we increase the value in the line above from 10 too 100? Currently the comment still says "value of 10 is chosen for debugging purposes. can increase to 100 once test is passing" ########## streams/upgrade-system-tests-33/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java: ########## @@ -60,6 +71,9 @@ public static void main(final String[] args) throws Exception { final boolean runFkJoin = Boolean.parseBoolean(streamsProperties.getProperty( "test.run_fk_join", "false")); + final boolean runTableAgg = Boolean.parseBoolean(streamsProperties.getProperty( Review Comment: Doh! This is the step I was missing when I was testing these test changes earlier. Thanks for solving my mystery :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org