Sure thing Boyang,

1) it is in proposed changes. I expanded on it a bit more now.
2) done
3) and done :)

thanks for the suggestions,
walker

On Fri, Feb 26, 2021 at 3:10 PM Boyang Chen <reluctanthero...@gmail.com>
wrote:

> Thanks Walker. Some minor comments:
>
> 1. Could you add a reference to localThreadMetadata method in the KIP?
> 2. Could you make the code block as a java template, such that
> TaskMetadata.java could be as the template title? Also it would be good to
> add some meta comments about the newly added functions.
> 3. Could you write more details about rejected alternatives? Just as why we
> don't choose to expose as metrics, and how a new method on KStream is not
> favorable. These would be valuable when we look back on our design
> decisions.
>
> On Fri, Feb 26, 2021 at 11:23 AM Walker Carlson <wcarl...@confluent.io>
> wrote:
>
> > I understand now. I think that is a valid concern but I think it is best
> > solved but having an external service verify through streams. As this KIP
> > is now just adding fields to TaskMetadata to be returned in the
> > threadMetadata I am going to say that is out of scope.
> >
> > That seems to be the last concern. If there are no others I will put this
> > up for a vote soon.
> >
> > walker
> >
> > On Thu, Feb 25, 2021 at 12:35 PM Boyang Chen <reluctanthero...@gmail.com
> >
> > wrote:
> >
> > > For the 3rd point, yes, what I'm proposing is an edge case. For
> example,
> > > when we have 4 tasks [0_0, 0_1, 1_0, 1_1], and a bug in rebalancing
> logic
> > > causing no one gets 1_1 assigned. Then the health check service will
> only
> > > see 3 tasks [0_0, 0_1, 1_0] reporting progress normally while not
> paying
> > > attention to 1_1. What I want to expose is a "logical global" view of
> all
> > > the tasks through the stream instance, since each instance gets the
> > > assigned topology and should be able to infer all the exact tasks to be
> > up
> > > and running when the service is healthy.
> > >
> > > On Thu, Feb 25, 2021 at 11:25 AM Walker Carlson <wcarl...@confluent.io
> >
> > > wrote:
> > >
> > > > Thanks for the follow up Boyang and Guozhang,
> > > >
> > > > I have updated the kip to include these ideas.
> > > >
> > > > Guozhang, that is a good idea about using the TaskMetadata. We can
> get
> > it
> > > > through the ThreadMetadata with a minor change to
> `localThreadMetadata`
> > > in
> > > > kafkaStreams. This means that we will only need to update
> TaskMetadata
> > > and
> > > > add no other APIs
> > > >
> > > > Boyang, since each TaskMetadata contains the TaskId and
> > TopicPartitions I
> > > > don't believe mapping either way will be a problem. Also I think we
> can
> > > do
> > > > something like record the time the task started idling and when it
> > stops
> > > > idling we can override it to -1. I think that should clear up the
> first
> > > two
> > > > points.
> > > >
> > > > As for your third point I am not sure I 100% understand. The
> > > ThreadMetadata
> > > > will contain a set of all task assigned to that thread. Any health
> > check
> > > > service will just need to query all clients and aggregate their
> > responses
> > > > to get a complete picture of all tasks correct?
> > > >
> > > > walker
> > > >
> > > > On Thu, Feb 25, 2021 at 9:57 AM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Regarding the second API and the `TaskStatus` class: I'd suggest we
> > > > > consolidate on the existing `TaskMetadata` since we have already
> > > > > accumulated a bunch of such classes, and its better to keep them
> > small
> > > as
> > > > > public APIs. You can see
> > > > https://issues.apache.org/jira/browse/KAFKA-12370
> > > > > for a reference and a proposal.
> > > > >
> > > > > On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen <
> > > reluctanthero...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Thanks for the updates Walker. Some replies and follow-up
> > questions:
> > > > > >
> > > > > > 1. I agree one task could have multiple partitions, but when we
> > hit a
> > > > > delay
> > > > > > in terms of offset progress, do we have a convenient way to
> reverse
> > > > > mapping
> > > > > > TopicPartition to the problematic task? In production, I believe
> it
> > > > would
> > > > > > be much quicker to identify the problem using task.id instead of
> > > topic
> > > > > > partition, especially when it points to an internal topic. I
> think
> > > > having
> > > > > > the task id as part of the entry value seems useful, which means
> > > > getting
> > > > > > something like Map<TopicPartition, TaskProgress> where
> TaskProgress
> > > > > > contains both committed offsets & task id.
> > > > > >
> > > > > > 2. The task idling API was still confusing. I don't think we care
> > > about
> > > > > the
> > > > > > exact state when making tasksIdling()query, instead we care more
> > > about
> > > > > how
> > > > > > long one task has been in idle state since when you called, which
> > > > > reflects
> > > > > > whether it is a normal idling period. So I feel it might be
> helpful
> > > to
> > > > > > track that time difference and report it in the TaskStatus
> struct.
> > > > > >
> > > > > > 3. What I want to achieve to have some global mapping of either
> > > > > > TopicPartition or TaskId was that it is not possible for a health
> > > check
> > > > > > service to report a task failure that doesn't emit any metrics.
> So
> > as
> > > > > long
> > > > > > as we have a global topic partition API, health check could
> always
> > be
> > > > > aware
> > > > > > of any task/partition not reporting its progress, does that make
> > > sense?
> > > > > If
> > > > > > you feel we have a better way to achieve this, such as querying
> all
> > > the
> > > > > > input/intermediate topic metadata directly from Kafka for the
> > > > baseline, I
> > > > > > think that should be good as well and worth mentioning it in the
> > KIP.
> > > > > >
> > > > > > Also it seems that the KIP hasn't reflected what you proposed for
> > the
> > > > > task
> > > > > > idling status.
> > > > > >
> > > > > > Best,
> > > > > > Boyang
> > > > > >
> > > > > >
> > > > > > On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson <
> > > wcarl...@confluent.io>
> > > > > > wrote:
> > > > > >
> > > > > > > Thank you for the comments everyone!
> > > > > > >
> > > > > > > I think there are a few things I can clear up in general then I
> > > will
> > > > > > > specifically respond to each question.
> > > > > > >
> > > > > > > First, when I say "idling" I refer to task idling. Where the
> > stream
> > > > is
> > > > > > > intentionally not making progress. (
> > > > > > > https://issues.apache.org/jira/browse/KAFKA-10091 is an
> > example).
> > > > This
> > > > > > > becomes relevant if a task is waiting on one partition with no
> > data
> > > > but
> > > > > > > that is holding up a partition with data. That would cause one
> > just
> > > > > > looking
> > > > > > > at the committed offset changes to believe the task has a
> problem
> > > > when
> > > > > it
> > > > > > > is working as intended.
> > > > > > >
> > > > > > > In light of this confusion. I plan to change tasksIdling() to
> > > > > > `Map<TaskId,
> > > > > > > TaskStatus> getTasksStatus()` this should hopefully make it
> more
> > > > clear
> > > > > > what
> > > > > > > is being exposed.
> > > > > > >
> > > > > > > TaskStatus would include: TopicPartions, TaskId,
> > ProcessorTopology,
> > > > > > Idling,
> > > > > > > and State.
> > > > > > >
> > > > > > > Boyang:
> > > > > > >
> > > > > > > 2) I think that each task should report on whatever
> > TopicPartitions
> > > > > they
> > > > > > > hold, this means a Topic Partition might get reported twice but
> > the
> > > > > user
> > > > > > > can roll those up and use the larger one when looking at the
> > whole
> > > > app.
> > > > > > >
> > > > > > > 4) If the user collects the committed offsets across all the
> > > running
> > > > > > > clients there shouldn't be any tasks missing correct?
> > > > > > >
> > > > > > > 6) Because there is not a 1:1 mapping between Tasks and
> > > > > TopicPartitions I
> > > > > > > think it is cleaner to report them separately.
> > > > > > >
> > > > > > > Guozhang:
> > > > > > >
> > > > > > > 1) Yes, that was my original plan but it made more sense to
> > mirror
> > > > how
> > > > > > the
> > > > > > > consumer exposes the committed offset.
> > > > > > >
> > > > > > > 3) That is a good point. I think that we should include
> internal
> > > > topics
> > > > > > as
> > > > > > > well. I think that if the topology were to evolve there should
> be
> > > > fair
> > > > > > > warning anyways. Maybe you can clarify what would be limited by
> > > > > exposing
> > > > > > > the interior topics here? I thought a user could find them in
> > other
> > > > > ways.
> > > > > > > If it is the name we could aynomise them before exposing them.
> > > > > > >
> > > > > > > Thank you all for your comments. If I did not respond directly
> to
> > > one
> > > > > of
> > > > > > > your questions I updated the kip to include the details it was
> > > > > > requesting.
> > > > > > > I didn't not include my proposed changes mentioned earlier as I
> > > would
> > > > > > like
> > > > > > > to get some feedback about what to include in TaskStatus and in
> > > > > general.
> > > > > > >
> > > > > > > best,
> > > > > > > Walker
> > > > > > >
> > > > > > > On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang <
> > wangg...@gmail.com
> > > >
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hello Walker, thanks for the KIP. A few thoughts:
> > > > > > > >
> > > > > > > > 1) Have you considered just relying on the
> > > `KafkaStreams#metrics()`
> > > > > > that
> > > > > > > > includes embedded consumer metrics that have the committed
> > > offsets
> > > > > > > > instead of adding a new API? Not advocating that this is a
> > better
> > > > > > > approach
> > > > > > > > but want to make sure we considered all options before we
> come
> > to
> > > > the
> > > > > > > "last
> > > > > > > > resort" of adding new public interfaces.
> > > > > > > >
> > > > > > > > 2) The javadoc mentions "tasks assigned to this client", but
> > the
> > > > > > returned
> > > > > > > > map is on partitions. I think we should make the javadoc and
> > the
> > > > > return
> > > > > > > > types consistent, either tasks or topic partitions.
> > > > > > > >
> > > > > > > > 3) In addition, if for 2) above we ended up with topic
> > > partitions,
> > > > > then
> > > > > > > > would they include only external source topics, or also
> > including
> > > > > > > internal
> > > > > > > > repartition / changelog topics? I think including only
> external
> > > > > source
> > > > > > > > topic partitions are not sufficient for your goal of tracking
> > > > > progress,
> > > > > > > but
> > > > > > > > exposing internal topic names are also a big commitment here
> > for
> > > > > future
> > > > > > > > topology evolution.
> > > > > > > >
> > > > > > > > 4)  For "tasksIdling", I'm wondering if we can make it more
> > > > general,
> > > > > > that
> > > > > > > > the returned value is not just a boolean, but a TaskState
> that
> > > can
> > > > be
> > > > > > an
> > > > > > > > enum of "created, restoring, running, idle, closing". This
> > could
> > > > help
> > > > > > us
> > > > > > > in
> > > > > > > > the future to track other things like restoration efficiency
> > and
> > > > > > > rebalance
> > > > > > > > efficiency etc.
> > > > > > > >
> > > > > > > > 5) We need to clarify how is "idling" being defined here:
> e.g.
> > we
> > > > can
> > > > > > > > clearly state that a task is considered idle only if 1) lag
> is
> > > > > > > > increasing, indicating that there are indeed new records
> > arrived
> > > at
> > > > > > > source,
> > > > > > > > while committed offset is not advancing, AND 2) produced
> offset
> > > > > > (imagine
> > > > > > > we
> > > > > > > > may have punctuations that generate new data to the output
> > topic
> > > > even
> > > > > > if
> > > > > > > > there's no input for a while) is not advancing either.
> > > > > > > >
> > > > > > > >
> > > > > > > > Guozhang
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <
> > > > > > reluctanthero...@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks Walker for the proposed KIP! This should definitely
> > > > empower
> > > > > > > > KStream
> > > > > > > > > users with better visibility.
> > > > > > > > >
> > > > > > > > > Meanwhile I got a couple of questions/suggestions:
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > 1. typo "repost/report" in the motivation section.
> > > > > > > > >
> > > > > > > > > 2. What offsets do we report when the task is under
> > restoration
> > > > or
> > > > > > > > > rebalancing?
> > > > > > > > >
> > > > > > > > > 3. IIUC, we should clearly state that our reported metrics
> > are
> > > > > based
> > > > > > > off
> > > > > > > > > locally assigned tasks for each instance.
> > > > > > > > >
> > > > > > > > > 4. In the meantime, what’s our strategy to report tasks
> that
> > > are
> > > > > not
> > > > > > > > local
> > > > > > > > > to the instance? Users would normally try to monitor all
> the
> > > > > possible
> > > > > > > > > tasks, and it’s unfortunate we couldn’t determine whether
> we
> > > have
> > > > > > lost
> > > > > > > > > tasks. My brainstorming was whether it makes sense for the
> > > leader
> > > > > > > > instance
> > > > > > > > > to report the task progress as -1 for all “supposed to be
> > > > running”
> > > > > > > tasks,
> > > > > > > > > so that on the metrics collector side it could catch any
> > > missing
> > > > > > tasks.
> > > > > > > > >
> > > > > > > > > 5. It seems not clear how users should use `isTaskIdling`.
> > Why
> > > > not
> > > > > > > > report a
> > > > > > > > > map/set for idling tasks just as what we did for committed
> > > > offsets?
> > > > > > > > >
> > > > > > > > > 6. Why do we use TopicPartition instead of TaskId as the
> key
> > in
> > > > the
> > > > > > > > > returned map?
> > > > > > > > > 7. Could we include some details in where we got the commit
> > > > offsets
> > > > > > for
> > > > > > > > > each task? Is it through consumer offset fetch, or the
> stream
> > > > > > > processing
> > > > > > > > > progress based on the records fetched?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <
> > > > > > wcarl...@confluent.io>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hello all,
> > > > > > > > > >
> > > > > > > > > > I would like to start discussion on KIP-715. This kip
> aims
> > to
> > > > > make
> > > > > > it
> > > > > > > > > > easier to monitor Kafka Streams progress by exposing the
> > > > > committed
> > > > > > > > offset
> > > > > > > > > > in a similar way as the consumer client does.
> > > > > > > > > >
> > > > > > > > > > Here is the KIP:
> > > https://cwiki.apache.org/confluence/x/aRRRCg
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Walker
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > --
> > > > > > > > -- Guozhang
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
>

Reply via email to