Hi Matthias,
I replied inline.
Best,
Bruno
On 02.09.20 22:06, Matthias J. Sax wrote:
Thanks for updating the KIP.
Why do you propose to return `boolean` from addStreamThread() if the
thread could not be started? As an alternative, we could also throw an
exception if the client is not in state RUNNING? -- I guess both are
valid options: just want to see what the pros/cons of each approach
would be?
I prefer to return a boolean because it is nothing exceptional if a
stream thread cannot be added due to an inappropriate state. State
changes are expected in Streams. Furthermore, users should not be forced
to control their program flow by catching exceptions. Let me give you
some examples for returning a boolean and throwing an exception:
returning a boolean
while (!kafkaStreams.addStreamThread() &&
kafkaStreams.state() != State.NOT_RUNNING &&
kafkaStreams.state() != State.ERROR) {
}
throwing an exception
boolean added = false;
while (!added &&
kafkaStreams.state() != State.NOT_RUNNING &&
kafkaStreams.state() != State.ERROR) {
try {
kafkaStreams.addStreamThread();
added = true;
} catch (final Exception ex) {
// do nothing
}
}
IMO the first example is more readable than the second.
Btw: should we allow to add a new thread if the state is REBALANCING,
too? I actually don't see a reason why we should not allow this?
I guess you are right. I will update the KIP and include REBALANCING.
For removeStreamThread(), might it be worth to actually guarantee that
the thread with the largest index is stopped instead of leaving if
unspecified? It does not seem to be a big burden on the implementation
and given that we plan to reused indices of died threads, it might be
nice to have a contract? Or would there be any negative impact if we
guarantee it?
I left unspecified which stream thread is removed since I could not find
any good reason for a guarantee. Also in your comment, I do not see what
advantage, we would have if we guaranteed that the stream thread with
the largest index is stopped. It would not guarantee that the next added
stream thread would get the largest index, because another stream thread
with a lower index could have failed in the meanwhile and now two
indices are up for grabs.
Leaving unspecified which stream thread is removed also gives us the
possibility to choose the stream thread to remove according to other
aspects like for example the one with the least local state.
Another thought: should we add a parameter `numberOfThreads` to each
method to allow users to start/stop multiple threads at once?
I would keep it simple for now and add overloads if users request them.
What happens if there is zero running threads and one calls
removeStreamThread()? Should we also add a `boolean` flag and return
`false` for this case (or throw an exception)?
Yeah, I think this is a good idea for the programmatical removal of all
threads. However, I would not throw an exception for the reasons I
pointed out above.
For the metric name, I would prefer "failed" over "crashed". Thoughts?
I think I like "failed" more than "crashed" and it is also more
consistent with other parts of the code like the
ProductionExceptionHandlerResponse.FAIL.
Side remark for the PR: can we make sure to update the description of
`num.stream.threads` to explain that it's the _initial_ number of
threads on startup?
Good point!
-Matthias
On 9/1/20 2:01 PM, Walker Carlson wrote:
Hi Bruno,
I read through your updated KIP and it looks good to me. I agree with
adding the metric to keep track of crashed streams in replace of a list of
dead streams.
best,
Wlaker :)
On Tue, Sep 1, 2020 at 1:05 PM Bruno Cadonna <br...@confluent.io> wrote:
Hi John,
your proposal makes sense! I will update the KIP.
Best,
Bruno
On 01.09.20 17:31, John Roesler wrote:
Hello Bruno,
Thanks for the update! The KIP looks good to me; I only have
a grammatical complaint about the proposed metric name.
"Died" is a verb, the past tense of "to die", but in the
expression,"x stream threads", x should be an adjective. To
be fair, "died" is also the past participle of "to die", and
participles can usually be used as adjectives. Maybe it
sounds wrong to me because there's already a specifically
adjectival form: "dead". So "dead-stream-threads" seems more
natural.
However, I'm not sure if that captures the specific meaning
you're shooting for, namely that the metric counts only the
threads that died exceptionally, vs. from calling
"removeStreamThread()". What do you think of "crashed-
stream-threads" instead?
Thanks,
-John
On Tue, 2020-09-01 at 11:30 +0200, Bruno Cadonna wrote:
Hi,
I updated the KIP with the feedback so far. I removed the API to close
the Kafka Streams client asynchronously, since it should be possible to
avoid the deadlock with the existing method and without a KIP.
Please have a look at the updated KIP and let me know what you think.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads
Best,
Bruno
On 26.08.20 16:31, Bruno Cadonna wrote:
Hi,
I would like to propose the following KIP to start and shut down stream
threads during execution as well as to shut down asynchronously a Kafka
Streams client from an uncaught exception handler.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads+and+to+Request+Closing+of+Kafka+Streams+Clients
Best,
Bruno