Oh, nice. Thanks, Daniel!

That’s much nicer than my ham-handed approach. 

Thanks,
John

On Thu, Nov 19, 2020, at 17:44, Daniel Hinojosa wrote:
> Hope this helps, I tried copying your code into a sample application. I got
> it to compile with the implicits all resolving. I think the trick was there
> were two implementations for Windowing Serdes.  You just need to block one
> from the imports.  See if that fits with what you are doing.  Oh also, I
> noticed that the types were not resolving when calling builder.stream, so I
> put [String, String] in the builder.  Here is a gist, which formats better.
> 
> https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735
> 
> import java.time.Duration
> import java.util.Properties
> 
> import org.apache.kafka.common.config.Config
> import org.apache.kafka.streams.Topology
> import org.apache.kafka.streams.kstream.{SessionWindows, Windowed}
> import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde => _, _}
> import org.apache.kafka.streams.scala.kstream.{Consumed, Grouped,
> Materialized, Produced}
> import org.apache.kafka.streams.scala.{ByteArraySessionStore, StreamsBuilder}
> 
> class SampleStream {
>   def createTopology(conf: Config, properties: Properties): Topology = {
> 
>     implicit val produced: Produced[Windowed[String], Long] =
>       Produced.`with`[Windowed[String], Long]
> 
>     implicit val grouped: Grouped[String, String] =
>       Grouped.`with`[String, String]
> 
>     implicit val consumed: Consumed[String, String] =
>       Consumed.`with`[String, String]
> 
>     implicit val materialized: Materialized[String, Long,
> ByteArraySessionStore] = Materialized.`with`[String, Long,
> ByteArraySessionStore]
> 
>     val builder: StreamsBuilder = new StreamsBuilder()
> 
>     builder
>       .stream[String, String]("streams-plaintext-input")
>       .groupBy((_, word) => word)
>       .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * 1000)))
>       .count()
>       .toStream
>       .to("streams-pipe-output")
> 
>     builder.build()
>   }
> }
> 
> 
> 
> 
> On Thu, Nov 19, 2020 at 7:24 AM John Roesler <vvcep...@apache.org> wrote:
> 
> > Hi Eric,
> >
> > Sure thing. Assuming the definition of ‘produced’ you had tried in your
> > code, it’s just:
> >
> > ...
> > .toStream.to("streams-pipe-output")(produced)
> >
> > As far as the json serde goes, I think that I wrote an example of using
> > Jackson to implement a serde in Confluent’s kafka-streams-examples repo.
> > I’m not sure what other/better examples
> > might be out there.
> >
> > Hope this helps,
> > John
> >
> > On Thu, Nov 19, 2020, at 00:27, Eric Beabes wrote:
> > > Not sure what you mean by "pass it explicitly". The definition of 'to' is
> > > given below. Can we pass it explicitly in this case. If yes, can you
> > please
> > > show me how?
> > >
> > > def to(topic: String)(implicit produced: Produced[K, V]): Unit =
> > >   inner.to(topic, produced)
> > >
> > >
> > > Also not sure how to use a self documenting format like JSON. Any
> > > examples to share?
> > >
> > >
> > > On Wed, Nov 18, 2020 at 5:14 PM John Roesler <vvcep...@apache.org>
> > wrote:
> > >
> > > > Hi Eric,
> > > >
> > > > Ah, that’s a bummer. The correct serde is the session windowed serde,
> > as I
> > > > can see you know. I’m afraid I’m a bit rusty on implicit resolution
> > rules,
> > > > so I can’t be much help there.
> > > >
> > > > But my general recommendation for implicits is that when things get
> > weird,
> > > > just don’t use them at all. For example, you can just explicitly pass
> > the
> > > > Produced in the second arg list of ‘to’.
> > > >
> > > > One other tip is that the serialized form produced by those serdes is
> > kind
> > > > of specialized and might not be the most convenient for your use. If
> > this
> > > > is just a POC, if suggest mapping the keys to strings, so they are
> > > > human-readable. If this is a production use case, then you might want
> > to
> > > > use a more self-documenting format like JSON or AVRO. Just my two
> > cents.
> > > >
> > > > I hope this helps!
> > > > -John
> > > >
> > > > On Wed, Nov 18, 2020, at 14:02, Eric Beabes wrote:
> > > > > I keep getting '*ambiguous implicit values*' message in the following
> > > > code.
> > > > > I tried several things (as can be seen from a couple of lines I've
> > > > > commented out). Any ideas on how to fix this? This is in *Scala*.
> > > > >
> > > > >  def createTopology(conf: Config, properties: Properties): Topology =
> > > > > {//    implicit val sessionSerde =
> > > > > Serde[WindowedSerdes.SessionWindowedSerde[String]]//    implicit val
> > > > > produced: Produced[Windowed[String], Long] =
> > > > > Produced.`with`[WindowedSerdes.SessionWindowedSerde[String], Long]
> > > > >     implicit val produced: Produced[Windowed[String], Long] =
> > > > > Produced.`with`[Windowed[String], Long]
> > > > >     implicit val consumed: Consumed[String, String] =
> > > > > Consumed.`with`[String, String]
> > > > >
> > > > >     val builder: StreamsBuilder = new StreamsBuilder()
> > > > >     builder.stream("streams-plaintext-input")
> > > > >         .groupBy((_, word) => word)
> > > > >         .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 *
> > 1000)))
> > > > >         .count()
> > > > >         .toStream.to("streams-pipe-output")
> > > > >
> > > > >     builder.build()
> > > > >
> > > > >   }
> > > > >
> > > > > *Compiler Errors:*
> > > > >
> > > > > Error:(52, 78) ambiguous implicit values:
> > > > >  both method timeWindowedSerde in object Serdes of type [T](implicit
> > > > > tSerde:
> > > > >
> > > >
> > org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde[T]
> > > > >  and method sessionWindowedSerde in object Serdes of type
> > [T](implicit
> > > > > tSerde:
> > > > >
> > > >
> > org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.SessionWindowedSerde[T]
> > > > >  match expected type
> > > > >
> > > >
> > org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
> > > > >     implicit val produced: Produced[Windowed[String], Long] =
> > > > > Produced.`with`[Windowed[String], Long]
> > > > > Error:(52, 78) could not find implicit value for parameter keySerde:
> > > > >
> > > >
> > org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
> > > > >     implicit val produced: Produced[Windowed[String], Long] =
> > > > > Produced.`with`[Windowed[String], Long]
> > > > > Error:(52, 78) not enough arguments for method with: (implicit
> > > > > keySerde:
> > > > >
> > > >
> > org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]],
> > > > > implicit valueSerde:
> > > > >
> > > >
> > org.apache.kafka.common.serialization.Serde[Long])org.apache.kafka.streams.kstream.Produced[org.apache.kafka.streams.kstream.Windowed[String],Long].Unspecified
> > > > > value parameters keySerde, valueSerde.
> > > > >     implicit val produced: Produced[Windowed[String], Long] =
> > > > > Produced.`with`[Windowed[String], Long]
> > > > >
> > > >
> > >
> >
>

Reply via email to