Hi John

In the process of doing what you suggested, I have some scala-kafka-streams
tests that fail to compile.
They fail because they can't import
EmbeddedKafkaCluster/IntegrationTestUtils.
I 'jar tf'-ed, and found out that it's part of kafka-streams-n.n.n-*test*
.jar, which is an artifact.

Questions:
1) Can it be added as an artifact?
2) Alternatively, can the classes required by the tests be moved from
"integration.utils" test package to another artifact? e.g.
kafka-streams-test-utils-n.n.n.jar?

if both are no, then I'll just remove the tests that don't compile in my
copy.

Thanks
rama


On Tue, May 11, 2021 at 6:24 PM Rama Eshel <rama....@gmail.com> wrote:

> Hey John
>
> Fork - Yes;  That's what I did in my POC, but my colleagues didn't like it
> so much, because of the maintenance burden..
> Contribute - by all means. I was hoping there's a means to avoid this low
> level change, but if I end up making it, then it eventually should be part
> of the library. I agree.
>
> BTW, looking for alternatives, I came across this library --
> https://github.com/bakdata/kafka-error-handling
> It might be what we need, and also may be good to integrate into
> kafka-streams eventually.
>
> Thanks again :)
> rama
>
>
> On Tue, May 11, 2021 at 5:02 PM John Roesler <vvcep...@apache.org> wrote:
>
>> Hi Rama,
>>
>> Ah, I think I see what you are getting at now.
>> I thought you were trying to extend the implicits,
>> But now I’m thinking that what you want is to extend KStream.scala,
>> KTable.scala, etc.
>>
>> As you observed, those classes are final, but what you could do is
>> basically replace it with your own wrapper. I.e., you can more or less copy
>> the scala wrapper files into your own project and add the wrapping logic to
>> each of the wrapper methods.
>>
>> It would probably take a few days. It might become a pain to have to
>> maintain your wrapper when we add new methods, deprecate apis, etc. But on
>> the other hand, you have the benefits you observed.
>>
>> By the way, in case it’s what you were asking, I don’t think it’s
>> possible to substitute different implicits into our streams-scala module.
>> In fact, I’ve been wanting to take my own advice and make those implicit
>> wrappers explicit.
>>
>> By the way, if you do wind up essentially forking the scala wrapper and
>> adding this feature, and once you get some production experience with the
>> approach, you might consider contributing it back to Apache Kafka. I know
>> this feature has been discussed/requested before, and I think it would be a
>> great feature, if it can be made general enough. That might be good for you
>> as well, since it offers you a way to get off your “fork” in the long term
>> and avoid that maintenance burden.
>>
>> I hope this reply helps more than the last ones,
>> John
>>
>> On Tue, May 11, 2021, at 00:41, Rama Eshel wrote:
>> > Hi John,
>> > You are right, and I really appreciate your help.
>> > I don't understand implicits well enough, as explicitly said.
>> >
>> > However - those functions are extremely useful: All the scala
>> application
>> > code passes through them, and a guarded version of them makes
>> applications
>> > guarded too.
>> >
>> > Put differently - why write functions, and explicitly wrap Predicates,
>> > ForEach functions, etc., when they all are already wrapped by the
>> nature of
>> > scala-kafka-streams?
>> > Instead I wanted to extend/replace that already existing wrap, much like
>> > I'd install producer/serdes exception handler, serdes, and any other
>> > class_config.
>> >
>> > Again - thank you very much.
>> > I mostly agree with the strong advice against implicits - but I kind of
>> > need to understand why my logic is so wrong.
>> >
>> > rama
>> >
>> > On Mon, May 10, 2021 at 11:15 PM John Roesler <vvcep...@apache.org>
>> wrote:
>> >
>> > > Hello Rama,
>> > >
>> > > There seems to be some fundamental confusion about how implicits
>> > > work in Scala here.
>> > >
>> > > I'd like to re-iterate my strong advice not to go
>> > > down this road. I've been there before, and it really does make it
>> very
>> > > difficult to maintain your codebase. If you just write the methods you
>> > > have in mind to decorate Predicates, ForEach functions, etc., it's a
>> simple
>> > > matter to just wrap all of your own functions with those utilities in
>> the
>> > > source code. Then, anyone who looks at your codebase will know
>> > > what exactly gets executed at run-time, they won't have any trouble
>> > > at all interpreting stacktraces, etc.
>> > >
>> > > I can try to answer your questions kind of broadly. I haven't been a
>> > > full-time
>> > > Scala developer in several years, so my information may not be 100%
>> > > accurate here. As far as I understand, you cannot inherit implicits
>> > > transitively;
>> > > you have to explicitly import the implicits you want in the same
>> source
>> > > code
>> > > file you want to use them. I think the Scala designers made that
>> choice to
>> > > help keep people from going insane.
>> > >
>> > > Secondly, those functions you cite are for translating between the
>> Scala
>> > > API and the Java API. If you're actually using the Scala API, then
>> these
>> > > functions wouldn't do anything useful to you anyway.
>> > >
>> > > I hope that helps,
>> > > -John
>> > >
>> > > On Sun, May 9, 2021, at 21:35, Rama Eshel wrote:
>> > > > Hi John,
>> > > > Thank you for your reply.
>> > > >
>> > > > The "built-in" FunctionsCompactConversions is "*private*[scala]
>> object
>> > > > FunctionsCompatConversions" ,
>> > > > Also, it is explicitly imported from KStream/KGroupedStream/KTable
>> etc..
>> > > > I don't understand implicit well enough, I guess - but:
>> > > >
>> > > >    - I can't extend a "*private*[scala] object
>> > > FunctionsCompatConversions",
>> > > >    right?
>> > > >    - You wrote:
>> > > >
>> > > > *You can always copy/paste it’s logic into your implementation. *
>> > > > *Note also that you will have to make sure that in your topology
>> building
>> > > >    code, you have to import  your implicit converter, and you cannot
>> > > import
>> > > >    ours. So I must ask -- *How?
>> > > >
>> > > >
>> > > >    1. I don't directly import FunctionsCompatConversions.
>> Naturally, I
>> > > >       import KStream/KGroupedStream/KTable etc., and they import it.
>> > > >       (See below an example from for KStream.scala)
>> > > >       2. Do I need to change anything in the apache-kafka-streams
>> code?
>> > > >       I assume no.
>> > > >       3. Can I import the Guarded class once, e.g. in a "common" jar
>> > > >       created w/ sbt-assembly?
>> > > >       Or should I explicitly import an alternative
>> > > >       FunctionsCompactConversions?
>> > > >          - Before or after import KStream/KGroupedStream/KTable
>> etc.?
>> > > >
>> > > > import org.apache.kafka.streams.scala.FunctionsCompatConversions.{
>> > > >   FlatValueMapperFromFunction,
>> > > >   FlatValueMapperWithKeyFromFunction,
>> > > >   ForeachActionFromFunction,
>> > > >   KeyValueMapperFromFunction,
>> > > >   MapperFromFunction,
>> > > >   PredicateFromFunction,
>> > > >   TransformerSupplierAsJava,
>> > > >   ValueMapperFromFunction,
>> > > >   ValueMapperWithKeyFromFunction,
>> > > >   ValueTransformerSupplierAsJava,
>> > > >   ValueTransformerSupplierWithKeyAsJava
>> > > > }
>> > > >
>> > > >
>> > > > On Sun, May 9, 2021 at 5:30 PM John Roesler <j...@vvcephei.org>
>> wrote:
>> > > >
>> > > > > Hi Rama,
>> > > > >
>> > > > > There has been discussion on and off for building something like
>> this
>> > > into
>> > > > > the library, and I think it’s a good idea, but we haven’t had
>> anyone
>> > > draft
>> > > > > a proposal.
>> > > > >
>> > > > > My bias with Scala is actually to avoid using implicits for
>> nontrivial
>> > > > > logic like this. It’s convenient, but it also makes the code hard
>> to
>> > > > > understand, especially for newcomers to your codebase.
>> > > > >
>> > > > > If you do want to stick with implicits, I’m not sure what the
>> problem
>> > > > > might be. If you can’t extend the library conversion, you can
>> always
>> > > > > copy/paste it’s logic into your implementation.
>> > > > >
>> > > > > Note also that you will have to make sure that in your topology
>> > > building
>> > > > > code, you have to import  your implicit converter, and you cannot
>> > > import
>> > > > > ours.
>> > > > >
>> > > > > I hope this helps!
>> > > > > -John
>> > > > >
>> > > > > On Sun, May 9, 2021, at 02:20, Rama Eshel wrote:
>> > > > > >
>> > > > >
>> > >
>> https://stackoverflow.com/questions/67289232/kafkastreams-scala-replace-functionscompatconversions
>> > > > > >
>> > > > > > I asked this question ~10 days ago, and it now occurs to me
>> that I
>> > > asked
>> > > > > it
>> > > > > > in the wrong place. So trying here:
>> > > > > >
>> > > > > >
>> > > > > > I am using KafkaStreams 2.6.0, scala, in an existing bunch of
>> > > > > applications.
>> > > > > >
>> > > > > > I'm devising of a scheme to maximize the uptime/robustness and
>> on
>> > > every
>> > > > > > exception, log + (discard or send-to-dead-letter-topic). I want
>> to do
>> > > > > this
>> > > > > > *without* explicitly adding Try/try-catch blocks all over the
>> > > > > applications.
>> > > > > >
>> > > > > > I had this idea, to replace FunctionsCompactConversions with my
>> own
>> > > > > > GuardedFunctionsCompactConversions, and add Try-s there e.g.
>> > > > > >
>> > > > > > replace
>> > > > > >
>> > > > > >   implicit class ForeachActionFromFunction[K, V](val p: (K, V)
>> =>
>> > > > > > Unit) extends AnyVal {
>> > > > > >     def asForeachAction: ForeachAction[K, V] = (key: K, value:
>> V) =>
>> > > > > > p(key, value)
>> > > > > >   }
>> > > > > >
>> > > > > > with
>> > > > > >
>> > > > > >   implicit class ForeachActionFromFunction[K, V](val p: (K, V)
>> =>
>> > > > > > Unit) extends AnyVal {
>> > > > > >     def asForeachAction: ForeachAction[K, V] = (key: K, value:
>> V) =>
>> > > {
>> > > > > >       setLogContext(value, key)
>> > > > > >       Try(p(key, value)) match {
>> > > > > >         case Success(_) =>
>> > > > > >         case Failure(ex) => Error(s"asForeachAction Failed $ex
>> when
>> > > > > > handle ($key, $value)")
>> > > > > >       }
>> > > > > >     }
>> > > > > >   }
>> > > > > >
>> > > > > > or
>> > > > > >
>> > > > > >     def asPredicate: Predicate[K, V] = (key: K, value: V) =>
>> p(key,
>> > > > > value)
>> > > > > >
>> > > > > > with
>> > > > > >
>> > > > > >     def asPredicate: Predicate[K, V] = (key: K, value: V) => {
>> > > > > >       setLogContext(value, key)
>> > > > > >       Try(p(key, value)) match {
>> > > > > >         case Success(s) => s
>> > > > > >         case Failure(ex) =>
>> > > > > >           Error(s"asPredicate Failed $ex when handle ($key,
>> $value)")
>> > > > > >           false
>> > > > > >       }
>> > > > > >     }
>> > > > > >
>> > > > > > etc. This way -
>> > > > > >
>> > > > > >    1.
>> > > > > >
>> > > > > >    All the application-provided code is guarded (predicates,
>> > > reducers,
>> > > > > >    Serde, ...), and one can't "forget" to try/catch
>> > > > > >    2.
>> > > > > >
>> > > > > >    Upon any error/exception, can log the message at hand,
>> providing
>> > > > > insight
>> > > > > >    as to what the fix should be
>> > > > > >    3.
>> > > > > >
>> > > > > >    Centrally able to disable this logging in production etc.
>> > > > > >    4.
>> > > > > >
>> > > > > >    Centrally able to opt for the Guarded version if
>> troubleshooting,
>> > > > > while
>> > > > > >    using the compact ones by default
>> > > > > >
>> > > > > > Unfortunately I failed to find the right path to do this. I
>> created
>> > > the
>> > > > > > GuardedFunctionsCompactConversions object, but could not
>> > > extend/override
>> > > > > > the compact one, nor get it imported into the proper
>> > > > > > KTable/KStream/KGroupedStream/... classes.
>> > > > > >
>> > > > > > Is this a common requirement? I expect it is. Is there a right
>> way
>> > > to get
>> > > > > > there?
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to