Daniel, I copied your code from here: https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735
Still getting the same error message. Actually, I am not sure why it would work since you don't have implicits for SessionWindowedSerde. By the way, which versions of Scala & Kafka did you use? Maybe that's the issue. I am using following versions: *Scala: 2.11.12* *Kafka: 2.4.0* Can you please try with these versions & let me know if it still works for you? Thanks for your help. On Thu, Nov 19, 2020 at 5:16 PM Liam Clarke-Hutchinson < liam.cla...@adscale.co.nz> wrote: > That said John, nothing wrong with being explicit in code. :) > > On Fri, 20 Nov. 2020, 1:46 pm John Roesler, <vvcep...@apache.org> wrote: > > > Oh, nice. Thanks, Daniel! > > > > That’s much nicer than my ham-handed approach. > > > > Thanks, > > John > > > > On Thu, Nov 19, 2020, at 17:44, Daniel Hinojosa wrote: > > > Hope this helps, I tried copying your code into a sample application. I > > got > > > it to compile with the implicits all resolving. I think the trick was > > there > > > were two implementations for Windowing Serdes. You just need to block > > one > > > from the imports. See if that fits with what you are doing. Oh also, > I > > > noticed that the types were not resolving when calling builder.stream, > > so I > > > put [String, String] in the builder. Here is a gist, which formats > > better. > > > > > > https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735 > > > > > > import java.time.Duration > > > import java.util.Properties > > > > > > import org.apache.kafka.common.config.Config > > > import org.apache.kafka.streams.Topology > > > import org.apache.kafka.streams.kstream.{SessionWindows, Windowed} > > > import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde => _, > _} > > > import org.apache.kafka.streams.scala.kstream.{Consumed, Grouped, > > > Materialized, Produced} > > > import org.apache.kafka.streams.scala.{ByteArraySessionStore, > > StreamsBuilder} > > > > > > class SampleStream { > > > def createTopology(conf: Config, properties: Properties): Topology = > { > > > > > > implicit val produced: Produced[Windowed[String], Long] = > > > Produced.`with`[Windowed[String], Long] > > > > > > implicit val grouped: Grouped[String, String] = > > > Grouped.`with`[String, String] > > > > > > implicit val consumed: Consumed[String, String] = > > > Consumed.`with`[String, String] > > > > > > implicit val materialized: Materialized[String, Long, > > > ByteArraySessionStore] = Materialized.`with`[String, Long, > > > ByteArraySessionStore] > > > > > > val builder: StreamsBuilder = new StreamsBuilder() > > > > > > builder > > > .stream[String, String]("streams-plaintext-input") > > > .groupBy((_, word) => word) > > > .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * 1000))) > > > .count() > > > .toStream > > > .to("streams-pipe-output") > > > > > > builder.build() > > > } > > > } > > > > > > > > > > > > > > > On Thu, Nov 19, 2020 at 7:24 AM John Roesler <vvcep...@apache.org> > > wrote: > > > > > > > Hi Eric, > > > > > > > > Sure thing. Assuming the definition of ‘produced’ you had tried in > your > > > > code, it’s just: > > > > > > > > ... > > > > .toStream.to("streams-pipe-output")(produced) > > > > > > > > As far as the json serde goes, I think that I wrote an example of > using > > > > Jackson to implement a serde in Confluent’s kafka-streams-examples > > repo. > > > > I’m not sure what other/better examples > > > > might be out there. > > > > > > > > Hope this helps, > > > > John > > > > > > > > On Thu, Nov 19, 2020, at 00:27, Eric Beabes wrote: > > > > > Not sure what you mean by "pass it explicitly". The definition of > > 'to' is > > > > > given below. Can we pass it explicitly in this case. If yes, can > you > > > > please > > > > > show me how? > > > > > > > > > > def to(topic: String)(implicit produced: Produced[K, V]): Unit = > > > > > inner.to(topic, produced) > > > > > > > > > > > > > > > Also not sure how to use a self documenting format like JSON. Any > > > > > examples to share? > > > > > > > > > > > > > > > On Wed, Nov 18, 2020 at 5:14 PM John Roesler <vvcep...@apache.org> > > > > wrote: > > > > > > > > > > > Hi Eric, > > > > > > > > > > > > Ah, that’s a bummer. The correct serde is the session windowed > > serde, > > > > as I > > > > > > can see you know. I’m afraid I’m a bit rusty on implicit > resolution > > > > rules, > > > > > > so I can’t be much help there. > > > > > > > > > > > > But my general recommendation for implicits is that when things > get > > > > weird, > > > > > > just don’t use them at all. For example, you can just explicitly > > pass > > > > the > > > > > > Produced in the second arg list of ‘to’. > > > > > > > > > > > > One other tip is that the serialized form produced by those > serdes > > is > > > > kind > > > > > > of specialized and might not be the most convenient for your use. > > If > > > > this > > > > > > is just a POC, if suggest mapping the keys to strings, so they > are > > > > > > human-readable. If this is a production use case, then you might > > want > > > > to > > > > > > use a more self-documenting format like JSON or AVRO. Just my two > > > > cents. > > > > > > > > > > > > I hope this helps! > > > > > > -John > > > > > > > > > > > > On Wed, Nov 18, 2020, at 14:02, Eric Beabes wrote: > > > > > > > I keep getting '*ambiguous implicit values*' message in the > > following > > > > > > code. > > > > > > > I tried several things (as can be seen from a couple of lines > > I've > > > > > > > commented out). Any ideas on how to fix this? This is in > *Scala*. > > > > > > > > > > > > > > def createTopology(conf: Config, properties: Properties): > > Topology = > > > > > > > {// implicit val sessionSerde = > > > > > > > Serde[WindowedSerdes.SessionWindowedSerde[String]]// > implicit > > val > > > > > > > produced: Produced[Windowed[String], Long] = > > > > > > > Produced.`with`[WindowedSerdes.SessionWindowedSerde[String], > > Long] > > > > > > > implicit val produced: Produced[Windowed[String], Long] = > > > > > > > Produced.`with`[Windowed[String], Long] > > > > > > > implicit val consumed: Consumed[String, String] = > > > > > > > Consumed.`with`[String, String] > > > > > > > > > > > > > > val builder: StreamsBuilder = new StreamsBuilder() > > > > > > > builder.stream("streams-plaintext-input") > > > > > > > .groupBy((_, word) => word) > > > > > > > .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 > * > > > > 1000))) > > > > > > > .count() > > > > > > > .toStream.to("streams-pipe-output") > > > > > > > > > > > > > > builder.build() > > > > > > > > > > > > > > } > > > > > > > > > > > > > > *Compiler Errors:* > > > > > > > > > > > > > > Error:(52, 78) ambiguous implicit values: > > > > > > > both method timeWindowedSerde in object Serdes of type > > [T](implicit > > > > > > > tSerde: > > > > > > > > > > > > > > > > > > > > org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde[T] > > > > > > > and method sessionWindowedSerde in object Serdes of type > > > > [T](implicit > > > > > > > tSerde: > > > > > > > > > > > > > > > > > > > > org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.SessionWindowedSerde[T] > > > > > > > match expected type > > > > > > > > > > > > > > > > > > > > org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]] > > > > > > > implicit val produced: Produced[Windowed[String], Long] = > > > > > > > Produced.`with`[Windowed[String], Long] > > > > > > > Error:(52, 78) could not find implicit value for parameter > > keySerde: > > > > > > > > > > > > > > > > > > > > org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]] > > > > > > > implicit val produced: Produced[Windowed[String], Long] = > > > > > > > Produced.`with`[Windowed[String], Long] > > > > > > > Error:(52, 78) not enough arguments for method with: (implicit > > > > > > > keySerde: > > > > > > > > > > > > > > > > > > > > org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]], > > > > > > > implicit valueSerde: > > > > > > > > > > > > > > > > > > > > org.apache.kafka.common.serialization.Serde[Long])org.apache.kafka.streams.kstream.Produced[org.apache.kafka.streams.kstream.Windowed[String],Long].Unspecified > > > > > > > value parameters keySerde, valueSerde. > > > > > > > implicit val produced: Produced[Windowed[String], Long] = > > > > > > > Produced.`with`[Windowed[String], Long] > > > > > > > > > > > > > > > > > > > > > > > > > > > >