Hi Rama,

Ah, I think I see what you are getting at now.
I thought you were trying to extend the implicits, 
But now I’m thinking that what you want is to extend KStream.scala, 
KTable.scala, etc. 

As you observed, those classes are final, but what you could do is basically 
replace it with your own wrapper. I.e., you can more or less copy the scala 
wrapper files into your own project and add the wrapping logic to each of the 
wrapper methods. 

It would probably take a few days. It might become a pain to have to maintain 
your wrapper when we add new methods, deprecate apis, etc. But on the other 
hand, you have the benefits you observed.

By the way, in case it’s what you were asking, I don’t think it’s possible to 
substitute different implicits into our streams-scala module. In fact, I’ve 
been wanting to take my own advice and make those implicit wrappers explicit. 

By the way, if you do wind up essentially forking the scala wrapper and adding 
this feature, and once you get some production experience with the approach, 
you might consider contributing it back to Apache Kafka. I know this feature 
has been discussed/requested before, and I think it would be a great feature, 
if it can be made general enough. That might be good for you as well, since it 
offers you a way to get off your “fork” in the long term and avoid that 
maintenance burden. 

I hope this reply helps more than the last ones,
John

On Tue, May 11, 2021, at 00:41, Rama Eshel wrote:
> Hi John,
> You are right, and I really appreciate your help.
> I don't understand implicits well enough, as explicitly said.
> 
> However - those functions are extremely useful: All the scala application
> code passes through them, and a guarded version of them makes applications
> guarded too.
> 
> Put differently - why write functions, and explicitly wrap Predicates,
> ForEach functions, etc., when they all are already wrapped by the nature of
> scala-kafka-streams?
> Instead I wanted to extend/replace that already existing wrap, much like
> I'd install producer/serdes exception handler, serdes, and any other
> class_config.
> 
> Again - thank you very much.
> I mostly agree with the strong advice against implicits - but I kind of
> need to understand why my logic is so wrong.
> 
> rama
> 
> On Mon, May 10, 2021 at 11:15 PM John Roesler <vvcep...@apache.org> wrote:
> 
> > Hello Rama,
> >
> > There seems to be some fundamental confusion about how implicits
> > work in Scala here.
> >
> > I'd like to re-iterate my strong advice not to go
> > down this road. I've been there before, and it really does make it very
> > difficult to maintain your codebase. If you just write the methods you
> > have in mind to decorate Predicates, ForEach functions, etc., it's a simple
> > matter to just wrap all of your own functions with those utilities in the
> > source code. Then, anyone who looks at your codebase will know
> > what exactly gets executed at run-time, they won't have any trouble
> > at all interpreting stacktraces, etc.
> >
> > I can try to answer your questions kind of broadly. I haven't been a
> > full-time
> > Scala developer in several years, so my information may not be 100%
> > accurate here. As far as I understand, you cannot inherit implicits
> > transitively;
> > you have to explicitly import the implicits you want in the same source
> > code
> > file you want to use them. I think the Scala designers made that choice to
> > help keep people from going insane.
> >
> > Secondly, those functions you cite are for translating between the Scala
> > API and the Java API. If you're actually using the Scala API, then these
> > functions wouldn't do anything useful to you anyway.
> >
> > I hope that helps,
> > -John
> >
> > On Sun, May 9, 2021, at 21:35, Rama Eshel wrote:
> > > Hi John,
> > > Thank you for your reply.
> > >
> > > The "built-in" FunctionsCompactConversions is "*private*[scala] object
> > > FunctionsCompatConversions" ,
> > > Also, it is explicitly imported from KStream/KGroupedStream/KTable etc..
> > > I don't understand implicit well enough, I guess - but:
> > >
> > >    - I can't extend a "*private*[scala] object
> > FunctionsCompatConversions",
> > >    right?
> > >    - You wrote:
> > >
> > > *You can always copy/paste it’s logic into your implementation. *
> > > *Note also that you will have to make sure that in your topology building
> > >    code, you have to import  your implicit converter, and you cannot
> > import
> > >    ours. So I must ask -- *How?
> > >
> > >
> > >    1. I don't directly import FunctionsCompatConversions. Naturally, I
> > >       import KStream/KGroupedStream/KTable etc., and they import it.
> > >       (See below an example from for KStream.scala)
> > >       2. Do I need to change anything in the apache-kafka-streams code?
> > >       I assume no.
> > >       3. Can I import the Guarded class once, e.g. in a "common" jar
> > >       created w/ sbt-assembly?
> > >       Or should I explicitly import an alternative
> > >       FunctionsCompactConversions?
> > >          - Before or after import KStream/KGroupedStream/KTable etc.?
> > >
> > > import org.apache.kafka.streams.scala.FunctionsCompatConversions.{
> > >   FlatValueMapperFromFunction,
> > >   FlatValueMapperWithKeyFromFunction,
> > >   ForeachActionFromFunction,
> > >   KeyValueMapperFromFunction,
> > >   MapperFromFunction,
> > >   PredicateFromFunction,
> > >   TransformerSupplierAsJava,
> > >   ValueMapperFromFunction,
> > >   ValueMapperWithKeyFromFunction,
> > >   ValueTransformerSupplierAsJava,
> > >   ValueTransformerSupplierWithKeyAsJava
> > > }
> > >
> > >
> > > On Sun, May 9, 2021 at 5:30 PM John Roesler <j...@vvcephei.org> wrote:
> > >
> > > > Hi Rama,
> > > >
> > > > There has been discussion on and off for building something like this
> > into
> > > > the library, and I think it’s a good idea, but we haven’t had anyone
> > draft
> > > > a proposal.
> > > >
> > > > My bias with Scala is actually to avoid using implicits for nontrivial
> > > > logic like this. It’s convenient, but it also makes the code hard to
> > > > understand, especially for newcomers to your codebase.
> > > >
> > > > If you do want to stick with implicits, I’m not sure what the problem
> > > > might be. If you can’t extend the library conversion, you can always
> > > > copy/paste it’s logic into your implementation.
> > > >
> > > > Note also that you will have to make sure that in your topology
> > building
> > > > code, you have to import  your implicit converter, and you cannot
> > import
> > > > ours.
> > > >
> > > > I hope this helps!
> > > > -John
> > > >
> > > > On Sun, May 9, 2021, at 02:20, Rama Eshel wrote:
> > > > >
> > > >
> > https://stackoverflow.com/questions/67289232/kafkastreams-scala-replace-functionscompatconversions
> > > > >
> > > > > I asked this question ~10 days ago, and it now occurs to me that I
> > asked
> > > > it
> > > > > in the wrong place. So trying here:
> > > > >
> > > > >
> > > > > I am using KafkaStreams 2.6.0, scala, in an existing bunch of
> > > > applications.
> > > > >
> > > > > I'm devising of a scheme to maximize the uptime/robustness and on
> > every
> > > > > exception, log + (discard or send-to-dead-letter-topic). I want to do
> > > > this
> > > > > *without* explicitly adding Try/try-catch blocks all over the
> > > > applications.
> > > > >
> > > > > I had this idea, to replace FunctionsCompactConversions with my own
> > > > > GuardedFunctionsCompactConversions, and add Try-s there e.g.
> > > > >
> > > > > replace
> > > > >
> > > > >   implicit class ForeachActionFromFunction[K, V](val p: (K, V) =>
> > > > > Unit) extends AnyVal {
> > > > >     def asForeachAction: ForeachAction[K, V] = (key: K, value: V) =>
> > > > > p(key, value)
> > > > >   }
> > > > >
> > > > > with
> > > > >
> > > > >   implicit class ForeachActionFromFunction[K, V](val p: (K, V) =>
> > > > > Unit) extends AnyVal {
> > > > >     def asForeachAction: ForeachAction[K, V] = (key: K, value: V) =>
> > {
> > > > >       setLogContext(value, key)
> > > > >       Try(p(key, value)) match {
> > > > >         case Success(_) =>
> > > > >         case Failure(ex) => Error(s"asForeachAction Failed $ex when
> > > > > handle ($key, $value)")
> > > > >       }
> > > > >     }
> > > > >   }
> > > > >
> > > > > or
> > > > >
> > > > >     def asPredicate: Predicate[K, V] = (key: K, value: V) => p(key,
> > > > value)
> > > > >
> > > > > with
> > > > >
> > > > >     def asPredicate: Predicate[K, V] = (key: K, value: V) => {
> > > > >       setLogContext(value, key)
> > > > >       Try(p(key, value)) match {
> > > > >         case Success(s) => s
> > > > >         case Failure(ex) =>
> > > > >           Error(s"asPredicate Failed $ex when handle ($key, $value)")
> > > > >           false
> > > > >       }
> > > > >     }
> > > > >
> > > > > etc. This way -
> > > > >
> > > > >    1.
> > > > >
> > > > >    All the application-provided code is guarded (predicates,
> > reducers,
> > > > >    Serde, ...), and one can't "forget" to try/catch
> > > > >    2.
> > > > >
> > > > >    Upon any error/exception, can log the message at hand, providing
> > > > insight
> > > > >    as to what the fix should be
> > > > >    3.
> > > > >
> > > > >    Centrally able to disable this logging in production etc.
> > > > >    4.
> > > > >
> > > > >    Centrally able to opt for the Guarded version if troubleshooting,
> > > > while
> > > > >    using the compact ones by default
> > > > >
> > > > > Unfortunately I failed to find the right path to do this. I created
> > the
> > > > > GuardedFunctionsCompactConversions object, but could not
> > extend/override
> > > > > the compact one, nor get it imported into the proper
> > > > > KTable/KStream/KGroupedStream/... classes.
> > > > >
> > > > > Is this a common requirement? I expect it is. Is there a right way
> > to get
> > > > > there?
> > > > >
> > > >
> > >
> >
> 

Reply via email to