Bill Bejeck created KAFKA-8101: ---------------------------------- Summary: Optimization Has Different Processor Node Count on Scala API Key: KAFKA-8101 URL: https://issues.apache.org/jira/browse/KAFKA-8101 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.1.1, 2.1.0, 2.0.1, 2.0.0, 2.2.0 Reporter: Bill Bejeck Assignee: Bill Bejeck
Surfaced from this PR https://github.com/apache/kafka/pull/6373 But two topologies have processor number count off by 1 when generating an optimized topology with Scala API vs Java API Scala Topology {noformat} Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic]) --> KSTREAM-MAP-0000000001 Processor: KSTREAM-MAP-0000000001 (stores: []) --> KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000041 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-FILTER-0000000002 (stores: []) --> KSTREAM-MAPVALUES-0000000003 <-- KSTREAM-MAP-0000000001 Processor: KSTREAM-FILTER-0000000041 (stores: []) --> KSTREAM-SINK-0000000040 <-- KSTREAM-MAP-0000000001 Processor: KSTREAM-MAPVALUES-0000000003 (stores: []) --> KSTREAM-PROCESSOR-0000000004 <-- KSTREAM-FILTER-0000000002 Processor: KSTREAM-PROCESSOR-0000000004 (stores: []) --> none <-- KSTREAM-MAPVALUES-0000000003 Sink: KSTREAM-SINK-0000000040 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition) <-- KSTREAM-FILTER-0000000041 Sub-topology: 1 Source: KSTREAM-SOURCE-0000000042 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition]) --> KSTREAM-AGGREGATE-0000000007, KSTREAM-AGGREGATE-0000000015, KSTREAM-FILTER-0000000021, KSTREAM-FILTER-0000000030 Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006]) --> KTABLE-MAPVALUES-0000000011 <-- KSTREAM-SOURCE-0000000042 Processor: KTABLE-MAPVALUES-0000000011 (stores: []) --> KTABLE-TOSTREAM-0000000012 <-- KSTREAM-AGGREGATE-0000000007 Processor: KTABLE-TOSTREAM-0000000012 (stores: []) --> KSTREAM-WINDOWED-0000000035, KSTREAM-SINK-0000000013 <-- KTABLE-MAPVALUES-0000000011 Processor: KSTREAM-FILTER-0000000021 (stores: []) --> KSTREAM-PEEK-0000000022 <-- KSTREAM-SOURCE-0000000042 Processor: KSTREAM-FILTER-0000000030 (stores: []) --> KSTREAM-WINDOWED-0000000034 <-- KSTREAM-SOURCE-0000000042 Processor: KSTREAM-PEEK-0000000022 (stores: []) --> KSTREAM-REDUCE-0000000024 <-- KSTREAM-FILTER-0000000021 Processor: KSTREAM-WINDOWED-0000000034 (stores: [KSTREAM-JOINTHIS-0000000036-store]) --> KSTREAM-JOINTHIS-0000000036 <-- KSTREAM-FILTER-0000000030 Processor: KSTREAM-WINDOWED-0000000035 (stores: [KSTREAM-JOINOTHER-0000000037-store]) --> KSTREAM-JOINOTHER-0000000037 <-- KTABLE-TOSTREAM-0000000012 Processor: KSTREAM-AGGREGATE-0000000015 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000014]) --> KTABLE-TOSTREAM-0000000019 <-- KSTREAM-SOURCE-0000000042 Processor: KSTREAM-JOINOTHER-0000000037 (stores: [KSTREAM-JOINTHIS-0000000036-store]) --> KSTREAM-MERGE-0000000038 <-- KSTREAM-WINDOWED-0000000035 Processor: KSTREAM-JOINTHIS-0000000036 (stores: [KSTREAM-JOINOTHER-0000000037-store]) --> KSTREAM-MERGE-0000000038 <-- KSTREAM-WINDOWED-0000000034 Processor: KSTREAM-REDUCE-0000000024 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000023]) --> KTABLE-TOSTREAM-0000000028 <-- KSTREAM-PEEK-0000000022 Processor: KSTREAM-MERGE-0000000038 (stores: []) --> KSTREAM-SINK-0000000039 <-- KSTREAM-JOINTHIS-0000000036, KSTREAM-JOINOTHER-0000000037 Processor: KTABLE-TOSTREAM-0000000019 (stores: []) --> KSTREAM-SINK-0000000020 <-- KSTREAM-AGGREGATE-0000000015 Processor: KTABLE-TOSTREAM-0000000028 (stores: []) --> KSTREAM-SINK-0000000029 <-- KSTREAM-REDUCE-0000000024 Sink: KSTREAM-SINK-0000000013 (topic: countTopic) <-- KTABLE-TOSTREAM-0000000012 Sink: KSTREAM-SINK-0000000020 (topic: aggregationTopic) <-- KTABLE-TOSTREAM-0000000019 Sink: KSTREAM-SINK-0000000029 (topic: reduceTopic) <-- KTABLE-TOSTREAM-0000000028 Sink: KSTREAM-SINK-0000000039 (topic: joinedTopic) <-- KSTREAM-MERGE-0000000038 {noformat} Java Topology {noformat} Topologies: Sub-topology: 0 Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic]) --> KSTREAM-MAP-0000000001 Processor: KSTREAM-MAP-0000000001 (stores: []) --> KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000040 <-- KSTREAM-SOURCE-0000000000 Processor: KSTREAM-FILTER-0000000002 (stores: []) --> KSTREAM-MAPVALUES-0000000003 <-- KSTREAM-MAP-0000000001 Processor: KSTREAM-FILTER-0000000040 (stores: []) --> KSTREAM-SINK-0000000039 <-- KSTREAM-MAP-0000000001 Processor: KSTREAM-MAPVALUES-0000000003 (stores: []) --> KSTREAM-PROCESSOR-0000000004 <-- KSTREAM-FILTER-0000000002 Processor: KSTREAM-PROCESSOR-0000000004 (stores: []) --> none <-- KSTREAM-MAPVALUES-0000000003 Sink: KSTREAM-SINK-0000000039 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition) <-- KSTREAM-FILTER-0000000040 Sub-topology: 1 Source: KSTREAM-SOURCE-0000000041 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition]) --> KSTREAM-FILTER-0000000020, KSTREAM-AGGREGATE-0000000007, KSTREAM-AGGREGATE-0000000014, KSTREAM-FILTER-0000000029 Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006]) --> KTABLE-TOSTREAM-0000000011 <-- KSTREAM-SOURCE-0000000041 Processor: KTABLE-TOSTREAM-0000000011 (stores: []) --> KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034 <-- KSTREAM-AGGREGATE-0000000007 Processor: KSTREAM-FILTER-0000000020 (stores: []) --> KSTREAM-PEEK-0000000021 <-- KSTREAM-SOURCE-0000000041 Processor: KSTREAM-FILTER-0000000029 (stores: []) --> KSTREAM-WINDOWED-0000000033 <-- KSTREAM-SOURCE-0000000041 Processor: KSTREAM-PEEK-0000000021 (stores: []) --> KSTREAM-REDUCE-0000000023 <-- KSTREAM-FILTER-0000000020 Processor: KSTREAM-WINDOWED-0000000033 (stores: [KSTREAM-JOINTHIS-0000000035-store]) --> KSTREAM-JOINTHIS-0000000035 <-- KSTREAM-FILTER-0000000029 Processor: KSTREAM-WINDOWED-0000000034 (stores: [KSTREAM-JOINOTHER-0000000036-store]) --> KSTREAM-JOINOTHER-0000000036 <-- KTABLE-TOSTREAM-0000000011 Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013]) --> KTABLE-TOSTREAM-0000000018 <-- KSTREAM-SOURCE-0000000041 Processor: KSTREAM-JOINOTHER-0000000036 (stores: [KSTREAM-JOINTHIS-0000000035-store]) --> KSTREAM-MERGE-0000000037 <-- KSTREAM-WINDOWED-0000000034 Processor: KSTREAM-JOINTHIS-0000000035 (stores: [KSTREAM-JOINOTHER-0000000036-store]) --> KSTREAM-MERGE-0000000037 <-- KSTREAM-WINDOWED-0000000033 Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022]) --> KTABLE-TOSTREAM-0000000027 <-- KSTREAM-PEEK-0000000021 Processor: KSTREAM-MERGE-0000000037 (stores: []) --> KSTREAM-SINK-0000000038 <-- KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036 Processor: KTABLE-TOSTREAM-0000000018 (stores: []) --> KSTREAM-SINK-0000000019 <-- KSTREAM-AGGREGATE-0000000014 Processor: KTABLE-TOSTREAM-0000000027 (stores: []) --> KSTREAM-SINK-0000000028 <-- KSTREAM-REDUCE-0000000023 Sink: KSTREAM-SINK-0000000012 (topic: countTopic) <-- KTABLE-TOSTREAM-0000000011 Sink: KSTREAM-SINK-0000000019 (topic: aggregationTopic) <-- KTABLE-TOSTREAM-0000000018 Sink: KSTREAM-SINK-0000000028 (topic: reduceTopic) <-- KTABLE-TOSTREAM-0000000027 Sink: KSTREAM-SINK-0000000038 (topic: joinedTopic) <-- KSTREAM-MERGE-0000000037 {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)