Done.  I am using SBT for the build tool.  I created an application and put
it here
https://github.com/dhinojosa/kafka-streams-scala

The implicits for SessionWindowedSerde from this line below. The reason is
I am turning off the timeWindowSerde, and that just leaves the
SessionWindowSerde, the last underscore is to include the rest from this
package.

import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde => _, _}

Everything should be in the project, it is now a 2.11.12 and Kafka 2.4.0.
The only thing you may want to change is the SBT version in
project/build.properties. :)


On Thu, Nov 19, 2020 at 10:06 PM Eric Beabes <mailinglist...@gmail.com>
wrote:

> Daniel,
>
> I copied your code from here:
> https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735
>
> Still getting the same error message. Actually, I am not sure why it would
> work since you don't have implicits for SessionWindowedSerde. By the way,
> which versions of Scala & Kafka did you use? Maybe that's the issue. I am
> using following versions:
>
> *Scala: 2.11.12*
> *Kafka: 2.4.0*
>
> Can you please try with these versions & let me know if it still works for
> you? Thanks for your help.
>
>
>
>
>
>
> On Thu, Nov 19, 2020 at 5:16 PM Liam Clarke-Hutchinson <
> liam.cla...@adscale.co.nz> wrote:
>
> > That said John, nothing wrong with being explicit in code. :)
> >
> > On Fri, 20 Nov. 2020, 1:46 pm John Roesler, <vvcep...@apache.org> wrote:
> >
> > > Oh, nice. Thanks, Daniel!
> > >
> > > That’s much nicer than my ham-handed approach.
> > >
> > > Thanks,
> > > John
> > >
> > > On Thu, Nov 19, 2020, at 17:44, Daniel Hinojosa wrote:
> > > > Hope this helps, I tried copying your code into a sample
> application. I
> > > got
> > > > it to compile with the implicits all resolving. I think the trick was
> > > there
> > > > were two implementations for Windowing Serdes.  You just need to
> block
> > > one
> > > > from the imports.  See if that fits with what you are doing.  Oh
> also,
> > I
> > > > noticed that the types were not resolving when calling
> builder.stream,
> > > so I
> > > > put [String, String] in the builder.  Here is a gist, which formats
> > > better.
> > > >
> > > > https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735
> > > >
> > > > import java.time.Duration
> > > > import java.util.Properties
> > > >
> > > > import org.apache.kafka.common.config.Config
> > > > import org.apache.kafka.streams.Topology
> > > > import org.apache.kafka.streams.kstream.{SessionWindows, Windowed}
> > > > import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde => _,
> > _}
> > > > import org.apache.kafka.streams.scala.kstream.{Consumed, Grouped,
> > > > Materialized, Produced}
> > > > import org.apache.kafka.streams.scala.{ByteArraySessionStore,
> > > StreamsBuilder}
> > > >
> > > > class SampleStream {
> > > >   def createTopology(conf: Config, properties: Properties): Topology
> =
> > {
> > > >
> > > >     implicit val produced: Produced[Windowed[String], Long] =
> > > >       Produced.`with`[Windowed[String], Long]
> > > >
> > > >     implicit val grouped: Grouped[String, String] =
> > > >       Grouped.`with`[String, String]
> > > >
> > > >     implicit val consumed: Consumed[String, String] =
> > > >       Consumed.`with`[String, String]
> > > >
> > > >     implicit val materialized: Materialized[String, Long,
> > > > ByteArraySessionStore] = Materialized.`with`[String, Long,
> > > > ByteArraySessionStore]
> > > >
> > > >     val builder: StreamsBuilder = new StreamsBuilder()
> > > >
> > > >     builder
> > > >       .stream[String, String]("streams-plaintext-input")
> > > >       .groupBy((_, word) => word)
> > > >       .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 *
> 1000)))
> > > >       .count()
> > > >       .toStream
> > > >       .to("streams-pipe-output")
> > > >
> > > >     builder.build()
> > > >   }
> > > > }
> > > >
> > > >
> > > >
> > > >
> > > > On Thu, Nov 19, 2020 at 7:24 AM John Roesler <vvcep...@apache.org>
> > > wrote:
> > > >
> > > > > Hi Eric,
> > > > >
> > > > > Sure thing. Assuming the definition of ‘produced’ you had tried in
> > your
> > > > > code, it’s just:
> > > > >
> > > > > ...
> > > > > .toStream.to("streams-pipe-output")(produced)
> > > > >
> > > > > As far as the json serde goes, I think that I wrote an example of
> > using
> > > > > Jackson to implement a serde in Confluent’s kafka-streams-examples
> > > repo.
> > > > > I’m not sure what other/better examples
> > > > > might be out there.
> > > > >
> > > > > Hope this helps,
> > > > > John
> > > > >
> > > > > On Thu, Nov 19, 2020, at 00:27, Eric Beabes wrote:
> > > > > > Not sure what you mean by "pass it explicitly". The definition of
> > > 'to' is
> > > > > > given below. Can we pass it explicitly in this case. If yes, can
> > you
> > > > > please
> > > > > > show me how?
> > > > > >
> > > > > > def to(topic: String)(implicit produced: Produced[K, V]): Unit =
> > > > > >   inner.to(topic, produced)
> > > > > >
> > > > > >
> > > > > > Also not sure how to use a self documenting format like JSON. Any
> > > > > > examples to share?
> > > > > >
> > > > > >
> > > > > > On Wed, Nov 18, 2020 at 5:14 PM John Roesler <
> vvcep...@apache.org>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Eric,
> > > > > > >
> > > > > > > Ah, that’s a bummer. The correct serde is the session windowed
> > > serde,
> > > > > as I
> > > > > > > can see you know. I’m afraid I’m a bit rusty on implicit
> > resolution
> > > > > rules,
> > > > > > > so I can’t be much help there.
> > > > > > >
> > > > > > > But my general recommendation for implicits is that when things
> > get
> > > > > weird,
> > > > > > > just don’t use them at all. For example, you can just
> explicitly
> > > pass
> > > > > the
> > > > > > > Produced in the second arg list of ‘to’.
> > > > > > >
> > > > > > > One other tip is that the serialized form produced by those
> > serdes
> > > is
> > > > > kind
> > > > > > > of specialized and might not be the most convenient for your
> use.
> > > If
> > > > > this
> > > > > > > is just a POC, if suggest mapping the keys to strings, so they
> > are
> > > > > > > human-readable. If this is a production use case, then you
> might
> > > want
> > > > > to
> > > > > > > use a more self-documenting format like JSON or AVRO. Just my
> two
> > > > > cents.
> > > > > > >
> > > > > > > I hope this helps!
> > > > > > > -John
> > > > > > >
> > > > > > > On Wed, Nov 18, 2020, at 14:02, Eric Beabes wrote:
> > > > > > > > I keep getting '*ambiguous implicit values*' message in the
> > > following
> > > > > > > code.
> > > > > > > > I tried several things (as can be seen from a couple of lines
> > > I've
> > > > > > > > commented out). Any ideas on how to fix this? This is in
> > *Scala*.
> > > > > > > >
> > > > > > > >  def createTopology(conf: Config, properties: Properties):
> > > Topology =
> > > > > > > > {//    implicit val sessionSerde =
> > > > > > > > Serde[WindowedSerdes.SessionWindowedSerde[String]]//
> > implicit
> > > val
> > > > > > > > produced: Produced[Windowed[String], Long] =
> > > > > > > > Produced.`with`[WindowedSerdes.SessionWindowedSerde[String],
> > > Long]
> > > > > > > >     implicit val produced: Produced[Windowed[String], Long] =
> > > > > > > > Produced.`with`[Windowed[String], Long]
> > > > > > > >     implicit val consumed: Consumed[String, String] =
> > > > > > > > Consumed.`with`[String, String]
> > > > > > > >
> > > > > > > >     val builder: StreamsBuilder = new StreamsBuilder()
> > > > > > > >     builder.stream("streams-plaintext-input")
> > > > > > > >         .groupBy((_, word) => word)
> > > > > > > >
>  .windowedBy(SessionWindows.`with`(Duration.ofMillis(60
> > *
> > > > > 1000)))
> > > > > > > >         .count()
> > > > > > > >         .toStream.to("streams-pipe-output")
> > > > > > > >
> > > > > > > >     builder.build()
> > > > > > > >
> > > > > > > >   }
> > > > > > > >
> > > > > > > > *Compiler Errors:*
> > > > > > > >
> > > > > > > > Error:(52, 78) ambiguous implicit values:
> > > > > > > >  both method timeWindowedSerde in object Serdes of type
> > > [T](implicit
> > > > > > > > tSerde:
> > > > > > > >
> > > > > > >
> > > > >
> > >
> >
> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde[T]
> > > > > > > >  and method sessionWindowedSerde in object Serdes of type
> > > > > [T](implicit
> > > > > > > > tSerde:
> > > > > > > >
> > > > > > >
> > > > >
> > >
> >
> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.SessionWindowedSerde[T]
> > > > > > > >  match expected type
> > > > > > > >
> > > > > > >
> > > > >
> > >
> >
> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
> > > > > > > >     implicit val produced: Produced[Windowed[String], Long] =
> > > > > > > > Produced.`with`[Windowed[String], Long]
> > > > > > > > Error:(52, 78) could not find implicit value for parameter
> > > keySerde:
> > > > > > > >
> > > > > > >
> > > > >
> > >
> >
> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
> > > > > > > >     implicit val produced: Produced[Windowed[String], Long] =
> > > > > > > > Produced.`with`[Windowed[String], Long]
> > > > > > > > Error:(52, 78) not enough arguments for method with:
> (implicit
> > > > > > > > keySerde:
> > > > > > > >
> > > > > > >
> > > > >
> > >
> >
> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]],
> > > > > > > > implicit valueSerde:
> > > > > > > >
> > > > > > >
> > > > >
> > >
> >
> org.apache.kafka.common.serialization.Serde[Long])org.apache.kafka.streams.kstream.Produced[org.apache.kafka.streams.kstream.Windowed[String],Long].Unspecified
> > > > > > > > value parameters keySerde, valueSerde.
> > > > > > > >     implicit val produced: Produced[Windowed[String], Long] =
> > > > > > > > Produced.`with`[Windowed[String], Long]
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to