Hi Walker,

Thank you for the KIP!

I somehow agree that we should document that some tasks may be missing.

I have one question/comment. As far as I understand, your KIP adds two methods that return data that is actually hosted on the brokers, namely committedOffsets() and endOffsets(). Thus, we need a remote call to fetch the data and consequently the cost of calling localThreadMetaData() might increase substantially. I understand, that for committedOffsets(), we could avoid the remote call by maintaining the committedOffsets() locally, but can we also avoid the remote call for endOffsets()? Should we allow users to pass a parameter to localThreadMetaData() that skips the metadata that needs remote calls to keep the costs for use cases that do not need the end offsets low?

Best,
Bruno

On 02.03.21 02:18, Matthias J. Sax wrote:
but the user should
not rely on all tasks being returned at any given time to begin with since
it's possible we are in between revoking and re-assigning a partition.

Exactly. That is what I meant: the "hand off" phase of partitions during
a rebalance. During this phase, some tasks are "missing" if you
aggregate the information globally. My point was (even if it might be
obvious to us) that it seems to be worth pointing out in the KIPs and in
the docs.

I meant "partial information" from a global POV (not partial for a
single local instance).

Also I mention that they return the highest value they had seen
so far for any tasks they have assigned to them.

For the shutdown case maybe, but after a task is closed its metadata
should not be returned any longer IMHO.


-Matthias

On 3/1/21 4:46 PM, Walker Carlson wrote:
I updated to use Optional, good idea Mathias.

For the localThreadMetadata, it could already be called running a
rebalance. Also I mention that they return the highest value they had seen
so far for any tasks they have assigned to them. I thought it would be
useful to see the TaskMetadata while the Threads were shutting down. I
think that there shouldn't really be partial information. If you think this
should be clarified better let me know.

walker

On Mon, Mar 1, 2021 at 3:45 PM Sophie Blee-Goldman <sop...@confluent.io>
wrote:

Can you clarify your second question Matthias? If this is queried during
a cooperative rebalance, it should return the tasks as usual. If the user
is
using eager rebalancing then this will not return any tasks, but the user
should
not rely on all tasks being returned at any given time to begin with since
it's
possible we are in between revoking and re-assigning a partition.

What does "partial information" mean?

(btw I agree that an Optional makes sense for timeCurrentIdlingStarted())

On Mon, Mar 1, 2021 at 11:46 AM Matthias J. Sax <mj...@apache.org> wrote:

Thanks the updating the KIP Walker.

About, `timeCurrentIdlingStarted()`: should we return an `Optional`
instead of `-1` if a task is not idling.


As we allow to call `localThreadMetadata()` any time, could it be that
we report partial information during a rebalance? If yes, this should be
pointed out, because if one want to implement a health check this needs
to be taken into account.

-Matthias


On 2/27/21 11:32 AM, Walker Carlson wrote:
Sure thing Boyang,

1) it is in proposed changes. I expanded on it a bit more now.
2) done
3) and done :)

thanks for the suggestions,
walker

On Fri, Feb 26, 2021 at 3:10 PM Boyang Chen <
reluctanthero...@gmail.com>
wrote:

Thanks Walker. Some minor comments:

1. Could you add a reference to localThreadMetadata method in the KIP?
2. Could you make the code block as a java template, such that
TaskMetadata.java could be as the template title? Also it would be
good
to
add some meta comments about the newly added functions.
3. Could you write more details about rejected alternatives? Just as
why we
don't choose to expose as metrics, and how a new method on KStream is
not
favorable. These would be valuable when we look back on our design
decisions.

On Fri, Feb 26, 2021 at 11:23 AM Walker Carlson <
wcarl...@confluent.io>
wrote:

I understand now. I think that is a valid concern but I think it is
best
solved but having an external service verify through streams. As this
KIP
is now just adding fields to TaskMetadata to be returned in the
threadMetadata I am going to say that is out of scope.

That seems to be the last concern. If there are no others I will put
this
up for a vote soon.

walker

On Thu, Feb 25, 2021 at 12:35 PM Boyang Chen <
reluctanthero...@gmail.com

wrote:

For the 3rd point, yes, what I'm proposing is an edge case. For
example,
when we have 4 tasks [0_0, 0_1, 1_0, 1_1], and a bug in rebalancing
logic
causing no one gets 1_1 assigned. Then the health check service will
only
see 3 tasks [0_0, 0_1, 1_0] reporting progress normally while not
paying
attention to 1_1. What I want to expose is a "logical global" view
of
all
the tasks through the stream instance, since each instance gets the
assigned topology and should be able to infer all the exact tasks to
be
up
and running when the service is healthy.

On Thu, Feb 25, 2021 at 11:25 AM Walker Carlson <
wcarl...@confluent.io

wrote:

Thanks for the follow up Boyang and Guozhang,

I have updated the kip to include these ideas.

Guozhang, that is a good idea about using the TaskMetadata. We can
get
it
through the ThreadMetadata with a minor change to
`localThreadMetadata`
in
kafkaStreams. This means that we will only need to update
TaskMetadata
and
add no other APIs

Boyang, since each TaskMetadata contains the TaskId and
TopicPartitions I
don't believe mapping either way will be a problem. Also I think we
can
do
something like record the time the task started idling and when it
stops
idling we can override it to -1. I think that should clear up the
first
two
points.

As for your third point I am not sure I 100% understand. The
ThreadMetadata
will contain a set of all task assigned to that thread. Any health
check
service will just need to query all clients and aggregate their
responses
to get a complete picture of all tasks correct?

walker

On Thu, Feb 25, 2021 at 9:57 AM Guozhang Wang <wangg...@gmail.com>
wrote:

Regarding the second API and the `TaskStatus` class: I'd suggest
we
consolidate on the existing `TaskMetadata` since we have already
accumulated a bunch of such classes, and its better to keep them
small
as
public APIs. You can see
https://issues.apache.org/jira/browse/KAFKA-12370
for a reference and a proposal.

On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen <
reluctanthero...@gmail.com>
wrote:

Thanks for the updates Walker. Some replies and follow-up
questions:

1. I agree one task could have multiple partitions, but when we
hit a
delay
in terms of offset progress, do we have a convenient way to
reverse
mapping
TopicPartition to the problematic task? In production, I believe
it
would
be much quicker to identify the problem using task.id instead of
topic
partition, especially when it points to an internal topic. I
think
having
the task id as part of the entry value seems useful, which means
getting
something like Map<TopicPartition, TaskProgress> where
TaskProgress
contains both committed offsets & task id.

2. The task idling API was still confusing. I don't think we care
about
the
exact state when making tasksIdling()query, instead we care more
about
how
long one task has been in idle state since when you called, which
reflects
whether it is a normal idling period. So I feel it might be
helpful
to
track that time difference and report it in the TaskStatus
struct.

3. What I want to achieve to have some global mapping of either
TopicPartition or TaskId was that it is not possible for a health
check
service to report a task failure that doesn't emit any metrics.
So
as
long
as we have a global topic partition API, health check could
always
be
aware
of any task/partition not reporting its progress, does that make
sense?
If
you feel we have a better way to achieve this, such as querying
all
the
input/intermediate topic metadata directly from Kafka for the
baseline, I
think that should be good as well and worth mentioning it in the
KIP.

Also it seems that the KIP hasn't reflected what you proposed for
the
task
idling status.

Best,
Boyang


On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson <
wcarl...@confluent.io>
wrote:

Thank you for the comments everyone!

I think there are a few things I can clear up in general then I
will
specifically respond to each question.

First, when I say "idling" I refer to task idling. Where the
stream
is
intentionally not making progress. (
https://issues.apache.org/jira/browse/KAFKA-10091 is an
example).
This
becomes relevant if a task is waiting on one partition with no
data
but
that is holding up a partition with data. That would cause one
just
looking
at the committed offset changes to believe the task has a
problem
when
it
is working as intended.

In light of this confusion. I plan to change tasksIdling() to
`Map<TaskId,
TaskStatus> getTasksStatus()` this should hopefully make it
more
clear
what
is being exposed.

TaskStatus would include: TopicPartions, TaskId,
ProcessorTopology,
Idling,
and State.

Boyang:

