Is this related to the fix https://github.com/apache/kafka/pull/5502 that is currently being worked on?
Guozhang On Mon, Aug 20, 2018 at 5:19 PM, Matthias J. Sax <matth...@confluent.io> wrote: > Thanks for reporting and for creating the ticket! > > -Matthias > > On 8/20/18 5:17 PM, Ted Yu wrote: > > I was able to reproduce what you saw with modification > > to StreamToTableJoinScalaIntegrationTestImplicitSerdes.scala > > I have logged KAFKA-7316 and am looking for a fix. > > > > FYI > > > > On Mon, Aug 20, 2018 at 1:39 PM Druhin Sagar Goel <dru...@arrcus.com> > wrote: > > > >> Isn’t that a bug then? Or can I fix my code somehow? > >> > >> > >> > >> On August 20, 2018 at 1:30:42 PM, Ted Yu (yuzhih...@gmail.com<mailto: > >> yuzhih...@gmail.com>) wrote: > >> > >> I think what happened in your use case was that the following implicit > >> from ImplicitConversions.scala kept wrapping the resultant KTable from > >> filter(): > >> > >> implicit def wrapKTable[K, V](inner: KTableJ[K, V]): KTable[K, V] = > >> > >> leading to stack overflow. > >> > >> Cheers > >> > >> On Mon, Aug 20, 2018 at 12:50 PM Druhin Sagar Goel <dru...@arrcus.com> > >> wrote: > >> > >>> Hi, > >>> > >>> I’m using the org.kafka.streams.scala that was released with version > >>> 2.0.0. I’m getting a StackOverflowError as follows: > >>> > >>> java.lang.StackOverflowError > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter( > KTable.scala:49) > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter( > KTable.scala:49) > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter( > KTable.scala:49) > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter( > KTable.scala:49) > >>> . > >>> . > >>> . > >>> at org.apache.kafka.streams.scala.kstream.KTable.filter( > KTable.scala:49) > >>> > >>> The Scala version I’m using is 2.11.11 and the code leading to the > error > >>> is as follows (particularly the .filter). > >>> > >>> val builder = new StreamsBuilder > >>> > >>> val stream = builder.stream[Array[Byte], CaseClassA](args.topic) > >>> > >>> val customers = args.config.keys > >>> > >>> val predicates = customers.map { customerId => > >>> (_: Array[Byte], message: CaseClassA) => message.customerId == > customerId > >>> }.toSeq > >>> > >>> val customerIdToStream = customers.zip(stream(predicates: _*)).toMap > >>> > >>> val y = Printed.toSysOut[Windowed[Key], Long] > >>> > >>> customerIdToStream.foreach { case (customerId, customerStream) => > >>> val customerConfig = args.config(customerId) > >>> customerStream > >>> .flatMap { case (_, message) => > >>> message.objects.map { > >>> case CaseClassB(c, _) => Key(message.id, c.prefix) -> 1 > >>> } > >>> } > >>> .groupByKey > >>> > >>> > >> .windowedBy(TimeWindows.of(customerConfig.windowSize). > advanceBy(customerConfig.sliderSize)) > >>> .count() > >>> .filter { case (_, count) => count >= > >>> customerConfig.frequencyThreshold } > >>> .toStream > >>> .print(y) > >>> } > >>> > >>> Is this a bug with the new scala module related to: > >>> https://github.com/lightbend/kafka-streams-scala/issues/63 ? > >>> Or am I doing something wrong? > >>> > >>> Thanks, > >>> Druhin > >>> > >> > > > > -- -- Guozhang