[ https://issues.apache.org/jira/browse/KAFKA-8101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bill Bejeck resolved KAFKA-8101. -------------------------------- Resolution: Not A Bug There is a known issue with KGroupedStream#count in the Scala API where there is an extra processor, KTable.mapValues to convert from a Java Long to a Scala Long. This added processor causes a Scala generated topology to differ from a Java generated topology > Different Processor Node Count on Scala API > ------------------------------------------- > > Key: KAFKA-8101 > URL: https://issues.apache.org/jira/browse/KAFKA-8101 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.0.0, 2.0.1, 2.1.0, 2.2.0, 2.1.1 > Reporter: Bill Bejeck > Assignee: Bill Bejeck > Priority: Major > > Surfaced from this PR [https://github.com/apache/kafka/pull/6373] > But two topologies have processor number count off by 1 when generating an > optimized (or unoptimized) topology with Scala API vs Java API > Scala Topology > {noformat} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic]) > --> KSTREAM-MAP-0000000001 > Processor: KSTREAM-MAP-0000000001 (stores: []) > --> KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000041 > <-- KSTREAM-SOURCE-0000000000 > Processor: KSTREAM-FILTER-0000000002 (stores: []) > --> KSTREAM-MAPVALUES-0000000003 > <-- KSTREAM-MAP-0000000001 > Processor: KSTREAM-FILTER-0000000041 (stores: []) > --> KSTREAM-SINK-0000000040 > <-- KSTREAM-MAP-0000000001 > Processor: KSTREAM-MAPVALUES-0000000003 (stores: []) > --> KSTREAM-PROCESSOR-0000000004 > <-- KSTREAM-FILTER-0000000002 > Processor: KSTREAM-PROCESSOR-0000000004 (stores: []) > --> none > <-- KSTREAM-MAPVALUES-0000000003 > Sink: KSTREAM-SINK-0000000040 (topic: > KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition) > <-- KSTREAM-FILTER-0000000041 > Sub-topology: 1 > Source: KSTREAM-SOURCE-0000000042 (topics: > [KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition]) > --> KSTREAM-AGGREGATE-0000000007, KSTREAM-AGGREGATE-0000000015, > KSTREAM-FILTER-0000000021, KSTREAM-FILTER-0000000030 > Processor: KSTREAM-AGGREGATE-0000000007 (stores: > [KSTREAM-AGGREGATE-STATE-STORE-0000000006]) > --> KTABLE-MAPVALUES-0000000011 > <-- KSTREAM-SOURCE-0000000042 > Processor: KTABLE-MAPVALUES-0000000011 (stores: []) > --> KTABLE-TOSTREAM-0000000012 > <-- KSTREAM-AGGREGATE-0000000007 > Processor: KTABLE-TOSTREAM-0000000012 (stores: []) > --> KSTREAM-WINDOWED-0000000035, KSTREAM-SINK-0000000013 > <-- KTABLE-MAPVALUES-0000000011 > Processor: KSTREAM-FILTER-0000000021 (stores: []) > --> KSTREAM-PEEK-0000000022 > <-- KSTREAM-SOURCE-0000000042 > Processor: KSTREAM-FILTER-0000000030 (stores: []) > --> KSTREAM-WINDOWED-0000000034 > <-- KSTREAM-SOURCE-0000000042 > Processor: KSTREAM-PEEK-0000000022 (stores: []) > --> KSTREAM-REDUCE-0000000024 > <-- KSTREAM-FILTER-0000000021 > Processor: KSTREAM-WINDOWED-0000000034 (stores: > [KSTREAM-JOINTHIS-0000000036-store]) > --> KSTREAM-JOINTHIS-0000000036 > <-- KSTREAM-FILTER-0000000030 > Processor: KSTREAM-WINDOWED-0000000035 (stores: > [KSTREAM-JOINOTHER-0000000037-store]) > --> KSTREAM-JOINOTHER-0000000037 > <-- KTABLE-TOSTREAM-0000000012 > Processor: KSTREAM-AGGREGATE-0000000015 (stores: > [KSTREAM-AGGREGATE-STATE-STORE-0000000014]) > --> KTABLE-TOSTREAM-0000000019 > <-- KSTREAM-SOURCE-0000000042 > Processor: KSTREAM-JOINOTHER-0000000037 (stores: > [KSTREAM-JOINTHIS-0000000036-store]) > --> KSTREAM-MERGE-0000000038 > <-- KSTREAM-WINDOWED-0000000035 > Processor: KSTREAM-JOINTHIS-0000000036 (stores: > [KSTREAM-JOINOTHER-0000000037-store]) > --> KSTREAM-MERGE-0000000038 > <-- KSTREAM-WINDOWED-0000000034 > Processor: KSTREAM-REDUCE-0000000024 (stores: > [KSTREAM-REDUCE-STATE-STORE-0000000023]) > --> KTABLE-TOSTREAM-0000000028 > <-- KSTREAM-PEEK-0000000022 > Processor: KSTREAM-MERGE-0000000038 (stores: []) > --> KSTREAM-SINK-0000000039 > <-- KSTREAM-JOINTHIS-0000000036, KSTREAM-JOINOTHER-0000000037 > Processor: KTABLE-TOSTREAM-0000000019 (stores: []) > --> KSTREAM-SINK-0000000020 > <-- KSTREAM-AGGREGATE-0000000015 > Processor: KTABLE-TOSTREAM-0000000028 (stores: []) > --> KSTREAM-SINK-0000000029 > <-- KSTREAM-REDUCE-0000000024 > Sink: KSTREAM-SINK-0000000013 (topic: countTopic) > <-- KTABLE-TOSTREAM-0000000012 > Sink: KSTREAM-SINK-0000000020 (topic: aggregationTopic) > <-- KTABLE-TOSTREAM-0000000019 > Sink: KSTREAM-SINK-0000000029 (topic: reduceTopic) > <-- KTABLE-TOSTREAM-0000000028 > Sink: KSTREAM-SINK-0000000039 (topic: joinedTopic) > <-- KSTREAM-MERGE-0000000038 > {noformat} > Java Topology > {noformat} > Topologies: > Sub-topology: 0 > Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic]) > --> KSTREAM-MAP-0000000001 > Processor: KSTREAM-MAP-0000000001 (stores: []) > --> KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000040 > <-- KSTREAM-SOURCE-0000000000 > Processor: KSTREAM-FILTER-0000000002 (stores: []) > --> KSTREAM-MAPVALUES-0000000003 > <-- KSTREAM-MAP-0000000001 > Processor: KSTREAM-FILTER-0000000040 (stores: []) > --> KSTREAM-SINK-0000000039 > <-- KSTREAM-MAP-0000000001 > Processor: KSTREAM-MAPVALUES-0000000003 (stores: []) > --> KSTREAM-PROCESSOR-0000000004 > <-- KSTREAM-FILTER-0000000002 > Processor: KSTREAM-PROCESSOR-0000000004 (stores: []) > --> none > <-- KSTREAM-MAPVALUES-0000000003 > Sink: KSTREAM-SINK-0000000039 (topic: > KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition) > <-- KSTREAM-FILTER-0000000040 > Sub-topology: 1 > Source: KSTREAM-SOURCE-0000000041 (topics: > [KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition]) > --> KSTREAM-FILTER-0000000020, KSTREAM-AGGREGATE-0000000007, > KSTREAM-AGGREGATE-0000000014, KSTREAM-FILTER-0000000029 > Processor: KSTREAM-AGGREGATE-0000000007 (stores: > [KSTREAM-AGGREGATE-STATE-STORE-0000000006]) > --> KTABLE-TOSTREAM-0000000011 > <-- KSTREAM-SOURCE-0000000041 > Processor: KTABLE-TOSTREAM-0000000011 (stores: []) > --> KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034 > <-- KSTREAM-AGGREGATE-0000000007 > Processor: KSTREAM-FILTER-0000000020 (stores: []) > --> KSTREAM-PEEK-0000000021 > <-- KSTREAM-SOURCE-0000000041 > Processor: KSTREAM-FILTER-0000000029 (stores: []) > --> KSTREAM-WINDOWED-0000000033 > <-- KSTREAM-SOURCE-0000000041 > Processor: KSTREAM-PEEK-0000000021 (stores: []) > --> KSTREAM-REDUCE-0000000023 > <-- KSTREAM-FILTER-0000000020 > Processor: KSTREAM-WINDOWED-0000000033 (stores: > [KSTREAM-JOINTHIS-0000000035-store]) > --> KSTREAM-JOINTHIS-0000000035 > <-- KSTREAM-FILTER-0000000029 > Processor: KSTREAM-WINDOWED-0000000034 (stores: > [KSTREAM-JOINOTHER-0000000036-store]) > --> KSTREAM-JOINOTHER-0000000036 > <-- KTABLE-TOSTREAM-0000000011 > Processor: KSTREAM-AGGREGATE-0000000014 (stores: > [KSTREAM-AGGREGATE-STATE-STORE-0000000013]) > --> KTABLE-TOSTREAM-0000000018 > <-- KSTREAM-SOURCE-0000000041 > Processor: KSTREAM-JOINOTHER-0000000036 (stores: > [KSTREAM-JOINTHIS-0000000035-store]) > --> KSTREAM-MERGE-0000000037 > <-- KSTREAM-WINDOWED-0000000034 > Processor: KSTREAM-JOINTHIS-0000000035 (stores: > [KSTREAM-JOINOTHER-0000000036-store]) > --> KSTREAM-MERGE-0000000037 > <-- KSTREAM-WINDOWED-0000000033 > Processor: KSTREAM-REDUCE-0000000023 (stores: > [KSTREAM-REDUCE-STATE-STORE-0000000022]) > --> KTABLE-TOSTREAM-0000000027 > <-- KSTREAM-PEEK-0000000021 > Processor: KSTREAM-MERGE-0000000037 (stores: []) > --> KSTREAM-SINK-0000000038 > <-- KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036 > Processor: KTABLE-TOSTREAM-0000000018 (stores: []) > --> KSTREAM-SINK-0000000019 > <-- KSTREAM-AGGREGATE-0000000014 > Processor: KTABLE-TOSTREAM-0000000027 (stores: []) > --> KSTREAM-SINK-0000000028 > <-- KSTREAM-REDUCE-0000000023 > Sink: KSTREAM-SINK-0000000012 (topic: countTopic) > <-- KTABLE-TOSTREAM-0000000011 > Sink: KSTREAM-SINK-0000000019 (topic: aggregationTopic) > <-- KTABLE-TOSTREAM-0000000018 > Sink: KSTREAM-SINK-0000000028 (topic: reduceTopic) > <-- KTABLE-TOSTREAM-0000000027 > Sink: KSTREAM-SINK-0000000038 (topic: joinedTopic) > <-- KSTREAM-MERGE-0000000037 > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)