Hi John, Thank you for your reply. The "built-in" FunctionsCompactConversions is "*private*[scala] object FunctionsCompatConversions" , Also, it is explicitly imported from KStream/KGroupedStream/KTable etc.. I don't understand implicit well enough, I guess - but:
- I can't extend a "*private*[scala] object FunctionsCompatConversions", right? - You wrote: *You can always copy/paste it’s logic into your implementation. * *Note also that you will have to make sure that in your topology building code, you have to import your implicit converter, and you cannot import ours. So I must ask -- *How? 1. I don't directly import FunctionsCompatConversions. Naturally, I import KStream/KGroupedStream/KTable etc., and they import it. (See below an example from for KStream.scala) 2. Do I need to change anything in the apache-kafka-streams code? I assume no. 3. Can I import the Guarded class once, e.g. in a "common" jar created w/ sbt-assembly? Or should I explicitly import an alternative FunctionsCompactConversions? - Before or after import KStream/KGroupedStream/KTable etc.? import org.apache.kafka.streams.scala.FunctionsCompatConversions.{ FlatValueMapperFromFunction, FlatValueMapperWithKeyFromFunction, ForeachActionFromFunction, KeyValueMapperFromFunction, MapperFromFunction, PredicateFromFunction, TransformerSupplierAsJava, ValueMapperFromFunction, ValueMapperWithKeyFromFunction, ValueTransformerSupplierAsJava, ValueTransformerSupplierWithKeyAsJava } On Sun, May 9, 2021 at 5:30 PM John Roesler <j...@vvcephei.org> wrote: > Hi Rama, > > There has been discussion on and off for building something like this into > the library, and I think it’s a good idea, but we haven’t had anyone draft > a proposal. > > My bias with Scala is actually to avoid using implicits for nontrivial > logic like this. It’s convenient, but it also makes the code hard to > understand, especially for newcomers to your codebase. > > If you do want to stick with implicits, I’m not sure what the problem > might be. If you can’t extend the library conversion, you can always > copy/paste it’s logic into your implementation. > > Note also that you will have to make sure that in your topology building > code, you have to import your implicit converter, and you cannot import > ours. > > I hope this helps! > -John > > On Sun, May 9, 2021, at 02:20, Rama Eshel wrote: > > > https://stackoverflow.com/questions/67289232/kafkastreams-scala-replace-functionscompatconversions > > > > I asked this question ~10 days ago, and it now occurs to me that I asked > it > > in the wrong place. So trying here: > > > > > > I am using KafkaStreams 2.6.0, scala, in an existing bunch of > applications. > > > > I'm devising of a scheme to maximize the uptime/robustness and on every > > exception, log + (discard or send-to-dead-letter-topic). I want to do > this > > *without* explicitly adding Try/try-catch blocks all over the > applications. > > > > I had this idea, to replace FunctionsCompactConversions with my own > > GuardedFunctionsCompactConversions, and add Try-s there e.g. > > > > replace > > > > implicit class ForeachActionFromFunction[K, V](val p: (K, V) => > > Unit) extends AnyVal { > > def asForeachAction: ForeachAction[K, V] = (key: K, value: V) => > > p(key, value) > > } > > > > with > > > > implicit class ForeachActionFromFunction[K, V](val p: (K, V) => > > Unit) extends AnyVal { > > def asForeachAction: ForeachAction[K, V] = (key: K, value: V) => { > > setLogContext(value, key) > > Try(p(key, value)) match { > > case Success(_) => > > case Failure(ex) => Error(s"asForeachAction Failed $ex when > > handle ($key, $value)") > > } > > } > > } > > > > or > > > > def asPredicate: Predicate[K, V] = (key: K, value: V) => p(key, > value) > > > > with > > > > def asPredicate: Predicate[K, V] = (key: K, value: V) => { > > setLogContext(value, key) > > Try(p(key, value)) match { > > case Success(s) => s > > case Failure(ex) => > > Error(s"asPredicate Failed $ex when handle ($key, $value)") > > false > > } > > } > > > > etc. This way - > > > > 1. > > > > All the application-provided code is guarded (predicates, reducers, > > Serde, ...), and one can't "forget" to try/catch > > 2. > > > > Upon any error/exception, can log the message at hand, providing > insight > > as to what the fix should be > > 3. > > > > Centrally able to disable this logging in production etc. > > 4. > > > > Centrally able to opt for the Guarded version if troubleshooting, > while > > using the compact ones by default > > > > Unfortunately I failed to find the right path to do this. I created the > > GuardedFunctionsCompactConversions object, but could not extend/override > > the compact one, nor get it imported into the proper > > KTable/KStream/KGroupedStream/... classes. > > > > Is this a common requirement? I expect it is. Is there a right way to get > > there? > > >