Not sure what you mean by "pass it explicitly". The definition of 'to' is
given below. Can we pass it explicitly in this case. If yes, can you please
show me how?

def to(topic: String)(implicit produced: Produced[K, V]): Unit =
  inner.to(topic, produced)


Also not sure how to use a self documenting format like JSON. Any
examples to share?


On Wed, Nov 18, 2020 at 5:14 PM John Roesler <vvcep...@apache.org> wrote:

> Hi Eric,
>
> Ah, that’s a bummer. The correct serde is the session windowed serde, as I
> can see you know. I’m afraid I’m a bit rusty on implicit resolution rules,
> so I can’t be much help there.
>
> But my general recommendation for implicits is that when things get weird,
> just don’t use them at all. For example, you can just explicitly pass the
> Produced in the second arg list of ‘to’.
>
> One other tip is that the serialized form produced by those serdes is kind
> of specialized and might not be the most convenient for your use. If this
> is just a POC, if suggest mapping the keys to strings, so they are
> human-readable. If this is a production use case, then you might want to
> use a more self-documenting format like JSON or AVRO. Just my two cents.
>
> I hope this helps!
> -John
>
> On Wed, Nov 18, 2020, at 14:02, Eric Beabes wrote:
> > I keep getting '*ambiguous implicit values*' message in the following
> code.
> > I tried several things (as can be seen from a couple of lines I've
> > commented out). Any ideas on how to fix this? This is in *Scala*.
> >
> >  def createTopology(conf: Config, properties: Properties): Topology =
> > {//    implicit val sessionSerde =
> > Serde[WindowedSerdes.SessionWindowedSerde[String]]//    implicit val
> > produced: Produced[Windowed[String], Long] =
> > Produced.`with`[WindowedSerdes.SessionWindowedSerde[String], Long]
> >     implicit val produced: Produced[Windowed[String], Long] =
> > Produced.`with`[Windowed[String], Long]
> >     implicit val consumed: Consumed[String, String] =
> > Consumed.`with`[String, String]
> >
> >     val builder: StreamsBuilder = new StreamsBuilder()
> >     builder.stream("streams-plaintext-input")
> >         .groupBy((_, word) => word)
> >         .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * 1000)))
> >         .count()
> >         .toStream.to("streams-pipe-output")
> >
> >     builder.build()
> >
> >   }
> >
> > *Compiler Errors:*
> >
> > Error:(52, 78) ambiguous implicit values:
> >  both method timeWindowedSerde in object Serdes of type [T](implicit
> > tSerde:
> >
> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde[T]
> >  and method sessionWindowedSerde in object Serdes of type [T](implicit
> > tSerde:
> >
> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.SessionWindowedSerde[T]
> >  match expected type
> >
> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
> >     implicit val produced: Produced[Windowed[String], Long] =
> > Produced.`with`[Windowed[String], Long]
> > Error:(52, 78) could not find implicit value for parameter keySerde:
> >
> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
> >     implicit val produced: Produced[Windowed[String], Long] =
> > Produced.`with`[Windowed[String], Long]
> > Error:(52, 78) not enough arguments for method with: (implicit
> > keySerde:
> >
> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]],
> > implicit valueSerde:
> >
> org.apache.kafka.common.serialization.Serde[Long])org.apache.kafka.streams.kstream.Produced[org.apache.kafka.streams.kstream.Windowed[String],Long].Unspecified
> > value parameters keySerde, valueSerde.
> >     implicit val produced: Produced[Windowed[String], Long] =
> > Produced.`with`[Windowed[String], Long]
> >
>

Reply via email to