Daniel,

I copied your code from here:
https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735

Still getting the same error message. Actually, I am not sure why it would
work since you don't have implicits for SessionWindowedSerde. By the way,
which versions of Scala & Kafka did you use? Maybe that's the issue. I am
using following versions:

*Scala: 2.11.12*
*Kafka: 2.4.0*

Can you please try with these versions & let me know if it still works for
you? Thanks for your help.






On Thu, Nov 19, 2020 at 5:16 PM Liam Clarke-Hutchinson <
liam.cla...@adscale.co.nz> wrote:

> That said John, nothing wrong with being explicit in code. :)
>
> On Fri, 20 Nov. 2020, 1:46 pm John Roesler, <vvcep...@apache.org> wrote:
>
> > Oh, nice. Thanks, Daniel!
> >
> > That’s much nicer than my ham-handed approach.
> >
> > Thanks,
> > John
> >
> > On Thu, Nov 19, 2020, at 17:44, Daniel Hinojosa wrote:
> > > Hope this helps, I tried copying your code into a sample application. I
> > got
> > > it to compile with the implicits all resolving. I think the trick was
> > there
> > > were two implementations for Windowing Serdes.  You just need to block
> > one
> > > from the imports.  See if that fits with what you are doing.  Oh also,
> I
> > > noticed that the types were not resolving when calling builder.stream,
> > so I
> > > put [String, String] in the builder.  Here is a gist, which formats
> > better.
> > >
> > > https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735
> > >
> > > import java.time.Duration
> > > import java.util.Properties
> > >
> > > import org.apache.kafka.common.config.Config
> > > import org.apache.kafka.streams.Topology
> > > import org.apache.kafka.streams.kstream.{SessionWindows, Windowed}
> > > import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde => _,
> _}
> > > import org.apache.kafka.streams.scala.kstream.{Consumed, Grouped,
> > > Materialized, Produced}
> > > import org.apache.kafka.streams.scala.{ByteArraySessionStore,
> > StreamsBuilder}
> > >
> > > class SampleStream {
> > >   def createTopology(conf: Config, properties: Properties): Topology =
> {
> > >
> > >     implicit val produced: Produced[Windowed[String], Long] =
> > >       Produced.`with`[Windowed[String], Long]
> > >
> > >     implicit val grouped: Grouped[String, String] =
> > >       Grouped.`with`[String, String]
> > >
> > >     implicit val consumed: Consumed[String, String] =
> > >       Consumed.`with`[String, String]
> > >
> > >     implicit val materialized: Materialized[String, Long,
> > > ByteArraySessionStore] = Materialized.`with`[String, Long,
> > > ByteArraySessionStore]
> > >
> > >     val builder: StreamsBuilder = new StreamsBuilder()
> > >
> > >     builder
> > >       .stream[String, String]("streams-plaintext-input")
> > >       .groupBy((_, word) => word)
> > >       .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * 1000)))
> > >       .count()
> > >       .toStream
> > >       .to("streams-pipe-output")
> > >
> > >     builder.build()
> > >   }
> > > }
> > >
> > >
> > >
> > >
> > > On Thu, Nov 19, 2020 at 7:24 AM John Roesler <vvcep...@apache.org>
> > wrote:
> > >
> > > > Hi Eric,
> > > >
> > > > Sure thing. Assuming the definition of ‘produced’ you had tried in
> your
> > > > code, it’s just:
> > > >
> > > > ...
> > > > .toStream.to("streams-pipe-output")(produced)
> > > >
> > > > As far as the json serde goes, I think that I wrote an example of
> using
> > > > Jackson to implement a serde in Confluent’s kafka-streams-examples
> > repo.
> > > > I’m not sure what other/better examples
> > > > might be out there.
> > > >
> > > > Hope this helps,
> > > > John
> > > >
> > > > On Thu, Nov 19, 2020, at 00:27, Eric Beabes wrote:
> > > > > Not sure what you mean by "pass it explicitly". The definition of
> > 'to' is
> > > > > given below. Can we pass it explicitly in this case. If yes, can
> you
> > > > please
> > > > > show me how?
> > > > >
> > > > > def to(topic: String)(implicit produced: Produced[K, V]): Unit =
> > > > >   inner.to(topic, produced)
> > > > >
> > > > >
> > > > > Also not sure how to use a self documenting format like JSON. Any
> > > > > examples to share?
> > > > >
> > > > >
> > > > > On Wed, Nov 18, 2020 at 5:14 PM John Roesler <vvcep...@apache.org>
> > > > wrote:
> > > > >
> > > > > > Hi Eric,
> > > > > >
> > > > > > Ah, that’s a bummer. The correct serde is the session windowed
> > serde,
> > > > as I
> > > > > > can see you know. I’m afraid I’m a bit rusty on implicit
> resolution
> > > > rules,
> > > > > > so I can’t be much help there.
> > > > > >
> > > > > > But my general recommendation for implicits is that when things
> get
> > > > weird,
> > > > > > just don’t use them at all. For example, you can just explicitly
> > pass
> > > > the
> > > > > > Produced in the second arg list of ‘to’.
> > > > > >
> > > > > > One other tip is that the serialized form produced by those
> serdes
> > is
> > > > kind
> > > > > > of specialized and might not be the most convenient for your use.
> > If
> > > > this
> > > > > > is just a POC, if suggest mapping the keys to strings, so they
> are
> > > > > > human-readable. If this is a production use case, then you might
> > want
> > > > to
> > > > > > use a more self-documenting format like JSON or AVRO. Just my two
> > > > cents.
> > > > > >
> > > > > > I hope this helps!
> > > > > > -John
> > > > > >
> > > > > > On Wed, Nov 18, 2020, at 14:02, Eric Beabes wrote:
> > > > > > > I keep getting '*ambiguous implicit values*' message in the
> > following
> > > > > > code.
> > > > > > > I tried several things (as can be seen from a couple of lines
> > I've
> > > > > > > commented out). Any ideas on how to fix this? This is in
> *Scala*.
> > > > > > >
> > > > > > >  def createTopology(conf: Config, properties: Properties):
> > Topology =
> > > > > > > {//    implicit val sessionSerde =
> > > > > > > Serde[WindowedSerdes.SessionWindowedSerde[String]]//
> implicit
> > val
> > > > > > > produced: Produced[Windowed[String], Long] =
> > > > > > > Produced.`with`[WindowedSerdes.SessionWindowedSerde[String],
> > Long]
> > > > > > >     implicit val produced: Produced[Windowed[String], Long] =
> > > > > > > Produced.`with`[Windowed[String], Long]
> > > > > > >     implicit val consumed: Consumed[String, String] =
> > > > > > > Consumed.`with`[String, String]
> > > > > > >
> > > > > > >     val builder: StreamsBuilder = new StreamsBuilder()
> > > > > > >     builder.stream("streams-plaintext-input")
> > > > > > >         .groupBy((_, word) => word)
> > > > > > >         .windowedBy(SessionWindows.`with`(Duration.ofMillis(60
> *
> > > > 1000)))
> > > > > > >         .count()
> > > > > > >         .toStream.to("streams-pipe-output")
> > > > > > >
> > > > > > >     builder.build()
> > > > > > >
> > > > > > >   }
> > > > > > >
> > > > > > > *Compiler Errors:*
> > > > > > >
> > > > > > > Error:(52, 78) ambiguous implicit values:
> > > > > > >  both method timeWindowedSerde in object Serdes of type
> > [T](implicit
> > > > > > > tSerde:
> > > > > > >
> > > > > >
> > > >
> >
> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde[T]
> > > > > > >  and method sessionWindowedSerde in object Serdes of type
> > > > [T](implicit
> > > > > > > tSerde:
> > > > > > >
> > > > > >
> > > >
> >
> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.SessionWindowedSerde[T]
> > > > > > >  match expected type
> > > > > > >
> > > > > >
> > > >
> >
> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
> > > > > > >     implicit val produced: Produced[Windowed[String], Long] =
> > > > > > > Produced.`with`[Windowed[String], Long]
> > > > > > > Error:(52, 78) could not find implicit value for parameter
> > keySerde:
> > > > > > >
> > > > > >
> > > >
> >
> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
> > > > > > >     implicit val produced: Produced[Windowed[String], Long] =
> > > > > > > Produced.`with`[Windowed[String], Long]
> > > > > > > Error:(52, 78) not enough arguments for method with: (implicit
> > > > > > > keySerde:
> > > > > > >
> > > > > >
> > > >
> >
> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]],
> > > > > > > implicit valueSerde:
> > > > > > >
> > > > > >
> > > >
> >
> org.apache.kafka.common.serialization.Serde[Long])org.apache.kafka.streams.kstream.Produced[org.apache.kafka.streams.kstream.Windowed[String],Long].Unspecified
> > > > > > > value parameters keySerde, valueSerde.
> > > > > > >     implicit val produced: Produced[Windowed[String], Long] =
> > > > > > > Produced.`with`[Windowed[String], Long]
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to