Thanks a lot Ted! FYI - The issue is not limited to the org.apache.kafka.streams.scala.KTable.filter. It also happens with org.apache.kafka.streams.scala.KTable.filterNot, org.apache.kafka.streams.scala.KStream.foreach and org.apache.kafka.streams.scala.KStream.peek.
- Druhin On August 20, 2018 at 5:19:36 PM, Matthias J. Sax (matth...@confluent.io<mailto:matth...@confluent.io>) wrote: Thanks for reporting and for creating the ticket! -Matthias On 8/20/18 5:17 PM, Ted Yu wrote: > I was able to reproduce what you saw with modification > to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > I have logged KAFKA-7316 and am looking for a fix. > > FYI > > On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel <dru...@arrcus.com> wrote: > >> Isn’t that a bug then? Or can I fix my code somehow? >> >> >> >> On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhih...@gmail.com<mailto: >> yuzhih...@gmail.com>) wrote: >> >> I think what happened in your use case was that the following implicit >> from ImplicitConversions.scala kept wrapping the resultant KTable from >> filter(): >> >> implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] = >> >> leading to stack overflow. >> >> Cheers >> >> On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel <dru...@arrcus.com> >> wrote: >> >>> Hi, >>> >>> I’m using the org.kafka.streams.scala that was released with version >>> 2.0.0. I’m getting a StackOverflowError as follows: >>> >>> java.lang.StackOverflowError >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49) >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49) >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49) >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49) >>> . >>> . >>> . >>> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49) >>> >>> The Scala version I’m using is 2.11.11 and the code leading to the error >>> is as follows (particularly the .filter). >>> >>> val builder = new StreamsBuilder >>> >>> val stream = builder.stream[Array[Byte], CaseClassA](args.topic) >>> >>> val customers = args.config.keys >>> >>> val predicates = customers.map { customerId => >>> (_: Array[Byte], message: CaseClassA) => message.customerId == customerId >>> }.toSeq >>> >>> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap >>> >>> val y = Printed.toSysOut[Windowed[Key], Long] >>> >>> customerIdToStream.foreach { case (customerId, customerStream) => >>> val customerConfig = args.config(customerId) >>> customerStream >>> .flatMap { case (_, message) => >>> message.objects.map { >>> case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1 >>> } >>> } >>> .groupByKey >>> >>> >> .windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize)) >>> .count() >>> .filter { case (_, count) => count >= >>> customerConfig.frequencyThreshold } >>> .toStream >>> .print(y) >>> } >>> >>> Is this a bug with the new scala module related to: >>> https://github.com/lightbend/kafka-streams-scala/issues/63 ? >>> Or am I doing something wrong? >>> >>> Thanks, >>> Druhin >>> >> >