It would not give the same information as the new API. Thus, it would be inconsistent (and this would be really bad IMHO.)
I would really like to remove (ie, deprecate) it. It was a "hot fix" to give some runtime information to the user. But with this KIP, we get a proper first class API and thus .toString() is just outdated. >>>> It's always preferable to have an implementation for toString method. Not sure about this btw. Furthermore, if anyone wants to print runtime information, we got ThreadMetadata#toString() and TaskMetadata#toString(). -Matthias On 4/20/17 9:36 AM, Eno Thereska wrote: > Hi there, > > Can't toString() just stay as is? > > Thanks > Eno > >> On 20 Apr 2017, at 17:26, Matthias J. Sax <matth...@confluent.io> wrote: >> >> Florian, >> >> I am just wondering: if we keep .toString(), what should the >> implementation look like? >> >> >> -Matthias >> >> On 4/19/17 2:42 PM, Florian Hussonnois wrote: >>> Hi Matthias, >>> >>> So sorry for the delay in replying to you. For now, I think we can keep >>> KafkaStreams#toString() as it is. >>> It's always preferable to have an implementation for toString method. >>> >>> 2017-04-14 4:08 GMT+02:00 Matthias J. Sax <matth...@confluent.io>: >>> >>>> Florian, >>>> >>>>>>> What about KafkaStreams#toString() method? >>>>>>> >>>>>>> I think, we want to deprecate it as with KIP-120 and the changes of >>>> this >>>>>>> KIP, is gets obsolete. >>>> >>>> Any thoughts about this? For me, this is the last open point to discuss >>>> (or what should be reflected in the KIP in case you agree) before I can >>>> put my vote on the VOTE thread do did start already. >>>> >>>> -Matthias >>>> >>>> >>>> On 4/11/17 12:18 AM, Damian Guy wrote: >>>>> Hi Florian, >>>>> >>>>> Thanks for the updates. The KIP is looking good. >>>>> >>>>> Cheers, >>>>> Damian >>>>> >>>>> On Fri, 7 Apr 2017 at 22:41 Matthias J. Sax <matth...@confluent.io> >>>> wrote: >>>>> >>>>>> What about KafkaStreams#toString() method? >>>>>> >>>>>> I think, we want to deprecate it as with KIP-120 and the changes of this >>>>>> KIP, is gets obsolete. >>>>>> >>>>>> If we do so, please update the KIP accordingly. >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> On 3/28/17 7:00 PM, Matthias J. Sax wrote: >>>>>>> Thanks for updating the KIP! >>>>>>> >>>>>>> I think it's good as is -- I would not add anything more to >>>> TaskMetadata. >>>>>>> >>>>>>> About subtopologies and tasks. We do have the concept of subtopologies >>>>>>> already in KIP-120. It's only missing and ID that allow to link a >>>>>>> subtopology to a task. >>>>>>> >>>>>>> IMHO, adding a simple variable to `Subtopoloy` that provide the id >>>>>>> should be sufficient. We can simply document in the JavaDocs how >>>>>>> Subtopology and TaskMetadata can be linked to each other. >>>>>>> >>>>>>> I did update KIP-120 accordingly. >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> On 3/28/17 3:45 PM, Florian Hussonnois wrote: >>>>>>>> Hi all, >>>>>>>> >>>>>>>> I've updated the KIP and the PR to reflect your suggestions. >>>>>>>> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+ >>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API >>>>>>>> https://github.com/apache/kafka/pull/2612 >>>>>>>> >>>>>>>> Also, I've exposed property StreamThread#state as a string through the >>>>>>>> new class ThreadMetadata. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> 2017-03-27 23:40 GMT+02:00 Florian Hussonnois <fhussonn...@gmail.com >>>>>>>> <mailto:fhussonn...@gmail.com>>: >>>>>>>> >>>>>>>> Hi Guozhang, Matthias, >>>>>>>> >>>>>>>> It's a great idea to add sub topologies descriptions. This would >>>>>>>> help developers to better understand topology concept. >>>>>>>> >>>>>>>> I agree that is not really user-friendly to check if >>>>>>>> `StreamsMetadata#streamThreads` is not returning null. >>>>>>>> >>>>>>>> The method name localThreadsMetadata looks good. In addition, it's >>>>>>>> more simple to build ThreadMetadata instances from the >>>> `StreamTask` >>>>>>>> class than from `StreamPartitionAssignor` class. >>>>>>>> >>>>>>>> I will work on modifications. As I understand, I have to add the >>>>>>>> property subTopologyId property to the TaskMetadata class - Am I >>>>>> right ? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> 2017-03-26 0:25 GMT+01:00 Guozhang Wang <wangg...@gmail.com >>>>>>>> <mailto:wangg...@gmail.com>>: >>>>>>>> >>>>>>>> Re 1): this is a good point. May be we can move >>>>>>>> `StreamsMetadata#streamThreads` as >>>>>>>> `KafkaStreams#localThreadsMetadata`? >>>>>>>> >>>>>>>> 3): this is a minor suggestion about function name of >>>>>>>> `assignedPartitions`, to `topicPartitions` to be consistent >>>> with >>>>>>>> `StreamsMetadata`? >>>>>>>> >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> On Thu, Mar 23, 2017 at 4:30 PM, Matthias J. Sax >>>>>>>> <matth...@confluent.io <mailto:matth...@confluent.io>> wrote: >>>>>>>> >>>>>>>> Thanks for the progress on this KIP. I think we are on the >>>>>>>> right path! >>>>>>>> >>>>>>>> Couple of comments/questions: >>>>>>>> >>>>>>>> (1) Why do we not consider the "rejected alternative" to >>>> add >>>>>>>> the method >>>>>>>> to KafkaStreams? The comment on #streamThreads() says: >>>>>>>> >>>>>>>> "Note this method will return <code>null</code> if called >>>> on >>>>>>>> {@link >>>>>>>> StreamsMetadata} which represent a remote application." >>>>>>>> >>>>>>>> Thus, if we cannot get any remote metadata, it seems not >>>>>>>> straight >>>>>>>> forward to not add it to KafkaStreams directly -- this >>>> would >>>>>>>> avoid >>>>>>>> invalid calls and `null` return value in the first place. >>>>>>>> >>>>>>>> I like the idea about exposing sub-topologies.: >>>>>>>> >>>>>>>> (2a) I would recommend to rename `topicsGroupId` to >>>>>>>> `subTopologyId` :) >>>>>>>> >>>>>>>> (2b) We could add this to KIP-120 already. However, I >>>> would >>>>>>>> not just >>>>>>>> link both via name, but leverage KIP-120 directly, and >>>> add a >>>>>>>> "Subtopology" member to the TaskMetadata class. >>>>>>>> >>>>>>>> >>>>>>>> Overall, I like the distinction of KIP-120 only exposing >>>>>>>> "static" >>>>>>>> information that can be determined before the topology >>>> get's >>>>>>>> started, >>>>>>>> while this KIP allow to access runtime information. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> >>>>>>>> On 3/22/17 12:42 PM, Guozhang Wang wrote: >>>>>>>>> Thanks for the updated KIP, and sorry for the late >>>>>> replies! >>>>>>>>> >>>>>>>>> I think a little bit more about KIP-130, and I feel that >>>>>>>> if we are going >>>>>>>>> to deprecate the `toString` function (it is not >>>> explicitly >>>>>>>> said in the >>>>>>>>> KIP, so I'm not sure if you plan to still keep the >>>>>>>>> `KafkaStreams#toString` as is or are going to replace it >>>>>>>> with the >>>>>>>>> proposed APIs) with the proposed ones, it may be okay. >>>>>> More >>>>>>>>> specifically, after both KIP-120 and KIP-130: >>>>>>>>> >>>>>>>>> 1. users can use `#describe` function to check the >>>>>>>> generated topology >>>>>>>>> before calling `KafkaStreams#start`, which is static >>>>>>>> information. >>>>>>>>> 2. users can use the `StreamsMetadata -> ThreadMetadata >>>> -> >>>>>>>> TaskMetadata` >>>>>>>>> programmatically after called `KafkaStreams#start` to >>>> get >>>>>> the >>>>>>>>> dynamically changeable information. >>>>>>>>> >>>>>>>>> One thing I'm still not sure though, is that in >>>>>>>> `TaskMetadata` we only >>>>>>>>> have the TaskId and assigned partitions, whereas in >>>>>>>>> "TopologyDescription" introduced in KIP-120, it will >>>>>>>> simply describe the >>>>>>>>> whole topology possibly composed of multiple >>>>>>>> sub-topologies. So it is >>>>>>>>> hard for users to tell which sub-topology is executed >>>>>>>> under which task >>>>>>>>> on-the-fly. >>>>>>>>> >>>>>>>>> Hence I'm thinking if we can expose the >>>> "sub-topology-id" >>>>>>>> (named as >>>>>>>>> topicsGroupId internally) in >>>>>>>> TopologyDescription#Subtopology, and then >>>>>>>>> from the task id which is essentially "sub-topology-id >>>>>> DASH >>>>>>>>> partition-group-id" users can make the link, though it >>>> is >>>>>>>> still not that >>>>>>>>> straight-forward. >>>>>>>>> >>>>>>>>> Thoughts? >>>>>>>>> >>>>>>>>> Guozhang >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Mar 15, 2017 at 3:16 PM, Florian Hussonnois >>>>>>>>> <fhussonn...@gmail.com <mailto:fhussonn...@gmail.com> >>>>>>>> <mailto:fhussonn...@gmail.com >>>>>>>> <mailto:fhussonn...@gmail.com>>> wrote: >>>>>>>>> >>>>>>>>> Thanks Guozhang for pointing me to the KIP-120. >>>>>>>>> >>>>>>>>> I've made some modifications to the KIP. I also >>>>>> proposed a new PR >>>>>>>>> (there is >>>>>>>>> still some tests to make). >>>>>>>>> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+ >>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API >>>>>>>> < >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+ >>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API >>>>>>> >>>>>>>>> < >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+ >>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API >>>>>>>> < >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+ >>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API >>>>>>>> >>>>>>>>> >>>>>>>>> Exposing consumed offsets through JMX is sufficient >>>>>> for debugging >>>>>>>>> purpose. >>>>>>>>> But I think this could be part to another JIRA as >>>>>> there is no impact to >>>>>>>>> public API. >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> >>>>>>>>> 2017-03-10 22:35 GMT+01:00 Guozhang Wang < >>>>>> wangg...@gmail.com <mailto:wangg...@gmail.com> >>>>>>>>> <mailto:wangg...@gmail.com <mailto: >>>> wangg...@gmail.com >>>>>>>>> : >>>>>>>> >>>>>>>>> >>>>>>>>>> Hello Florian, >>>>>>>>>> >>>>>>>>>> As for programmatically discover monitoring data >>>> by >>>>>>>> piping metrics >>>>>>>>> into a >>>>>>>>>> dedicated topic. I think you can actually use a >>>>>>>>> KafkaMetricsReporter which >>>>>>>>>> pipes the polled metric values into a pre-defined >>>>>>>> topic (note that >>>>>>>>> in Kafka >>>>>>>>>> the MetricsReporter is simply an interface and >>>> users >>>>>>>> can build >>>>>>>>> their own >>>>>>>>>> impl in addition to the JMXReporter), for example >>>> : >>>>>>>>>> >>>>>>>>>> https://github.com/krux/kafka-metrics-reporter >>>>>>>> <https://github.com/krux/kafka-metrics-reporter> >>>>>>>>> <https://github.com/krux/kafka-metrics-reporter >>>>>>>> <https://github.com/krux/kafka-metrics-reporter>> >>>>>>>>>> >>>>>>>>>> As for the "static task-level assignment", what I >>>>>>>> meant is that >>>>>>>>> the mapping >>>>>>>>>> from source-topic-partitions -> tasks are static, >>>>>>>> via the >>>>>>>>>> "PartitionGrouper", and a task won't switch from >>>> an >>>>>>>> active task to a >>>>>>>>>> standby task, it is actually that an active task >>>>>>>> could be >>>>>>>>> migrated, as a >>>>>>>>>> whole along with all its assigned partitions, to >>>>>>>> another thread / >>>>>>>>> process >>>>>>>>>> and a new standby task will be created on the host >>>>>>>> that this >>>>>>>>> active task is >>>>>>>>>> migrating from. So for the SAME task, its >>>>>> taskMetadata. >>>>>>>>>> assignedPartitions() >>>>>>>>>> will always return you the same partitions. >>>>>>>>>> >>>>>>>>>> As for the `toString` function that what we have >>>>>>>> today, I feel it >>>>>>>>> has some >>>>>>>>>> correlations with KIP-120 so I'm trying to >>>>>>>> coordinate some >>>>>>>>> discussions here >>>>>>>>>> (cc'ing Matthias as the owner of KIP-120). My >>>>>>>> understand is that: >>>>>>>>>> >>>>>>>>>> 1. In KIP-120, the `toString` function of >>>>>>>> `KafkaStreams` will be >>>>>>>>> removed >>>>>>>>>> and instead the `Topology#describe` function will >>>> be >>>>>>>> introduced >>>>>>>>> for users >>>>>>>>>> to debug the topology BEFORE start running their >>>>>>>> instance with the >>>>>>>>>> topology. And hence the description won't contain >>>>>>>> any task >>>>>>>>> information as >>>>>>>>>> they are not formed yet. >>>>>>>>>> 2. In KIP-130, we want to add the task-level >>>>>>>> information for >>>>>>>>> monitoring >>>>>>>>>> purposes, which is not static and can only be >>>>>>>> captured AFTER the >>>>>>>>> instance >>>>>>>>>> has started running. Again I'm wondering for >>>> KIP-130 >>>>>>>> alone if >>>>>>>>> adding those >>>>>>>>>> metrics mentioned in my previous email would >>>> suffice >>>>>>>> even for the >>>>>>>>> use case >>>>>>>>>> that you have mentioned. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Guozhang >>>>>>>>>> >>>>>>>>>> On Wed, Mar 8, 2017 at 3:18 PM, Florian Hussonnois >>>>>>>>> <fhussonn...@gmail.com <mailto: >>>> fhussonn...@gmail.com> >>>>>>>> <mailto:fhussonn...@gmail.com <mailto: >>>> fhussonn...@gmail.com >>>>>>>>> >>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Guozhang >>>>>>>>>>> >>>>>>>>>>> Thank you for your feedback. I've started to >>>> look >>>>>>>> more deeply >>>>>>>>> into the >>>>>>>>>>> code. As you mention, it would be more clever to >>>>>>>> use the current >>>>>>>>>>> StreamMetadata API to expose these information. >>>>>>>>>>> >>>>>>>>>>> I think exposing metrics through JMX is great >>>> for >>>>>>>> building >>>>>>>>> monitoring >>>>>>>>>>> dashboards using some tools like jmxtrans and >>>>>> grafana. >>>>>>>>>>> But for our use case we would like to expose the >>>>>>>> states >>>>>>>>> directely from >>>>>>>>>> the >>>>>>>>>>> application embedding the kstreams topologies. >>>>>>>>>>> So we expect to be able to retrieve states in a >>>>>>>> programmatic way. >>>>>>>>>>> >>>>>>>>>>> For instance, we could imagin to produce those >>>>>>>> states into a >>>>>>>>> dedicated >>>>>>>>>>> topic. In that way a third application could >>>>>>>> automatically >>>>>>>>> discover all >>>>>>>>>>> kafka-streams applications which could be >>>>>> monitored. >>>>>>>>>>> In production environment, that can be clearly a >>>>>>>> solution to have a >>>>>>>>>>> complete overview of a microservices >>>> architecture >>>>>>>> based on Kafka >>>>>>>>> Streams. >>>>>>>>>>> >>>>>>>>>>> The toString() method give a lots of information >>>>>>>> it can only be >>>>>>>>> used for >>>>>>>>>>> debugging purpose but not to build a topologies >>>>>>>> visualization >>>>>>>>> tool. We >>>>>>>>>>> could actually expose same details about the >>>>>>>> stream topology >>>>>>>>> from the >>>>>>>>>>> StreamMetadata API ? So the TaskMetadata class >>>> you >>>>>>>> have >>>>>>>>> suggested could >>>>>>>>>>> contains similar information that ones return by >>>>>>>> the toString >>>>>>>>> method from >>>>>>>>>>> AbstractTask class ? >>>>>>>>>>> >>>>>>>>>>> I can update the KIP in that way. >>>>>>>>>>> >>>>>>>>>>> Finally, I'm not sure to understand your last >>>>>>>> point :* "Note >>>>>>>>> that the >>>>>>>>>>> task-level assignment information is static, >>>> i.e. >>>>>>>> it will not change >>>>>>>>>> during >>>>>>>>>>> the runtime" * >>>>>>>>>>> >>>>>>>>>>> Does that mean when a rebalance occurs new tasks >>>>>>>> are created for >>>>>>>>> the new >>>>>>>>>>> assignments and old ones just switch to a >>>> standby >>>>>>>> state ? >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> >>>>>>>>>>> 2017-03-05 7:04 GMT+01:00 Guozhang Wang >>>>>>>> <wangg...@gmail.com <mailto:wangg...@gmail.com> >>>>>>>>> <mailto:wangg...@gmail.com <mailto: >>>> wangg...@gmail.com >>>>>>>>> : >>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> Hello Florian, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for the KIP and your detailed >>>> explanation >>>>>>>> of your use >>>>>>>>> case. I >>>>>>>>>>> think >>>>>>>>>>>> there are two dimensions to discuss on how to >>>>>>>> improve Streams' >>>>>>>>>>>> debuggability (or more specifically state >>>>>>>> exposure for >>>>>>>>> visualization). >>>>>>>>>>>> >>>>>>>>>>>> First question is "what information should we >>>>>>>> expose to the >>>>>>>>> user". From >>>>>>>>>>>> your KIP I saw generally three categories: >>>>>>>>>>>> >>>>>>>>>>>> 1. The state of the thread within a process, >>>> as >>>>>>>> you mentioned >>>>>>>>> currently >>>>>>>>>>> we >>>>>>>>>>>> only expose the state of the process but not >>>> the >>>>>>>> finer grained >>>>>>>>>> per-thread >>>>>>>>>>>> state. >>>>>>>>>>>> 2. The state of the task. Currently the most >>>>>>>> close API to this is >>>>>>>>>>>> StreamsMetadata, >>>>>>>>>>>> however it aggregates the tasks across all >>>>>>>> threads and only >>>>>>>>> present the >>>>>>>>>>>> aggregated set of the assigned partitions / >>>>>>>> state stores etc. >>>>>>>>> We can >>>>>>>>>>>> consider extending this method to have a new >>>>>>>>> StreamsMetadata#tasks() >>>>>>>>>>> which >>>>>>>>>>>> returns a TaskMetadata with the similar >>>> fields, >>>>>>>> and the >>>>>>>>>>>> StreamsMetadata.stateStoreNames / etc would >>>>>>>> still be returning the >>>>>>>>>>>> aggregated results but users can still "drill >>>>>>>> down" if they want. >>>>>>>>>>>> >>>>>>>>>>>> The second question is "how should we expose >>>>>>>> them to the >>>>>>>>> user". For >>>>>>>>>>>> example, you mentioned about >>>>>>>> consumedOffsetsByPartition in the >>>>>>>>>>> activeTasks. >>>>>>>>>>>> We could add this as a JMX metric based on >>>> fetch >>>>>>>> positions >>>>>>>>> inside the >>>>>>>>>>>> consumer layer (note that Streams is just >>>>>>>> embedding consumers) >>>>>>>>> or we >>>>>>>>>>> could >>>>>>>>>>>> consider adding it into TaskMetadata. Either >>>>>>>> case it can be >>>>>>>>> visualized >>>>>>>>>>> for >>>>>>>>>>>> monitoring. The reason we expose >>>> StreamsMetadata >>>>>>>> as well as >>>>>>>>> State was >>>>>>>>>>> that >>>>>>>>>>>> it is expected to be "polled" in a >>>> programmatic >>>>>>>> way for >>>>>>>>> interactive >>>>>>>>>>> queries >>>>>>>>>>>> and also for control flows (e.g. I would like >>>> to >>>>>>>> ONLY start >>>>>>>>> running my >>>>>>>>>>>> other topology until the first topology has >>>> been >>>>>>>> up and >>>>>>>>> running) while >>>>>>>>>>> for >>>>>>>>>>>> your case it seems the main purpose is to >>>>>>>> continuously query >>>>>>>>> them for >>>>>>>>>>>> monitoring etc. Personally I'd prefer to >>>> expose >>>>>>>> them as JMX >>>>>>>>> only for >>>>>>>>>> such >>>>>>>>>>>> purposes only to have a simpler API. >>>>>>>>>>>> >>>>>>>>>>>> So given your current motivations I'd suggest >>>>>>>> expose the following >>>>>>>>>>>> information as newly added metrics in Streams: >>>>>>>>>>>> >>>>>>>>>>>> 1. Thread-level state metric. >>>>>>>>>>>> 2. Task-level hosted client identifier metric >>>>>>>> (e.g. host:port). >>>>>>>>>>>> 3. Consumer-level per-topic/partition position >>>>>>>> metric ( >>>>>>>>>>>> >>>>>>>> >>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring >>>>>>>> < >>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring> >>>>>>>>> >>>>>>>> < >>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring >>>>>>>> < >>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring>>). >>>>>>>>>>>> >>>>>>>>>>>> Note that the task-level assignment >>>> information >>>>>> is static, >>>>>>>>> i.e. it will >>>>>>>>>>> not >>>>>>>>>>>> change during the runtime at all and can be >>>>>> accessed from the >>>>>>>>>>> `toString()` >>>>>>>>>>>> function already even before the instance >>>> start >>>>>> running, so I >>>>>>>>> think >>>>>>>>>> this >>>>>>>>>>>> piece of information do not need to be exposed >>>>>> through JMX >>>>>>>>> anymore. >>>>>>>>>>>> >>>>>>>>>>>> WDYT? >>>>>>>>>>>> >>>>>>>>>>>> Guozhang >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Mar 2, 2017 at 3:11 AM, Damian Guy >>>>>>>>> <damian....@gmail.com <mailto:damian....@gmail.com> >>>>>>>> <mailto:damian....@gmail.com <mailto:damian....@gmail.com >>>>>>>>> >>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Florian, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for the KIP. >>>>>>>>>>>>> >>>>>>>>>>>>> It seems there is some overlap here with >>>> what >>>>>> we already have in >>>>>>>>>>>>> KafkaStreams.allMetadata(). This currently >>>>>> returns a >>>>>>>>>>>>> Collection<StreamsMetadata> where each >>>>>> StreamsMetadata >>>>>>>>> instance holds >>>>>>>>>>> the >>>>>>>>>>>>> state stores and partition assignment for >>>>>> every instance of the >>>>>>>>>>>>> KafkaStreams application. I'm wondering if >>>>>> that is good >>>>>>>>> enough for >>>>>>>>>> what >>>>>>>>>>>> you >>>>>>>>>>>>> are trying to achieve? If not could it be >>>>>> modified to >>>>>>>>> include the per >>>>>>>>>>>>> Thread assignment? >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> Damian >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, 1 Mar 2017 at 22:49 Florian >>>>>> Hussonnois < >>>>>>>>>> fhussonn...@gmail.com <mailto: >>>> fhussonn...@gmail.com >>>>>>> >>>>>>>> <mailto:fhussonn...@gmail.com <mailto: >>>> fhussonn...@gmail.com >>>>>>>>> >>>>>>>> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>>> >>>>>>>>>>>>>> First, I will answer to your last >>>> question. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The main reason to have both >>>>>>>> TaskState#assignment and >>>>>>>>>>>>>> TaskState#consumedOffsetsByPartition is >>>> that >>>>>>>> tasks have no >>>>>>>>>> consumed >>>>>>>>>>>>> offsets >>>>>>>>>>>>>> until at least one message is consumed for >>>>>>>> each partition >>>>>>>>> even if >>>>>>>>>>>>> previous >>>>>>>>>>>>>> offsets exist for the consumer group. >>>>>>>>>>>>>> So yes this methods are redundant as it >>>> only >>>>>>>> diverge at >>>>>>>>> application >>>>>>>>>>>>>> startup. >>>>>>>>>>>>>> >>>>>>>>>>>>>> About the use case, currently we are >>>>>>>> developping for a >>>>>>>>> customer a >>>>>>>>>>>> little >>>>>>>>>>>>>> framework based on KafkaStreams which >>>>>>>>> transform/denormalize data >>>>>>>>>>> before >>>>>>>>>>>>>> ingesting into hadoop. >>>>>>>>>>>>>> >>>>>>>>>>>>>> We have a cluster of workers (SpringBoot) >>>>>>>> which instantiate >>>>>>>>>> KStreams >>>>>>>>>>>>>> topologies dynamicaly based on dataflow >>>>>>>> configurations. >>>>>>>>>>>>>> Each configuration describes a topic to >>>>>>>> consume and how to >>>>>>>>> process >>>>>>>>>>>>> messages >>>>>>>>>>>>>> (this looks like NiFi processors API). >>>>>>>>>>>>>> >>>>>>>>>>>>>> Our architecture is inspired from >>>>>>>> KafkaConnect. We have >>>>>>>>> topics for >>>>>>>>>>>>> configs >>>>>>>>>>>>>> and states which are consumed by each >>>>>>>> workers (actually we >>>>>>>>> have >>>>>>>>>>> reused >>>>>>>>>>>>> some >>>>>>>>>>>>>> internals classes to the connect API). >>>>>>>>>>>>>> >>>>>>>>>>>>>> Now, we would like to develop UIs to >>>>>>>> visualize topics and >>>>>>>>>> partitions >>>>>>>>>>>>>> consumed by our worker applications. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Also, I think it would be nice to be able, >>>>>>>> in the futur, to >>>>>>>>>> develop >>>>>>>>>>>> web >>>>>>>>>>>>>> UIs similar to Spark but for KafkaStreams >>>> to >>>>>>>> visualize >>>>>>>>> DAGs...so >>>>>>>>>>> maybe >>>>>>>>>>>>> this >>>>>>>>>>>>>> KIP is just a first step. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2017-03-01 22:52 GMT+01:00 Matthias J. Sax >>>>>>>>> <matth...@confluent.io <mailto: >>>> matth...@confluent.io> >>>>>>>> <mailto:matth...@confluent.io <mailto: >>>> matth...@confluent.io >>>>>>>> >>>>>>>> >>>>>>>>>>> : >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks for the KIP. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> I am wondering a little bit, why you >>>> need >>>>>>>> to expose this >>>>>>>>>>> information. >>>>>>>>>>>>>>> Can you describe some use cases? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Would it be worth to unify this new API >>>>>> with >>>>>>>>> KafkaStreams#state() >>>>>>>>>>> to >>>>>>>>>>>>> get >>>>>>>>>>>>>>> the overall state of an application >>>>>>>> without the need to >>>>>>>>> call two >>>>>>>>>>>>>>> different methods? Not sure how this >>>>>>>> unified API might >>>>>>>>> look like >>>>>>>>>>>>> though. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> One minor comment about the API: >>>>>>>> TaskState#assignment >>>>>>>>> seems to be >>>>>>>>>>>>>>> redundant. It should be the same as >>>>>>>>>>>>>>> >>>>>> TaskState#consumedOffsetsByPartition.keySet() >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Or do I miss something? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On 3/1/17 5:19 AM, Florian Hussonnois >>>>>> wrote: >>>>>>>>>>>>>>>> Hi Eno, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Yes, but the state() method only >>>> returns >>>>>>>> the global >>>>>>>>> state of >>>>>>>>>> the >>>>>>>>>>>>>>>> KafkaStream application (ie: CREATED, >>>>>>>> RUNNING, >>>>>>>>> REBALANCING, >>>>>>>>>>>>>>>> PENDING_SHUTDOWN, NOT_RUNNING). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> An alternative to this KIP would be to >>>>>>>> change this >>>>>>>>> method to >>>>>>>>>>> return >>>>>>>>>>>>>> more >>>>>>>>>>>>>>>> information instead of adding a new >>>>>> method. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2017-03-01 13:46 GMT+01:00 Eno >>>> Thereska >>>>>> < >>>>>>>>>> eno.there...@gmail.com >>>>>>>> <mailto:eno.there...@gmail.com> >>>>>>>> <mailto:eno.there...@gmail.com <mailto: >>>>>> eno.there...@gmail.com>> >>>>>>>>>>>> : >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks Florian, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Have you had a chance to look at the >>>>>> new state methods in >>>>>>>>>>> 0.10.2, >>>>>>>>>>>>>> e.g., >>>>>>>>>>>>>>>>> KafkaStreams.state()? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>> Eno >>>>>>>>>>>>>>>>>> On 1 Mar 2017, at 11:54, Florian >>>>>> Hussonnois < >>>>>>>>>>>> fhussonn...@gmail.com >>>>>>>> <mailto:fhussonn...@gmail.com> <mailto: >>>>>> fhussonn...@gmail.com >>>>>>>> <mailto:fhussonn...@gmail.com>> >>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I have just created KIP-130 to add a >>>>>>>> new method to the >>>>>>>>>>>> KafkaStreams >>>>>>>>>>>>>> API >>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>> order to expose the states of >>>> threads >>>>>>>> and active tasks. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+ >>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+> >>>>>>>>> >>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+ >>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+>> >>>>>>>>>>>>>>>>> >>>>>>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+ >>>>>>>>>> public+API >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>> Florian HUSSONNOIS >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Florian HUSSONNOIS >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> -- Guozhang >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Florian HUSSONNOIS >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> -- Guozhang >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Florian HUSSONNOIS >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> -- Guozhang >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> -- Guozhang >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Florian HUSSONNOIS >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Florian HUSSONNOIS >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>>> >>> >>> >> >
signature.asc
Description: OpenPGP digital signature