I'd agree with Matthias. @Eno do you have any specific use case in mind to better keep the `toString` function?
Guozhang On Fri, Apr 21, 2017 at 11:41 AM, Matthias J. Sax <matth...@confluent.io> wrote: > It would not give the same information as the new API. Thus, it would be > inconsistent (and this would be really bad IMHO.) > > > I would really like to remove (ie, deprecate) it. It was a "hot fix" to > give some runtime information to the user. But with this KIP, we get a > proper first class API and thus .toString() is just outdated. > > >>>> It's always preferable to have an implementation for toString method. > > Not sure about this btw. > > Furthermore, if anyone wants to print runtime information, we got > ThreadMetadata#toString() and TaskMetadata#toString(). > > > > > -Matthias > > On 4/20/17 9:36 AM, Eno Thereska wrote: > > Hi there, > > > > Can't toString() just stay as is? > > > > Thanks > > Eno > > > >> On 20 Apr 2017, at 17:26, Matthias J. Sax <matth...@confluent.io> > wrote: > >> > >> Florian, > >> > >> I am just wondering: if we keep .toString(), what should the > >> implementation look like? > >> > >> > >> -Matthias > >> > >> On 4/19/17 2:42 PM, Florian Hussonnois wrote: > >>> Hi Matthias, > >>> > >>> So sorry for the delay in replying to you. For now, I think we can keep > >>> KafkaStreams#toString() as it is. > >>> It's always preferable to have an implementation for toString method. > >>> > >>> 2017-04-14 4:08 GMT+02:00 Matthias J. Sax <matth...@confluent.io>: > >>> > >>>> Florian, > >>>> > >>>>>>> What about KafkaStreams#toString() method? > >>>>>>> > >>>>>>> I think, we want to deprecate it as with KIP-120 and the changes of > >>>> this > >>>>>>> KIP, is gets obsolete. > >>>> > >>>> Any thoughts about this? For me, this is the last open point to > discuss > >>>> (or what should be reflected in the KIP in case you agree) before I > can > >>>> put my vote on the VOTE thread do did start already. > >>>> > >>>> -Matthias > >>>> > >>>> > >>>> On 4/11/17 12:18 AM, Damian Guy wrote: > >>>>> Hi Florian, > >>>>> > >>>>> Thanks for the updates. The KIP is looking good. > >>>>> > >>>>> Cheers, > >>>>> Damian > >>>>> > >>>>> On Fri, 7 Apr 2017 at 22:41 Matthias J. Sax <matth...@confluent.io> > >>>> wrote: > >>>>> > >>>>>> What about KafkaStreams#toString() method? > >>>>>> > >>>>>> I think, we want to deprecate it as with KIP-120 and the changes of > this > >>>>>> KIP, is gets obsolete. > >>>>>> > >>>>>> If we do so, please update the KIP accordingly. > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> On 3/28/17 7:00 PM, Matthias J. Sax wrote: > >>>>>>> Thanks for updating the KIP! > >>>>>>> > >>>>>>> I think it's good as is -- I would not add anything more to > >>>> TaskMetadata. > >>>>>>> > >>>>>>> About subtopologies and tasks. We do have the concept of > subtopologies > >>>>>>> already in KIP-120. It's only missing and ID that allow to link a > >>>>>>> subtopology to a task. > >>>>>>> > >>>>>>> IMHO, adding a simple variable to `Subtopoloy` that provide the id > >>>>>>> should be sufficient. We can simply document in the JavaDocs how > >>>>>>> Subtopology and TaskMetadata can be linked to each other. > >>>>>>> > >>>>>>> I did update KIP-120 accordingly. > >>>>>>> > >>>>>>> > >>>>>>> -Matthias > >>>>>>> > >>>>>>> On 3/28/17 3:45 PM, Florian Hussonnois wrote: > >>>>>>>> Hi all, > >>>>>>>> > >>>>>>>> I've updated the KIP and the PR to reflect your suggestions. > >>>>>>>> > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+ > >>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API > >>>>>>>> https://github.com/apache/kafka/pull/2612 > >>>>>>>> > >>>>>>>> Also, I've exposed property StreamThread#state as a string > through the > >>>>>>>> new class ThreadMetadata. > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> > >>>>>>>> 2017-03-27 23:40 GMT+02:00 Florian Hussonnois < > fhussonn...@gmail.com > >>>>>>>> <mailto:fhussonn...@gmail.com>>: > >>>>>>>> > >>>>>>>> Hi Guozhang, Matthias, > >>>>>>>> > >>>>>>>> It's a great idea to add sub topologies descriptions. This > would > >>>>>>>> help developers to better understand topology concept. > >>>>>>>> > >>>>>>>> I agree that is not really user-friendly to check if > >>>>>>>> `StreamsMetadata#streamThreads` is not returning null. > >>>>>>>> > >>>>>>>> The method name localThreadsMetadata looks good. In addition, > it's > >>>>>>>> more simple to build ThreadMetadata instances from the > >>>> `StreamTask` > >>>>>>>> class than from `StreamPartitionAssignor` class. > >>>>>>>> > >>>>>>>> I will work on modifications. As I understand, I have to add > the > >>>>>>>> property subTopologyId property to the TaskMetadata class - Am > I > >>>>>> right ? > >>>>>>>> > >>>>>>>> Thanks, > >>>>>>>> > >>>>>>>> 2017-03-26 0:25 GMT+01:00 Guozhang Wang <wangg...@gmail.com > >>>>>>>> <mailto:wangg...@gmail.com>>: > >>>>>>>> > >>>>>>>> Re 1): this is a good point. May be we can move > >>>>>>>> `StreamsMetadata#streamThreads` as > >>>>>>>> `KafkaStreams#localThreadsMetadata`? > >>>>>>>> > >>>>>>>> 3): this is a minor suggestion about function name of > >>>>>>>> `assignedPartitions`, to `topicPartitions` to be consistent > >>>> with > >>>>>>>> `StreamsMetadata`? > >>>>>>>> > >>>>>>>> > >>>>>>>> Guozhang > >>>>>>>> > >>>>>>>> On Thu, Mar 23, 2017 at 4:30 PM, Matthias J. Sax > >>>>>>>> <matth...@confluent.io <mailto:matth...@confluent.io>> > wrote: > >>>>>>>> > >>>>>>>> Thanks for the progress on this KIP. I think we are on > the > >>>>>>>> right path! > >>>>>>>> > >>>>>>>> Couple of comments/questions: > >>>>>>>> > >>>>>>>> (1) Why do we not consider the "rejected alternative" > to > >>>> add > >>>>>>>> the method > >>>>>>>> to KafkaStreams? The comment on #streamThreads() says: > >>>>>>>> > >>>>>>>> "Note this method will return <code>null</code> if > called > >>>> on > >>>>>>>> {@link > >>>>>>>> StreamsMetadata} which represent a remote application." > >>>>>>>> > >>>>>>>> Thus, if we cannot get any remote metadata, it seems > not > >>>>>>>> straight > >>>>>>>> forward to not add it to KafkaStreams directly -- this > >>>> would > >>>>>>>> avoid > >>>>>>>> invalid calls and `null` return value in the first > place. > >>>>>>>> > >>>>>>>> I like the idea about exposing sub-topologies.: > >>>>>>>> > >>>>>>>> (2a) I would recommend to rename `topicsGroupId` to > >>>>>>>> `subTopologyId` :) > >>>>>>>> > >>>>>>>> (2b) We could add this to KIP-120 already. However, I > >>>> would > >>>>>>>> not just > >>>>>>>> link both via name, but leverage KIP-120 directly, and > >>>> add a > >>>>>>>> "Subtopology" member to the TaskMetadata class. > >>>>>>>> > >>>>>>>> > >>>>>>>> Overall, I like the distinction of KIP-120 only > exposing > >>>>>>>> "static" > >>>>>>>> information that can be determined before the topology > >>>> get's > >>>>>>>> started, > >>>>>>>> while this KIP allow to access runtime information. > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> > >>>>>>>> On 3/22/17 12:42 PM, Guozhang Wang wrote: > >>>>>>>>> Thanks for the updated KIP, and sorry for the late > >>>>>> replies! > >>>>>>>>> > >>>>>>>>> I think a little bit more about KIP-130, and I feel that > >>>>>>>> if we are going > >>>>>>>>> to deprecate the `toString` function (it is not > >>>> explicitly > >>>>>>>> said in the > >>>>>>>>> KIP, so I'm not sure if you plan to still keep the > >>>>>>>>> `KafkaStreams#toString` as is or are going to replace it > >>>>>>>> with the > >>>>>>>>> proposed APIs) with the proposed ones, it may be okay. > >>>>>> More > >>>>>>>>> specifically, after both KIP-120 and KIP-130: > >>>>>>>>> > >>>>>>>>> 1. users can use `#describe` function to check the > >>>>>>>> generated topology > >>>>>>>>> before calling `KafkaStreams#start`, which is static > >>>>>>>> information. > >>>>>>>>> 2. users can use the `StreamsMetadata -> ThreadMetadata > >>>> -> > >>>>>>>> TaskMetadata` > >>>>>>>>> programmatically after called `KafkaStreams#start` to > >>>> get > >>>>>> the > >>>>>>>>> dynamically changeable information. > >>>>>>>>> > >>>>>>>>> One thing I'm still not sure though, is that in > >>>>>>>> `TaskMetadata` we only > >>>>>>>>> have the TaskId and assigned partitions, whereas in > >>>>>>>>> "TopologyDescription" introduced in KIP-120, it will > >>>>>>>> simply describe the > >>>>>>>>> whole topology possibly composed of multiple > >>>>>>>> sub-topologies. So it is > >>>>>>>>> hard for users to tell which sub-topology is executed > >>>>>>>> under which task > >>>>>>>>> on-the-fly. > >>>>>>>>> > >>>>>>>>> Hence I'm thinking if we can expose the > >>>> "sub-topology-id" > >>>>>>>> (named as > >>>>>>>>> topicsGroupId internally) in > >>>>>>>> TopologyDescription#Subtopology, and then > >>>>>>>>> from the task id which is essentially "sub-topology-id > >>>>>> DASH > >>>>>>>>> partition-group-id" users can make the link, though it > >>>> is > >>>>>>>> still not that > >>>>>>>>> straight-forward. > >>>>>>>>> > >>>>>>>>> Thoughts? > >>>>>>>>> > >>>>>>>>> Guozhang > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Wed, Mar 15, 2017 at 3:16 PM, Florian Hussonnois > >>>>>>>>> <fhussonn...@gmail.com <mailto:fhussonn...@gmail.com> > >>>>>>>> <mailto:fhussonn...@gmail.com > >>>>>>>> <mailto:fhussonn...@gmail.com>>> wrote: > >>>>>>>>> > >>>>>>>>> Thanks Guozhang for pointing me to the KIP-120. > >>>>>>>>> > >>>>>>>>> I've made some modifications to the KIP. I also > >>>>>> proposed a new PR > >>>>>>>>> (there is > >>>>>>>>> still some tests to make). > >>>>>>>>> > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+ > >>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API > >>>>>>>> < > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+ > >>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API > >>>>>>> > >>>>>>>>> < > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+ > >>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API > >>>>>>>> < > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+ > >>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API > >>>>>>>> > >>>>>>>>> > >>>>>>>>> Exposing consumed offsets through JMX is sufficient > >>>>>> for debugging > >>>>>>>>> purpose. > >>>>>>>>> But I think this could be part to another JIRA as > >>>>>> there is no impact to > >>>>>>>>> public API. > >>>>>>>>> > >>>>>>>>> Thanks > >>>>>>>>> > >>>>>>>>> 2017-03-10 22:35 GMT+01:00 Guozhang Wang < > >>>>>> wangg...@gmail.com <mailto:wangg...@gmail.com> > >>>>>>>>> <mailto:wangg...@gmail.com <mailto: > >>>> wangg...@gmail.com > >>>>>>>>> : > >>>>>>>> > >>>>>>>>> > >>>>>>>>>> Hello Florian, > >>>>>>>>>> > >>>>>>>>>> As for programmatically discover monitoring data > >>>> by > >>>>>>>> piping metrics > >>>>>>>>> into a > >>>>>>>>>> dedicated topic. I think you can actually use a > >>>>>>>>> KafkaMetricsReporter which > >>>>>>>>>> pipes the polled metric values into a pre-defined > >>>>>>>> topic (note that > >>>>>>>>> in Kafka > >>>>>>>>>> the MetricsReporter is simply an interface and > >>>> users > >>>>>>>> can build > >>>>>>>>> their own > >>>>>>>>>> impl in addition to the JMXReporter), for example > >>>> : > >>>>>>>>>> > >>>>>>>>>> https://github.com/krux/kafka-metrics-reporter > >>>>>>>> <https://github.com/krux/kafka-metrics-reporter> > >>>>>>>>> <https://github.com/krux/kafka-metrics-reporter > >>>>>>>> <https://github.com/krux/kafka-metrics-reporter>> > >>>>>>>>>> > >>>>>>>>>> As for the "static task-level assignment", what I > >>>>>>>> meant is that > >>>>>>>>> the mapping > >>>>>>>>>> from source-topic-partitions -> tasks are static, > >>>>>>>> via the > >>>>>>>>>> "PartitionGrouper", and a task won't switch from > >>>> an > >>>>>>>> active task to a > >>>>>>>>>> standby task, it is actually that an active task > >>>>>>>> could be > >>>>>>>>> migrated, as a > >>>>>>>>>> whole along with all its assigned partitions, to > >>>>>>>> another thread / > >>>>>>>>> process > >>>>>>>>>> and a new standby task will be created on the host > >>>>>>>> that this > >>>>>>>>> active task is > >>>>>>>>>> migrating from. So for the SAME task, its > >>>>>> taskMetadata. > >>>>>>>>>> assignedPartitions() > >>>>>>>>>> will always return you the same partitions. > >>>>>>>>>> > >>>>>>>>>> As for the `toString` function that what we have > >>>>>>>> today, I feel it > >>>>>>>>> has some > >>>>>>>>>> correlations with KIP-120 so I'm trying to > >>>>>>>> coordinate some > >>>>>>>>> discussions here > >>>>>>>>>> (cc'ing Matthias as the owner of KIP-120). My > >>>>>>>> understand is that: > >>>>>>>>>> > >>>>>>>>>> 1. In KIP-120, the `toString` function of > >>>>>>>> `KafkaStreams` will be > >>>>>>>>> removed > >>>>>>>>>> and instead the `Topology#describe` function will > >>>> be > >>>>>>>> introduced > >>>>>>>>> for users > >>>>>>>>>> to debug the topology BEFORE start running their > >>>>>>>> instance with the > >>>>>>>>>> topology. And hence the description won't contain > >>>>>>>> any task > >>>>>>>>> information as > >>>>>>>>>> they are not formed yet. > >>>>>>>>>> 2. In KIP-130, we want to add the task-level > >>>>>>>> information for > >>>>>>>>> monitoring > >>>>>>>>>> purposes, which is not static and can only be > >>>>>>>> captured AFTER the > >>>>>>>>> instance > >>>>>>>>>> has started running. Again I'm wondering for > >>>> KIP-130 > >>>>>>>> alone if > >>>>>>>>> adding those > >>>>>>>>>> metrics mentioned in my previous email would > >>>> suffice > >>>>>>>> even for the > >>>>>>>>> use case > >>>>>>>>>> that you have mentioned. > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> Guozhang > >>>>>>>>>> > >>>>>>>>>> On Wed, Mar 8, 2017 at 3:18 PM, Florian Hussonnois > >>>>>>>>> <fhussonn...@gmail.com <mailto: > >>>> fhussonn...@gmail.com> > >>>>>>>> <mailto:fhussonn...@gmail.com <mailto: > >>>> fhussonn...@gmail.com > >>>>>>>>> > >>>>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi Guozhang > >>>>>>>>>>> > >>>>>>>>>>> Thank you for your feedback. I've started to > >>>> look > >>>>>>>> more deeply > >>>>>>>>> into the > >>>>>>>>>>> code. As you mention, it would be more clever to > >>>>>>>> use the current > >>>>>>>>>>> StreamMetadata API to expose these information. > >>>>>>>>>>> > >>>>>>>>>>> I think exposing metrics through JMX is great > >>>> for > >>>>>>>> building > >>>>>>>>> monitoring > >>>>>>>>>>> dashboards using some tools like jmxtrans and > >>>>>> grafana. > >>>>>>>>>>> But for our use case we would like to expose the > >>>>>>>> states > >>>>>>>>> directely from > >>>>>>>>>> the > >>>>>>>>>>> application embedding the kstreams topologies. > >>>>>>>>>>> So we expect to be able to retrieve states in a > >>>>>>>> programmatic way. > >>>>>>>>>>> > >>>>>>>>>>> For instance, we could imagin to produce those > >>>>>>>> states into a > >>>>>>>>> dedicated > >>>>>>>>>>> topic. In that way a third application could > >>>>>>>> automatically > >>>>>>>>> discover all > >>>>>>>>>>> kafka-streams applications which could be > >>>>>> monitored. > >>>>>>>>>>> In production environment, that can be clearly a > >>>>>>>> solution to have a > >>>>>>>>>>> complete overview of a microservices > >>>> architecture > >>>>>>>> based on Kafka > >>>>>>>>> Streams. > >>>>>>>>>>> > >>>>>>>>>>> The toString() method give a lots of information > >>>>>>>> it can only be > >>>>>>>>> used for > >>>>>>>>>>> debugging purpose but not to build a topologies > >>>>>>>> visualization > >>>>>>>>> tool. We > >>>>>>>>>>> could actually expose same details about the > >>>>>>>> stream topology > >>>>>>>>> from the > >>>>>>>>>>> StreamMetadata API ? So the TaskMetadata class > >>>> you > >>>>>>>> have > >>>>>>>>> suggested could > >>>>>>>>>>> contains similar information that ones return by > >>>>>>>> the toString > >>>>>>>>> method from > >>>>>>>>>>> AbstractTask class ? > >>>>>>>>>>> > >>>>>>>>>>> I can update the KIP in that way. > >>>>>>>>>>> > >>>>>>>>>>> Finally, I'm not sure to understand your last > >>>>>>>> point :* "Note > >>>>>>>>> that the > >>>>>>>>>>> task-level assignment information is static, > >>>> i.e. > >>>>>>>> it will not change > >>>>>>>>>> during > >>>>>>>>>>> the runtime" * > >>>>>>>>>>> > >>>>>>>>>>> Does that mean when a rebalance occurs new tasks > >>>>>>>> are created for > >>>>>>>>> the new > >>>>>>>>>>> assignments and old ones just switch to a > >>>> standby > >>>>>>>> state ? > >>>>>>>>>>> > >>>>>>>>>>> Thanks, > >>>>>>>>>>> > >>>>>>>>>>> 2017-03-05 7:04 GMT+01:00 Guozhang Wang > >>>>>>>> <wangg...@gmail.com <mailto:wangg...@gmail.com> > >>>>>>>>> <mailto:wangg...@gmail.com <mailto: > >>>> wangg...@gmail.com > >>>>>>>>> : > >>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>>> Hello Florian, > >>>>>>>>>>>> > >>>>>>>>>>>> Thanks for the KIP and your detailed > >>>> explanation > >>>>>>>> of your use > >>>>>>>>> case. I > >>>>>>>>>>> think > >>>>>>>>>>>> there are two dimensions to discuss on how to > >>>>>>>> improve Streams' > >>>>>>>>>>>> debuggability (or more specifically state > >>>>>>>> exposure for > >>>>>>>>> visualization). > >>>>>>>>>>>> > >>>>>>>>>>>> First question is "what information should we > >>>>>>>> expose to the > >>>>>>>>> user". From > >>>>>>>>>>>> your KIP I saw generally three categories: > >>>>>>>>>>>> > >>>>>>>>>>>> 1. The state of the thread within a process, > >>>> as > >>>>>>>> you mentioned > >>>>>>>>> currently > >>>>>>>>>>> we > >>>>>>>>>>>> only expose the state of the process but not > >>>> the > >>>>>>>> finer grained > >>>>>>>>>> per-thread > >>>>>>>>>>>> state. > >>>>>>>>>>>> 2. The state of the task. Currently the most > >>>>>>>> close API to this is > >>>>>>>>>>>> StreamsMetadata, > >>>>>>>>>>>> however it aggregates the tasks across all > >>>>>>>> threads and only > >>>>>>>>> present the > >>>>>>>>>>>> aggregated set of the assigned partitions / > >>>>>>>> state stores etc. > >>>>>>>>> We can > >>>>>>>>>>>> consider extending this method to have a new > >>>>>>>>> StreamsMetadata#tasks() > >>>>>>>>>>> which > >>>>>>>>>>>> returns a TaskMetadata with the similar > >>>> fields, > >>>>>>>> and the > >>>>>>>>>>>> StreamsMetadata.stateStoreNames / etc would > >>>>>>>> still be returning the > >>>>>>>>>>>> aggregated results but users can still "drill > >>>>>>>> down" if they want. > >>>>>>>>>>>> > >>>>>>>>>>>> The second question is "how should we expose > >>>>>>>> them to the > >>>>>>>>> user". For > >>>>>>>>>>>> example, you mentioned about > >>>>>>>> consumedOffsetsByPartition in the > >>>>>>>>>>> activeTasks. > >>>>>>>>>>>> We could add this as a JMX metric based on > >>>> fetch > >>>>>>>> positions > >>>>>>>>> inside the > >>>>>>>>>>>> consumer layer (note that Streams is just > >>>>>>>> embedding consumers) > >>>>>>>>> or we > >>>>>>>>>>> could > >>>>>>>>>>>> consider adding it into TaskMetadata. Either > >>>>>>>> case it can be > >>>>>>>>> visualized > >>>>>>>>>>> for > >>>>>>>>>>>> monitoring. The reason we expose > >>>> StreamsMetadata > >>>>>>>> as well as > >>>>>>>>> State was > >>>>>>>>>>> that > >>>>>>>>>>>> it is expected to be "polled" in a > >>>> programmatic > >>>>>>>> way for > >>>>>>>>> interactive > >>>>>>>>>>> queries > >>>>>>>>>>>> and also for control flows (e.g. I would like > >>>> to > >>>>>>>> ONLY start > >>>>>>>>> running my > >>>>>>>>>>>> other topology until the first topology has > >>>> been > >>>>>>>> up and > >>>>>>>>> running) while > >>>>>>>>>>> for > >>>>>>>>>>>> your case it seems the main purpose is to > >>>>>>>> continuously query > >>>>>>>>> them for > >>>>>>>>>>>> monitoring etc. Personally I'd prefer to > >>>> expose > >>>>>>>> them as JMX > >>>>>>>>> only for > >>>>>>>>>> such > >>>>>>>>>>>> purposes only to have a simpler API. > >>>>>>>>>>>> > >>>>>>>>>>>> So given your current motivations I'd suggest > >>>>>>>> expose the following > >>>>>>>>>>>> information as newly added metrics in Streams: > >>>>>>>>>>>> > >>>>>>>>>>>> 1. Thread-level state metric. > >>>>>>>>>>>> 2. Task-level hosted client identifier metric > >>>>>>>> (e.g. host:port). > >>>>>>>>>>>> 3. Consumer-level per-topic/partition position > >>>>>>>> metric ( > >>>>>>>>>>>> > >>>>>>>> > >>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring > >>>>>>>> < > >>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring> > >>>>>>>>> > >>>>>>>> < > >>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring > >>>>>>>> < > >>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring>>). > >>>>>>>>>>>> > >>>>>>>>>>>> Note that the task-level assignment > >>>> information > >>>>>> is static, > >>>>>>>>> i.e. it will > >>>>>>>>>>> not > >>>>>>>>>>>> change during the runtime at all and can be > >>>>>> accessed from the > >>>>>>>>>>> `toString()` > >>>>>>>>>>>> function already even before the instance > >>>> start > >>>>>> running, so I > >>>>>>>>> think > >>>>>>>>>> this > >>>>>>>>>>>> piece of information do not need to be exposed > >>>>>> through JMX > >>>>>>>>> anymore. > >>>>>>>>>>>> > >>>>>>>>>>>> WDYT? > >>>>>>>>>>>> > >>>>>>>>>>>> Guozhang > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> On Thu, Mar 2, 2017 at 3:11 AM, Damian Guy > >>>>>>>>> <damian....@gmail.com <mailto:damian....@gmail.com> > >>>>>>>> <mailto:damian....@gmail.com <mailto: > damian....@gmail.com > >>>>>>>>> > >>>>>>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>>>> Hi Florian, > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for the KIP. > >>>>>>>>>>>>> > >>>>>>>>>>>>> It seems there is some overlap here with > >>>> what > >>>>>> we already have in > >>>>>>>>>>>>> KafkaStreams.allMetadata(). This currently > >>>>>> returns a > >>>>>>>>>>>>> Collection<StreamsMetadata> where each > >>>>>> StreamsMetadata > >>>>>>>>> instance holds > >>>>>>>>>>> the > >>>>>>>>>>>>> state stores and partition assignment for > >>>>>> every instance of the > >>>>>>>>>>>>> KafkaStreams application. I'm wondering if > >>>>>> that is good > >>>>>>>>> enough for > >>>>>>>>>> what > >>>>>>>>>>>> you > >>>>>>>>>>>>> are trying to achieve? If not could it be > >>>>>> modified to > >>>>>>>>> include the per > >>>>>>>>>>>>> Thread assignment? > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>> Damian > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> On Wed, 1 Mar 2017 at 22:49 Florian > >>>>>> Hussonnois < > >>>>>>>>>> fhussonn...@gmail.com <mailto: > >>>> fhussonn...@gmail.com > >>>>>>> > >>>>>>>> <mailto:fhussonn...@gmail.com <mailto: > >>>> fhussonn...@gmail.com > >>>>>>>>> > >>>>>>>> > >>>>>>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>>> Hi Matthias, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> First, I will answer to your last > >>>> question. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> The main reason to have both > >>>>>>>> TaskState#assignment and > >>>>>>>>>>>>>> TaskState#consumedOffsetsByPartition is > >>>> that > >>>>>>>> tasks have no > >>>>>>>>>> consumed > >>>>>>>>>>>>> offsets > >>>>>>>>>>>>>> until at least one message is consumed for > >>>>>>>> each partition > >>>>>>>>> even if > >>>>>>>>>>>>> previous > >>>>>>>>>>>>>> offsets exist for the consumer group. > >>>>>>>>>>>>>> So yes this methods are redundant as it > >>>> only > >>>>>>>> diverge at > >>>>>>>>> application > >>>>>>>>>>>>>> startup. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> About the use case, currently we are > >>>>>>>> developping for a > >>>>>>>>> customer a > >>>>>>>>>>>> little > >>>>>>>>>>>>>> framework based on KafkaStreams which > >>>>>>>>> transform/denormalize data > >>>>>>>>>>> before > >>>>>>>>>>>>>> ingesting into hadoop. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> We have a cluster of workers (SpringBoot) > >>>>>>>> which instantiate > >>>>>>>>>> KStreams > >>>>>>>>>>>>>> topologies dynamicaly based on dataflow > >>>>>>>> configurations. > >>>>>>>>>>>>>> Each configuration describes a topic to > >>>>>>>> consume and how to > >>>>>>>>> process > >>>>>>>>>>>>> messages > >>>>>>>>>>>>>> (this looks like NiFi processors API). > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Our architecture is inspired from > >>>>>>>> KafkaConnect. We have > >>>>>>>>> topics for > >>>>>>>>>>>>> configs > >>>>>>>>>>>>>> and states which are consumed by each > >>>>>>>> workers (actually we > >>>>>>>>> have > >>>>>>>>>>> reused > >>>>>>>>>>>>> some > >>>>>>>>>>>>>> internals classes to the connect API). > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Now, we would like to develop UIs to > >>>>>>>> visualize topics and > >>>>>>>>>> partitions > >>>>>>>>>>>>>> consumed by our worker applications. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Also, I think it would be nice to be able, > >>>>>>>> in the futur, to > >>>>>>>>>> develop > >>>>>>>>>>>> web > >>>>>>>>>>>>>> UIs similar to Spark but for KafkaStreams > >>>> to > >>>>>>>> visualize > >>>>>>>>> DAGs...so > >>>>>>>>>>> maybe > >>>>>>>>>>>>> this > >>>>>>>>>>>>>> KIP is just a first step. > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> 2017-03-01 22:52 GMT+01:00 Matthias J. Sax > >>>>>>>>> <matth...@confluent.io <mailto: > >>>> matth...@confluent.io> > >>>>>>>> <mailto:matth...@confluent.io <mailto: > >>>> matth...@confluent.io > >>>>>>>> > >>>>>>>> > >>>>>>>>>>> : > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Thanks for the KIP. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> I am wondering a little bit, why you > >>>> need > >>>>>>>> to expose this > >>>>>>>>>>> information. > >>>>>>>>>>>>>>> Can you describe some use cases? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Would it be worth to unify this new API > >>>>>> with > >>>>>>>>> KafkaStreams#state() > >>>>>>>>>>> to > >>>>>>>>>>>>> get > >>>>>>>>>>>>>>> the overall state of an application > >>>>>>>> without the need to > >>>>>>>>> call two > >>>>>>>>>>>>>>> different methods? Not sure how this > >>>>>>>> unified API might > >>>>>>>>> look like > >>>>>>>>>>>>> though. > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> One minor comment about the API: > >>>>>>>> TaskState#assignment > >>>>>>>>> seems to be > >>>>>>>>>>>>>>> redundant. It should be the same as > >>>>>>>>>>>>>>> > >>>>>> TaskState#consumedOffsetsByPartition.keySet() > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> Or do I miss something? > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> -Matthias > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> On 3/1/17 5:19 AM, Florian Hussonnois > >>>>>> wrote: > >>>>>>>>>>>>>>>> Hi Eno, > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> Yes, but the state() method only > >>>> returns > >>>>>>>> the global > >>>>>>>>> state of > >>>>>>>>>> the > >>>>>>>>>>>>>>>> KafkaStream application (ie: CREATED, > >>>>>>>> RUNNING, > >>>>>>>>> REBALANCING, > >>>>>>>>>>>>>>>> PENDING_SHUTDOWN, NOT_RUNNING). > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> An alternative to this KIP would be to > >>>>>>>> change this > >>>>>>>>> method to > >>>>>>>>>>> return > >>>>>>>>>>>>>> more > >>>>>>>>>>>>>>>> information instead of adding a new > >>>>>> method. > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> 2017-03-01 13:46 GMT+01:00 Eno > >>>> Thereska > >>>>>> < > >>>>>>>>>> eno.there...@gmail.com > >>>>>>>> <mailto:eno.there...@gmail.com> > >>>>>>>> <mailto:eno.there...@gmail.com <mailto: > >>>>>> eno.there...@gmail.com>> > >>>>>>>>>>>> : > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks Florian, > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Have you had a chance to look at the > >>>>>> new state methods in > >>>>>>>>>>> 0.10.2, > >>>>>>>>>>>>>> e.g., > >>>>>>>>>>>>>>>>> KafkaStreams.state()? > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> Thanks > >>>>>>>>>>>>>>>>> Eno > >>>>>>>>>>>>>>>>>> On 1 Mar 2017, at 11:54, Florian > >>>>>> Hussonnois < > >>>>>>>>>>>> fhussonn...@gmail.com > >>>>>>>> <mailto:fhussonn...@gmail.com> <mailto: > >>>>>> fhussonn...@gmail.com > >>>>>>>> <mailto:fhussonn...@gmail.com>> > >>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> wrote: > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Hi all, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> I have just created KIP-130 to add a > >>>>>>>> new method to the > >>>>>>>>>>>> KafkaStreams > >>>>>>>>>>>>>> API > >>>>>>>>>>>>>>>>> in > >>>>>>>>>>>>>>>>>> order to expose the states of > >>>> threads > >>>>>>>> and active tasks. > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>> > >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+ > >>>>>>>> <https://cwiki.apache.org/ > confluence/display/KAFKA/KIP+> > >>>>>>>>> > >>>>>>>> <https://cwiki.apache.org/ > confluence/display/KAFKA/KIP+ > >>>>>>>> <https://cwiki.apache.org/ > confluence/display/KAFKA/KIP+>> > >>>>>>>>>>>>>>>>> > >>>>>>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+ > >>>>>>>>>> public+API > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> Thanks, > >>>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>> Florian HUSSONNOIS > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> -- > >>>>>>>>>>>>>> Florian HUSSONNOIS > >>>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> -- > >>>>>>>>>>>> -- Guozhang > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> -- > >>>>>>>>>>> Florian HUSSONNOIS > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -- > >>>>>>>>>> -- Guozhang > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> Florian HUSSONNOIS > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> -- Guozhang > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> -- > >>>>>>>> -- Guozhang > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> -- > >>>>>>>> Florian HUSSONNOIS > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>>> -- > >>>>>>>> Florian HUSSONNOIS > >>>>>>> > >>>>>> > >>>>>> > >>>>> > >>>> > >>>> > >>> > >>> > >> > > > > -- -- Guozhang