I already thought I tried that. I tried explicitly creating the KeyValueMapper and supplying the apply override. I think it looks like SAM syntax but it might be something else.
Sent from my iPhone > On Sep 9, 2018, at 11:08 AM, Ryanne Dolan <ryannedo...@gmail.com> wrote: > > This means the types of k, v, vr don't match those expected by the > aggregate() function. Add explicit types to your code and you'll find the > problem. You'll probably find that Scala is inferring an Any somewhere. > > Ryanne > >> On Sun, Sep 9, 2018, 12:14 AM Michael Eugene <far...@hotmail.com> wrote: >> >> Hi, >> >> I am using kafak-sreams-scala >> https://github.com/lightbend/kafka-streams-scala, and I am trying to >> implement something very simple and I am getting a compilation error by the >> "aggregate" method. The error is "Cannot resolve overload method >> 'aggregate'" and "Unspecified value parameters: materialized: >> Materialized[String, NotInferedVR, KeyValueStore[Bytes, Array[Byte]]]" >> [https://avatars0.githubusercontent.com/u/16247783?s=400&v=4]< >> https://github.com/lightbend/kafka-streams-scala> >> >> GitHub >> - lightbend/kafka-streams-scala: Thin Scala wrapper ...< >> https://github.com/lightbend/kafka-streams-scala> >> github.com >> Note: >> Scala API for Kafka Streams have been accepted for inclusion in Apache >> Kafka. We have been working with the Kafka team since the last couple of >> months working towards meeting the standards and guidelines for this >> activity. Lightbend and Alexis Seigneurin have >> contributed this library (with ... >> >> >> >> However when I add a third argument for a Materialized, I get the >> compilation error "Too may arguments for method aggregate(() =>VR, (K,V,VR) >> => VR)" >> >> It doesn't make sense anymore what could be breaking this. >> >> >> >> val myStream = builder >> .stream(inputTopic) >> .map{ (key: String, value: Array[Byte]) => >> println(s"key = ${key}") >> val newKey = GroupByAction.getGroupByKeyFromByteAray(value) >> >> val newValue = GroupByAction.getGroupByValueFromByteAray(value) >> >> println(s"newKey = ${newKey}") >> (newKey, serialise(newValue))} >> >> .groupByKey >> .aggregate(()=> 0L, (k,v,vr) => vr + 1) >> >>