Hey Rama, Yes, that's right, the EmbeddedKafkaCluster is not a "public API", which is a perennial source of disappointment, since it is very useful for integration tests. I think that's another area that's "nice to have", but no one has really volunteered to write a KIP and add it officially.
Your assessment is correct; the class is in the test jar, which you could in principle depend on (for example: https://github.com/confluentinc/kafka-streams-examples/blob/6.1.1-post/pom.xml#L234-L240 ). And you are also correct in thinking that it's generally not a good practice to depend on another project's test modules. I think whether you remove the tests or not depends on whether you want to keep the integration test coverage. If you do want to keep them, you could bite the bullet and pull in the test module. You could also take a look around; I believe there are some third-party "embedded Kafka cluster" projects out there. But I don't know if they will be any more stable than the one in Kafka's test module. Good luck! -John On Tue, May 18, 2021, at 10:58, Rama Eshel wrote: > Hi John > > In the process of doing what you suggested, I have some scala-kafka-streams > tests that fail to compile. > They fail because they can't import > EmbeddedKafkaCluster/IntegrationTestUtils. > I 'jar tf'-ed, and found out that it's part of kafka-streams-n.n.n-*test* > .jar, which is an artifact. > > Questions: > 1) Can it be added as an artifact? > 2) Alternatively, can the classes required by the tests be moved from > "integration.utils" test package to another artifact? e.g. > kafka-streams-test-utils-n.n.n.jar? > > if both are no, then I'll just remove the tests that don't compile in my > copy. > > Thanks > rama > > > On Tue, May 11, 2021 at 6:24 PM Rama Eshel <rama....@gmail.com> wrote: > > > Hey John > > > > Fork - Yes; That's what I did in my POC, but my colleagues didn't like it > > so much, because of the maintenance burden.. > > Contribute - by all means. I was hoping there's a means to avoid this low > > level change, but if I end up making it, then it eventually should be part > > of the library. I agree. > > > > BTW, looking for alternatives, I came across this library -- > > https://github.com/bakdata/kafka-error-handling > > It might be what we need, and also may be good to integrate into > > kafka-streams eventually. > > > > Thanks again :) > > rama > > > > > > On Tue, May 11, 2021 at 5:02 PM John Roesler <vvcep...@apache.org> wrote: > > > >> Hi Rama, > >> > >> Ah, I think I see what you are getting at now. > >> I thought you were trying to extend the implicits, > >> But now I’m thinking that what you want is to extend KStream.scala, > >> KTable.scala, etc. > >> > >> As you observed, those classes are final, but what you could do is > >> basically replace it with your own wrapper. I.e., you can more or less copy > >> the scala wrapper files into your own project and add the wrapping logic to > >> each of the wrapper methods. > >> > >> It would probably take a few days. It might become a pain to have to > >> maintain your wrapper when we add new methods, deprecate apis, etc. But on > >> the other hand, you have the benefits you observed. > >> > >> By the way, in case it’s what you were asking, I don’t think it’s > >> possible to substitute different implicits into our streams-scala module. > >> In fact, I’ve been wanting to take my own advice and make those implicit > >> wrappers explicit. > >> > >> By the way, if you do wind up essentially forking the scala wrapper and > >> adding this feature, and once you get some production experience with the > >> approach, you might consider contributing it back to Apache Kafka. I know > >> this feature has been discussed/requested before, and I think it would be a > >> great feature, if it can be made general enough. That might be good for you > >> as well, since it offers you a way to get off your “fork” in the long term > >> and avoid that maintenance burden. > >> > >> I hope this reply helps more than the last ones, > >> John > >> > >> On Tue, May 11, 2021, at 00:41, Rama Eshel wrote: > >> > Hi John, > >> > You are right, and I really appreciate your help. > >> > I don't understand implicits well enough, as explicitly said. > >> > > >> > However - those functions are extremely useful: All the scala > >> application > >> > code passes through them, and a guarded version of them makes > >> applications > >> > guarded too. > >> > > >> > Put differently - why write functions, and explicitly wrap Predicates, > >> > ForEach functions, etc., when they all are already wrapped by the > >> nature of > >> > scala-kafka-streams? > >> > Instead I wanted to extend/replace that already existing wrap, much like > >> > I'd install producer/serdes exception handler, serdes, and any other > >> > class_config. > >> > > >> > Again - thank you very much. > >> > I mostly agree with the strong advice against implicits - but I kind of > >> > need to understand why my logic is so wrong. > >> > > >> > rama > >> > > >> > On Mon, May 10, 2021 at 11:15 PM John Roesler <vvcep...@apache.org> > >> wrote: > >> > > >> > > Hello Rama, > >> > > > >> > > There seems to be some fundamental confusion about how implicits > >> > > work in Scala here. > >> > > > >> > > I'd like to re-iterate my strong advice not to go > >> > > down this road. I've been there before, and it really does make it > >> very > >> > > difficult to maintain your codebase. If you just write the methods you > >> > > have in mind to decorate Predicates, ForEach functions, etc., it's a > >> simple > >> > > matter to just wrap all of your own functions with those utilities in > >> the > >> > > source code. Then, anyone who looks at your codebase will know > >> > > what exactly gets executed at run-time, they won't have any trouble > >> > > at all interpreting stacktraces, etc. > >> > > > >> > > I can try to answer your questions kind of broadly. I haven't been a > >> > > full-time > >> > > Scala developer in several years, so my information may not be 100% > >> > > accurate here. As far as I understand, you cannot inherit implicits > >> > > transitively; > >> > > you have to explicitly import the implicits you want in the same > >> source > >> > > code > >> > > file you want to use them. I think the Scala designers made that > >> choice to > >> > > help keep people from going insane. > >> > > > >> > > Secondly, those functions you cite are for translating between the > >> Scala > >> > > API and the Java API. If you're actually using the Scala API, then > >> these > >> > > functions wouldn't do anything useful to you anyway. > >> > > > >> > > I hope that helps, > >> > > -John > >> > > > >> > > On Sun, May 9, 2021, at 21:35, Rama Eshel wrote: > >> > > > Hi John, > >> > > > Thank you for your reply. > >> > > > > >> > > > The "built-in" FunctionsCompactConversions is "*private*[scala] > >> object > >> > > > FunctionsCompatConversions" , > >> > > > Also, it is explicitly imported from KStream/KGroupedStream/KTable > >> etc.. > >> > > > I don't understand implicit well enough, I guess - but: > >> > > > > >> > > > - I can't extend a "*private*[scala] object > >> > > FunctionsCompatConversions", > >> > > > right? > >> > > > - You wrote: > >> > > > > >> > > > *You can always copy/paste it’s logic into your implementation. * > >> > > > *Note also that you will have to make sure that in your topology > >> building > >> > > > code, you have to import your implicit converter, and you cannot > >> > > import > >> > > > ours. So I must ask -- *How? > >> > > > > >> > > > > >> > > > 1. I don't directly import FunctionsCompatConversions. > >> Naturally, I > >> > > > import KStream/KGroupedStream/KTable etc., and they import it. > >> > > > (See below an example from for KStream.scala) > >> > > > 2. Do I need to change anything in the apache-kafka-streams > >> code? > >> > > > I assume no. > >> > > > 3. Can I import the Guarded class once, e.g. in a "common" jar > >> > > > created w/ sbt-assembly? > >> > > > Or should I explicitly import an alternative > >> > > > FunctionsCompactConversions? > >> > > > - Before or after import KStream/KGroupedStream/KTable > >> etc.? > >> > > > > >> > > > import org.apache.kafka.streams.scala.FunctionsCompatConversions.{ > >> > > > FlatValueMapperFromFunction, > >> > > > FlatValueMapperWithKeyFromFunction, > >> > > > ForeachActionFromFunction, > >> > > > KeyValueMapperFromFunction, > >> > > > MapperFromFunction, > >> > > > PredicateFromFunction, > >> > > > TransformerSupplierAsJava, > >> > > > ValueMapperFromFunction, > >> > > > ValueMapperWithKeyFromFunction, > >> > > > ValueTransformerSupplierAsJava, > >> > > > ValueTransformerSupplierWithKeyAsJava > >> > > > } > >> > > > > >> > > > > >> > > > On Sun, May 9, 2021 at 5:30 PM John Roesler <j...@vvcephei.org> > >> wrote: > >> > > > > >> > > > > Hi Rama, > >> > > > > > >> > > > > There has been discussion on and off for building something like > >> this > >> > > into > >> > > > > the library, and I think it’s a good idea, but we haven’t had > >> anyone > >> > > draft > >> > > > > a proposal. > >> > > > > > >> > > > > My bias with Scala is actually to avoid using implicits for > >> nontrivial > >> > > > > logic like this. It’s convenient, but it also makes the code hard > >> to > >> > > > > understand, especially for newcomers to your codebase. > >> > > > > > >> > > > > If you do want to stick with implicits, I’m not sure what the > >> problem > >> > > > > might be. If you can’t extend the library conversion, you can > >> always > >> > > > > copy/paste it’s logic into your implementation. > >> > > > > > >> > > > > Note also that you will have to make sure that in your topology > >> > > building > >> > > > > code, you have to import your implicit converter, and you cannot > >> > > import > >> > > > > ours. > >> > > > > > >> > > > > I hope this helps! > >> > > > > -John > >> > > > > > >> > > > > On Sun, May 9, 2021, at 02:20, Rama Eshel wrote: > >> > > > > > > >> > > > > > >> > > > >> https://stackoverflow.com/questions/67289232/kafkastreams-scala-replace-functionscompatconversions > >> > > > > > > >> > > > > > I asked this question ~10 days ago, and it now occurs to me > >> that I > >> > > asked > >> > > > > it > >> > > > > > in the wrong place. So trying here: > >> > > > > > > >> > > > > > > >> > > > > > I am using KafkaStreams 2.6.0, scala, in an existing bunch of > >> > > > > applications. > >> > > > > > > >> > > > > > I'm devising of a scheme to maximize the uptime/robustness and > >> on > >> > > every > >> > > > > > exception, log + (discard or send-to-dead-letter-topic). I want > >> to do > >> > > > > this > >> > > > > > *without* explicitly adding Try/try-catch blocks all over the > >> > > > > applications. > >> > > > > > > >> > > > > > I had this idea, to replace FunctionsCompactConversions with my > >> own > >> > > > > > GuardedFunctionsCompactConversions, and add Try-s there e.g. > >> > > > > > > >> > > > > > replace > >> > > > > > > >> > > > > > implicit class ForeachActionFromFunction[K, V](val p: (K, V) > >> => > >> > > > > > Unit) extends AnyVal { > >> > > > > > def asForeachAction: ForeachAction[K, V] = (key: K, value: > >> V) => > >> > > > > > p(key, value) > >> > > > > > } > >> > > > > > > >> > > > > > with > >> > > > > > > >> > > > > > implicit class ForeachActionFromFunction[K, V](val p: (K, V) > >> => > >> > > > > > Unit) extends AnyVal { > >> > > > > > def asForeachAction: ForeachAction[K, V] = (key: K, value: > >> V) => > >> > > { > >> > > > > > setLogContext(value, key) > >> > > > > > Try(p(key, value)) match { > >> > > > > > case Success(_) => > >> > > > > > case Failure(ex) => Error(s"asForeachAction Failed $ex > >> when > >> > > > > > handle ($key, $value)") > >> > > > > > } > >> > > > > > } > >> > > > > > } > >> > > > > > > >> > > > > > or > >> > > > > > > >> > > > > > def asPredicate: Predicate[K, V] = (key: K, value: V) => > >> p(key, > >> > > > > value) > >> > > > > > > >> > > > > > with > >> > > > > > > >> > > > > > def asPredicate: Predicate[K, V] = (key: K, value: V) => { > >> > > > > > setLogContext(value, key) > >> > > > > > Try(p(key, value)) match { > >> > > > > > case Success(s) => s > >> > > > > > case Failure(ex) => > >> > > > > > Error(s"asPredicate Failed $ex when handle ($key, > >> $value)") > >> > > > > > false > >> > > > > > } > >> > > > > > } > >> > > > > > > >> > > > > > etc. This way - > >> > > > > > > >> > > > > > 1. > >> > > > > > > >> > > > > > All the application-provided code is guarded (predicates, > >> > > reducers, > >> > > > > > Serde, ...), and one can't "forget" to try/catch > >> > > > > > 2. > >> > > > > > > >> > > > > > Upon any error/exception, can log the message at hand, > >> providing > >> > > > > insight > >> > > > > > as to what the fix should be > >> > > > > > 3. > >> > > > > > > >> > > > > > Centrally able to disable this logging in production etc. > >> > > > > > 4. > >> > > > > > > >> > > > > > Centrally able to opt for the Guarded version if > >> troubleshooting, > >> > > > > while > >> > > > > > using the compact ones by default > >> > > > > > > >> > > > > > Unfortunately I failed to find the right path to do this. I > >> created > >> > > the > >> > > > > > GuardedFunctionsCompactConversions object, but could not > >> > > extend/override > >> > > > > > the compact one, nor get it imported into the proper > >> > > > > > KTable/KStream/KGroupedStream/... classes. > >> > > > > > > >> > > > > > Is this a common requirement? I expect it is. Is there a right > >> way > >> > > to get > >> > > > > > there? > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > >