I don't have the environment to run the Scala code right now. Will be tomorrow until I have one ..
Now that Scala API is part of the official Kafka distribution. Can u please try that out instead of kafka-streams-scala ? The library is now deprecated and I remember we ran into some SAM related issues with Scala 2.11 (which worked fine with 2.12). They were finally fixed in the Kafka distribution - there are some differences in the APIs as well .. regards. On Sun, Sep 9, 2018 at 11:32 PM Michael Eugene <far...@hotmail.com> wrote: > I’m using 2.11.11 > > Sent from my iPhone > > > On Sep 9, 2018, at 12:13 PM, Debasish Ghosh <ghosh.debas...@gmail.com> > wrote: > > > > Which version of Scala are u using ? > > > >> On Sun, 9 Sep 2018 at 10:44 AM, Michael Eugene <far...@hotmail.com> > wrote: > >> > >> Hi, > >> > >> I am using kafak-sreams-scala > >> https://github.com/lightbend/kafka-streams-scala, and I am trying to > >> implement something very simple and I am getting a compilation error by > the > >> "aggregate" method. The error is "Cannot resolve overload method > >> 'aggregate'" and "Unspecified value parameters: materialized: > >> Materialized[String, NotInferedVR, KeyValueStore[Bytes, Array[Byte]]]" > >> [https://avatars0.githubusercontent.com/u/16247783?s=400&v=4]< > >> https://github.com/lightbend/kafka-streams-scala> > >> > >> GitHub > >> - lightbend/kafka-streams-scala: Thin Scala wrapper ...< > >> https://github.com/lightbend/kafka-streams-scala> > >> github.com > >> Note: > >> Scala API for Kafka Streams have been accepted for inclusion in Apache > >> Kafka. We have been working with the Kafka team since the last couple of > >> months working towards meeting the standards and guidelines for this > >> activity. Lightbend and Alexis Seigneurin have > >> contributed this library (with ... > >> > >> > >> > >> However when I add a third argument for a Materialized, I get the > >> compilation error "Too may arguments for method aggregate(() =>VR, > (K,V,VR) > >> => VR)" > >> > >> It doesn't make sense anymore what could be breaking this. > >> > >> > >> > >> val myStream = builder > >> .stream(inputTopic) > >> .map{ (key: String, value: Array[Byte]) => > >> println(s"key = ${key}") > >> val newKey = GroupByAction.getGroupByKeyFromByteAray(value) > >> > >> val newValue = GroupByAction.getGroupByValueFromByteAray(value) > >> > >> println(s"newKey = ${newKey}") > >> (newKey, serialise(newValue))} > >> > >> .groupByKey > >> .aggregate(()=> 0L, (k,v,vr) => vr + 1) > >> > >> -- > > Sent from my iPhone > -- Debasish Ghosh http://manning.com/ghosh2 http://manning.com/ghosh Twttr: @debasishg Blog: http://debasishg.blogspot.com Code: http://github.com/debasishg