Hi Rama,

There has been discussion on and off for building something like this into the 
library, and I think it’s a good idea, but we haven’t had anyone draft a 
proposal.

My bias with Scala is actually to avoid using implicits for nontrivial logic 
like this. It’s convenient, but it also makes the code hard to understand, 
especially for newcomers to your codebase.

If you do want to stick with implicits, I’m not sure what the problem might be. 
If you can’t extend the library conversion, you can always copy/paste it’s 
logic into your implementation.

Note also that you will have to make sure that in your topology building code, 
you have to import  your implicit converter, and you cannot import ours.

I hope this helps!
-John

On Sun, May 9, 2021, at 02:20, Rama Eshel wrote:
> https://stackoverflow.com/questions/67289232/kafkastreams-scala-replace-functionscompatconversions
> 
> I asked this question ~10 days ago, and it now occurs to me that I asked it
> in the wrong place. So trying here:
> 
> 
> I am using KafkaStreams 2.6.0, scala, in an existing bunch of applications.
> 
> I'm devising of a scheme to maximize the uptime/robustness and on every
> exception, log + (discard or send-to-dead-letter-topic). I want to do this
> *without* explicitly adding Try/try-catch blocks all over the applications.
> 
> I had this idea, to replace FunctionsCompactConversions with my own
> GuardedFunctionsCompactConversions, and add Try-s there e.g.
> 
> replace
> 
>   implicit class ForeachActionFromFunction[K, V](val p: (K, V) =>
> Unit) extends AnyVal {
>     def asForeachAction: ForeachAction[K, V] = (key: K, value: V) =>
> p(key, value)
>   }
> 
> with
> 
>   implicit class ForeachActionFromFunction[K, V](val p: (K, V) =>
> Unit) extends AnyVal {
>     def asForeachAction: ForeachAction[K, V] = (key: K, value: V) => {
>       setLogContext(value, key)
>       Try(p(key, value)) match {
>         case Success(_) =>
>         case Failure(ex) => Error(s"asForeachAction Failed $ex when
> handle ($key, $value)")
>       }
>     }
>   }
> 
> or
> 
>     def asPredicate: Predicate[K, V] = (key: K, value: V) => p(key, value)
> 
> with
> 
>     def asPredicate: Predicate[K, V] = (key: K, value: V) => {
>       setLogContext(value, key)
>       Try(p(key, value)) match {
>         case Success(s) => s
>         case Failure(ex) =>
>           Error(s"asPredicate Failed $ex when handle ($key, $value)")
>           false
>       }
>     }
> 
> etc. This way -
> 
>    1.
> 
>    All the application-provided code is guarded (predicates, reducers,
>    Serde, ...), and one can't "forget" to try/catch
>    2.
> 
>    Upon any error/exception, can log the message at hand, providing insight
>    as to what the fix should be
>    3.
> 
>    Centrally able to disable this logging in production etc.
>    4.
> 
>    Centrally able to opt for the Guarded version if troubleshooting, while
>    using the compact ones by default
> 
> Unfortunately I failed to find the right path to do this. I created the
> GuardedFunctionsCompactConversions object, but could not extend/override
> the compact one, nor get it imported into the proper
> KTable/KStream/KGroupedStream/... classes.
> 
> Is this a common requirement? I expect it is. Is there a right way to get
> there?
> 

Reply via email to