Hi, I’m using the org.kafka.streams.scala that was released with version 2.0.0. I’m getting a StackOverflowError as follows:
java.lang.StackOverflowError at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49) at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49) at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49) at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49) . . . at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49) The Scala version I’m using is 2.11.11 and the code leading to the error is as follows (particularly the .filter). val builder = new StreamsBuilder val stream = builder.stream[Array[Byte], CaseClassA](args.topic) val customers = args.config.keys val predicates = customers.map { customerId => (_: Array[Byte], message: CaseClassA) => message.customerId == customerId }.toSeq val customerIdToStream = customers.zip(stream(predicates: _*)).toMap val y = Printed.toSysOut[Windowed[Key], Long] customerIdToStream.foreach { case (customerId, customerStream) => val customerConfig = args.config(customerId) customerStream .flatMap { case (_, message) => message.objects.map { case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1 } } .groupByKey .windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize)) .count() .filter { case (_, count) => count >= customerConfig.frequencyThreshold } .toStream .print(y) } Is this a bug with the new scala module related to: https://github.com/lightbend/kafka-streams-scala/issues/63 ? Or am I doing something wrong? Thanks, Druhin