Hey John Fork - Yes; That's what I did in my POC, but my colleagues didn't like it so much, because of the maintenance burden.. Contribute - by all means. I was hoping there's a means to avoid this low level change, but if I end up making it, then it eventually should be part of the library. I agree.
BTW, looking for alternatives, I came across this library -- https://github.com/bakdata/kafka-error-handling It might be what we need, and also may be good to integrate into kafka-streams eventually. Thanks again :) rama On Tue, May 11, 2021 at 5:02 PM John Roesler <vvcep...@apache.org> wrote: > Hi Rama, > > Ah, I think I see what you are getting at now. > I thought you were trying to extend the implicits, > But now I’m thinking that what you want is to extend KStream.scala, > KTable.scala, etc. > > As you observed, those classes are final, but what you could do is > basically replace it with your own wrapper. I.e., you can more or less copy > the scala wrapper files into your own project and add the wrapping logic to > each of the wrapper methods. > > It would probably take a few days. It might become a pain to have to > maintain your wrapper when we add new methods, deprecate apis, etc. But on > the other hand, you have the benefits you observed. > > By the way, in case it’s what you were asking, I don’t think it’s possible > to substitute different implicits into our streams-scala module. In fact, > I’ve been wanting to take my own advice and make those implicit wrappers > explicit. > > By the way, if you do wind up essentially forking the scala wrapper and > adding this feature, and once you get some production experience with the > approach, you might consider contributing it back to Apache Kafka. I know > this feature has been discussed/requested before, and I think it would be a > great feature, if it can be made general enough. That might be good for you > as well, since it offers you a way to get off your “fork” in the long term > and avoid that maintenance burden. > > I hope this reply helps more than the last ones, > John > > On Tue, May 11, 2021, at 00:41, Rama Eshel wrote: > > Hi John, > > You are right, and I really appreciate your help. > > I don't understand implicits well enough, as explicitly said. > > > > However - those functions are extremely useful: All the scala application > > code passes through them, and a guarded version of them makes > applications > > guarded too. > > > > Put differently - why write functions, and explicitly wrap Predicates, > > ForEach functions, etc., when they all are already wrapped by the nature > of > > scala-kafka-streams? > > Instead I wanted to extend/replace that already existing wrap, much like > > I'd install producer/serdes exception handler, serdes, and any other > > class_config. > > > > Again - thank you very much. > > I mostly agree with the strong advice against implicits - but I kind of > > need to understand why my logic is so wrong. > > > > rama > > > > On Mon, May 10, 2021 at 11:15 PM John Roesler <vvcep...@apache.org> > wrote: > > > > > Hello Rama, > > > > > > There seems to be some fundamental confusion about how implicits > > > work in Scala here. > > > > > > I'd like to re-iterate my strong advice not to go > > > down this road. I've been there before, and it really does make it very > > > difficult to maintain your codebase. If you just write the methods you > > > have in mind to decorate Predicates, ForEach functions, etc., it's a > simple > > > matter to just wrap all of your own functions with those utilities in > the > > > source code. Then, anyone who looks at your codebase will know > > > what exactly gets executed at run-time, they won't have any trouble > > > at all interpreting stacktraces, etc. > > > > > > I can try to answer your questions kind of broadly. I haven't been a > > > full-time > > > Scala developer in several years, so my information may not be 100% > > > accurate here. As far as I understand, you cannot inherit implicits > > > transitively; > > > you have to explicitly import the implicits you want in the same source > > > code > > > file you want to use them. I think the Scala designers made that > choice to > > > help keep people from going insane. > > > > > > Secondly, those functions you cite are for translating between the > Scala > > > API and the Java API. If you're actually using the Scala API, then > these > > > functions wouldn't do anything useful to you anyway. > > > > > > I hope that helps, > > > -John > > > > > > On Sun, May 9, 2021, at 21:35, Rama Eshel wrote: > > > > Hi John, > > > > Thank you for your reply. > > > > > > > > The "built-in" FunctionsCompactConversions is "*private*[scala] > object > > > > FunctionsCompatConversions" , > > > > Also, it is explicitly imported from KStream/KGroupedStream/KTable > etc.. > > > > I don't understand implicit well enough, I guess - but: > > > > > > > > - I can't extend a "*private*[scala] object > > > FunctionsCompatConversions", > > > > right? > > > > - You wrote: > > > > > > > > *You can always copy/paste it’s logic into your implementation. * > > > > *Note also that you will have to make sure that in your topology > building > > > > code, you have to import your implicit converter, and you cannot > > > import > > > > ours. So I must ask -- *How? > > > > > > > > > > > > 1. I don't directly import FunctionsCompatConversions. Naturally, > I > > > > import KStream/KGroupedStream/KTable etc., and they import it. > > > > (See below an example from for KStream.scala) > > > > 2. Do I need to change anything in the apache-kafka-streams > code? > > > > I assume no. > > > > 3. Can I import the Guarded class once, e.g. in a "common" jar > > > > created w/ sbt-assembly? > > > > Or should I explicitly import an alternative > > > > FunctionsCompactConversions? > > > > - Before or after import KStream/KGroupedStream/KTable etc.? > > > > > > > > import org.apache.kafka.streams.scala.FunctionsCompatConversions.{ > > > > FlatValueMapperFromFunction, > > > > FlatValueMapperWithKeyFromFunction, > > > > ForeachActionFromFunction, > > > > KeyValueMapperFromFunction, > > > > MapperFromFunction, > > > > PredicateFromFunction, > > > > TransformerSupplierAsJava, > > > > ValueMapperFromFunction, > > > > ValueMapperWithKeyFromFunction, > > > > ValueTransformerSupplierAsJava, > > > > ValueTransformerSupplierWithKeyAsJava > > > > } > > > > > > > > > > > > On Sun, May 9, 2021 at 5:30 PM John Roesler <j...@vvcephei.org> > wrote: > > > > > > > > > Hi Rama, > > > > > > > > > > There has been discussion on and off for building something like > this > > > into > > > > > the library, and I think it’s a good idea, but we haven’t had > anyone > > > draft > > > > > a proposal. > > > > > > > > > > My bias with Scala is actually to avoid using implicits for > nontrivial > > > > > logic like this. It’s convenient, but it also makes the code hard > to > > > > > understand, especially for newcomers to your codebase. > > > > > > > > > > If you do want to stick with implicits, I’m not sure what the > problem > > > > > might be. If you can’t extend the library conversion, you can > always > > > > > copy/paste it’s logic into your implementation. > > > > > > > > > > Note also that you will have to make sure that in your topology > > > building > > > > > code, you have to import your implicit converter, and you cannot > > > import > > > > > ours. > > > > > > > > > > I hope this helps! > > > > > -John > > > > > > > > > > On Sun, May 9, 2021, at 02:20, Rama Eshel wrote: > > > > > > > > > > > > > > > https://stackoverflow.com/questions/67289232/kafkastreams-scala-replace-functionscompatconversions > > > > > > > > > > > > I asked this question ~10 days ago, and it now occurs to me that > I > > > asked > > > > > it > > > > > > in the wrong place. So trying here: > > > > > > > > > > > > > > > > > > I am using KafkaStreams 2.6.0, scala, in an existing bunch of > > > > > applications. > > > > > > > > > > > > I'm devising of a scheme to maximize the uptime/robustness and on > > > every > > > > > > exception, log + (discard or send-to-dead-letter-topic). I want > to do > > > > > this > > > > > > *without* explicitly adding Try/try-catch blocks all over the > > > > > applications. > > > > > > > > > > > > I had this idea, to replace FunctionsCompactConversions with my > own > > > > > > GuardedFunctionsCompactConversions, and add Try-s there e.g. > > > > > > > > > > > > replace > > > > > > > > > > > > implicit class ForeachActionFromFunction[K, V](val p: (K, V) => > > > > > > Unit) extends AnyVal { > > > > > > def asForeachAction: ForeachAction[K, V] = (key: K, value: > V) => > > > > > > p(key, value) > > > > > > } > > > > > > > > > > > > with > > > > > > > > > > > > implicit class ForeachActionFromFunction[K, V](val p: (K, V) => > > > > > > Unit) extends AnyVal { > > > > > > def asForeachAction: ForeachAction[K, V] = (key: K, value: > V) => > > > { > > > > > > setLogContext(value, key) > > > > > > Try(p(key, value)) match { > > > > > > case Success(_) => > > > > > > case Failure(ex) => Error(s"asForeachAction Failed $ex > when > > > > > > handle ($key, $value)") > > > > > > } > > > > > > } > > > > > > } > > > > > > > > > > > > or > > > > > > > > > > > > def asPredicate: Predicate[K, V] = (key: K, value: V) => > p(key, > > > > > value) > > > > > > > > > > > > with > > > > > > > > > > > > def asPredicate: Predicate[K, V] = (key: K, value: V) => { > > > > > > setLogContext(value, key) > > > > > > Try(p(key, value)) match { > > > > > > case Success(s) => s > > > > > > case Failure(ex) => > > > > > > Error(s"asPredicate Failed $ex when handle ($key, > $value)") > > > > > > false > > > > > > } > > > > > > } > > > > > > > > > > > > etc. This way - > > > > > > > > > > > > 1. > > > > > > > > > > > > All the application-provided code is guarded (predicates, > > > reducers, > > > > > > Serde, ...), and one can't "forget" to try/catch > > > > > > 2. > > > > > > > > > > > > Upon any error/exception, can log the message at hand, > providing > > > > > insight > > > > > > as to what the fix should be > > > > > > 3. > > > > > > > > > > > > Centrally able to disable this logging in production etc. > > > > > > 4. > > > > > > > > > > > > Centrally able to opt for the Guarded version if > troubleshooting, > > > > > while > > > > > > using the compact ones by default > > > > > > > > > > > > Unfortunately I failed to find the right path to do this. I > created > > > the > > > > > > GuardedFunctionsCompactConversions object, but could not > > > extend/override > > > > > > the compact one, nor get it imported into the proper > > > > > > KTable/KStream/KGroupedStream/... classes. > > > > > > > > > > > > Is this a common requirement? I expect it is. Is there a right > way > > > to get > > > > > > there? > > > > > > > > > > > > > > > > > > > > >