Isn’t that a bug then? Or can I fix my code somehow?

On August 20, 2018 at 1:30:42 PM, Ted Yu 
(<>) wrote:

I think what happened in your use case was that the following implicit
from ImplicitConversions.scala kept wrapping the resultant KTable from

implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] =

leading to stack overflow.


On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel <>

> Hi,
> I’m using the org.kafka.streams.scala that was released with version
> 2.0.0. I’m getting a StackOverflowError as follows:
> java.lang.StackOverflowError
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> .
> .
> .
> at org.apache.kafka.streams.scala.kstream.KTable.filter(KTable.scala:49)
> The Scala version I’m using is 2.11.11 and the code leading to the error
> is as follows (particularly the .filter).
> val builder = new StreamsBuilder
> val stream =[Array[Byte], CaseClassA](args.topic)
> val customers = args.config.keys
> val predicates = { customerId =>
> (_: Array[Byte], message: CaseClassA) => message.customerId == customerId
> }.toSeq
> val customerIdToStream = _*)).toMap
> val y = Printed.toSysOut[Windowed[Key], Long]
> customerIdToStream.foreach { case (customerId, customerStream) =>
> val customerConfig = args.config(customerId)
> customerStream
> .flatMap { case (_, message) =>
> {
> case CaseClassB(c, _) => Key(, c.prefix) -> 1
> }
> }
> .groupByKey
> .windowedBy(TimeWindows.of(customerConfig.windowSize).advanceBy(customerConfig.sliderSize))
> .count()
> .filter { case (_, count) => count >=
> customerConfig.frequencyThreshold }
> .toStream
> .print(y)
> }
> Is this a bug with the new scala module related to:
> ?
> Or am I doing something wrong?
> Thanks,
> Druhin

Reply via email to