Hi,

I’m using the org.kafka.streams.scala that was released with version 2.0.0. I’m 
getting a StackOverflowError as follows:

java.lang.StackOverflowError
at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
                                                   .
   .
   .
at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)

The Scala version I’m using is 2.11.11 and the code leading to the error is as 
follows (particularly the .filter).

val builder = new StreamsBuilder

val stream = builder.stream[Array[Byte], CaseClassA](args.topic)

val customers = args.config.keys

val predicates = customers.map { customerId =>
  (_: Array[Byte], message: CaseClassA) => message.customerId == customerId
}.toSeq

val customerIdToStream = customers.zip(stream(predicates: _*)).toMap

val y = Printed.toSysOut[Windowed[Key], Long]

customerIdToStream.foreach { case (customerId, customerStream) =>
  val customerConfig = args.config(customerId)
  customerStream
    .flatMap { case (_, message) =>
      message.objects.map {
        case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1
      }
    }
    .groupByKey
    
.windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize))
    .count()
    .filter { case (_, count) => count >= customerConfig.frequencyThreshold }
    .toStream
    .print(y)
}

Is this a bug with the new scala module related to: 
https://github.com/lightbend/kafka-streams-scala/issues/63 ?
Or am I doing something wrong?

Thanks,
Druhin

Reply via email to