I updated to use Optional, good idea Mathias. For the localThreadMetadata, it could already be called running a rebalance. Also I mention that they return the highest value they had seen so far for any tasks they have assigned to them. I thought it would be useful to see the TaskMetadata while the Threads were shutting down. I think that there shouldn't really be partial information. If you think this should be clarified better let me know.
walker On Mon, Mar 1, 2021 at 3:45 PM Sophie Blee-Goldman <sop...@confluent.io> wrote: > Can you clarify your second question Matthias? If this is queried during > a cooperative rebalance, it should return the tasks as usual. If the user > is > using eager rebalancing then this will not return any tasks, but the user > should > not rely on all tasks being returned at any given time to begin with since > it's > possible we are in between revoking and re-assigning a partition. > > What does "partial information" mean? > > (btw I agree that an Optional makes sense for timeCurrentIdlingStarted()) > > On Mon, Mar 1, 2021 at 11:46 AM Matthias J. Sax <mj...@apache.org> wrote: > > > Thanks the updating the KIP Walker. > > > > About, `timeCurrentIdlingStarted()`: should we return an `Optional` > > instead of `-1` if a task is not idling. > > > > > > As we allow to call `localThreadMetadata()` any time, could it be that > > we report partial information during a rebalance? If yes, this should be > > pointed out, because if one want to implement a health check this needs > > to be taken into account. > > > > -Matthias > > > > > > On 2/27/21 11:32 AM, Walker Carlson wrote: > > > Sure thing Boyang, > > > > > > 1) it is in proposed changes. I expanded on it a bit more now. > > > 2) done > > > 3) and done :) > > > > > > thanks for the suggestions, > > > walker > > > > > > On Fri, Feb 26, 2021 at 3:10 PM Boyang Chen < > reluctanthero...@gmail.com> > > > wrote: > > > > > >> Thanks Walker. Some minor comments: > > >> > > >> 1. Could you add a reference to localThreadMetadata method in the KIP? > > >> 2. Could you make the code block as a java template, such that > > >> TaskMetadata.java could be as the template title? Also it would be > good > > to > > >> add some meta comments about the newly added functions. > > >> 3. Could you write more details about rejected alternatives? Just as > > why we > > >> don't choose to expose as metrics, and how a new method on KStream is > > not > > >> favorable. These would be valuable when we look back on our design > > >> decisions. > > >> > > >> On Fri, Feb 26, 2021 at 11:23 AM Walker Carlson < > wcarl...@confluent.io> > > >> wrote: > > >> > > >>> I understand now. I think that is a valid concern but I think it is > > best > > >>> solved but having an external service verify through streams. As this > > KIP > > >>> is now just adding fields to TaskMetadata to be returned in the > > >>> threadMetadata I am going to say that is out of scope. > > >>> > > >>> That seems to be the last concern. If there are no others I will put > > this > > >>> up for a vote soon. > > >>> > > >>> walker > > >>> > > >>> On Thu, Feb 25, 2021 at 12:35 PM Boyang Chen < > > reluctanthero...@gmail.com > > >>> > > >>> wrote: > > >>> > > >>>> For the 3rd point, yes, what I'm proposing is an edge case. For > > >> example, > > >>>> when we have 4 tasks [0_0, 0_1, 1_0, 1_1], and a bug in rebalancing > > >> logic > > >>>> causing no one gets 1_1 assigned. Then the health check service will > > >> only > > >>>> see 3 tasks [0_0, 0_1, 1_0] reporting progress normally while not > > >> paying > > >>>> attention to 1_1. What I want to expose is a "logical global" view > of > > >> all > > >>>> the tasks through the stream instance, since each instance gets the > > >>>> assigned topology and should be able to infer all the exact tasks to > > be > > >>> up > > >>>> and running when the service is healthy. > > >>>> > > >>>> On Thu, Feb 25, 2021 at 11:25 AM Walker Carlson < > > wcarl...@confluent.io > > >>> > > >>>> wrote: > > >>>> > > >>>>> Thanks for the follow up Boyang and Guozhang, > > >>>>> > > >>>>> I have updated the kip to include these ideas. > > >>>>> > > >>>>> Guozhang, that is a good idea about using the TaskMetadata. We can > > >> get > > >>> it > > >>>>> through the ThreadMetadata with a minor change to > > >> `localThreadMetadata` > > >>>> in > > >>>>> kafkaStreams. This means that we will only need to update > > >> TaskMetadata > > >>>> and > > >>>>> add no other APIs > > >>>>> > > >>>>> Boyang, since each TaskMetadata contains the TaskId and > > >>> TopicPartitions I > > >>>>> don't believe mapping either way will be a problem. Also I think we > > >> can > > >>>> do > > >>>>> something like record the time the task started idling and when it > > >>> stops > > >>>>> idling we can override it to -1. I think that should clear up the > > >> first > > >>>> two > > >>>>> points. > > >>>>> > > >>>>> As for your third point I am not sure I 100% understand. The > > >>>> ThreadMetadata > > >>>>> will contain a set of all task assigned to that thread. Any health > > >>> check > > >>>>> service will just need to query all clients and aggregate their > > >>> responses > > >>>>> to get a complete picture of all tasks correct? > > >>>>> > > >>>>> walker > > >>>>> > > >>>>> On Thu, Feb 25, 2021 at 9:57 AM Guozhang Wang <wangg...@gmail.com> > > >>>> wrote: > > >>>>> > > >>>>>> Regarding the second API and the `TaskStatus` class: I'd suggest > we > > >>>>>> consolidate on the existing `TaskMetadata` since we have already > > >>>>>> accumulated a bunch of such classes, and its better to keep them > > >>> small > > >>>> as > > >>>>>> public APIs. You can see > > >>>>> https://issues.apache.org/jira/browse/KAFKA-12370 > > >>>>>> for a reference and a proposal. > > >>>>>> > > >>>>>> On Thu, Feb 25, 2021 at 9:40 AM Boyang Chen < > > >>>> reluctanthero...@gmail.com> > > >>>>>> wrote: > > >>>>>> > > >>>>>>> Thanks for the updates Walker. Some replies and follow-up > > >>> questions: > > >>>>>>> > > >>>>>>> 1. I agree one task could have multiple partitions, but when we > > >>> hit a > > >>>>>> delay > > >>>>>>> in terms of offset progress, do we have a convenient way to > > >> reverse > > >>>>>> mapping > > >>>>>>> TopicPartition to the problematic task? In production, I believe > > >> it > > >>>>> would > > >>>>>>> be much quicker to identify the problem using task.id instead of > > >>>> topic > > >>>>>>> partition, especially when it points to an internal topic. I > > >> think > > >>>>> having > > >>>>>>> the task id as part of the entry value seems useful, which means > > >>>>> getting > > >>>>>>> something like Map<TopicPartition, TaskProgress> where > > >> TaskProgress > > >>>>>>> contains both committed offsets & task id. > > >>>>>>> > > >>>>>>> 2. The task idling API was still confusing. I don't think we care > > >>>> about > > >>>>>> the > > >>>>>>> exact state when making tasksIdling()query, instead we care more > > >>>> about > > >>>>>> how > > >>>>>>> long one task has been in idle state since when you called, which > > >>>>>> reflects > > >>>>>>> whether it is a normal idling period. So I feel it might be > > >> helpful > > >>>> to > > >>>>>>> track that time difference and report it in the TaskStatus > > >> struct. > > >>>>>>> > > >>>>>>> 3. What I want to achieve to have some global mapping of either > > >>>>>>> TopicPartition or TaskId was that it is not possible for a health > > >>>> check > > >>>>>>> service to report a task failure that doesn't emit any metrics. > > >> So > > >>> as > > >>>>>> long > > >>>>>>> as we have a global topic partition API, health check could > > >> always > > >>> be > > >>>>>> aware > > >>>>>>> of any task/partition not reporting its progress, does that make > > >>>> sense? > > >>>>>> If > > >>>>>>> you feel we have a better way to achieve this, such as querying > > >> all > > >>>> the > > >>>>>>> input/intermediate topic metadata directly from Kafka for the > > >>>>> baseline, I > > >>>>>>> think that should be good as well and worth mentioning it in the > > >>> KIP. > > >>>>>>> > > >>>>>>> Also it seems that the KIP hasn't reflected what you proposed for > > >>> the > > >>>>>> task > > >>>>>>> idling status. > > >>>>>>> > > >>>>>>> Best, > > >>>>>>> Boyang > > >>>>>>> > > >>>>>>> > > >>>>>>> On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson < > > >>>> wcarl...@confluent.io> > > >>>>>>> wrote: > > >>>>>>> > > >>>>>>>> Thank you for the comments everyone! > > >>>>>>>> > > >>>>>>>> I think there are a few things I can clear up in general then I > > >>>> will > > >>>>>>>> specifically respond to each question. > > >>>>>>>> > > >>>>>>>> First, when I say "idling" I refer to task idling. Where the > > >>> stream > > >>>>> is > > >>>>>>>> intentionally not making progress. ( > > >>>>>>>> https://issues.apache.org/jira/browse/KAFKA-10091 is an > > >>> example). > > >>>>> This > > >>>>>>>> becomes relevant if a task is waiting on one partition with no > > >>> data > > >>>>> but > > >>>>>>>> that is holding up a partition with data. That would cause one > > >>> just > > >>>>>>> looking > > >>>>>>>> at the committed offset changes to believe the task has a > > >> problem > > >>>>> when > > >>>>>> it > > >>>>>>>> is working as intended. > > >>>>>>>> > > >>>>>>>> In light of this confusion. I plan to change tasksIdling() to > > >>>>>>> `Map<TaskId, > > >>>>>>>> TaskStatus> getTasksStatus()` this should hopefully make it > > >> more > > >>>>> clear > > >>>>>>> what > > >>>>>>>> is being exposed. > > >>>>>>>> > > >>>>>>>> TaskStatus would include: TopicPartions, TaskId, > > >>> ProcessorTopology, > > >>>>>>> Idling, > > >>>>>>>> and State. > > >>>>>>>> > > >>>>>>>> Boyang: > > >>>>>>>> > > >>>>>>>> 2) I think that each task should report on whatever > > >>> TopicPartitions > > >>>>>> they > > >>>>>>>> hold, this means a Topic Partition might get reported twice but > > >>> the > > >>>>>> user > > >>>>>>>> can roll those up and use the larger one when looking at the > > >>> whole > > >>>>> app. > > >>>>>>>> > > >>>>>>>> 4) If the user collects the committed offsets across all the > > >>>> running > > >>>>>>>> clients there shouldn't be any tasks missing correct? > > >>>>>>>> > > >>>>>>>> 6) Because there is not a 1:1 mapping between Tasks and > > >>>>>> TopicPartitions I > > >>>>>>>> think it is cleaner to report them separately. > > >>>>>>>> > > >>>>>>>> Guozhang: > > >>>>>>>> > > >>>>>>>> 1) Yes, that was my original plan but it made more sense to > > >>> mirror > > >>>>> how > > >>>>>>> the > > >>>>>>>> consumer exposes the committed offset. > > >>>>>>>> > > >>>>>>>> 3) That is a good point. I think that we should include > > >> internal > > >>>>> topics > > >>>>>>> as > > >>>>>>>> well. I think that if the topology were to evolve there should > > >> be > > >>>>> fair > > >>>>>>>> warning anyways. Maybe you can clarify what would be limited by > > >>>>>> exposing > > >>>>>>>> the interior topics here? I thought a user could find them in > > >>> other > > >>>>>> ways. > > >>>>>>>> If it is the name we could aynomise them before exposing them. > > >>>>>>>> > > >>>>>>>> Thank you all for your comments. If I did not respond directly > > >> to > > >>>> one > > >>>>>> of > > >>>>>>>> your questions I updated the kip to include the details it was > > >>>>>>> requesting. > > >>>>>>>> I didn't not include my proposed changes mentioned earlier as I > > >>>> would > > >>>>>>> like > > >>>>>>>> to get some feedback about what to include in TaskStatus and in > > >>>>>> general. > > >>>>>>>> > > >>>>>>>> best, > > >>>>>>>> Walker > > >>>>>>>> > > >>>>>>>> On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang < > > >>> wangg...@gmail.com > > >>>>> > > >>>>>>> wrote: > > >>>>>>>> > > >>>>>>>>> Hello Walker, thanks for the KIP. A few thoughts: > > >>>>>>>>> > > >>>>>>>>> 1) Have you considered just relying on the > > >>>> `KafkaStreams#metrics()` > > >>>>>>> that > > >>>>>>>>> includes embedded consumer metrics that have the committed > > >>>> offsets > > >>>>>>>>> instead of adding a new API? Not advocating that this is a > > >>> better > > >>>>>>>> approach > > >>>>>>>>> but want to make sure we considered all options before we > > >> come > > >>> to > > >>>>> the > > >>>>>>>> "last > > >>>>>>>>> resort" of adding new public interfaces. > > >>>>>>>>> > > >>>>>>>>> 2) The javadoc mentions "tasks assigned to this client", but > > >>> the > > >>>>>>> returned > > >>>>>>>>> map is on partitions. I think we should make the javadoc and > > >>> the > > >>>>>> return > > >>>>>>>>> types consistent, either tasks or topic partitions. > > >>>>>>>>> > > >>>>>>>>> 3) In addition, if for 2) above we ended up with topic > > >>>> partitions, > > >>>>>> then > > >>>>>>>>> would they include only external source topics, or also > > >>> including > > >>>>>>>> internal > > >>>>>>>>> repartition / changelog topics? I think including only > > >> external > > >>>>>> source > > >>>>>>>>> topic partitions are not sufficient for your goal of tracking > > >>>>>> progress, > > >>>>>>>> but > > >>>>>>>>> exposing internal topic names are also a big commitment here > > >>> for > > >>>>>> future > > >>>>>>>>> topology evolution. > > >>>>>>>>> > > >>>>>>>>> 4) For "tasksIdling", I'm wondering if we can make it more > > >>>>> general, > > >>>>>>> that > > >>>>>>>>> the returned value is not just a boolean, but a TaskState > > >> that > > >>>> can > > >>>>> be > > >>>>>>> an > > >>>>>>>>> enum of "created, restoring, running, idle, closing". This > > >>> could > > >>>>> help > > >>>>>>> us > > >>>>>>>> in > > >>>>>>>>> the future to track other things like restoration efficiency > > >>> and > > >>>>>>>> rebalance > > >>>>>>>>> efficiency etc. > > >>>>>>>>> > > >>>>>>>>> 5) We need to clarify how is "idling" being defined here: > > >> e.g. > > >>> we > > >>>>> can > > >>>>>>>>> clearly state that a task is considered idle only if 1) lag > > >> is > > >>>>>>>>> increasing, indicating that there are indeed new records > > >>> arrived > > >>>> at > > >>>>>>>> source, > > >>>>>>>>> while committed offset is not advancing, AND 2) produced > > >> offset > > >>>>>>> (imagine > > >>>>>>>> we > > >>>>>>>>> may have punctuations that generate new data to the output > > >>> topic > > >>>>> even > > >>>>>>> if > > >>>>>>>>> there's no input for a while) is not advancing either. > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> Guozhang > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen < > > >>>>>>> reluctanthero...@gmail.com> > > >>>>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>>> Thanks Walker for the proposed KIP! This should definitely > > >>>>> empower > > >>>>>>>>> KStream > > >>>>>>>>>> users with better visibility. > > >>>>>>>>>> > > >>>>>>>>>> Meanwhile I got a couple of questions/suggestions: > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> 1. typo "repost/report" in the motivation section. > > >>>>>>>>>> > > >>>>>>>>>> 2. What offsets do we report when the task is under > > >>> restoration > > >>>>> or > > >>>>>>>>>> rebalancing? > > >>>>>>>>>> > > >>>>>>>>>> 3. IIUC, we should clearly state that our reported metrics > > >>> are > > >>>>>> based > > >>>>>>>> off > > >>>>>>>>>> locally assigned tasks for each instance. > > >>>>>>>>>> > > >>>>>>>>>> 4. In the meantime, what’s our strategy to report tasks > > >> that > > >>>> are > > >>>>>> not > > >>>>>>>>> local > > >>>>>>>>>> to the instance? Users would normally try to monitor all > > >> the > > >>>>>> possible > > >>>>>>>>>> tasks, and it’s unfortunate we couldn’t determine whether > > >> we > > >>>> have > > >>>>>>> lost > > >>>>>>>>>> tasks. My brainstorming was whether it makes sense for the > > >>>> leader > > >>>>>>>>> instance > > >>>>>>>>>> to report the task progress as -1 for all “supposed to be > > >>>>> running” > > >>>>>>>> tasks, > > >>>>>>>>>> so that on the metrics collector side it could catch any > > >>>> missing > > >>>>>>> tasks. > > >>>>>>>>>> > > >>>>>>>>>> 5. It seems not clear how users should use `isTaskIdling`. > > >>> Why > > >>>>> not > > >>>>>>>>> report a > > >>>>>>>>>> map/set for idling tasks just as what we did for committed > > >>>>> offsets? > > >>>>>>>>>> > > >>>>>>>>>> 6. Why do we use TopicPartition instead of TaskId as the > > >> key > > >>> in > > >>>>> the > > >>>>>>>>>> returned map? > > >>>>>>>>>> 7. Could we include some details in where we got the commit > > >>>>> offsets > > >>>>>>> for > > >>>>>>>>>> each task? Is it through consumer offset fetch, or the > > >> stream > > >>>>>>>> processing > > >>>>>>>>>> progress based on the records fetched? > > >>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson < > > >>>>>>> wcarl...@confluent.io> > > >>>>>>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>>> Hello all, > > >>>>>>>>>>> > > >>>>>>>>>>> I would like to start discussion on KIP-715. This kip > > >> aims > > >>> to > > >>>>>> make > > >>>>>>> it > > >>>>>>>>>>> easier to monitor Kafka Streams progress by exposing the > > >>>>>> committed > > >>>>>>>>> offset > > >>>>>>>>>>> in a similar way as the consumer client does. > > >>>>>>>>>>> > > >>>>>>>>>>> Here is the KIP: > > >>>> https://cwiki.apache.org/confluence/x/aRRRCg > > >>>>>>>>>>> > > >>>>>>>>>>> Best, > > >>>>>>>>>>> Walker > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> -- > > >>>>>>>>> -- Guozhang > > >>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>>> > > >>>>>> -- > > >>>>>> -- Guozhang > > >>>>>> > > >>>>> > > >>>> > > >>> > > >> > > > > > >