Isn’t that a bug then? Or can I fix my code somehow?
On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>) wrote: I think what happened in your use case was that the following implicit from ImplicitConversions.scala kept wrapping the resultant KTable from filter(): implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] = leading to stack overflow. Cheers On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel <dru...@arrcus.com> wrote: > Hi, > > I’m using the org.kafka.streams.scala that was released with version > 2.0.0. I’m getting a StackOverflowError as follows: > > java.lang.StackOverflowError > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49) > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49) > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49) > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49) > . > . > . > at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49) > > The Scala version I’m using is 2.11.11 and the code leading to the error > is as follows (particularly the .filter). > > val builder = new StreamsBuilder > > val stream = builder.stream[Array[Byte], CaseClassA](args.topic) > > val customers = args.config.keys > > val predicates = customers.map { customerId => > (_: Array[Byte], message: CaseClassA) => message.customerId == customerId > }.toSeq > > val customerIdToStream = customers.zip(stream(predicates: _*)).toMap > > val y = Printed.toSysOut[Windowed[Key], Long] > > customerIdToStream.foreach { case (customerId, customerStream) => > val customerConfig = args.config(customerId) > customerStream > .flatMap { case (_, message) => > message.objects.map { > case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1 > } > } > .groupByKey > > .windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize)) > .count() > .filter { case (_, count) => count >= > customerConfig.frequencyThreshold } > .toStream > .print(y) > } > > Is this a bug with the new scala module related to: > https://github.com/lightbend/kafka-streams-scala/issues/63 ? > Or am I doing something wrong? > > Thanks, > Druhin >