Hi Florian, Thanks for the updates. The KIP is looking good.
Cheers, Damian On Fri, 7 Apr 2017 at 22:41 Matthias J. Sax <matth...@confluent.io> wrote: > What about KafkaStreams#toString() method? > > I think, we want to deprecate it as with KIP-120 and the changes of this > KIP, is gets obsolete. > > If we do so, please update the KIP accordingly. > > > -Matthias > > On 3/28/17 7:00 PM, Matthias J. Sax wrote: > > Thanks for updating the KIP! > > > > I think it's good as is -- I would not add anything more to TaskMetadata. > > > > About subtopologies and tasks. We do have the concept of subtopologies > > already in KIP-120. It's only missing and ID that allow to link a > > subtopology to a task. > > > > IMHO, adding a simple variable to `Subtopoloy` that provide the id > > should be sufficient. We can simply document in the JavaDocs how > > Subtopology and TaskMetadata can be linked to each other. > > > > I did update KIP-120 accordingly. > > > > > > -Matthias > > > > On 3/28/17 3:45 PM, Florian Hussonnois wrote: > >> Hi all, > >> > >> I've updated the KIP and the PR to reflect your suggestions. > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API > >> https://github.com/apache/kafka/pull/2612 > >> > >> Also, I've exposed property StreamThread#state as a string through the > >> new class ThreadMetadata. > >> > >> Thanks, > >> > >> 2017-03-27 23:40 GMT+02:00 Florian Hussonnois <fhussonn...@gmail.com > >> <mailto:fhussonn...@gmail.com>>: > >> > >> Hi Guozhang, Matthias, > >> > >> It's a great idea to add sub topologies descriptions. This would > >> help developers to better understand topology concept. > >> > >> I agree that is not really user-friendly to check if > >> `StreamsMetadata#streamThreads` is not returning null. > >> > >> The method name localThreadsMetadata looks good. In addition, it's > >> more simple to build ThreadMetadata instances from the `StreamTask` > >> class than from `StreamPartitionAssignor` class. > >> > >> I will work on modifications. As I understand, I have to add the > >> property subTopologyId property to the TaskMetadata class - Am I > right ? > >> > >> Thanks, > >> > >> 2017-03-26 0:25 GMT+01:00 Guozhang Wang <wangg...@gmail.com > >> <mailto:wangg...@gmail.com>>: > >> > >> Re 1): this is a good point. May be we can move > >> `StreamsMetadata#streamThreads` as > >> `KafkaStreams#localThreadsMetadata`? > >> > >> 3): this is a minor suggestion about function name of > >> `assignedPartitions`, to `topicPartitions` to be consistent with > >> `StreamsMetadata`? > >> > >> > >> Guozhang > >> > >> On Thu, Mar 23, 2017 at 4:30 PM, Matthias J. Sax > >> <matth...@confluent.io <mailto:matth...@confluent.io>> wrote: > >> > >> Thanks for the progress on this KIP. I think we are on the > >> right path! > >> > >> Couple of comments/questions: > >> > >> (1) Why do we not consider the "rejected alternative" to add > >> the method > >> to KafkaStreams? The comment on #streamThreads() says: > >> > >> "Note this method will return <code>null</code> if called on > >> {@link > >> StreamsMetadata} which represent a remote application." > >> > >> Thus, if we cannot get any remote metadata, it seems not > >> straight > >> forward to not add it to KafkaStreams directly -- this would > >> avoid > >> invalid calls and `null` return value in the first place. > >> > >> I like the idea about exposing sub-topologies.: > >> > >> (2a) I would recommend to rename `topicsGroupId` to > >> `subTopologyId` :) > >> > >> (2b) We could add this to KIP-120 already. However, I would > >> not just > >> link both via name, but leverage KIP-120 directly, and add a > >> "Subtopology" member to the TaskMetadata class. > >> > >> > >> Overall, I like the distinction of KIP-120 only exposing > >> "static" > >> information that can be determined before the topology get's > >> started, > >> while this KIP allow to access runtime information. > >> > >> > >> > >> -Matthias > >> > >> > >> On 3/22/17 12:42 PM, Guozhang Wang wrote: > >> > Thanks for the updated KIP, and sorry for the late > replies! > >> > > >> > I think a little bit more about KIP-130, and I feel that > >> if we are going > >> > to deprecate the `toString` function (it is not explicitly > >> said in the > >> > KIP, so I'm not sure if you plan to still keep the > >> > `KafkaStreams#toString` as is or are going to replace it > >> with the > >> > proposed APIs) with the proposed ones, it may be okay. > More > >> > specifically, after both KIP-120 and KIP-130: > >> > > >> > 1. users can use `#describe` function to check the > >> generated topology > >> > before calling `KafkaStreams#start`, which is static > >> information. > >> > 2. users can use the `StreamsMetadata -> ThreadMetadata -> > >> TaskMetadata` > >> > programmatically after called `KafkaStreams#start` to get > the > >> > dynamically changeable information. > >> > > >> > One thing I'm still not sure though, is that in > >> `TaskMetadata` we only > >> > have the TaskId and assigned partitions, whereas in > >> > "TopologyDescription" introduced in KIP-120, it will > >> simply describe the > >> > whole topology possibly composed of multiple > >> sub-topologies. So it is > >> > hard for users to tell which sub-topology is executed > >> under which task > >> > on-the-fly. > >> > > >> > Hence I'm thinking if we can expose the "sub-topology-id" > >> (named as > >> > topicsGroupId internally) in > >> TopologyDescription#Subtopology, and then > >> > from the task id which is essentially "sub-topology-id > DASH > >> > partition-group-id" users can make the link, though it is > >> still not that > >> > straight-forward. > >> > > >> > Thoughts? > >> > > >> > Guozhang > >> > > >> > > >> > > >> > On Wed, Mar 15, 2017 at 3:16 PM, Florian Hussonnois > >> > <fhussonn...@gmail.com <mailto:fhussonn...@gmail.com> > >> <mailto:fhussonn...@gmail.com > >> <mailto:fhussonn...@gmail.com>>> wrote: > >> > > >> > Thanks Guozhang for pointing me to the KIP-120. > >> > > >> > I've made some modifications to the KIP. I also > proposed a new PR > >> > (there is > >> > still some tests to make). > >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API > >> < > https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API > > > >> > < > https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API > >> < > https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API > >> > >> > > >> > Exposing consumed offsets through JMX is sufficient > for debugging > >> > purpose. > >> > But I think this could be part to another JIRA as > there is no impact to > >> > public API. > >> > > >> > Thanks > >> > > >> > 2017-03-10 22:35 GMT+01:00 Guozhang Wang < > wangg...@gmail.com <mailto:wangg...@gmail.com> > >> > <mailto:wangg...@gmail.com <mailto:wangg...@gmail.com > >>>: > >> > >> > > >> > > Hello Florian, > >> > > > >> > > As for programmatically discover monitoring data by > >> piping metrics > >> > into a > >> > > dedicated topic. I think you can actually use a > >> > KafkaMetricsReporter which > >> > > pipes the polled metric values into a pre-defined > >> topic (note that > >> > in Kafka > >> > > the MetricsReporter is simply an interface and users > >> can build > >> > their own > >> > > impl in addition to the JMXReporter), for example : > >> > > > >> > > https://github.com/krux/kafka-metrics-reporter > >> <https://github.com/krux/kafka-metrics-reporter> > >> > <https://github.com/krux/kafka-metrics-reporter > >> <https://github.com/krux/kafka-metrics-reporter>> > >> > > > >> > > As for the "static task-level assignment", what I > >> meant is that > >> > the mapping > >> > > from source-topic-partitions -> tasks are static, > >> via the > >> > > "PartitionGrouper", and a task won't switch from an > >> active task to a > >> > > standby task, it is actually that an active task > >> could be > >> > migrated, as a > >> > > whole along with all its assigned partitions, to > >> another thread / > >> > process > >> > > and a new standby task will be created on the host > >> that this > >> > active task is > >> > > migrating from. So for the SAME task, its > taskMetadata. > >> > > assignedPartitions() > >> > > will always return you the same partitions. > >> > > > >> > > As for the `toString` function that what we have > >> today, I feel it > >> > has some > >> > > correlations with KIP-120 so I'm trying to > >> coordinate some > >> > discussions here > >> > > (cc'ing Matthias as the owner of KIP-120). My > >> understand is that: > >> > > > >> > > 1. In KIP-120, the `toString` function of > >> `KafkaStreams` will be > >> > removed > >> > > and instead the `Topology#describe` function will be > >> introduced > >> > for users > >> > > to debug the topology BEFORE start running their > >> instance with the > >> > > topology. And hence the description won't contain > >> any task > >> > information as > >> > > they are not formed yet. > >> > > 2. In KIP-130, we want to add the task-level > >> information for > >> > monitoring > >> > > purposes, which is not static and can only be > >> captured AFTER the > >> > instance > >> > > has started running. Again I'm wondering for KIP-130 > >> alone if > >> > adding those > >> > > metrics mentioned in my previous email would suffice > >> even for the > >> > use case > >> > > that you have mentioned. > >> > > > >> > > > >> > > Guozhang > >> > > > >> > > On Wed, Mar 8, 2017 at 3:18 PM, Florian Hussonnois > >> > <fhussonn...@gmail.com <mailto:fhussonn...@gmail.com> > >> <mailto:fhussonn...@gmail.com <mailto:fhussonn...@gmail.com > >>> > >> > >> > > wrote: > >> > > > >> > > > Hi Guozhang > >> > > > > >> > > > Thank you for your feedback. I've started to look > >> more deeply > >> > into the > >> > > > code. As you mention, it would be more clever to > >> use the current > >> > > > StreamMetadata API to expose these information. > >> > > > > >> > > > I think exposing metrics through JMX is great for > >> building > >> > monitoring > >> > > > dashboards using some tools like jmxtrans and > grafana. > >> > > > But for our use case we would like to expose the > >> states > >> > directely from > >> > > the > >> > > > application embedding the kstreams topologies. > >> > > > So we expect to be able to retrieve states in a > >> programmatic way. > >> > > > > >> > > > For instance, we could imagin to produce those > >> states into a > >> > dedicated > >> > > > topic. In that way a third application could > >> automatically > >> > discover all > >> > > > kafka-streams applications which could be > monitored. > >> > > > In production environment, that can be clearly a > >> solution to have a > >> > > > complete overview of a microservices architecture > >> based on Kafka > >> > Streams. > >> > > > > >> > > > The toString() method give a lots of information > >> it can only be > >> > used for > >> > > > debugging purpose but not to build a topologies > >> visualization > >> > tool. We > >> > > > could actually expose same details about the > >> stream topology > >> > from the > >> > > > StreamMetadata API ? So the TaskMetadata class you > >> have > >> > suggested could > >> > > > contains similar information that ones return by > >> the toString > >> > method from > >> > > > AbstractTask class ? > >> > > > > >> > > > I can update the KIP in that way. > >> > > > > >> > > > Finally, I'm not sure to understand your last > >> point :* "Note > >> > that the > >> > > > task-level assignment information is static, i.e. > >> it will not change > >> > > during > >> > > > the runtime" * > >> > > > > >> > > > Does that mean when a rebalance occurs new tasks > >> are created for > >> > the new > >> > > > assignments and old ones just switch to a standby > >> state ? > >> > > > > >> > > > Thanks, > >> > > > > >> > > > 2017-03-05 7:04 GMT+01:00 Guozhang Wang > >> <wangg...@gmail.com <mailto:wangg...@gmail.com> > >> > <mailto:wangg...@gmail.com <mailto:wangg...@gmail.com > >>>: > >> > >> > > > > >> > > > > Hello Florian, > >> > > > > > >> > > > > Thanks for the KIP and your detailed explanation > >> of your use > >> > case. I > >> > > > think > >> > > > > there are two dimensions to discuss on how to > >> improve Streams' > >> > > > > debuggability (or more specifically state > >> exposure for > >> > visualization). > >> > > > > > >> > > > > First question is "what information should we > >> expose to the > >> > user". From > >> > > > > your KIP I saw generally three categories: > >> > > > > > >> > > > > 1. The state of the thread within a process, as > >> you mentioned > >> > currently > >> > > > we > >> > > > > only expose the state of the process but not the > >> finer grained > >> > > per-thread > >> > > > > state. > >> > > > > 2. The state of the task. Currently the most > >> close API to this is > >> > > > > StreamsMetadata, > >> > > > > however it aggregates the tasks across all > >> threads and only > >> > present the > >> > > > > aggregated set of the assigned partitions / > >> state stores etc. > >> > We can > >> > > > > consider extending this method to have a new > >> > StreamsMetadata#tasks() > >> > > > which > >> > > > > returns a TaskMetadata with the similar fields, > >> and the > >> > > > > StreamsMetadata.stateStoreNames / etc would > >> still be returning the > >> > > > > aggregated results but users can still "drill > >> down" if they want. > >> > > > > > >> > > > > The second question is "how should we expose > >> them to the > >> > user". For > >> > > > > example, you mentioned about > >> consumedOffsetsByPartition in the > >> > > > activeTasks. > >> > > > > We could add this as a JMX metric based on fetch > >> positions > >> > inside the > >> > > > > consumer layer (note that Streams is just > >> embedding consumers) > >> > or we > >> > > > could > >> > > > > consider adding it into TaskMetadata. Either > >> case it can be > >> > visualized > >> > > > for > >> > > > > monitoring. The reason we expose StreamsMetadata > >> as well as > >> > State was > >> > > > that > >> > > > > it is expected to be "polled" in a programmatic > >> way for > >> > interactive > >> > > > queries > >> > > > > and also for control flows (e.g. I would like to > >> ONLY start > >> > running my > >> > > > > other topology until the first topology has been > >> up and > >> > running) while > >> > > > for > >> > > > > your case it seems the main purpose is to > >> continuously query > >> > them for > >> > > > > monitoring etc. Personally I'd prefer to expose > >> them as JMX > >> > only for > >> > > such > >> > > > > purposes only to have a simpler API. > >> > > > > > >> > > > > So given your current motivations I'd suggest > >> expose the following > >> > > > > information as newly added metrics in Streams: > >> > > > > > >> > > > > 1. Thread-level state metric. > >> > > > > 2. Task-level hosted client identifier metric > >> (e.g. host:port). > >> > > > > 3. Consumer-level per-topic/partition position > >> metric ( > >> > > > > > >> > https://kafka.apache.org/documentation/#topic_fetch_monitoring > >> < > https://kafka.apache.org/documentation/#topic_fetch_monitoring> > >> > > >> < > https://kafka.apache.org/documentation/#topic_fetch_monitoring > >> < > https://kafka.apache.org/documentation/#topic_fetch_monitoring>>). > >> > > > > > >> > > > > Note that the task-level assignment information > is static, > >> > i.e. it will > >> > > > not > >> > > > > change during the runtime at all and can be > accessed from the > >> > > > `toString()` > >> > > > > function already even before the instance start > running, so I > >> > think > >> > > this > >> > > > > piece of information do not need to be exposed > through JMX > >> > anymore. > >> > > > > > >> > > > > WDYT? > >> > > > > > >> > > > > Guozhang > >> > > > > > >> > > > > > >> > > > > On Thu, Mar 2, 2017 at 3:11 AM, Damian Guy > >> > <damian....@gmail.com <mailto:damian....@gmail.com> > >> <mailto:damian....@gmail.com <mailto:damian....@gmail.com > >>> > >> > > wrote: > >> > > > > > >> > > > > > Hi Florian, > >> > > > > > > >> > > > > > Thanks for the KIP. > >> > > > > > > >> > > > > > It seems there is some overlap here with what > we already have in > >> > > > > > KafkaStreams.allMetadata(). This currently > returns a > >> > > > > > Collection<StreamsMetadata> where each > StreamsMetadata > >> > instance holds > >> > > > the > >> > > > > > state stores and partition assignment for > every instance of the > >> > > > > > KafkaStreams application. I'm wondering if > that is good > >> > enough for > >> > > what > >> > > > > you > >> > > > > > are trying to achieve? If not could it be > modified to > >> > include the per > >> > > > > > Thread assignment? > >> > > > > > > >> > > > > > Thanks, > >> > > > > > Damian > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > > > > On Wed, 1 Mar 2017 at 22:49 Florian > Hussonnois < > >> > > fhussonn...@gmail.com <mailto:fhussonn...@gmail.com > > > >> <mailto:fhussonn...@gmail.com <mailto:fhussonn...@gmail.com > >>> > >> > >> > > > > > wrote: > >> > > > > > > >> > > > > > > Hi Matthias, > >> > > > > > > > >> > > > > > > First, I will answer to your last question. > >> > > > > > > > >> > > > > > > The main reason to have both > >> TaskState#assignment and > >> > > > > > > TaskState#consumedOffsetsByPartition is that > >> tasks have no > >> > > consumed > >> > > > > > offsets > >> > > > > > > until at least one message is consumed for > >> each partition > >> > even if > >> > > > > > previous > >> > > > > > > offsets exist for the consumer group. > >> > > > > > > So yes this methods are redundant as it only > >> diverge at > >> > application > >> > > > > > > startup. > >> > > > > > > > >> > > > > > > About the use case, currently we are > >> developping for a > >> > customer a > >> > > > > little > >> > > > > > > framework based on KafkaStreams which > >> > transform/denormalize data > >> > > > before > >> > > > > > > ingesting into hadoop. > >> > > > > > > > >> > > > > > > We have a cluster of workers (SpringBoot) > >> which instantiate > >> > > KStreams > >> > > > > > > topologies dynamicaly based on dataflow > >> configurations. > >> > > > > > > Each configuration describes a topic to > >> consume and how to > >> > process > >> > > > > > messages > >> > > > > > > (this looks like NiFi processors API). > >> > > > > > > > >> > > > > > > Our architecture is inspired from > >> KafkaConnect. We have > >> > topics for > >> > > > > > configs > >> > > > > > > and states which are consumed by each > >> workers (actually we > >> > have > >> > > > reused > >> > > > > > some > >> > > > > > > internals classes to the connect API). > >> > > > > > > > >> > > > > > > Now, we would like to develop UIs to > >> visualize topics and > >> > > partitions > >> > > > > > > consumed by our worker applications. > >> > > > > > > > >> > > > > > > Also, I think it would be nice to be able, > >> in the futur, to > >> > > develop > >> > > > > web > >> > > > > > > UIs similar to Spark but for KafkaStreams to > >> visualize > >> > DAGs...so > >> > > > maybe > >> > > > > > this > >> > > > > > > KIP is just a first step. > >> > > > > > > > >> > > > > > > Thanks, > >> > > > > > > > >> > > > > > > 2017-03-01 22:52 GMT+01:00 Matthias J. Sax > >> > <matth...@confluent.io <mailto:matth...@confluent.io> > >> <mailto:matth...@confluent.io <mailto:matth...@confluent.io > >> > >> > >> > > >: > >> > > > > > > > >> > > > > > > > Thanks for the KIP. > >> > > > > > > > > >> > > > > > > > I am wondering a little bit, why you need > >> to expose this > >> > > > information. > >> > > > > > > > Can you describe some use cases? > >> > > > > > > > > >> > > > > > > > Would it be worth to unify this new API > with > >> > KafkaStreams#state() > >> > > > to > >> > > > > > get > >> > > > > > > > the overall state of an application > >> without the need to > >> > call two > >> > > > > > > > different methods? Not sure how this > >> unified API might > >> > look like > >> > > > > > though. > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > One minor comment about the API: > >> TaskState#assignment > >> > seems to be > >> > > > > > > > redundant. It should be the same as > >> > > > > > > > > TaskState#consumedOffsetsByPartition.keySet() > >> > > > > > > > > >> > > > > > > > Or do I miss something? > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > -Matthias > >> > > > > > > > > >> > > > > > > > On 3/1/17 5:19 AM, Florian Hussonnois > wrote: > >> > > > > > > > > Hi Eno, > >> > > > > > > > > > >> > > > > > > > > Yes, but the state() method only returns > >> the global > >> > state of > >> > > the > >> > > > > > > > > KafkaStream application (ie: CREATED, > >> RUNNING, > >> > REBALANCING, > >> > > > > > > > > PENDING_SHUTDOWN, NOT_RUNNING). > >> > > > > > > > > > >> > > > > > > > > An alternative to this KIP would be to > >> change this > >> > method to > >> > > > return > >> > > > > > > more > >> > > > > > > > > information instead of adding a new > method. > >> > > > > > > > > > >> > > > > > > > > 2017-03-01 13:46 GMT+01:00 Eno Thereska > < > >> > > eno.there...@gmail.com > >> <mailto:eno.there...@gmail.com> > >> <mailto:eno.there...@gmail.com <mailto: > eno.there...@gmail.com>> > >> > > > >: > >> > > > > > > > > > >> > > > > > > > >> Thanks Florian, > >> > > > > > > > >> > >> > > > > > > > >> Have you had a chance to look at the > new state methods in > >> > > > 0.10.2, > >> > > > > > > e.g., > >> > > > > > > > >> KafkaStreams.state()? > >> > > > > > > > >> > >> > > > > > > > >> Thanks > >> > > > > > > > >> Eno > >> > > > > > > > >>> On 1 Mar 2017, at 11:54, Florian > Hussonnois < > >> > > > > fhussonn...@gmail.com > >> <mailto:fhussonn...@gmail.com> <mailto: > fhussonn...@gmail.com > >> <mailto:fhussonn...@gmail.com>> > >> > >> > > > > > > > >> > > > > > > > >> wrote: > >> > > > > > > > >>> > >> > > > > > > > >>> Hi all, > >> > > > > > > > >>> > >> > > > > > > > >>> I have just created KIP-130 to add a > >> new method to the > >> > > > > KafkaStreams > >> > > > > > > API > >> > > > > > > > >> in > >> > > > > > > > >>> order to expose the states of threads > >> and active tasks. > >> > > > > > > > >>> > >> > > > > > > > >>> > >> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP+ > >> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+> > >> > > >> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+ > >> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+>> > >> > > > > > > > >> > >> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+ > >> > > public+API > >> > > > > > > > >>> > >> > > > > > > > >>> > >> > > > > > > > >>> Thanks, > >> > > > > > > > >>> > >> > > > > > > > >>> -- > >> > > > > > > > >>> Florian HUSSONNOIS > >> > > > > > > > >> > >> > > > > > > > >> > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > > > -- > >> > > > > > > Florian HUSSONNOIS > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > >> > > > > > >> > > > > -- > >> > > > > -- Guozhang > >> > > > > > >> > > > > >> > > > > >> > > > > >> > > > -- > >> > > > Florian HUSSONNOIS > >> > > > > >> > > > >> > > > >> > > > >> > > -- > >> > > -- Guozhang > >> > > > >> > > >> > > >> > > >> > -- > >> > Florian HUSSONNOIS > >> > > >> > > >> > > >> > > >> > -- > >> > -- Guozhang > >> > >> > >> > >> > >> -- > >> -- Guozhang > >> > >> > >> > >> > >> -- > >> Florian HUSSONNOIS > >> > >> > >> > >> > >> -- > >> Florian HUSSONNOIS > > > >