Thanks! SGTM. -Matthias
On 9/3/20 3:17 AM, Bruno Cadonna wrote: > Hi Matthias, > > I replied inline. > > Best, > Bruno > > On 02.09.20 22:06, Matthias J. Sax wrote: >> Thanks for updating the KIP. >> >> Why do you propose to return `boolean` from addStreamThread() if the >> thread could not be started? As an alternative, we could also throw an >> exception if the client is not in state RUNNING? -- I guess both are >> valid options: just want to see what the pros/cons of each approach >> would be? >> > > I prefer to return a boolean because it is nothing exceptional if a > stream thread cannot be added due to an inappropriate state. State > changes are expected in Streams. Furthermore, users should not be forced > to control their program flow by catching exceptions. Let me give you > some examples for returning a boolean and throwing an exception: > > returning a boolean > > while (!kafkaStreams.addStreamThread() && > kafkaStreams.state() != State.NOT_RUNNING && > kafkaStreams.state() != State.ERROR) { > } > > > throwing an exception > > boolean added = false; > while (!added && > kafkaStreams.state() != State.NOT_RUNNING && > kafkaStreams.state() != State.ERROR) { > > try { > kafkaStreams.addStreamThread(); > added = true; > } catch (final Exception ex) { > // do nothing > } > } > > IMO the first example is more readable than the second. > > >> Btw: should we allow to add a new thread if the state is REBALANCING, >> too? I actually don't see a reason why we should not allow this? >> > > I guess you are right. I will update the KIP and include REBALANCING. > > >> For removeStreamThread(), might it be worth to actually guarantee that >> the thread with the largest index is stopped instead of leaving if >> unspecified? It does not seem to be a big burden on the implementation >> and given that we plan to reused indices of died threads, it might be >> nice to have a contract? Or would there be any negative impact if we >> guarantee it? >> > > I left unspecified which stream thread is removed since I could not find > any good reason for a guarantee. Also in your comment, I do not see what > advantage, we would have if we guaranteed that the stream thread with > the largest index is stopped. It would not guarantee that the next added > stream thread would get the largest index, because another stream thread > with a lower index could have failed in the meanwhile and now two > indices are up for grabs. > Leaving unspecified which stream thread is removed also gives us the > possibility to choose the stream thread to remove according to other > aspects like for example the one with the least local state. > > >> Another thought: should we add a parameter `numberOfThreads` to each >> method to allow users to start/stop multiple threads at once? >> > > I would keep it simple for now and add overloads if users request them. > > >> What happens if there is zero running threads and one calls >> removeStreamThread()? Should we also add a `boolean` flag and return >> `false` for this case (or throw an exception)? >> > > Yeah, I think this is a good idea for the programmatical removal of all > threads. However, I would not throw an exception for the reasons I > pointed out above. > > >> >> For the metric name, I would prefer "failed" over "crashed". Thoughts? >> > > I think I like "failed" more than "crashed" and it is also more > consistent with other parts of the code like the > ProductionExceptionHandlerResponse.FAIL. > > >> >> Side remark for the PR: can we make sure to update the description of >> `num.stream.threads` to explain that it's the _initial_ number of >> threads on startup? >> > > Good point! > >> >> -Matthias >> >> >> On 9/1/20 2:01 PM, Walker Carlson wrote: >>> Hi Bruno, >>> >>> I read through your updated KIP and it looks good to me. I agree with >>> adding the metric to keep track of crashed streams in replace of a >>> list of >>> dead streams. >>> >>> best, >>> Wlaker :) >>> >>> On Tue, Sep 1, 2020 at 1:05 PM Bruno Cadonna <br...@confluent.io> wrote: >>> >>>> Hi John, >>>> >>>> your proposal makes sense! I will update the KIP. >>>> >>>> Best, >>>> Bruno >>>> >>>> On 01.09.20 17:31, John Roesler wrote: >>>>> Hello Bruno, >>>>> >>>>> Thanks for the update! The KIP looks good to me; I only have >>>>> a grammatical complaint about the proposed metric name. >>>>> >>>>> "Died" is a verb, the past tense of "to die", but in the >>>>> expression,"x stream threads", x should be an adjective. To >>>>> be fair, "died" is also the past participle of "to die", and >>>>> participles can usually be used as adjectives. Maybe it >>>>> sounds wrong to me because there's already a specifically >>>>> adjectival form: "dead". So "dead-stream-threads" seems more >>>>> natural. >>>>> >>>>> However, I'm not sure if that captures the specific meaning >>>>> you're shooting for, namely that the metric counts only the >>>>> threads that died exceptionally, vs. from calling >>>>> "removeStreamThread()". What do you think of "crashed- >>>>> stream-threads" instead? >>>>> >>>>> Thanks, >>>>> -John >>>>> >>>>> On Tue, 2020-09-01 at 11:30 +0200, Bruno Cadonna wrote: >>>>>> Hi, >>>>>> >>>>>> I updated the KIP with the feedback so far. I removed the API to >>>>>> close >>>>>> the Kafka Streams client asynchronously, since it should be >>>>>> possible to >>>>>> avoid the deadlock with the existing method and without a KIP. >>>>>> >>>>>> Please have a look at the updated KIP and let me know what you think. >>>>>> >>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads >>>> >>>>>> >>>>>> Best, >>>>>> Bruno >>>>>> >>>>>> On 26.08.20 16:31, Bruno Cadonna wrote: >>>>>>> Hi, >>>>>>> >>>>>>> I would like to propose the following KIP to start and shut down >>>>>>> stream >>>>>>> threads during execution as well as to shut down asynchronously a >>>>>>> Kafka >>>>>>> Streams client from an uncaught exception handler. >>>>>>> >>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients >>>> >>>>>>> >>>>>>> >>>>>>> Best, >>>>>>> Bruno >>>>> >>>> >>> >>
signature.asc
Description: OpenPGP digital signature