Hey John

Fork - Yes;  That's what I did in my POC, but my colleagues didn't like it
so much, because of the maintenance burden..
Contribute - by all means. I was hoping there's a means to avoid this low
level change, but if I end up making it, then it eventually should be part
of the library. I agree.

BTW, looking for alternatives, I came across this library --
https://github.com/bakdata/kafka-error-handling
It might be what we need, and also may be good to integrate into
kafka-streams eventually.

Thanks again :)
rama


On Tue, May 11, 2021 at 5:02 PM John Roesler <vvcep...@apache.org> wrote:

> Hi Rama,
>
> Ah, I think I see what you are getting at now.
> I thought you were trying to extend the implicits,
> But now I’m thinking that what you want is to extend KStream.scala,
> KTable.scala, etc.
>
> As you observed, those classes are final, but what you could do is
> basically replace it with your own wrapper. I.e., you can more or less copy
> the scala wrapper files into your own project and add the wrapping logic to
> each of the wrapper methods.
>
> It would probably take a few days. It might become a pain to have to
> maintain your wrapper when we add new methods, deprecate apis, etc. But on
> the other hand, you have the benefits you observed.
>
> By the way, in case it’s what you were asking, I don’t think it’s possible
> to substitute different implicits into our streams-scala module. In fact,
> I’ve been wanting to take my own advice and make those implicit wrappers
> explicit.
>
> By the way, if you do wind up essentially forking the scala wrapper and
> adding this feature, and once you get some production experience with the
> approach, you might consider contributing it back to Apache Kafka. I know
> this feature has been discussed/requested before, and I think it would be a
> great feature, if it can be made general enough. That might be good for you
> as well, since it offers you a way to get off your “fork” in the long term
> and avoid that maintenance burden.
>
> I hope this reply helps more than the last ones,
> John
>
> On Tue, May 11, 2021, at 00:41, Rama Eshel wrote:
> > Hi John,
> > You are right, and I really appreciate your help.
> > I don't understand implicits well enough, as explicitly said.
> >
> > However - those functions are extremely useful: All the scala application
> > code passes through them, and a guarded version of them makes
> applications
> > guarded too.
> >
> > Put differently - why write functions, and explicitly wrap Predicates,
> > ForEach functions, etc., when they all are already wrapped by the nature
> of
> > scala-kafka-streams?
> > Instead I wanted to extend/replace that already existing wrap, much like
> > I'd install producer/serdes exception handler, serdes, and any other
> > class_config.
> >
> > Again - thank you very much.
> > I mostly agree with the strong advice against implicits - but I kind of
> > need to understand why my logic is so wrong.
> >
> > rama
> >
> > On Mon, May 10, 2021 at 11:15 PM John Roesler <vvcep...@apache.org>
> wrote:
> >
> > > Hello Rama,
> > >
> > > There seems to be some fundamental confusion about how implicits
> > > work in Scala here.
> > >
> > > I'd like to re-iterate my strong advice not to go
> > > down this road. I've been there before, and it really does make it very
> > > difficult to maintain your codebase. If you just write the methods you
> > > have in mind to decorate Predicates, ForEach functions, etc., it's a
> simple
> > > matter to just wrap all of your own functions with those utilities in
> the
> > > source code. Then, anyone who looks at your codebase will know
> > > what exactly gets executed at run-time, they won't have any trouble
> > > at all interpreting stacktraces, etc.
> > >
> > > I can try to answer your questions kind of broadly. I haven't been a
> > > full-time
> > > Scala developer in several years, so my information may not be 100%
> > > accurate here. As far as I understand, you cannot inherit implicits
> > > transitively;
> > > you have to explicitly import the implicits you want in the same source
> > > code
> > > file you want to use them. I think the Scala designers made that
> choice to
> > > help keep people from going insane.
> > >
> > > Secondly, those functions you cite are for translating between the
> Scala
> > > API and the Java API. If you're actually using the Scala API, then
> these
> > > functions wouldn't do anything useful to you anyway.
> > >
> > > I hope that helps,
> > > -John
> > >
> > > On Sun, May 9, 2021, at 21:35, Rama Eshel wrote:
> > > > Hi John,
> > > > Thank you for your reply.
> > > >
> > > > The "built-in" FunctionsCompactConversions is "*private*[scala]
> object
> > > > FunctionsCompatConversions" ,
> > > > Also, it is explicitly imported from KStream/KGroupedStream/KTable
> etc..
> > > > I don't understand implicit well enough, I guess - but:
> > > >
> > > >    - I can't extend a "*private*[scala] object
> > > FunctionsCompatConversions",
> > > >    right?
> > > >    - You wrote:
> > > >
> > > > *You can always copy/paste it’s logic into your implementation. *
> > > > *Note also that you will have to make sure that in your topology
> building
> > > >    code, you have to import  your implicit converter, and you cannot
> > > import
> > > >    ours. So I must ask -- *How?
> > > >
> > > >
> > > >    1. I don't directly import FunctionsCompatConversions. Naturally,
> I
> > > >       import KStream/KGroupedStream/KTable etc., and they import it.
> > > >       (See below an example from for KStream.scala)
> > > >       2. Do I need to change anything in the apache-kafka-streams
> code?
> > > >       I assume no.
> > > >       3. Can I import the Guarded class once, e.g. in a "common" jar
> > > >       created w/ sbt-assembly?
> > > >       Or should I explicitly import an alternative
> > > >       FunctionsCompactConversions?
> > > >          - Before or after import KStream/KGroupedStream/KTable etc.?
> > > >
> > > > import org.apache.kafka.streams.scala.FunctionsCompatConversions.{
> > > >   FlatValueMapperFromFunction,
> > > >   FlatValueMapperWithKeyFromFunction,
> > > >   ForeachActionFromFunction,
> > > >   KeyValueMapperFromFunction,
> > > >   MapperFromFunction,
> > > >   PredicateFromFunction,
> > > >   TransformerSupplierAsJava,
> > > >   ValueMapperFromFunction,
> > > >   ValueMapperWithKeyFromFunction,
> > > >   ValueTransformerSupplierAsJava,
> > > >   ValueTransformerSupplierWithKeyAsJava
> > > > }
> > > >
> > > >
> > > > On Sun, May 9, 2021 at 5:30 PM John Roesler <j...@vvcephei.org>
> wrote:
> > > >
> > > > > Hi Rama,
> > > > >
> > > > > There has been discussion on and off for building something like
> this
> > > into
> > > > > the library, and I think it’s a good idea, but we haven’t had
> anyone
> > > draft
> > > > > a proposal.
> > > > >
> > > > > My bias with Scala is actually to avoid using implicits for
> nontrivial
> > > > > logic like this. It’s convenient, but it also makes the code hard
> to
> > > > > understand, especially for newcomers to your codebase.
> > > > >
> > > > > If you do want to stick with implicits, I’m not sure what the
> problem
> > > > > might be. If you can’t extend the library conversion, you can
> always
> > > > > copy/paste it’s logic into your implementation.
> > > > >
> > > > > Note also that you will have to make sure that in your topology
> > > building
> > > > > code, you have to import  your implicit converter, and you cannot
> > > import
> > > > > ours.
> > > > >
> > > > > I hope this helps!
> > > > > -John
> > > > >
> > > > > On Sun, May 9, 2021, at 02:20, Rama Eshel wrote:
> > > > > >
> > > > >
> > >
> https://stackoverflow.com/questions/67289232/kafkastreams-scala-replace-functionscompatconversions
> > > > > >
> > > > > > I asked this question ~10 days ago, and it now occurs to me that
> I
> > > asked
> > > > > it
> > > > > > in the wrong place. So trying here:
> > > > > >
> > > > > >
> > > > > > I am using KafkaStreams 2.6.0, scala, in an existing bunch of
> > > > > applications.
> > > > > >
> > > > > > I'm devising of a scheme to maximize the uptime/robustness and on
> > > every
> > > > > > exception, log + (discard or send-to-dead-letter-topic). I want
> to do
> > > > > this
> > > > > > *without* explicitly adding Try/try-catch blocks all over the
> > > > > applications.
> > > > > >
> > > > > > I had this idea, to replace FunctionsCompactConversions with my
> own
> > > > > > GuardedFunctionsCompactConversions, and add Try-s there e.g.
> > > > > >
> > > > > > replace
> > > > > >
> > > > > >   implicit class ForeachActionFromFunction[K, V](val p: (K, V) =>
> > > > > > Unit) extends AnyVal {
> > > > > >     def asForeachAction: ForeachAction[K, V] = (key: K, value:
> V) =>
> > > > > > p(key, value)
> > > > > >   }
> > > > > >
> > > > > > with
> > > > > >
> > > > > >   implicit class ForeachActionFromFunction[K, V](val p: (K, V) =>
> > > > > > Unit) extends AnyVal {
> > > > > >     def asForeachAction: ForeachAction[K, V] = (key: K, value:
> V) =>
> > > {
> > > > > >       setLogContext(value, key)
> > > > > >       Try(p(key, value)) match {
> > > > > >         case Success(_) =>
> > > > > >         case Failure(ex) => Error(s"asForeachAction Failed $ex
> when
> > > > > > handle ($key, $value)")
> > > > > >       }
> > > > > >     }
> > > > > >   }
> > > > > >
> > > > > > or
> > > > > >
> > > > > >     def asPredicate: Predicate[K, V] = (key: K, value: V) =>
> p(key,
> > > > > value)
> > > > > >
> > > > > > with
> > > > > >
> > > > > >     def asPredicate: Predicate[K, V] = (key: K, value: V) => {
> > > > > >       setLogContext(value, key)
> > > > > >       Try(p(key, value)) match {
> > > > > >         case Success(s) => s
> > > > > >         case Failure(ex) =>
> > > > > >           Error(s"asPredicate Failed $ex when handle ($key,
> $value)")
> > > > > >           false
> > > > > >       }
> > > > > >     }
> > > > > >
> > > > > > etc. This way -
> > > > > >
> > > > > >    1.
> > > > > >
> > > > > >    All the application-provided code is guarded (predicates,
> > > reducers,
> > > > > >    Serde, ...), and one can't "forget" to try/catch
> > > > > >    2.
> > > > > >
> > > > > >    Upon any error/exception, can log the message at hand,
> providing
> > > > > insight
> > > > > >    as to what the fix should be
> > > > > >    3.
> > > > > >
> > > > > >    Centrally able to disable this logging in production etc.
> > > > > >    4.
> > > > > >
> > > > > >    Centrally able to opt for the Guarded version if
> troubleshooting,
> > > > > while
> > > > > >    using the compact ones by default
> > > > > >
> > > > > > Unfortunately I failed to find the right path to do this. I
> created
> > > the
> > > > > > GuardedFunctionsCompactConversions object, but could not
> > > extend/override
> > > > > > the compact one, nor get it imported into the proper
> > > > > > KTable/KStream/KGroupedStream/... classes.
> > > > > >
> > > > > > Is this a common requirement? I expect it is. Is there a right
> way
> > > to get
> > > > > > there?
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to