Wow. This is amazing Daniel. THANKS A LOT! On Fri, Nov 20, 2020 at 9:44 AM Daniel Hinojosa <dhinoj...@evolutionnext.com> wrote:
> By the way. It was even cleaner than that. I decided on a hunch this > morning that there has to be something simpler and cleaner Checked the > documentation and there it was. > > You can just import these two statements and just do the stream without > implicit bindings: > > import org.apache.kafka.streams.scala.ImplicitConversions._ > import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde => _, _} > > Check the code on the repository I just did a push. Let me know what you > think. > > Danno! > > > > On Fri, Nov 20, 2020 at 12:58 AM Daniel Hinojosa < > dhinoj...@evolutionnext.com> wrote: > > > FYI, my goal was to get it to compile, since that is where > > implicit resolution takes place. Obviously, you would need to plug in the > > properties, start the stream, add a shutdown hook, etc. > > > > On Fri, Nov 20, 2020 at 12:56 AM Daniel Hinojosa < > > dhinoj...@evolutionnext.com> wrote: > > > >> Done. I am using SBT for the build tool. I created an application and > >> put it here > >> https://github.com/dhinojosa/kafka-streams-scala > >> > >> The implicits for SessionWindowedSerde from this line below. The reason > >> is I am turning off the timeWindowSerde, and that just leaves the > >> SessionWindowSerde, the last underscore is to include the rest from this > >> package. > >> > >> import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde => _, _} > >> > >> Everything should be in the project, it is now a 2.11.12 and Kafka > >> 2.4.0. The only thing you may want to change is the SBT version in > >> project/build.properties. :) > >> > >> > >> On Thu, Nov 19, 2020 at 10:06 PM Eric Beabes <mailinglist...@gmail.com> > >> wrote: > >> > >>> Daniel, > >>> > >>> I copied your code from here: > >>> https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735 > >>> > >>> Still getting the same error message. Actually, I am not sure why it > >>> would > >>> work since you don't have implicits for SessionWindowedSerde. By the > way, > >>> which versions of Scala & Kafka did you use? Maybe that's the issue. I > am > >>> using following versions: > >>> > >>> *Scala: 2.11.12* > >>> *Kafka: 2.4.0* > >>> > >>> Can you please try with these versions & let me know if it still works > >>> for > >>> you? Thanks for your help. > >>> > >>> > >>> > >>> > >>> > >>> > >>> On Thu, Nov 19, 2020 at 5:16 PM Liam Clarke-Hutchinson < > >>> liam.cla...@adscale.co.nz> wrote: > >>> > >>> > That said John, nothing wrong with being explicit in code. :) > >>> > > >>> > On Fri, 20 Nov. 2020, 1:46 pm John Roesler, <vvcep...@apache.org> > >>> wrote: > >>> > > >>> > > Oh, nice. Thanks, Daniel! > >>> > > > >>> > > That’s much nicer than my ham-handed approach. > >>> > > > >>> > > Thanks, > >>> > > John > >>> > > > >>> > > On Thu, Nov 19, 2020, at 17:44, Daniel Hinojosa wrote: > >>> > > > Hope this helps, I tried copying your code into a sample > >>> application. I > >>> > > got > >>> > > > it to compile with the implicits all resolving. I think the trick > >>> was > >>> > > there > >>> > > > were two implementations for Windowing Serdes. You just need to > >>> block > >>> > > one > >>> > > > from the imports. See if that fits with what you are doing. Oh > >>> also, > >>> > I > >>> > > > noticed that the types were not resolving when calling > >>> builder.stream, > >>> > > so I > >>> > > > put [String, String] in the builder. Here is a gist, which > formats > >>> > > better. > >>> > > > > >>> > > > > https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735 > >>> > > > > >>> > > > import java.time.Duration > >>> > > > import java.util.Properties > >>> > > > > >>> > > > import org.apache.kafka.common.config.Config > >>> > > > import org.apache.kafka.streams.Topology > >>> > > > import org.apache.kafka.streams.kstream.{SessionWindows, > Windowed} > >>> > > > import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde > => > >>> _, > >>> > _} > >>> > > > import org.apache.kafka.streams.scala.kstream.{Consumed, Grouped, > >>> > > > Materialized, Produced} > >>> > > > import org.apache.kafka.streams.scala.{ByteArraySessionStore, > >>> > > StreamsBuilder} > >>> > > > > >>> > > > class SampleStream { > >>> > > > def createTopology(conf: Config, properties: Properties): > >>> Topology = > >>> > { > >>> > > > > >>> > > > implicit val produced: Produced[Windowed[String], Long] = > >>> > > > Produced.`with`[Windowed[String], Long] > >>> > > > > >>> > > > implicit val grouped: Grouped[String, String] = > >>> > > > Grouped.`with`[String, String] > >>> > > > > >>> > > > implicit val consumed: Consumed[String, String] = > >>> > > > Consumed.`with`[String, String] > >>> > > > > >>> > > > implicit val materialized: Materialized[String, Long, > >>> > > > ByteArraySessionStore] = Materialized.`with`[String, Long, > >>> > > > ByteArraySessionStore] > >>> > > > > >>> > > > val builder: StreamsBuilder = new StreamsBuilder() > >>> > > > > >>> > > > builder > >>> > > > .stream[String, String]("streams-plaintext-input") > >>> > > > .groupBy((_, word) => word) > >>> > > > .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 * > >>> 1000))) > >>> > > > .count() > >>> > > > .toStream > >>> > > > .to("streams-pipe-output") > >>> > > > > >>> > > > builder.build() > >>> > > > } > >>> > > > } > >>> > > > > >>> > > > > >>> > > > > >>> > > > > >>> > > > On Thu, Nov 19, 2020 at 7:24 AM John Roesler < > vvcep...@apache.org> > >>> > > wrote: > >>> > > > > >>> > > > > Hi Eric, > >>> > > > > > >>> > > > > Sure thing. Assuming the definition of ‘produced’ you had tried > >>> in > >>> > your > >>> > > > > code, it’s just: > >>> > > > > > >>> > > > > ... > >>> > > > > .toStream.to("streams-pipe-output")(produced) > >>> > > > > > >>> > > > > As far as the json serde goes, I think that I wrote an example > of > >>> > using > >>> > > > > Jackson to implement a serde in Confluent’s > >>> kafka-streams-examples > >>> > > repo. > >>> > > > > I’m not sure what other/better examples > >>> > > > > might be out there. > >>> > > > > > >>> > > > > Hope this helps, > >>> > > > > John > >>> > > > > > >>> > > > > On Thu, Nov 19, 2020, at 00:27, Eric Beabes wrote: > >>> > > > > > Not sure what you mean by "pass it explicitly". The > definition > >>> of > >>> > > 'to' is > >>> > > > > > given below. Can we pass it explicitly in this case. If yes, > >>> can > >>> > you > >>> > > > > please > >>> > > > > > show me how? > >>> > > > > > > >>> > > > > > def to(topic: String)(implicit produced: Produced[K, V]): > Unit > >>> = > >>> > > > > > inner.to(topic, produced) > >>> > > > > > > >>> > > > > > > >>> > > > > > Also not sure how to use a self documenting format like JSON. > >>> Any > >>> > > > > > examples to share? > >>> > > > > > > >>> > > > > > > >>> > > > > > On Wed, Nov 18, 2020 at 5:14 PM John Roesler < > >>> vvcep...@apache.org> > >>> > > > > wrote: > >>> > > > > > > >>> > > > > > > Hi Eric, > >>> > > > > > > > >>> > > > > > > Ah, that’s a bummer. The correct serde is the session > >>> windowed > >>> > > serde, > >>> > > > > as I > >>> > > > > > > can see you know. I’m afraid I’m a bit rusty on implicit > >>> > resolution > >>> > > > > rules, > >>> > > > > > > so I can’t be much help there. > >>> > > > > > > > >>> > > > > > > But my general recommendation for implicits is that when > >>> things > >>> > get > >>> > > > > weird, > >>> > > > > > > just don’t use them at all. For example, you can just > >>> explicitly > >>> > > pass > >>> > > > > the > >>> > > > > > > Produced in the second arg list of ‘to’. > >>> > > > > > > > >>> > > > > > > One other tip is that the serialized form produced by those > >>> > serdes > >>> > > is > >>> > > > > kind > >>> > > > > > > of specialized and might not be the most convenient for > your > >>> use. > >>> > > If > >>> > > > > this > >>> > > > > > > is just a POC, if suggest mapping the keys to strings, so > >>> they > >>> > are > >>> > > > > > > human-readable. If this is a production use case, then you > >>> might > >>> > > want > >>> > > > > to > >>> > > > > > > use a more self-documenting format like JSON or AVRO. Just > >>> my two > >>> > > > > cents. > >>> > > > > > > > >>> > > > > > > I hope this helps! > >>> > > > > > > -John > >>> > > > > > > > >>> > > > > > > On Wed, Nov 18, 2020, at 14:02, Eric Beabes wrote: > >>> > > > > > > > I keep getting '*ambiguous implicit values*' message in > the > >>> > > following > >>> > > > > > > code. > >>> > > > > > > > I tried several things (as can be seen from a couple of > >>> lines > >>> > > I've > >>> > > > > > > > commented out). Any ideas on how to fix this? This is in > >>> > *Scala*. > >>> > > > > > > > > >>> > > > > > > > def createTopology(conf: Config, properties: > Properties): > >>> > > Topology = > >>> > > > > > > > {// implicit val sessionSerde = > >>> > > > > > > > Serde[WindowedSerdes.SessionWindowedSerde[String]]// > >>> > implicit > >>> > > val > >>> > > > > > > > produced: Produced[Windowed[String], Long] = > >>> > > > > > > > > >>> Produced.`with`[WindowedSerdes.SessionWindowedSerde[String], > >>> > > Long] > >>> > > > > > > > implicit val produced: Produced[Windowed[String], > >>> Long] = > >>> > > > > > > > Produced.`with`[Windowed[String], Long] > >>> > > > > > > > implicit val consumed: Consumed[String, String] = > >>> > > > > > > > Consumed.`with`[String, String] > >>> > > > > > > > > >>> > > > > > > > val builder: StreamsBuilder = new StreamsBuilder() > >>> > > > > > > > builder.stream("streams-plaintext-input") > >>> > > > > > > > .groupBy((_, word) => word) > >>> > > > > > > > > >>> .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 > >>> > * > >>> > > > > 1000))) > >>> > > > > > > > .count() > >>> > > > > > > > .toStream.to("streams-pipe-output") > >>> > > > > > > > > >>> > > > > > > > builder.build() > >>> > > > > > > > > >>> > > > > > > > } > >>> > > > > > > > > >>> > > > > > > > *Compiler Errors:* > >>> > > > > > > > > >>> > > > > > > > Error:(52, 78) ambiguous implicit values: > >>> > > > > > > > both method timeWindowedSerde in object Serdes of type > >>> > > [T](implicit > >>> > > > > > > > tSerde: > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > >>> > > > >>> > > >>> > org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde[T] > >>> > > > > > > > and method sessionWindowedSerde in object Serdes of type > >>> > > > > [T](implicit > >>> > > > > > > > tSerde: > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > >>> > > > >>> > > >>> > org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.SessionWindowedSerde[T] > >>> > > > > > > > match expected type > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > >>> > > > >>> > > >>> > org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]] > >>> > > > > > > > implicit val produced: Produced[Windowed[String], > >>> Long] = > >>> > > > > > > > Produced.`with`[Windowed[String], Long] > >>> > > > > > > > Error:(52, 78) could not find implicit value for > parameter > >>> > > keySerde: > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > >>> > > > >>> > > >>> > org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]] > >>> > > > > > > > implicit val produced: Produced[Windowed[String], > >>> Long] = > >>> > > > > > > > Produced.`with`[Windowed[String], Long] > >>> > > > > > > > Error:(52, 78) not enough arguments for method with: > >>> (implicit > >>> > > > > > > > keySerde: > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > >>> > > > >>> > > >>> > org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]], > >>> > > > > > > > implicit valueSerde: > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > >>> > > > >>> > > >>> > org.apache.kafka.common.serialization.Serde[Long])org.apache.kafka.streams.kstream.Produced[org.apache.kafka.streams.kstream.Windowed[String],Long].Unspecified > >>> > > > > > > > value parameters keySerde, valueSerde. > >>> > > > > > > > implicit val produced: Produced[Windowed[String], > >>> Long] = > >>> > > > > > > > Produced.`with`[Windowed[String], Long] > >>> > > > > > > > > >>> > > > > > > > >>> > > > > > > >>> > > > > > >>> > > > > >>> > > > >>> > > >>> > >> >