Hi Eric,

Sure thing. Assuming the definition of ‘produced’ you had tried in your code, 
it’s just:

...
.toStream.to("streams-pipe-output")(produced)

As far as the json serde goes, I think that I wrote an example of using Jackson 
to implement a serde in Confluent’s kafka-streams-examples repo. I’m not sure 
what other/better examples
might be out there. 

Hope this helps,
John

On Thu, Nov 19, 2020, at 00:27, Eric Beabes wrote:
> Not sure what you mean by "pass it explicitly". The definition of 'to' is
> given below. Can we pass it explicitly in this case. If yes, can you please
> show me how?
> 
> def to(topic: String)(implicit produced: Produced[K, V]): Unit =
>   inner.to(topic, produced)
> 
> 
> Also not sure how to use a self documenting format like JSON. Any
> examples to share?
> 
> 
> On Wed, Nov 18, 2020 at 5:14 PM John Roesler <vvcep...@apache.org> wrote:
> 
> > Hi Eric,
> >
> > Ah, that’s a bummer. The correct serde is the session windowed serde, as I
> > can see you know. I’m afraid I’m a bit rusty on implicit resolution rules,
> > so I can’t be much help there.
> >
> > But my general recommendation for implicits is that when things get weird,
> > just don’t use them at all. For example, you can just explicitly pass the
> > Produced in the second arg list of ‘to’.
> >
> > One other tip is that the serialized form produced by those serdes is kind
> > of specialized and might not be the most convenient for your use. If this
> > is just a POC, if suggest mapping the keys to strings, so they are
> > human-readable. If this is a production use case, then you might want to
> > use a more self-documenting format like JSON or AVRO. Just my two cents.
> >
> > I hope this helps!
> > -John
> >
> > On Wed, Nov 18, 2020, at 14:02, Eric Beabes wrote:
> > > I keep getting '*ambiguous implicit values*' message in the following
> > code.
> > > I tried several things (as can be seen from a couple of lines I've
> > > commented out). Any ideas on how to fix this? This is in *Scala*.
> > >
> > >  def createTopology(conf: Config, properties: Properties): Topology =
> > > {//    implicit val sessionSerde =
> > > Serde[WindowedSerdes.SessionWindowedSerde[String]]//    implicit val
> > > produced: Produced[Windowed[String], Long] =
> > > Produced.`with`[WindowedSerdes.SessionWindowedSerde[String], Long]
> > >     implicit val produced: Produced[Windowed[String], Long] =
> > > Produced.`with`[Windowed[String], Long]
> > >     implicit val consumed: Consumed[String, String] =
> > > Consumed.`with`[String, String]
> > >
> > >     val builder: StreamsBuilder = new StreamsBuilder()
> > >     builder.stream("streams-plaintext-input")
> > >         .groupBy((_, word) => word)
> > >         .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * 1000)))
> > >         .count()
> > >         .toStream.to("streams-pipe-output")
> > >
> > >     builder.build()
> > >
> > >   }
> > >
> > > *Compiler Errors:*
> > >
> > > Error:(52, 78) ambiguous implicit values:
> > >  both method timeWindowedSerde in object Serdes of type [T](implicit
> > > tSerde:
> > >
> > org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde[T]
> > >  and method sessionWindowedSerde in object Serdes of type [T](implicit
> > > tSerde:
> > >
> > org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.SessionWindowedSerde[T]
> > >  match expected type
> > >
> > org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
> > >     implicit val produced: Produced[Windowed[String], Long] =
> > > Produced.`with`[Windowed[String], Long]
> > > Error:(52, 78) could not find implicit value for parameter keySerde:
> > >
> > org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
> > >     implicit val produced: Produced[Windowed[String], Long] =
> > > Produced.`with`[Windowed[String], Long]
> > > Error:(52, 78) not enough arguments for method with: (implicit
> > > keySerde:
> > >
> > org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]],
> > > implicit valueSerde:
> > >
> > org.apache.kafka.common.serialization.Serde[Long])org.apache.kafka.streams.kstream.Produced[org.apache.kafka.streams.kstream.Windowed[String],Long].Unspecified
> > > value parameters keySerde, valueSerde.
> > >     implicit val produced: Produced[Windowed[String], Long] =
> > > Produced.`with`[Windowed[String], Long]
> > >
> >
>

Reply via email to