[ https://issues.apache.org/jira/browse/KAFKA-12564?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jessica Johann updated KAFKA-12564: ----------------------------------- Description: Libraries from build.sbt: {{"org.apache.kafka" % "kafka_2.13" % "2.7.0",}} {{"org.apache.kafka" % "kafka-streams" % "2.7.0",}} {{"org.apache.kafka" % "kafka-clients" % "2.7.0",}} {{"org.apache.kafka" % "kafka-streams-scala_2.13" % "2.7.0",}} h4. Feed the Stream "issue_stream" with: {{(1->"A")}} {{(1->"B")}} h4. Topology: {{// #1}} {{val issueStream:KStream[Int,String] = builder.stream[Int,String]("issue_stream")}} {{{{// #2}}}} {{ {{val aggTable:KTable[Int,String] =}}}} {{issueStream}} {{.groupBy((k,v)=>k)}} {{.aggregate[String]("EMPTY")((k,v,agg)=>s"$agg+$v")}} {{{{// #3}}}} {{ {{aggTable}}}} {{.toStream}} {{.print(Printed.toSysOut)}} {{{{// #4}}}} {{ {{aggTable.filter((k,v)=>{}}}} {{ {{ println(s"filter($k, $v) at ${System.nanoTime()}")}}}} {{ {{ true}}}} {{})}} {{.toStream}} {{.print(Printed.toSysOut)}} h4. First Tuple: (1->"A") #3 Output as expected, the aggregated tuple ("EMPTY"+"+A") {{[KTABLE-TOSTREAM-0000000044]: 1, EMPTY+A}} #4 The filter-method is called twice. First call with the expected tuple. {{filter(1, EMPTY+A) at 211379567071847}} The second call with the empty initialized aggregate. {{filter(1, EMPTY) at 211379567120375}} Output contains the correct tuple {{[KTABLE-TOSTREAM-0000000047]: 1, EMPTY+A}} h4. Second Tuple: (1->"B") #3 Output as expected the aggregated tuple ("EMPTY"+"+A"+"+B") {{[KTABLE-TOSTREAM-0000000044]: 1, EMPTY+A+B}} #4 Again a second unexpected call to filter(...) with the previous tuple before aggregation First call: {{filter(1, EMPTY+A+B) at 211379567498482}} Second call: {{filter(1, EMPTY+A) at 211379567524475}} But the output contains only one tuple as expected {{[KTABLE-TOSTREAM-0000000047]: 1, EMPTY+A+B}} was: Libraries from build.sbt: {{ "org.apache.kafka" % "kafka_2.13" % "2.7.0",}} {{ "org.apache.kafka" % "kafka-streams" % "2.7.0",}} {{ "org.apache.kafka" % "kafka-clients" % "2.7.0",}} {{ "org.apache.kafka" % "kafka-streams-scala_2.13" % "2.7.0",}} h4. Feed the Stream "issue_stream" with: {{(1->"A")}} {{(1->"B")}} h4. Topology: {{// #1}} {{val issueStream:KStream[Int,String] = builder.stream[Int,String]("issue_stream")}} {{// #2}} {{val aggTable:KTable[Int,String] =}} {{ issueStream}} {{ .groupBy((k,v)=>k)}} {{ .aggregate[String]("EMPTY")((k,v,agg)=>s"$agg+$v")}} {{// #3}} {{aggTable}} {{ .toStream}} {{ .print(Printed.toSysOut)}} {{// #4}} {{aggTable.filter((k,v)=>{}} {{ println(s"filter($k, $v) at ${System.nanoTime()}")}} {{ true}} {{ })}} {{ .toStream}} {{ .print(Printed.toSysOut)}} h4. First Tuple: (1->"A") #3 Output as expected, the aggregated tuple ("EMPTY"+"+A") {{[KTABLE-TOSTREAM-0000000044]: 1, EMPTY+A}} #4 The filter-method is called twice. First call with the expected tuple. {{filter(1, EMPTY+A) at 211379567071847}} The second call with the empty initialized aggregate. {{filter(1, EMPTY) at 211379567120375}} Output contains the correct tuple {{[KTABLE-TOSTREAM-0000000047]: 1, EMPTY+A}} h4. Second Tuple: (1->"B") #3 Output as expected the aggregated tuple ("EMPTY"+"+A"+"+B") {{[KTABLE-TOSTREAM-0000000044]: 1, EMPTY+A+B}} #4 Again a second unexpected call to filter(...) with the previous tuple before aggregation First call: {{filter(1, EMPTY+A+B) at 211379567498482}} Second call: {{filter(1, EMPTY+A) at 211379567524475}} But the output contains only one tuple as expected {{[KTABLE-TOSTREAM-0000000047]: 1, EMPTY+A+B}} > KTable#filter-method called twice after aggregation > --------------------------------------------------- > > Key: KAFKA-12564 > URL: https://issues.apache.org/jira/browse/KAFKA-12564 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.7.0 > Reporter: Jessica Johann > Priority: Major > > Libraries from build.sbt: > {{"org.apache.kafka" % "kafka_2.13" % "2.7.0",}} > {{"org.apache.kafka" % "kafka-streams" % "2.7.0",}} > {{"org.apache.kafka" % "kafka-clients" % "2.7.0",}} > {{"org.apache.kafka" % "kafka-streams-scala_2.13" % "2.7.0",}} > h4. > Feed the Stream "issue_stream" with: > {{(1->"A")}} > {{(1->"B")}} > h4. > Topology: > {{// #1}} > {{val issueStream:KStream[Int,String] = > builder.stream[Int,String]("issue_stream")}} > > {{{{// #2}}}} > {{ {{val aggTable:KTable[Int,String] =}}}} > {{issueStream}} > {{.groupBy((k,v)=>k)}} > {{.aggregate[String]("EMPTY")((k,v,agg)=>s"$agg+$v")}} > > {{{{// #3}}}} > {{ {{aggTable}}}} > {{.toStream}} > {{.print(Printed.toSysOut)}} > > {{{{// #4}}}} > {{ {{aggTable.filter((k,v)=>{}}}} > {{ {{ println(s"filter($k, $v) at ${System.nanoTime()}")}}}} > {{ {{ true}}}} > {{})}} > {{.toStream}} > {{.print(Printed.toSysOut)}} > h4. > First Tuple: (1->"A") > #3 Output as expected, the aggregated tuple ("EMPTY"+"+A") > {{[KTABLE-TOSTREAM-0000000044]: 1, EMPTY+A}} > > #4 The filter-method is called twice. > First call with the expected tuple. > {{filter(1, EMPTY+A) at 211379567071847}} > The second call with the empty initialized aggregate. > {{filter(1, EMPTY) at 211379567120375}} > Output contains the correct tuple > {{[KTABLE-TOSTREAM-0000000047]: 1, EMPTY+A}} > h4. > Second Tuple: (1->"B") > #3 Output as expected the aggregated tuple ("EMPTY"+"+A"+"+B") > {{[KTABLE-TOSTREAM-0000000044]: 1, EMPTY+A+B}} > #4 Again a second unexpected call to filter(...) with the previous tuple > before aggregation > First call: > {{filter(1, EMPTY+A+B) at 211379567498482}} > Second call: > {{filter(1, EMPTY+A) at 211379567524475}} > But the output contains only one tuple as expected > {{[KTABLE-TOSTREAM-0000000047]: 1, EMPTY+A+B}} -- This message was sent by Atlassian Jira (v8.3.4#803005)