Thanks for the updates Walker. Some replies and follow-up questions: 1. I agree one task could have multiple partitions, but when we hit a delay in terms of offset progress, do we have a convenient way to reverse mapping TopicPartition to the problematic task? In production, I believe it would be much quicker to identify the problem using task.id instead of topic partition, especially when it points to an internal topic. I think having the task id as part of the entry value seems useful, which means getting something like Map<TopicPartition, TaskProgress> where TaskProgress contains both committed offsets & task id.
2. The task idling API was still confusing. I don't think we care about the exact state when making tasksIdling()query, instead we care more about how long one task has been in idle state since when you called, which reflects whether it is a normal idling period. So I feel it might be helpful to track that time difference and report it in the TaskStatus struct. 3. What I want to achieve to have some global mapping of either TopicPartition or TaskId was that it is not possible for a health check service to report a task failure that doesn't emit any metrics. So as long as we have a global topic partition API, health check could always be aware of any task/partition not reporting its progress, does that make sense? If you feel we have a better way to achieve this, such as querying all the input/intermediate topic metadata directly from Kafka for the baseline, I think that should be good as well and worth mentioning it in the KIP. Also it seems that the KIP hasn't reflected what you proposed for the task idling status. Best, Boyang On Wed, Feb 24, 2021 at 9:11 AM Walker Carlson <wcarl...@confluent.io> wrote: > Thank you for the comments everyone! > > I think there are a few things I can clear up in general then I will > specifically respond to each question. > > First, when I say "idling" I refer to task idling. Where the stream is > intentionally not making progress. ( > https://issues.apache.org/jira/browse/KAFKA-10091 is an example). This > becomes relevant if a task is waiting on one partition with no data but > that is holding up a partition with data. That would cause one just looking > at the committed offset changes to believe the task has a problem when it > is working as intended. > > In light of this confusion. I plan to change tasksIdling() to `Map<TaskId, > TaskStatus> getTasksStatus()` this should hopefully make it more clear what > is being exposed. > > TaskStatus would include: TopicPartions, TaskId, ProcessorTopology, Idling, > and State. > > Boyang: > > 2) I think that each task should report on whatever TopicPartitions they > hold, this means a Topic Partition might get reported twice but the user > can roll those up and use the larger one when looking at the whole app. > > 4) If the user collects the committed offsets across all the running > clients there shouldn't be any tasks missing correct? > > 6) Because there is not a 1:1 mapping between Tasks and TopicPartitions I > think it is cleaner to report them separately. > > Guozhang: > > 1) Yes, that was my original plan but it made more sense to mirror how the > consumer exposes the committed offset. > > 3) That is a good point. I think that we should include internal topics as > well. I think that if the topology were to evolve there should be fair > warning anyways. Maybe you can clarify what would be limited by exposing > the interior topics here? I thought a user could find them in other ways. > If it is the name we could aynomise them before exposing them. > > Thank you all for your comments. If I did not respond directly to one of > your questions I updated the kip to include the details it was requesting. > I didn't not include my proposed changes mentioned earlier as I would like > to get some feedback about what to include in TaskStatus and in general. > > best, > Walker > > On Mon, Feb 22, 2021 at 10:20 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Walker, thanks for the KIP. A few thoughts: > > > > 1) Have you considered just relying on the `KafkaStreams#metrics()` that > > includes embedded consumer metrics that have the committed offsets > > instead of adding a new API? Not advocating that this is a better > approach > > but want to make sure we considered all options before we come to the > "last > > resort" of adding new public interfaces. > > > > 2) The javadoc mentions "tasks assigned to this client", but the returned > > map is on partitions. I think we should make the javadoc and the return > > types consistent, either tasks or topic partitions. > > > > 3) In addition, if for 2) above we ended up with topic partitions, then > > would they include only external source topics, or also including > internal > > repartition / changelog topics? I think including only external source > > topic partitions are not sufficient for your goal of tracking progress, > but > > exposing internal topic names are also a big commitment here for future > > topology evolution. > > > > 4) For "tasksIdling", I'm wondering if we can make it more general, that > > the returned value is not just a boolean, but a TaskState that can be an > > enum of "created, restoring, running, idle, closing". This could help us > in > > the future to track other things like restoration efficiency and > rebalance > > efficiency etc. > > > > 5) We need to clarify how is "idling" being defined here: e.g. we can > > clearly state that a task is considered idle only if 1) lag is > > increasing, indicating that there are indeed new records arrived at > source, > > while committed offset is not advancing, AND 2) produced offset (imagine > we > > may have punctuations that generate new data to the output topic even if > > there's no input for a while) is not advancing either. > > > > > > Guozhang > > > > > > > > On Mon, Feb 22, 2021 at 3:11 PM Boyang Chen <reluctanthero...@gmail.com> > > wrote: > > > > > Thanks Walker for the proposed KIP! This should definitely empower > > KStream > > > users with better visibility. > > > > > > Meanwhile I got a couple of questions/suggestions: > > > > > > > > > 1. typo "repost/report" in the motivation section. > > > > > > 2. What offsets do we report when the task is under restoration or > > > rebalancing? > > > > > > 3. IIUC, we should clearly state that our reported metrics are based > off > > > locally assigned tasks for each instance. > > > > > > 4. In the meantime, what’s our strategy to report tasks that are not > > local > > > to the instance? Users would normally try to monitor all the possible > > > tasks, and it’s unfortunate we couldn’t determine whether we have lost > > > tasks. My brainstorming was whether it makes sense for the leader > > instance > > > to report the task progress as -1 for all “supposed to be running” > tasks, > > > so that on the metrics collector side it could catch any missing tasks. > > > > > > 5. It seems not clear how users should use `isTaskIdling`. Why not > > report a > > > map/set for idling tasks just as what we did for committed offsets? > > > > > > 6. Why do we use TopicPartition instead of TaskId as the key in the > > > returned map? > > > 7. Could we include some details in where we got the commit offsets for > > > each task? Is it through consumer offset fetch, or the stream > processing > > > progress based on the records fetched? > > > > > > > > > On Mon, Feb 22, 2021 at 3:00 PM Walker Carlson <wcarl...@confluent.io> > > > wrote: > > > > > > > Hello all, > > > > > > > > I would like to start discussion on KIP-715. This kip aims to make it > > > > easier to monitor Kafka Streams progress by exposing the committed > > offset > > > > in a similar way as the consumer client does. > > > > > > > > Here is the KIP: https://cwiki.apache.org/confluence/x/aRRRCg > > > > > > > > Best, > > > > Walker > > > > > > > > > > > > > -- > > -- Guozhang > > >