Sure thing Boyang, 1) it is in proposed changes. I expanded on it a bit more now. 2) done 3) and done :)
thanks for the suggestions, walker On Fri, Feb 26, 2021 at 3:10 PM Boyang Chen <reluctanthero...@gmail.com> wrote: > Thanks Walker. Some minor comments: > > 1. Could you add a reference to localThreadMetadata method in the KIP? > 2. Could you make the code block as a java template, such that > TaskMetadata.java could be as the template title? Also it would be good to > add some meta comments about the newly added functions. > 3. Could you write more details about rejected alternatives? Just as why we > don't choose to expose as metrics, and how a new method on KStream is not > favorable. These would be valuable when we look back on our design > decisions. > > On Fri, Feb 26, 2021 at 11:23 AM Walker Carlson <wcarl...@confluent.io> > wrote: > > > I understand now. I think that is a valid concern but I think it is best > > solved but having an external service verify through streams. As this KIP > > is now just adding fields to TaskMetadata to be returned in the > > threadMetadata I am going to say that is out of scope. > > > > That seems to be the last concern. If there are no others I will put this > > up for a vote soon. > > > > walker > > > > On Thu, Feb 25, 2021 at 12:35 PM Boyang Chen <reluctanthero...@gmail.com > > > > wrote: > > > > > For the 3rd point, yes, what I'm proposing is an edge case. For > example, > > > when we have 4 tasks [0_0, 0_1, 1_0, 1_1], and a bug in rebalancing > logic > > > causing no one gets 1_1 assigned. Then the health check service will > only > > > see 3 tasks [0_0, 0_1, 1_0] reporting progress normally while not > paying > > > attention to 1_1. What I want to expose is a "logical global" view of > all > > > the tasks through the stream instance, since each instance gets the > > > assigned topology and should be able to infer all the exact tasks to be > > up > > > and running when the service is healthy. > > > > > > On Thu, Feb 25, 2021 at 11:25 AM Walker Carlson <wcarl...@confluent.io > > > > > wrote: > > > > > > > Thanks for the follow up Boyang and Guozhang, > > > > > > > > I have updated the kip to include these ideas. > > > > > > > > Guozhang, that is a good idea about using the TaskMetadata. We can > get > > it > > > > through the ThreadMetadata with a minor change to > `localThreadMetadata` > > > in > > > > kafkaStreams. This means that we will only need to update > TaskMetadata > > > and > > > > add no other APIs > > > > > > > > Boyang, since each TaskMetadata contains the TaskId and > > TopicPartitions I > > > > don't believe mapping either way will be a problem. Also I think we > can > > > do > > > > something like record the time the task started idling and when it > > stops > > > > idling we can override it to -1. I think that should clear up the > first > > > two > > > > points. > > > > > > > > As for your third point I am not sure I 100% understand. The > > > ThreadMetadata > > > > will contain a set of all task assigned to that thread. Any health > > check > > > > service will just need to query all clients and aggregate their > > responses > > > > to get a complete picture of all tasks correct? > > > > > > > > walker > > > > > > > > On Thu, Feb 25, 2021 at 9:57 AM Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > > > Regarding the second API and the `TaskStatus` class: I'd suggest we > > > > > consolidate on the existing `TaskMetadata` since we have already > > > > > accumulated a bunch of such classes, and its better to keep them > > small > > > as > > > > > public APIs. You can see > > > > https://issues.apache.org/jira/browse/KAFKA-12370 > > > > > for a reference and a proposal. > > > > > > > > > > On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen < > > > reluctanthero...@gmail.com> > > > > > wrote: > > > > > > > > > > > Thanks for the updates Walker. Some replies and follow-up > > questions: > > > > > > > > > > > > 1. I agree one task could have multiple partitions, but when we > > hit a > > > > > delay > > > > > > in terms of offset progress, do we have a convenient way to > reverse > > > > > mapping > > > > > > TopicPartition to the problematic task? In production, I believe > it > > > > would > > > > > > be much quicker to identify the problem using task.id instead of > > > topic > > > > > > partition, especially when it points to an internal topic. I > think > > > > having > > > > > > the task id as part of the entry value seems useful, which means > > > > getting > > > > > > something like Map<TopicPartition, TaskProgress> where > TaskProgress > > > > > > contains both committed offsets & task id. > > > > > > > > > > > > 2. The task idling API was still confusing. I don't think we care > > > about > > > > > the > > > > > > exact state when making tasksIdling()query, instead we care more > > > about > > > > > how > > > > > > long one task has been in idle state since when you called, which > > > > > reflects > > > > > > whether it is a normal idling period. So I feel it might be > helpful > > > to > > > > > > track that time difference and report it in the TaskStatus > struct. > > > > > > > > > > > > 3. What I want to achieve to have some global mapping of either > > > > > > TopicPartition or TaskId was that it is not possible for a health > > > check > > > > > > service to report a task failure that doesn't emit any metrics. > So > > as > > > > > long > > > > > > as we have a global topic partition API, health check could > always > > be > > > > > aware > > > > > > of any task/partition not reporting its progress, does that make > > > sense? > > > > > If > > > > > > you feel we have a better way to achieve this, such as querying > all > > > the > > > > > > input/intermediate topic metadata directly from Kafka for the > > > > baseline, I > > > > > > think that should be good as well and worth mentioning it in the > > KIP. > > > > > > > > > > > > Also it seems that the KIP hasn't reflected what you proposed for > > the > > > > > task > > > > > > idling status. > > > > > > > > > > > > Best, > > > > > > Boyang > > > > > > > > > > > > > > > > > > On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson < > > > wcarl...@confluent.io> > > > > > > wrote: > > > > > > > > > > > > > Thank you for the comments everyone! > > > > > > > > > > > > > > I think there are a few things I can clear up in general then I > > > will > > > > > > > specifically respond to each question. > > > > > > > > > > > > > > First, when I say "idling" I refer to task idling. Where the > > stream > > > > is > > > > > > > intentionally not making progress. ( > > > > > > > https://issues.apache.org/jira/browse/KAFKA-10091 is an > > example). > > > > This > > > > > > > becomes relevant if a task is waiting on one partition with no > > data > > > > but > > > > > > > that is holding up a partition with data. That would cause one > > just > > > > > > looking > > > > > > > at the committed offset changes to believe the task has a > problem > > > > when > > > > > it > > > > > > > is working as intended. > > > > > > > > > > > > > > In light of this confusion. I plan to change tasksIdling() to > > > > > > `Map<TaskId, > > > > > > > TaskStatus> getTasksStatus()` this should hopefully make it > more > > > > clear > > > > > > what > > > > > > > is being exposed. > > > > > > > > > > > > > > TaskStatus would include: TopicPartions, TaskId, > > ProcessorTopology, > > > > > > Idling, > > > > > > > and State. > > > > > > > > > > > > > > Boyang: > > > > > > > > > > > > > > 2) I think that each task should report on whatever > > TopicPartitions > > > > > they > > > > > > > hold, this means a Topic Partition might get reported twice but > > the > > > > > user > > > > > > > can roll those up and use the larger one when looking at the > > whole > > > > app. > > > > > > > > > > > > > > 4) If the user collects the committed offsets across all the > > > running > > > > > > > clients there shouldn't be any tasks missing correct? > > > > > > > > > > > > > > 6) Because there is not a 1:1 mapping between Tasks and > > > > > TopicPartitions I > > > > > > > think it is cleaner to report them separately. > > > > > > > > > > > > > > Guozhang: > > > > > > > > > > > > > > 1) Yes, that was my original plan but it made more sense to > > mirror > > > > how > > > > > > the > > > > > > > consumer exposes the committed offset. > > > > > > > > > > > > > > 3) That is a good point. I think that we should include > internal > > > > topics > > > > > > as > > > > > > > well. I think that if the topology were to evolve there should > be > > > > fair > > > > > > > warning anyways. Maybe you can clarify what would be limited by > > > > > exposing > > > > > > > the interior topics here? I thought a user could find them in > > other > > > > > ways. > > > > > > > If it is the name we could aynomise them before exposing them. > > > > > > > > > > > > > > Thank you all for your comments. If I did not respond directly > to > > > one > > > > > of > > > > > > > your questions I updated the kip to include the details it was > > > > > > requesting. > > > > > > > I didn't not include my proposed changes mentioned earlier as I > > > would > > > > > > like > > > > > > > to get some feedback about what to include in TaskStatus and in > > > > > general. > > > > > > > > > > > > > > best, > > > > > > > Walker > > > > > > > > > > > > > > On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang < > > wangg...@gmail.com > > > > > > > > > > wrote: > > > > > > > > > > > > > > > Hello Walker, thanks for the KIP. A few thoughts: > > > > > > > > > > > > > > > > 1) Have you considered just relying on the > > > `KafkaStreams#metrics()` > > > > > > that > > > > > > > > includes embedded consumer metrics that have the committed > > > offsets > > > > > > > > instead of adding a new API? Not advocating that this is a > > better > > > > > > > approach > > > > > > > > but want to make sure we considered all options before we > come > > to > > > > the > > > > > > > "last > > > > > > > > resort" of adding new public interfaces. > > > > > > > > > > > > > > > > 2) The javadoc mentions "tasks assigned to this client", but > > the > > > > > > returned > > > > > > > > map is on partitions. I think we should make the javadoc and > > the > > > > > return > > > > > > > > types consistent, either tasks or topic partitions. > > > > > > > > > > > > > > > > 3) In addition, if for 2) above we ended up with topic > > > partitions, > > > > > then > > > > > > > > would they include only external source topics, or also > > including > > > > > > > internal > > > > > > > > repartition / changelog topics? I think including only > external > > > > > source > > > > > > > > topic partitions are not sufficient for your goal of tracking > > > > > progress, > > > > > > > but > > > > > > > > exposing internal topic names are also a big commitment here > > for > > > > > future > > > > > > > > topology evolution. > > > > > > > > > > > > > > > > 4) For "tasksIdling", I'm wondering if we can make it more > > > > general, > > > > > > that > > > > > > > > the returned value is not just a boolean, but a TaskState > that > > > can > > > > be > > > > > > an > > > > > > > > enum of "created, restoring, running, idle, closing". This > > could > > > > help > > > > > > us > > > > > > > in > > > > > > > > the future to track other things like restoration efficiency > > and > > > > > > > rebalance > > > > > > > > efficiency etc. > > > > > > > > > > > > > > > > 5) We need to clarify how is "idling" being defined here: > e.g. > > we > > > > can > > > > > > > > clearly state that a task is considered idle only if 1) lag > is > > > > > > > > increasing, indicating that there are indeed new records > > arrived > > > at > > > > > > > source, > > > > > > > > while committed offset is not advancing, AND 2) produced > offset > > > > > > (imagine > > > > > > > we > > > > > > > > may have punctuations that generate new data to the output > > topic > > > > even > > > > > > if > > > > > > > > there's no input for a while) is not advancing either. > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen < > > > > > > reluctanthero...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Thanks Walker for the proposed KIP! This should definitely > > > > empower > > > > > > > > KStream > > > > > > > > > users with better visibility. > > > > > > > > > > > > > > > > > > Meanwhile I got a couple of questions/suggestions: > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. typo "repost/report" in the motivation section. > > > > > > > > > > > > > > > > > > 2. What offsets do we report when the task is under > > restoration > > > > or > > > > > > > > > rebalancing? > > > > > > > > > > > > > > > > > > 3. IIUC, we should clearly state that our reported metrics > > are > > > > > based > > > > > > > off > > > > > > > > > locally assigned tasks for each instance. > > > > > > > > > > > > > > > > > > 4. In the meantime, what’s our strategy to report tasks > that > > > are > > > > > not > > > > > > > > local > > > > > > > > > to the instance? Users would normally try to monitor all > the > > > > > possible > > > > > > > > > tasks, and it’s unfortunate we couldn’t determine whether > we > > > have > > > > > > lost > > > > > > > > > tasks. My brainstorming was whether it makes sense for the > > > leader > > > > > > > > instance > > > > > > > > > to report the task progress as -1 for all “supposed to be > > > > running” > > > > > > > tasks, > > > > > > > > > so that on the metrics collector side it could catch any > > > missing > > > > > > tasks. > > > > > > > > > > > > > > > > > > 5. It seems not clear how users should use `isTaskIdling`. > > Why > > > > not > > > > > > > > report a > > > > > > > > > map/set for idling tasks just as what we did for committed > > > > offsets? > > > > > > > > > > > > > > > > > > 6. Why do we use TopicPartition instead of TaskId as the > key > > in > > > > the > > > > > > > > > returned map? > > > > > > > > > 7. Could we include some details in where we got the commit > > > > offsets > > > > > > for > > > > > > > > > each task? Is it through consumer offset fetch, or the > stream > > > > > > > processing > > > > > > > > > progress based on the records fetched? > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson < > > > > > > wcarl...@confluent.io> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hello all, > > > > > > > > > > > > > > > > > > > > I would like to start discussion on KIP-715. This kip > aims > > to > > > > > make > > > > > > it > > > > > > > > > > easier to monitor Kafka Streams progress by exposing the > > > > > committed > > > > > > > > offset > > > > > > > > > > in a similar way as the consumer client does. > > > > > > > > > > > > > > > > > > > > Here is the KIP: > > > https://cwiki.apache.org/confluence/x/aRRRCg > > > > > > > > > > > > > > > > > > > > Best, > > > > > > > > > > Walker > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > >