Hi John In the process of doing what you suggested, I have some scala-kafka-streams tests that fail to compile. They fail because they can't import EmbeddedKafkaCluster/IntegrationTestUtils. I 'jar tf'-ed, and found out that it's part of kafka-streams-n.n.n-*test* .jar, which is an artifact.
Questions: 1) Can it be added as an artifact? 2) Alternatively, can the classes required by the tests be moved from "integration.utils" test package to another artifact? e.g. kafka-streams-test-utils-n.n.n.jar? if both are no, then I'll just remove the tests that don't compile in my copy. Thanks rama On Tue, May 11, 2021 at 6:24 PM Rama Eshel <rama....@gmail.com> wrote: > Hey John > > Fork - Yes; That's what I did in my POC, but my colleagues didn't like it > so much, because of the maintenance burden.. > Contribute - by all means. I was hoping there's a means to avoid this low > level change, but if I end up making it, then it eventually should be part > of the library. I agree. > > BTW, looking for alternatives, I came across this library -- > https://github.com/bakdata/kafka-error-handling > It might be what we need, and also may be good to integrate into > kafka-streams eventually. > > Thanks again :) > rama > > > On Tue, May 11, 2021 at 5:02 PM John Roesler <vvcep...@apache.org> wrote: > >> Hi Rama, >> >> Ah, I think I see what you are getting at now. >> I thought you were trying to extend the implicits, >> But now I’m thinking that what you want is to extend KStream.scala, >> KTable.scala, etc. >> >> As you observed, those classes are final, but what you could do is >> basically replace it with your own wrapper. I.e., you can more or less copy >> the scala wrapper files into your own project and add the wrapping logic to >> each of the wrapper methods. >> >> It would probably take a few days. It might become a pain to have to >> maintain your wrapper when we add new methods, deprecate apis, etc. But on >> the other hand, you have the benefits you observed. >> >> By the way, in case it’s what you were asking, I don’t think it’s >> possible to substitute different implicits into our streams-scala module. >> In fact, I’ve been wanting to take my own advice and make those implicit >> wrappers explicit. >> >> By the way, if you do wind up essentially forking the scala wrapper and >> adding this feature, and once you get some production experience with the >> approach, you might consider contributing it back to Apache Kafka. I know >> this feature has been discussed/requested before, and I think it would be a >> great feature, if it can be made general enough. That might be good for you >> as well, since it offers you a way to get off your “fork” in the long term >> and avoid that maintenance burden. >> >> I hope this reply helps more than the last ones, >> John >> >> On Tue, May 11, 2021, at 00:41, Rama Eshel wrote: >> > Hi John, >> > You are right, and I really appreciate your help. >> > I don't understand implicits well enough, as explicitly said. >> > >> > However - those functions are extremely useful: All the scala >> application >> > code passes through them, and a guarded version of them makes >> applications >> > guarded too. >> > >> > Put differently - why write functions, and explicitly wrap Predicates, >> > ForEach functions, etc., when they all are already wrapped by the >> nature of >> > scala-kafka-streams? >> > Instead I wanted to extend/replace that already existing wrap, much like >> > I'd install producer/serdes exception handler, serdes, and any other >> > class_config. >> > >> > Again - thank you very much. >> > I mostly agree with the strong advice against implicits - but I kind of >> > need to understand why my logic is so wrong. >> > >> > rama >> > >> > On Mon, May 10, 2021 at 11:15 PM John Roesler <vvcep...@apache.org> >> wrote: >> > >> > > Hello Rama, >> > > >> > > There seems to be some fundamental confusion about how implicits >> > > work in Scala here. >> > > >> > > I'd like to re-iterate my strong advice not to go >> > > down this road. I've been there before, and it really does make it >> very >> > > difficult to maintain your codebase. If you just write the methods you >> > > have in mind to decorate Predicates, ForEach functions, etc., it's a >> simple >> > > matter to just wrap all of your own functions with those utilities in >> the >> > > source code. Then, anyone who looks at your codebase will know >> > > what exactly gets executed at run-time, they won't have any trouble >> > > at all interpreting stacktraces, etc. >> > > >> > > I can try to answer your questions kind of broadly. I haven't been a >> > > full-time >> > > Scala developer in several years, so my information may not be 100% >> > > accurate here. As far as I understand, you cannot inherit implicits >> > > transitively; >> > > you have to explicitly import the implicits you want in the same >> source >> > > code >> > > file you want to use them. I think the Scala designers made that >> choice to >> > > help keep people from going insane. >> > > >> > > Secondly, those functions you cite are for translating between the >> Scala >> > > API and the Java API. If you're actually using the Scala API, then >> these >> > > functions wouldn't do anything useful to you anyway. >> > > >> > > I hope that helps, >> > > -John >> > > >> > > On Sun, May 9, 2021, at 21:35, Rama Eshel wrote: >> > > > Hi John, >> > > > Thank you for your reply. >> > > > >> > > > The "built-in" FunctionsCompactConversions is "*private*[scala] >> object >> > > > FunctionsCompatConversions" , >> > > > Also, it is explicitly imported from KStream/KGroupedStream/KTable >> etc.. >> > > > I don't understand implicit well enough, I guess - but: >> > > > >> > > > - I can't extend a "*private*[scala] object >> > > FunctionsCompatConversions", >> > > > right? >> > > > - You wrote: >> > > > >> > > > *You can always copy/paste it’s logic into your implementation. * >> > > > *Note also that you will have to make sure that in your topology >> building >> > > > code, you have to import your implicit converter, and you cannot >> > > import >> > > > ours. So I must ask -- *How? >> > > > >> > > > >> > > > 1. I don't directly import FunctionsCompatConversions. >> Naturally, I >> > > > import KStream/KGroupedStream/KTable etc., and they import it. >> > > > (See below an example from for KStream.scala) >> > > > 2. Do I need to change anything in the apache-kafka-streams >> code? >> > > > I assume no. >> > > > 3. Can I import the Guarded class once, e.g. in a "common" jar >> > > > created w/ sbt-assembly? >> > > > Or should I explicitly import an alternative >> > > > FunctionsCompactConversions? >> > > > - Before or after import KStream/KGroupedStream/KTable >> etc.? >> > > > >> > > > import org.apache.kafka.streams.scala.FunctionsCompatConversions.{ >> > > > FlatValueMapperFromFunction, >> > > > FlatValueMapperWithKeyFromFunction, >> > > > ForeachActionFromFunction, >> > > > KeyValueMapperFromFunction, >> > > > MapperFromFunction, >> > > > PredicateFromFunction, >> > > > TransformerSupplierAsJava, >> > > > ValueMapperFromFunction, >> > > > ValueMapperWithKeyFromFunction, >> > > > ValueTransformerSupplierAsJava, >> > > > ValueTransformerSupplierWithKeyAsJava >> > > > } >> > > > >> > > > >> > > > On Sun, May 9, 2021 at 5:30 PM John Roesler <j...@vvcephei.org> >> wrote: >> > > > >> > > > > Hi Rama, >> > > > > >> > > > > There has been discussion on and off for building something like >> this >> > > into >> > > > > the library, and I think it’s a good idea, but we haven’t had >> anyone >> > > draft >> > > > > a proposal. >> > > > > >> > > > > My bias with Scala is actually to avoid using implicits for >> nontrivial >> > > > > logic like this. It’s convenient, but it also makes the code hard >> to >> > > > > understand, especially for newcomers to your codebase. >> > > > > >> > > > > If you do want to stick with implicits, I’m not sure what the >> problem >> > > > > might be. If you can’t extend the library conversion, you can >> always >> > > > > copy/paste it’s logic into your implementation. >> > > > > >> > > > > Note also that you will have to make sure that in your topology >> > > building >> > > > > code, you have to import your implicit converter, and you cannot >> > > import >> > > > > ours. >> > > > > >> > > > > I hope this helps! >> > > > > -John >> > > > > >> > > > > On Sun, May 9, 2021, at 02:20, Rama Eshel wrote: >> > > > > > >> > > > > >> > > >> https://stackoverflow.com/questions/67289232/kafkastreams-scala-replace-functionscompatconversions >> > > > > > >> > > > > > I asked this question ~10 days ago, and it now occurs to me >> that I >> > > asked >> > > > > it >> > > > > > in the wrong place. So trying here: >> > > > > > >> > > > > > >> > > > > > I am using KafkaStreams 2.6.0, scala, in an existing bunch of >> > > > > applications. >> > > > > > >> > > > > > I'm devising of a scheme to maximize the uptime/robustness and >> on >> > > every >> > > > > > exception, log + (discard or send-to-dead-letter-topic). I want >> to do >> > > > > this >> > > > > > *without* explicitly adding Try/try-catch blocks all over the >> > > > > applications. >> > > > > > >> > > > > > I had this idea, to replace FunctionsCompactConversions with my >> own >> > > > > > GuardedFunctionsCompactConversions, and add Try-s there e.g. >> > > > > > >> > > > > > replace >> > > > > > >> > > > > > implicit class ForeachActionFromFunction[K, V](val p: (K, V) >> => >> > > > > > Unit) extends AnyVal { >> > > > > > def asForeachAction: ForeachAction[K, V] = (key: K, value: >> V) => >> > > > > > p(key, value) >> > > > > > } >> > > > > > >> > > > > > with >> > > > > > >> > > > > > implicit class ForeachActionFromFunction[K, V](val p: (K, V) >> => >> > > > > > Unit) extends AnyVal { >> > > > > > def asForeachAction: ForeachAction[K, V] = (key: K, value: >> V) => >> > > { >> > > > > > setLogContext(value, key) >> > > > > > Try(p(key, value)) match { >> > > > > > case Success(_) => >> > > > > > case Failure(ex) => Error(s"asForeachAction Failed $ex >> when >> > > > > > handle ($key, $value)") >> > > > > > } >> > > > > > } >> > > > > > } >> > > > > > >> > > > > > or >> > > > > > >> > > > > > def asPredicate: Predicate[K, V] = (key: K, value: V) => >> p(key, >> > > > > value) >> > > > > > >> > > > > > with >> > > > > > >> > > > > > def asPredicate: Predicate[K, V] = (key: K, value: V) => { >> > > > > > setLogContext(value, key) >> > > > > > Try(p(key, value)) match { >> > > > > > case Success(s) => s >> > > > > > case Failure(ex) => >> > > > > > Error(s"asPredicate Failed $ex when handle ($key, >> $value)") >> > > > > > false >> > > > > > } >> > > > > > } >> > > > > > >> > > > > > etc. This way - >> > > > > > >> > > > > > 1. >> > > > > > >> > > > > > All the application-provided code is guarded (predicates, >> > > reducers, >> > > > > > Serde, ...), and one can't "forget" to try/catch >> > > > > > 2. >> > > > > > >> > > > > > Upon any error/exception, can log the message at hand, >> providing >> > > > > insight >> > > > > > as to what the fix should be >> > > > > > 3. >> > > > > > >> > > > > > Centrally able to disable this logging in production etc. >> > > > > > 4. >> > > > > > >> > > > > > Centrally able to opt for the Guarded version if >> troubleshooting, >> > > > > while >> > > > > > using the compact ones by default >> > > > > > >> > > > > > Unfortunately I failed to find the right path to do this. I >> created >> > > the >> > > > > > GuardedFunctionsCompactConversions object, but could not >> > > extend/override >> > > > > > the compact one, nor get it imported into the proper >> > > > > > KTable/KStream/KGroupedStream/... classes. >> > > > > > >> > > > > > Is this a common requirement? I expect it is. Is there a right >> way >> > > to get >> > > > > > there? >> > > > > > >> > > > > >> > > > >> > > >> > >> >