I don't have the environment to run the Scala code right now. Will be
tomorrow until I have one ..

Now that Scala API is part of the official Kafka distribution. Can u please
try that out instead of kafka-streams-scala ? The library is now deprecated
and I remember we ran into some SAM related issues with Scala 2.11 (which
worked fine with 2.12). They were finally fixed in the Kafka distribution -
there are some differences in the APIs as well ..

regards.

On Sun, Sep 9, 2018 at 11:32 PM Michael Eugene <far...@hotmail.com> wrote:

> I’m using 2.11.11
>
> Sent from my iPhone
>
> > On Sep 9, 2018, at 12:13 PM, Debasish Ghosh <ghosh.debas...@gmail.com>
> wrote:
> >
> > Which version of Scala are u using ?
> >
> >> On Sun, 9 Sep 2018 at 10:44 AM, Michael Eugene <far...@hotmail.com>
> wrote:
> >>
> >> Hi,
> >>
> >>  I am using kafak-sreams-scala
> >> https://github.com/lightbend/kafka-streams-scala, and I am trying to
> >> implement something very simple and I am getting a compilation error by
> the
> >> "aggregate" method. The error is "Cannot resolve overload method
> >> 'aggregate'" and "Unspecified value parameters: materialized:
> >> Materialized[String, NotInferedVR, KeyValueStore[Bytes, Array[Byte]]]"
> >> [https://avatars0.githubusercontent.com/u/16247783?s=400&v=4]<
> >> https://github.com/lightbend/kafka-streams-scala>
> >>
> >> GitHub
> >> - lightbend/kafka-streams-scala: Thin Scala wrapper ...<
> >> https://github.com/lightbend/kafka-streams-scala>
> >> github.com
> >> Note:
> >> Scala API for Kafka Streams have been accepted for inclusion in Apache
> >> Kafka. We have been working with the Kafka team since the last couple of
> >> months working towards meeting the standards and guidelines for this
> >> activity. Lightbend and Alexis Seigneurin have
> >> contributed this library (with ...
> >>
> >>
> >>
> >>  However when I add a third argument for a Materialized, I get the
> >> compilation error "Too may arguments for method aggregate(() =>VR,
> (K,V,VR)
> >> => VR)"
> >>
> >>  It doesn't make sense anymore what could be breaking this.
> >>
> >>
> >>
> >> val myStream = builder
> >>  .stream(inputTopic)
> >>  .map{ (key: String, value: Array[Byte]) =>
> >>    println(s"key = ${key}")
> >>    val newKey = GroupByAction.getGroupByKeyFromByteAray(value)
> >>
> >>    val newValue = GroupByAction.getGroupByValueFromByteAray(value)
> >>
> >>    println(s"newKey = ${newKey}")
> >>    (newKey, serialise(newValue))}
> >>
> >> .groupByKey
> >> .aggregate(()=> 0L, (k,v,vr) => vr + 1)
> >>
> >> --
> > Sent from my iPhone
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg

Reply via email to