Hi John,
Thank you for your reply.

The "built-in" FunctionsCompactConversions is "*private*[scala] object
FunctionsCompatConversions" ,
Also, it is explicitly imported from KStream/KGroupedStream/KTable etc..
I don't understand implicit well enough, I guess - but:

   - I can't extend a "*private*[scala] object FunctionsCompatConversions",
   right?
   - You wrote:

*You can always copy/paste it’s logic into your implementation. *
*Note also that you will have to make sure that in your topology building
   code, you have to import  your implicit converter, and you cannot import
   ours. So I must ask -- *How?


   1. I don't directly import FunctionsCompatConversions. Naturally, I
      import KStream/KGroupedStream/KTable etc., and they import it.
      (See below an example from for KStream.scala)
      2. Do I need to change anything in the apache-kafka-streams code?
      I assume no.
      3. Can I import the Guarded class once, e.g. in a "common" jar
      created w/ sbt-assembly?
      Or should I explicitly import an alternative
      FunctionsCompactConversions?
         - Before or after import KStream/KGroupedStream/KTable etc.?

import org.apache.kafka.streams.scala.FunctionsCompatConversions.{
  FlatValueMapperFromFunction,
  FlatValueMapperWithKeyFromFunction,
  ForeachActionFromFunction,
  KeyValueMapperFromFunction,
  MapperFromFunction,
  PredicateFromFunction,
  TransformerSupplierAsJava,
  ValueMapperFromFunction,
  ValueMapperWithKeyFromFunction,
  ValueTransformerSupplierAsJava,
  ValueTransformerSupplierWithKeyAsJava
}


On Sun, May 9, 2021 at 5:30 PM John Roesler <j...@vvcephei.org> wrote:

> Hi Rama,
>
> There has been discussion on and off for building something like this into
> the library, and I think it’s a good idea, but we haven’t had anyone draft
> a proposal.
>
> My bias with Scala is actually to avoid using implicits for nontrivial
> logic like this. It’s convenient, but it also makes the code hard to
> understand, especially for newcomers to your codebase.
>
> If you do want to stick with implicits, I’m not sure what the problem
> might be. If you can’t extend the library conversion, you can always
> copy/paste it’s logic into your implementation.
>
> Note also that you will have to make sure that in your topology building
> code, you have to import  your implicit converter, and you cannot import
> ours.
>
> I hope this helps!
> -John
>
> On Sun, May 9, 2021, at 02:20, Rama Eshel wrote:
> >
> https://stackoverflow.com/questions/67289232/kafkastreams-scala-replace-functionscompatconversions
> >
> > I asked this question ~10 days ago, and it now occurs to me that I asked
> it
> > in the wrong place. So trying here:
> >
> >
> > I am using KafkaStreams 2.6.0, scala, in an existing bunch of
> applications.
> >
> > I'm devising of a scheme to maximize the uptime/robustness and on every
> > exception, log + (discard or send-to-dead-letter-topic). I want to do
> this
> > *without* explicitly adding Try/try-catch blocks all over the
> applications.
> >
> > I had this idea, to replace FunctionsCompactConversions with my own
> > GuardedFunctionsCompactConversions, and add Try-s there e.g.
> >
> > replace
> >
> >   implicit class ForeachActionFromFunction[K, V](val p: (K, V) =>
> > Unit) extends AnyVal {
> >     def asForeachAction: ForeachAction[K, V] = (key: K, value: V) =>
> > p(key, value)
> >   }
> >
> > with
> >
> >   implicit class ForeachActionFromFunction[K, V](val p: (K, V) =>
> > Unit) extends AnyVal {
> >     def asForeachAction: ForeachAction[K, V] = (key: K, value: V) => {
> >       setLogContext(value, key)
> >       Try(p(key, value)) match {
> >         case Success(_) =>
> >         case Failure(ex) => Error(s"asForeachAction Failed $ex when
> > handle ($key, $value)")
> >       }
> >     }
> >   }
> >
> > or
> >
> >     def asPredicate: Predicate[K, V] = (key: K, value: V) => p(key,
> value)
> >
> > with
> >
> >     def asPredicate: Predicate[K, V] = (key: K, value: V) => {
> >       setLogContext(value, key)
> >       Try(p(key, value)) match {
> >         case Success(s) => s
> >         case Failure(ex) =>
> >           Error(s"asPredicate Failed $ex when handle ($key, $value)")
> >           false
> >       }
> >     }
> >
> > etc. This way -
> >
> >    1.
> >
> >    All the application-provided code is guarded (predicates, reducers,
> >    Serde, ...), and one can't "forget" to try/catch
> >    2.
> >
> >    Upon any error/exception, can log the message at hand, providing
> insight
> >    as to what the fix should be
> >    3.
> >
> >    Centrally able to disable this logging in production etc.
> >    4.
> >
> >    Centrally able to opt for the Guarded version if troubleshooting,
> while
> >    using the compact ones by default
> >
> > Unfortunately I failed to find the right path to do this. I created the
> > GuardedFunctionsCompactConversions object, but could not extend/override
> > the compact one, nor get it imported into the proper
> > KTable/KStream/KGroupedStream/... classes.
> >
> > Is this a common requirement? I expect it is. Is there a right way to get
> > there?
> >
>

Reply via email to