2) I think that each task should report on whatever
TopicPartitions
they
hold, this means a Topic Partition might get reported twice but
the
user
can roll those up and use the larger one when looking at the
whole
app.

4) If the user collects the committed offsets across all the
running
clients there shouldn't be any tasks missing correct?

6) Because there is not a 1:1 mapping between Tasks and
TopicPartitions I
think it is cleaner to report them separately.

Guozhang:

1) Yes, that was my original plan but it made more sense to
mirror
how
the
consumer exposes the committed offset.

3) That is a good point. I think that we should include
internal
topics
as
well. I think that if the topology were to evolve there should
be
fair
warning anyways. Maybe you can clarify what would be limited by
exposing
the interior topics here? I thought a user could find them in
other
ways.
If it is the name we could aynomise them before exposing them.

Thank you all for your comments. If I did not respond directly
to
one
of
your questions I updated the kip to include the details it was
requesting.
I didn't not include my proposed changes mentioned earlier as I
would
like
to get some feedback about what to include in TaskStatus and in
general.

best,
Walker

On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang <
wangg...@gmail.com

wrote:

Hello Walker, thanks for the KIP. A few thoughts:

1) Have you considered just relying on the
`KafkaStreams#metrics()`
that
includes embedded consumer metrics that have the committed
offsets
instead of adding a new API? Not advocating that this is a
better
approach
but want to make sure we considered all options before we
come
to
the
"last
resort" of adding new public interfaces.

2) The javadoc mentions "tasks assigned to this client", but
the
returned
map is on partitions. I think we should make the javadoc and
the
return
types consistent, either tasks or topic partitions.

3) In addition, if for 2) above we ended up with topic
partitions,
then
would they include only external source topics, or also
including
internal
repartition / changelog topics? I think including only
external
source
topic partitions are not sufficient for your goal of tracking
progress,
but
exposing internal topic names are also a big commitment here
for
future
topology evolution.

4)  For "tasksIdling", I'm wondering if we can make it more
general,
that
the returned value is not just a boolean, but a TaskState
that
can
be
an
enum of "created, restoring, running, idle, closing". This
could
help
us
in
the future to track other things like restoration efficiency
and
rebalance
efficiency etc.

5) We need to clarify how is "idling" being defined here:
e.g.
we
can
clearly state that a task is considered idle only if 1) lag
is
increasing, indicating that there are indeed new records
arrived
at
source,
while committed offset is not advancing, AND 2) produced
offset
(imagine
we
may have punctuations that generate new data to the output
topic
even
if
there's no input for a while) is not advancing either.


Guozhang



On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <
reluctanthero...@gmail.com>
wrote:

Thanks Walker for the proposed KIP! This should definitely
empower
KStream
users with better visibility.

Meanwhile I got a couple of questions/suggestions:


1. typo "repost/report" in the motivation section.

2. What offsets do we report when the task is under
restoration
or
rebalancing?

3. IIUC, we should clearly state that our reported metrics
are
based
off
locally assigned tasks for each instance.

4. In the meantime, what’s our strategy to report tasks
that
are
not
local
to the instance? Users would normally try to monitor all
the
possible
tasks, and it’s unfortunate we couldn’t determine whether
we
have
lost
tasks. My brainstorming was whether it makes sense for the
leader
instance
to report the task progress as -1 for all “supposed to be
running”
tasks,
so that on the metrics collector side it could catch any
missing
tasks.

5. It seems not clear how users should use `isTaskIdling`.
Why
not
report a
map/set for idling tasks just as what we did for committed
offsets?

6. Why do we use TopicPartition instead of TaskId as the
key
in
the
returned map?
7. Could we include some details in where we got the commit
offsets
for
each task? Is it through consumer offset fetch, or the
stream
processing
progress based on the records fetched?


On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <
wcarl...@confluent.io>
wrote:

Hello all,

I would like to start discussion on KIP-715. This kip
aims
to
make
it
easier to monitor Kafka Streams progress by exposing the
committed
offset
in a similar way as the consumer client does.

Here is the KIP:
https://cwiki.apache.org/confluence/x/aRRRCg

Best,
Walker




--
-- Guozhang





--
-- Guozhang









Reply via email to