Wow. This is amazing Daniel. THANKS A LOT!

On Fri, Nov 20, 2020 at 9:44 AM Daniel Hinojosa <dhinoj...@evolutionnext.com>
wrote:

> By the way. It was even cleaner than that.  I decided on a hunch this
> morning that there has to be something simpler and cleaner Checked the
> documentation and there it was.
>
> You can just import these two statements and just do the stream without
> implicit bindings:
>
> import org.apache.kafka.streams.scala.ImplicitConversions._
> import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde => _, _}
>
> Check the code on the repository I just did a push. Let me know what you
> think.
>
> Danno!
>
>
>
> On Fri, Nov 20, 2020 at 12:58 AM Daniel Hinojosa <
> dhinoj...@evolutionnext.com> wrote:
>
> > FYI, my goal was to get it to compile, since that is where
> > implicit resolution takes place. Obviously, you would need to plug in the
> > properties, start the stream, add a shutdown hook, etc.
> >
> > On Fri, Nov 20, 2020 at 12:56 AM Daniel Hinojosa <
> > dhinoj...@evolutionnext.com> wrote:
> >
> >> Done.  I am using SBT for the build tool.  I created an application and
> >> put it here
> >> https://github.com/dhinojosa/kafka-streams-scala
> >>
> >> The implicits for SessionWindowedSerde from this line below. The reason
> >> is I am turning off the timeWindowSerde, and that just leaves the
> >> SessionWindowSerde, the last underscore is to include the rest from this
> >> package.
> >>
> >> import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde => _, _}
> >>
> >> Everything should be in the project, it is now a 2.11.12 and Kafka
> >> 2.4.0.  The only thing you may want to change is the SBT version in
> >> project/build.properties. :)
> >>
> >>
> >> On Thu, Nov 19, 2020 at 10:06 PM Eric Beabes <mailinglist...@gmail.com>
> >> wrote:
> >>
> >>> Daniel,
> >>>
> >>> I copied your code from here:
> >>> https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735
> >>>
> >>> Still getting the same error message. Actually, I am not sure why it
> >>> would
> >>> work since you don't have implicits for SessionWindowedSerde. By the
> way,
> >>> which versions of Scala & Kafka did you use? Maybe that's the issue. I
> am
> >>> using following versions:
> >>>
> >>> *Scala: 2.11.12*
> >>> *Kafka: 2.4.0*
> >>>
> >>> Can you please try with these versions & let me know if it still works
> >>> for
> >>> you? Thanks for your help.
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> On Thu, Nov 19, 2020 at 5:16 PM Liam Clarke-Hutchinson <
> >>> liam.cla...@adscale.co.nz> wrote:
> >>>
> >>> > That said John, nothing wrong with being explicit in code. :)
> >>> >
> >>> > On Fri, 20 Nov. 2020, 1:46 pm John Roesler, <vvcep...@apache.org>
> >>> wrote:
> >>> >
> >>> > > Oh, nice. Thanks, Daniel!
> >>> > >
> >>> > > That’s much nicer than my ham-handed approach.
> >>> > >
> >>> > > Thanks,
> >>> > > John
> >>> > >
> >>> > > On Thu, Nov 19, 2020, at 17:44, Daniel Hinojosa wrote:
> >>> > > > Hope this helps, I tried copying your code into a sample
> >>> application. I
> >>> > > got
> >>> > > > it to compile with the implicits all resolving. I think the trick
> >>> was
> >>> > > there
> >>> > > > were two implementations for Windowing Serdes.  You just need to
> >>> block
> >>> > > one
> >>> > > > from the imports.  See if that fits with what you are doing.  Oh
> >>> also,
> >>> > I
> >>> > > > noticed that the types were not resolving when calling
> >>> builder.stream,
> >>> > > so I
> >>> > > > put [String, String] in the builder.  Here is a gist, which
> formats
> >>> > > better.
> >>> > > >
> >>> > > >
> https://gist.github.com/dhinojosa/390f3a98d271272bbea90765cb58e735
> >>> > > >
> >>> > > > import java.time.Duration
> >>> > > > import java.util.Properties
> >>> > > >
> >>> > > > import org.apache.kafka.common.config.Config
> >>> > > > import org.apache.kafka.streams.Topology
> >>> > > > import org.apache.kafka.streams.kstream.{SessionWindows,
> Windowed}
> >>> > > > import org.apache.kafka.streams.scala.Serdes.{timeWindowedSerde
> =>
> >>> _,
> >>> > _}
> >>> > > > import org.apache.kafka.streams.scala.kstream.{Consumed, Grouped,
> >>> > > > Materialized, Produced}
> >>> > > > import org.apache.kafka.streams.scala.{ByteArraySessionStore,
> >>> > > StreamsBuilder}
> >>> > > >
> >>> > > > class SampleStream {
> >>> > > >   def createTopology(conf: Config, properties: Properties):
> >>> Topology =
> >>> > {
> >>> > > >
> >>> > > >     implicit val produced: Produced[Windowed[String], Long] =
> >>> > > >       Produced.`with`[Windowed[String], Long]
> >>> > > >
> >>> > > >     implicit val grouped: Grouped[String, String] =
> >>> > > >       Grouped.`with`[String, String]
> >>> > > >
> >>> > > >     implicit val consumed: Consumed[String, String] =
> >>> > > >       Consumed.`with`[String, String]
> >>> > > >
> >>> > > >     implicit val materialized: Materialized[String, Long,
> >>> > > > ByteArraySessionStore] = Materialized.`with`[String, Long,
> >>> > > > ByteArraySessionStore]
> >>> > > >
> >>> > > >     val builder: StreamsBuilder = new StreamsBuilder()
> >>> > > >
> >>> > > >     builder
> >>> > > >       .stream[String, String]("streams-plaintext-input")
> >>> > > >       .groupBy((_, word) => word)
> >>> > > >       .windowedBy(SessionWindows.`with`(Duration.ofMillis(60 *
> >>> 1000)))
> >>> > > >       .count()
> >>> > > >       .toStream
> >>> > > >       .to("streams-pipe-output")
> >>> > > >
> >>> > > >     builder.build()
> >>> > > >   }
> >>> > > > }
> >>> > > >
> >>> > > >
> >>> > > >
> >>> > > >
> >>> > > > On Thu, Nov 19, 2020 at 7:24 AM John Roesler <
> vvcep...@apache.org>
> >>> > > wrote:
> >>> > > >
> >>> > > > > Hi Eric,
> >>> > > > >
> >>> > > > > Sure thing. Assuming the definition of ‘produced’ you had tried
> >>> in
> >>> > your
> >>> > > > > code, it’s just:
> >>> > > > >
> >>> > > > > ...
> >>> > > > > .toStream.to("streams-pipe-output")(produced)
> >>> > > > >
> >>> > > > > As far as the json serde goes, I think that I wrote an example
> of
> >>> > using
> >>> > > > > Jackson to implement a serde in Confluent’s
> >>> kafka-streams-examples
> >>> > > repo.
> >>> > > > > I’m not sure what other/better examples
> >>> > > > > might be out there.
> >>> > > > >
> >>> > > > > Hope this helps,
> >>> > > > > John
> >>> > > > >
> >>> > > > > On Thu, Nov 19, 2020, at 00:27, Eric Beabes wrote:
> >>> > > > > > Not sure what you mean by "pass it explicitly". The
> definition
> >>> of
> >>> > > 'to' is
> >>> > > > > > given below. Can we pass it explicitly in this case. If yes,
> >>> can
> >>> > you
> >>> > > > > please
> >>> > > > > > show me how?
> >>> > > > > >
> >>> > > > > > def to(topic: String)(implicit produced: Produced[K, V]):
> Unit
> >>> =
> >>> > > > > >   inner.to(topic, produced)
> >>> > > > > >
> >>> > > > > >
> >>> > > > > > Also not sure how to use a self documenting format like JSON.
> >>> Any
> >>> > > > > > examples to share?
> >>> > > > > >
> >>> > > > > >
> >>> > > > > > On Wed, Nov 18, 2020 at 5:14 PM John Roesler <
> >>> vvcep...@apache.org>
> >>> > > > > wrote:
> >>> > > > > >
> >>> > > > > > > Hi Eric,
> >>> > > > > > >
> >>> > > > > > > Ah, that’s a bummer. The correct serde is the session
> >>> windowed
> >>> > > serde,
> >>> > > > > as I
> >>> > > > > > > can see you know. I’m afraid I’m a bit rusty on implicit
> >>> > resolution
> >>> > > > > rules,
> >>> > > > > > > so I can’t be much help there.
> >>> > > > > > >
> >>> > > > > > > But my general recommendation for implicits is that when
> >>> things
> >>> > get
> >>> > > > > weird,
> >>> > > > > > > just don’t use them at all. For example, you can just
> >>> explicitly
> >>> > > pass
> >>> > > > > the
> >>> > > > > > > Produced in the second arg list of ‘to’.
> >>> > > > > > >
> >>> > > > > > > One other tip is that the serialized form produced by those
> >>> > serdes
> >>> > > is
> >>> > > > > kind
> >>> > > > > > > of specialized and might not be the most convenient for
> your
> >>> use.
> >>> > > If
> >>> > > > > this
> >>> > > > > > > is just a POC, if suggest mapping the keys to strings, so
> >>> they
> >>> > are
> >>> > > > > > > human-readable. If this is a production use case, then you
> >>> might
> >>> > > want
> >>> > > > > to
> >>> > > > > > > use a more self-documenting format like JSON or AVRO. Just
> >>> my two
> >>> > > > > cents.
> >>> > > > > > >
> >>> > > > > > > I hope this helps!
> >>> > > > > > > -John
> >>> > > > > > >
> >>> > > > > > > On Wed, Nov 18, 2020, at 14:02, Eric Beabes wrote:
> >>> > > > > > > > I keep getting '*ambiguous implicit values*' message in
> the
> >>> > > following
> >>> > > > > > > code.
> >>> > > > > > > > I tried several things (as can be seen from a couple of
> >>> lines
> >>> > > I've
> >>> > > > > > > > commented out). Any ideas on how to fix this? This is in
> >>> > *Scala*.
> >>> > > > > > > >
> >>> > > > > > > >  def createTopology(conf: Config, properties:
> Properties):
> >>> > > Topology =
> >>> > > > > > > > {//    implicit val sessionSerde =
> >>> > > > > > > > Serde[WindowedSerdes.SessionWindowedSerde[String]]//
> >>> > implicit
> >>> > > val
> >>> > > > > > > > produced: Produced[Windowed[String], Long] =
> >>> > > > > > > >
> >>> Produced.`with`[WindowedSerdes.SessionWindowedSerde[String],
> >>> > > Long]
> >>> > > > > > > >     implicit val produced: Produced[Windowed[String],
> >>> Long] =
> >>> > > > > > > > Produced.`with`[Windowed[String], Long]
> >>> > > > > > > >     implicit val consumed: Consumed[String, String] =
> >>> > > > > > > > Consumed.`with`[String, String]
> >>> > > > > > > >
> >>> > > > > > > >     val builder: StreamsBuilder = new StreamsBuilder()
> >>> > > > > > > >     builder.stream("streams-plaintext-input")
> >>> > > > > > > >         .groupBy((_, word) => word)
> >>> > > > > > > >
> >>>  .windowedBy(SessionWindows.`with`(Duration.ofMillis(60
> >>> > *
> >>> > > > > 1000)))
> >>> > > > > > > >         .count()
> >>> > > > > > > >         .toStream.to("streams-pipe-output")
> >>> > > > > > > >
> >>> > > > > > > >     builder.build()
> >>> > > > > > > >
> >>> > > > > > > >   }
> >>> > > > > > > >
> >>> > > > > > > > *Compiler Errors:*
> >>> > > > > > > >
> >>> > > > > > > > Error:(52, 78) ambiguous implicit values:
> >>> > > > > > > >  both method timeWindowedSerde in object Serdes of type
> >>> > > [T](implicit
> >>> > > > > > > > tSerde:
> >>> > > > > > > >
> >>> > > > > > >
> >>> > > > >
> >>> > >
> >>> >
> >>>
> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.TimeWindowedSerde[T]
> >>> > > > > > > >  and method sessionWindowedSerde in object Serdes of type
> >>> > > > > [T](implicit
> >>> > > > > > > > tSerde:
> >>> > > > > > > >
> >>> > > > > > >
> >>> > > > >
> >>> > >
> >>> >
> >>>
> org.apache.kafka.common.serialization.Serde[T])org.apache.kafka.streams.kstream.WindowedSerdes.SessionWindowedSerde[T]
> >>> > > > > > > >  match expected type
> >>> > > > > > > >
> >>> > > > > > >
> >>> > > > >
> >>> > >
> >>> >
> >>>
> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
> >>> > > > > > > >     implicit val produced: Produced[Windowed[String],
> >>> Long] =
> >>> > > > > > > > Produced.`with`[Windowed[String], Long]
> >>> > > > > > > > Error:(52, 78) could not find implicit value for
> parameter
> >>> > > keySerde:
> >>> > > > > > > >
> >>> > > > > > >
> >>> > > > >
> >>> > >
> >>> >
> >>>
> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]]
> >>> > > > > > > >     implicit val produced: Produced[Windowed[String],
> >>> Long] =
> >>> > > > > > > > Produced.`with`[Windowed[String], Long]
> >>> > > > > > > > Error:(52, 78) not enough arguments for method with:
> >>> (implicit
> >>> > > > > > > > keySerde:
> >>> > > > > > > >
> >>> > > > > > >
> >>> > > > >
> >>> > >
> >>> >
> >>>
> org.apache.kafka.common.serialization.Serde[org.apache.kafka.streams.kstream.Windowed[String]],
> >>> > > > > > > > implicit valueSerde:
> >>> > > > > > > >
> >>> > > > > > >
> >>> > > > >
> >>> > >
> >>> >
> >>>
> org.apache.kafka.common.serialization.Serde[Long])org.apache.kafka.streams.kstream.Produced[org.apache.kafka.streams.kstream.Windowed[String],Long].Unspecified
> >>> > > > > > > > value parameters keySerde, valueSerde.
> >>> > > > > > > >     implicit val produced: Produced[Windowed[String],
> >>> Long] =
> >>> > > > > > > > Produced.`with`[Windowed[String], Long]
> >>> > > > > > > >
> >>> > > > > > >
> >>> > > > > >
> >>> > > > >
> >>> > > >
> >>> > >
> >>> >
> >>>
> >>
>

Reply via email to