Hello Rama, There seems to be some fundamental confusion about how implicits work in Scala here.
I'd like to re-iterate my strong advice not to go down this road. I've been there before, and it really does make it very difficult to maintain your codebase. If you just write the methods you have in mind to decorate Predicates, ForEach functions, etc., it's a simple matter to just wrap all of your own functions with those utilities in the source code. Then, anyone who looks at your codebase will know what exactly gets executed at run-time, they won't have any trouble at all interpreting stacktraces, etc. I can try to answer your questions kind of broadly. I haven't been a full-time Scala developer in several years, so my information may not be 100% accurate here. As far as I understand, you cannot inherit implicits transitively; you have to explicitly import the implicits you want in the same source code file you want to use them. I think the Scala designers made that choice to help keep people from going insane. Secondly, those functions you cite are for translating between the Scala API and the Java API. If you're actually using the Scala API, then these functions wouldn't do anything useful to you anyway. I hope that helps, -John On Sun, May 9, 2021, at 21:35, Rama Eshel wrote: > Hi John, > Thank you for your reply. > > The "built-in" FunctionsCompactConversions is "*private*[scala] object > FunctionsCompatConversions" , > Also, it is explicitly imported from KStream/KGroupedStream/KTable etc.. > I don't understand implicit well enough, I guess - but: > > - I can't extend a "*private*[scala] object FunctionsCompatConversions", > right? > - You wrote: > > *You can always copy/paste it’s logic into your implementation. * > *Note also that you will have to make sure that in your topology building > code, you have to import your implicit converter, and you cannot import > ours. So I must ask -- *How? > > > 1. I don't directly import FunctionsCompatConversions. Naturally, I > import KStream/KGroupedStream/KTable etc., and they import it. > (See below an example from for KStream.scala) > 2. Do I need to change anything in the apache-kafka-streams code? > I assume no. > 3. Can I import the Guarded class once, e.g. in a "common" jar > created w/ sbt-assembly? > Or should I explicitly import an alternative > FunctionsCompactConversions? > - Before or after import KStream/KGroupedStream/KTable etc.? > > import org.apache.kafka.streams.scala.FunctionsCompatConversions.{ > FlatValueMapperFromFunction, > FlatValueMapperWithKeyFromFunction, > ForeachActionFromFunction, > KeyValueMapperFromFunction, > MapperFromFunction, > PredicateFromFunction, > TransformerSupplierAsJava, > ValueMapperFromFunction, > ValueMapperWithKeyFromFunction, > ValueTransformerSupplierAsJava, > ValueTransformerSupplierWithKeyAsJava > } > > > On Sun, May 9, 2021 at 5:30 PM John Roesler <j...@vvcephei.org> wrote: > > > Hi Rama, > > > > There has been discussion on and off for building something like this into > > the library, and I think it’s a good idea, but we haven’t had anyone draft > > a proposal. > > > > My bias with Scala is actually to avoid using implicits for nontrivial > > logic like this. It’s convenient, but it also makes the code hard to > > understand, especially for newcomers to your codebase. > > > > If you do want to stick with implicits, I’m not sure what the problem > > might be. If you can’t extend the library conversion, you can always > > copy/paste it’s logic into your implementation. > > > > Note also that you will have to make sure that in your topology building > > code, you have to import your implicit converter, and you cannot import > > ours. > > > > I hope this helps! > > -John > > > > On Sun, May 9, 2021, at 02:20, Rama Eshel wrote: > > > > > https://stackoverflow.com/questions/67289232/kafkastreams-scala-replace-functionscompatconversions > > > > > > I asked this question ~10 days ago, and it now occurs to me that I asked > > it > > > in the wrong place. So trying here: > > > > > > > > > I am using KafkaStreams 2.6.0, scala, in an existing bunch of > > applications. > > > > > > I'm devising of a scheme to maximize the uptime/robustness and on every > > > exception, log + (discard or send-to-dead-letter-topic). I want to do > > this > > > *without* explicitly adding Try/try-catch blocks all over the > > applications. > > > > > > I had this idea, to replace FunctionsCompactConversions with my own > > > GuardedFunctionsCompactConversions, and add Try-s there e.g. > > > > > > replace > > > > > > implicit class ForeachActionFromFunction[K, V](val p: (K, V) => > > > Unit) extends AnyVal { > > > def asForeachAction: ForeachAction[K, V] = (key: K, value: V) => > > > p(key, value) > > > } > > > > > > with > > > > > > implicit class ForeachActionFromFunction[K, V](val p: (K, V) => > > > Unit) extends AnyVal { > > > def asForeachAction: ForeachAction[K, V] = (key: K, value: V) => { > > > setLogContext(value, key) > > > Try(p(key, value)) match { > > > case Success(_) => > > > case Failure(ex) => Error(s"asForeachAction Failed $ex when > > > handle ($key, $value)") > > > } > > > } > > > } > > > > > > or > > > > > > def asPredicate: Predicate[K, V] = (key: K, value: V) => p(key, > > value) > > > > > > with > > > > > > def asPredicate: Predicate[K, V] = (key: K, value: V) => { > > > setLogContext(value, key) > > > Try(p(key, value)) match { > > > case Success(s) => s > > > case Failure(ex) => > > > Error(s"asPredicate Failed $ex when handle ($key, $value)") > > > false > > > } > > > } > > > > > > etc. This way - > > > > > > 1. > > > > > > All the application-provided code is guarded (predicates, reducers, > > > Serde, ...), and one can't "forget" to try/catch > > > 2. > > > > > > Upon any error/exception, can log the message at hand, providing > > insight > > > as to what the fix should be > > > 3. > > > > > > Centrally able to disable this logging in production etc. > > > 4. > > > > > > Centrally able to opt for the Guarded version if troubleshooting, > > while > > > using the compact ones by default > > > > > > Unfortunately I failed to find the right path to do this. I created the > > > GuardedFunctionsCompactConversions object, but could not extend/override > > > the compact one, nor get it imported into the proper > > > KTable/KStream/KGroupedStream/... classes. > > > > > > Is this a common requirement? I expect it is. Is there a right way to get > > > there? > > > > > >