Hi there, Can't toString() just stay as is?
Thanks Eno > On 20 Apr 2017, at 17:26, Matthias J. Sax <matth...@confluent.io> wrote: > > Florian, > > I am just wondering: if we keep .toString(), what should the > implementation look like? > > > -Matthias > > On 4/19/17 2:42 PM, Florian Hussonnois wrote: >> Hi Matthias, >> >> So sorry for the delay in replying to you. For now, I think we can keep >> KafkaStreams#toString() as it is. >> It's always preferable to have an implementation for toString method. >> >> 2017-04-14 4:08 GMT+02:00 Matthias J. Sax <matth...@confluent.io>: >> >>> Florian, >>> >>>>>> What about KafkaStreams#toString() method? >>>>>> >>>>>> I think, we want to deprecate it as with KIP-120 and the changes of >>> this >>>>>> KIP, is gets obsolete. >>> >>> Any thoughts about this? For me, this is the last open point to discuss >>> (or what should be reflected in the KIP in case you agree) before I can >>> put my vote on the VOTE thread do did start already. >>> >>> -Matthias >>> >>> >>> On 4/11/17 12:18 AM, Damian Guy wrote: >>>> Hi Florian, >>>> >>>> Thanks for the updates. The KIP is looking good. >>>> >>>> Cheers, >>>> Damian >>>> >>>> On Fri, 7 Apr 2017 at 22:41 Matthias J. Sax <matth...@confluent.io> >>> wrote: >>>> >>>>> What about KafkaStreams#toString() method? >>>>> >>>>> I think, we want to deprecate it as with KIP-120 and the changes of this >>>>> KIP, is gets obsolete. >>>>> >>>>> If we do so, please update the KIP accordingly. >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> On 3/28/17 7:00 PM, Matthias J. Sax wrote: >>>>>> Thanks for updating the KIP! >>>>>> >>>>>> I think it's good as is -- I would not add anything more to >>> TaskMetadata. >>>>>> >>>>>> About subtopologies and tasks. We do have the concept of subtopologies >>>>>> already in KIP-120. It's only missing and ID that allow to link a >>>>>> subtopology to a task. >>>>>> >>>>>> IMHO, adding a simple variable to `Subtopoloy` that provide the id >>>>>> should be sufficient. We can simply document in the JavaDocs how >>>>>> Subtopology and TaskMetadata can be linked to each other. >>>>>> >>>>>> I did update KIP-120 accordingly. >>>>>> >>>>>> >>>>>> -Matthias >>>>>> >>>>>> On 3/28/17 3:45 PM, Florian Hussonnois wrote: >>>>>>> Hi all, >>>>>>> >>>>>>> I've updated the KIP and the PR to reflect your suggestions. >>>>>>> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+ >>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API >>>>>>> https://github.com/apache/kafka/pull/2612 >>>>>>> >>>>>>> Also, I've exposed property StreamThread#state as a string through the >>>>>>> new class ThreadMetadata. >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> 2017-03-27 23:40 GMT+02:00 Florian Hussonnois <fhussonn...@gmail.com >>>>>>> <mailto:fhussonn...@gmail.com>>: >>>>>>> >>>>>>> Hi Guozhang, Matthias, >>>>>>> >>>>>>> It's a great idea to add sub topologies descriptions. This would >>>>>>> help developers to better understand topology concept. >>>>>>> >>>>>>> I agree that is not really user-friendly to check if >>>>>>> `StreamsMetadata#streamThreads` is not returning null. >>>>>>> >>>>>>> The method name localThreadsMetadata looks good. In addition, it's >>>>>>> more simple to build ThreadMetadata instances from the >>> `StreamTask` >>>>>>> class than from `StreamPartitionAssignor` class. >>>>>>> >>>>>>> I will work on modifications. As I understand, I have to add the >>>>>>> property subTopologyId property to the TaskMetadata class - Am I >>>>> right ? >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> 2017-03-26 0:25 GMT+01:00 Guozhang Wang <wangg...@gmail.com >>>>>>> <mailto:wangg...@gmail.com>>: >>>>>>> >>>>>>> Re 1): this is a good point. May be we can move >>>>>>> `StreamsMetadata#streamThreads` as >>>>>>> `KafkaStreams#localThreadsMetadata`? >>>>>>> >>>>>>> 3): this is a minor suggestion about function name of >>>>>>> `assignedPartitions`, to `topicPartitions` to be consistent >>> with >>>>>>> `StreamsMetadata`? >>>>>>> >>>>>>> >>>>>>> Guozhang >>>>>>> >>>>>>> On Thu, Mar 23, 2017 at 4:30 PM, Matthias J. Sax >>>>>>> <matth...@confluent.io <mailto:matth...@confluent.io>> wrote: >>>>>>> >>>>>>> Thanks for the progress on this KIP. I think we are on the >>>>>>> right path! >>>>>>> >>>>>>> Couple of comments/questions: >>>>>>> >>>>>>> (1) Why do we not consider the "rejected alternative" to >>> add >>>>>>> the method >>>>>>> to KafkaStreams? The comment on #streamThreads() says: >>>>>>> >>>>>>> "Note this method will return <code>null</code> if called >>> on >>>>>>> {@link >>>>>>> StreamsMetadata} which represent a remote application." >>>>>>> >>>>>>> Thus, if we cannot get any remote metadata, it seems not >>>>>>> straight >>>>>>> forward to not add it to KafkaStreams directly -- this >>> would >>>>>>> avoid >>>>>>> invalid calls and `null` return value in the first place. >>>>>>> >>>>>>> I like the idea about exposing sub-topologies.: >>>>>>> >>>>>>> (2a) I would recommend to rename `topicsGroupId` to >>>>>>> `subTopologyId` :) >>>>>>> >>>>>>> (2b) We could add this to KIP-120 already. However, I >>> would >>>>>>> not just >>>>>>> link both via name, but leverage KIP-120 directly, and >>> add a >>>>>>> "Subtopology" member to the TaskMetadata class. >>>>>>> >>>>>>> >>>>>>> Overall, I like the distinction of KIP-120 only exposing >>>>>>> "static" >>>>>>> information that can be determined before the topology >>> get's >>>>>>> started, >>>>>>> while this KIP allow to access runtime information. >>>>>>> >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> >>>>>>> On 3/22/17 12:42 PM, Guozhang Wang wrote: >>>>>>>> Thanks for the updated KIP, and sorry for the late >>>>> replies! >>>>>>>> >>>>>>>> I think a little bit more about KIP-130, and I feel that >>>>>>> if we are going >>>>>>>> to deprecate the `toString` function (it is not >>> explicitly >>>>>>> said in the >>>>>>>> KIP, so I'm not sure if you plan to still keep the >>>>>>>> `KafkaStreams#toString` as is or are going to replace it >>>>>>> with the >>>>>>>> proposed APIs) with the proposed ones, it may be okay. >>>>> More >>>>>>>> specifically, after both KIP-120 and KIP-130: >>>>>>>> >>>>>>>> 1. users can use `#describe` function to check the >>>>>>> generated topology >>>>>>>> before calling `KafkaStreams#start`, which is static >>>>>>> information. >>>>>>>> 2. users can use the `StreamsMetadata -> ThreadMetadata >>> -> >>>>>>> TaskMetadata` >>>>>>>> programmatically after called `KafkaStreams#start` to >>> get >>>>> the >>>>>>>> dynamically changeable information. >>>>>>>> >>>>>>>> One thing I'm still not sure though, is that in >>>>>>> `TaskMetadata` we only >>>>>>>> have the TaskId and assigned partitions, whereas in >>>>>>>> "TopologyDescription" introduced in KIP-120, it will >>>>>>> simply describe the >>>>>>>> whole topology possibly composed of multiple >>>>>>> sub-topologies. So it is >>>>>>>> hard for users to tell which sub-topology is executed >>>>>>> under which task >>>>>>>> on-the-fly. >>>>>>>> >>>>>>>> Hence I'm thinking if we can expose the >>> "sub-topology-id" >>>>>>> (named as >>>>>>>> topicsGroupId internally) in >>>>>>> TopologyDescription#Subtopology, and then >>>>>>>> from the task id which is essentially "sub-topology-id >>>>> DASH >>>>>>>> partition-group-id" users can make the link, though it >>> is >>>>>>> still not that >>>>>>>> straight-forward. >>>>>>>> >>>>>>>> Thoughts? >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Mar 15, 2017 at 3:16 PM, Florian Hussonnois >>>>>>>> <fhussonn...@gmail.com <mailto:fhussonn...@gmail.com> >>>>>>> <mailto:fhussonn...@gmail.com >>>>>>> <mailto:fhussonn...@gmail.com>>> wrote: >>>>>>>> >>>>>>>> Thanks Guozhang for pointing me to the KIP-120. >>>>>>>> >>>>>>>> I've made some modifications to the KIP. I also >>>>> proposed a new PR >>>>>>>> (there is >>>>>>>> still some tests to make). >>>>>>>> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+ >>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API >>>>>>> < >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+ >>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API >>>>>> >>>>>>>> < >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+ >>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API >>>>>>> < >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+ >>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API >>>>>>> >>>>>>>> >>>>>>>> Exposing consumed offsets through JMX is sufficient >>>>> for debugging >>>>>>>> purpose. >>>>>>>> But I think this could be part to another JIRA as >>>>> there is no impact to >>>>>>>> public API. >>>>>>>> >>>>>>>> Thanks >>>>>>>> >>>>>>>> 2017-03-10 22:35 GMT+01:00 Guozhang Wang < >>>>> wangg...@gmail.com <mailto:wangg...@gmail.com> >>>>>>>> <mailto:wangg...@gmail.com <mailto: >>> wangg...@gmail.com >>>>>>>> : >>>>>>> >>>>>>>> >>>>>>>>> Hello Florian, >>>>>>>>> >>>>>>>>> As for programmatically discover monitoring data >>> by >>>>>>> piping metrics >>>>>>>> into a >>>>>>>>> dedicated topic. I think you can actually use a >>>>>>>> KafkaMetricsReporter which >>>>>>>>> pipes the polled metric values into a pre-defined >>>>>>> topic (note that >>>>>>>> in Kafka >>>>>>>>> the MetricsReporter is simply an interface and >>> users >>>>>>> can build >>>>>>>> their own >>>>>>>>> impl in addition to the JMXReporter), for example >>> : >>>>>>>>> >>>>>>>>> https://github.com/krux/kafka-metrics-reporter >>>>>>> <https://github.com/krux/kafka-metrics-reporter> >>>>>>>> <https://github.com/krux/kafka-metrics-reporter >>>>>>> <https://github.com/krux/kafka-metrics-reporter>> >>>>>>>>> >>>>>>>>> As for the "static task-level assignment", what I >>>>>>> meant is that >>>>>>>> the mapping >>>>>>>>> from source-topic-partitions -> tasks are static, >>>>>>> via the >>>>>>>>> "PartitionGrouper", and a task won't switch from >>> an >>>>>>> active task to a >>>>>>>>> standby task, it is actually that an active task >>>>>>> could be >>>>>>>> migrated, as a >>>>>>>>> whole along with all its assigned partitions, to >>>>>>> another thread / >>>>>>>> process >>>>>>>>> and a new standby task will be created on the host >>>>>>> that this >>>>>>>> active task is >>>>>>>>> migrating from. So for the SAME task, its >>>>> taskMetadata. >>>>>>>>> assignedPartitions() >>>>>>>>> will always return you the same partitions. >>>>>>>>> >>>>>>>>> As for the `toString` function that what we have >>>>>>> today, I feel it >>>>>>>> has some >>>>>>>>> correlations with KIP-120 so I'm trying to >>>>>>> coordinate some >>>>>>>> discussions here >>>>>>>>> (cc'ing Matthias as the owner of KIP-120). My >>>>>>> understand is that: >>>>>>>>> >>>>>>>>> 1. In KIP-120, the `toString` function of >>>>>>> `KafkaStreams` will be >>>>>>>> removed >>>>>>>>> and instead the `Topology#describe` function will >>> be >>>>>>> introduced >>>>>>>> for users >>>>>>>>> to debug the topology BEFORE start running their >>>>>>> instance with the >>>>>>>>> topology. And hence the description won't contain >>>>>>> any task >>>>>>>> information as >>>>>>>>> they are not formed yet. >>>>>>>>> 2. In KIP-130, we want to add the task-level >>>>>>> information for >>>>>>>> monitoring >>>>>>>>> purposes, which is not static and can only be >>>>>>> captured AFTER the >>>>>>>> instance >>>>>>>>> has started running. Again I'm wondering for >>> KIP-130 >>>>>>> alone if >>>>>>>> adding those >>>>>>>>> metrics mentioned in my previous email would >>> suffice >>>>>>> even for the >>>>>>>> use case >>>>>>>>> that you have mentioned. >>>>>>>>> >>>>>>>>> >>>>>>>>> Guozhang >>>>>>>>> >>>>>>>>> On Wed, Mar 8, 2017 at 3:18 PM, Florian Hussonnois >>>>>>>> <fhussonn...@gmail.com <mailto: >>> fhussonn...@gmail.com> >>>>>>> <mailto:fhussonn...@gmail.com <mailto: >>> fhussonn...@gmail.com >>>>>>>> >>>>>>> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Guozhang >>>>>>>>>> >>>>>>>>>> Thank you for your feedback. I've started to >>> look >>>>>>> more deeply >>>>>>>> into the >>>>>>>>>> code. As you mention, it would be more clever to >>>>>>> use the current >>>>>>>>>> StreamMetadata API to expose these information. >>>>>>>>>> >>>>>>>>>> I think exposing metrics through JMX is great >>> for >>>>>>> building >>>>>>>> monitoring >>>>>>>>>> dashboards using some tools like jmxtrans and >>>>> grafana. >>>>>>>>>> But for our use case we would like to expose the >>>>>>> states >>>>>>>> directely from >>>>>>>>> the >>>>>>>>>> application embedding the kstreams topologies. >>>>>>>>>> So we expect to be able to retrieve states in a >>>>>>> programmatic way. >>>>>>>>>> >>>>>>>>>> For instance, we could imagin to produce those >>>>>>> states into a >>>>>>>> dedicated >>>>>>>>>> topic. In that way a third application could >>>>>>> automatically >>>>>>>> discover all >>>>>>>>>> kafka-streams applications which could be >>>>> monitored. >>>>>>>>>> In production environment, that can be clearly a >>>>>>> solution to have a >>>>>>>>>> complete overview of a microservices >>> architecture >>>>>>> based on Kafka >>>>>>>> Streams. >>>>>>>>>> >>>>>>>>>> The toString() method give a lots of information >>>>>>> it can only be >>>>>>>> used for >>>>>>>>>> debugging purpose but not to build a topologies >>>>>>> visualization >>>>>>>> tool. We >>>>>>>>>> could actually expose same details about the >>>>>>> stream topology >>>>>>>> from the >>>>>>>>>> StreamMetadata API ? So the TaskMetadata class >>> you >>>>>>> have >>>>>>>> suggested could >>>>>>>>>> contains similar information that ones return by >>>>>>> the toString >>>>>>>> method from >>>>>>>>>> AbstractTask class ? >>>>>>>>>> >>>>>>>>>> I can update the KIP in that way. >>>>>>>>>> >>>>>>>>>> Finally, I'm not sure to understand your last >>>>>>> point :* "Note >>>>>>>> that the >>>>>>>>>> task-level assignment information is static, >>> i.e. >>>>>>> it will not change >>>>>>>>> during >>>>>>>>>> the runtime" * >>>>>>>>>> >>>>>>>>>> Does that mean when a rebalance occurs new tasks >>>>>>> are created for >>>>>>>> the new >>>>>>>>>> assignments and old ones just switch to a >>> standby >>>>>>> state ? >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> >>>>>>>>>> 2017-03-05 7:04 GMT+01:00 Guozhang Wang >>>>>>> <wangg...@gmail.com <mailto:wangg...@gmail.com> >>>>>>>> <mailto:wangg...@gmail.com <mailto: >>> wangg...@gmail.com >>>>>>>> : >>>>>>> >>>>>>>>>> >>>>>>>>>>> Hello Florian, >>>>>>>>>>> >>>>>>>>>>> Thanks for the KIP and your detailed >>> explanation >>>>>>> of your use >>>>>>>> case. I >>>>>>>>>> think >>>>>>>>>>> there are two dimensions to discuss on how to >>>>>>> improve Streams' >>>>>>>>>>> debuggability (or more specifically state >>>>>>> exposure for >>>>>>>> visualization). >>>>>>>>>>> >>>>>>>>>>> First question is "what information should we >>>>>>> expose to the >>>>>>>> user". From >>>>>>>>>>> your KIP I saw generally three categories: >>>>>>>>>>> >>>>>>>>>>> 1. The state of the thread within a process, >>> as >>>>>>> you mentioned >>>>>>>> currently >>>>>>>>>> we >>>>>>>>>>> only expose the state of the process but not >>> the >>>>>>> finer grained >>>>>>>>> per-thread >>>>>>>>>>> state. >>>>>>>>>>> 2. The state of the task. Currently the most >>>>>>> close API to this is >>>>>>>>>>> StreamsMetadata, >>>>>>>>>>> however it aggregates the tasks across all >>>>>>> threads and only >>>>>>>> present the >>>>>>>>>>> aggregated set of the assigned partitions / >>>>>>> state stores etc. >>>>>>>> We can >>>>>>>>>>> consider extending this method to have a new >>>>>>>> StreamsMetadata#tasks() >>>>>>>>>> which >>>>>>>>>>> returns a TaskMetadata with the similar >>> fields, >>>>>>> and the >>>>>>>>>>> StreamsMetadata.stateStoreNames / etc would >>>>>>> still be returning the >>>>>>>>>>> aggregated results but users can still "drill >>>>>>> down" if they want. >>>>>>>>>>> >>>>>>>>>>> The second question is "how should we expose >>>>>>> them to the >>>>>>>> user". For >>>>>>>>>>> example, you mentioned about >>>>>>> consumedOffsetsByPartition in the >>>>>>>>>> activeTasks. >>>>>>>>>>> We could add this as a JMX metric based on >>> fetch >>>>>>> positions >>>>>>>> inside the >>>>>>>>>>> consumer layer (note that Streams is just >>>>>>> embedding consumers) >>>>>>>> or we >>>>>>>>>> could >>>>>>>>>>> consider adding it into TaskMetadata. Either >>>>>>> case it can be >>>>>>>> visualized >>>>>>>>>> for >>>>>>>>>>> monitoring. The reason we expose >>> StreamsMetadata >>>>>>> as well as >>>>>>>> State was >>>>>>>>>> that >>>>>>>>>>> it is expected to be "polled" in a >>> programmatic >>>>>>> way for >>>>>>>> interactive >>>>>>>>>> queries >>>>>>>>>>> and also for control flows (e.g. I would like >>> to >>>>>>> ONLY start >>>>>>>> running my >>>>>>>>>>> other topology until the first topology has >>> been >>>>>>> up and >>>>>>>> running) while >>>>>>>>>> for >>>>>>>>>>> your case it seems the main purpose is to >>>>>>> continuously query >>>>>>>> them for >>>>>>>>>>> monitoring etc. Personally I'd prefer to >>> expose >>>>>>> them as JMX >>>>>>>> only for >>>>>>>>> such >>>>>>>>>>> purposes only to have a simpler API. >>>>>>>>>>> >>>>>>>>>>> So given your current motivations I'd suggest >>>>>>> expose the following >>>>>>>>>>> information as newly added metrics in Streams: >>>>>>>>>>> >>>>>>>>>>> 1. Thread-level state metric. >>>>>>>>>>> 2. Task-level hosted client identifier metric >>>>>>> (e.g. host:port). >>>>>>>>>>> 3. Consumer-level per-topic/partition position >>>>>>> metric ( >>>>>>>>>>> >>>>>>> >>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring >>>>>>> < >>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring> >>>>>>>> >>>>>>> < >>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring >>>>>>> < >>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring>>). >>>>>>>>>>> >>>>>>>>>>> Note that the task-level assignment >>> information >>>>> is static, >>>>>>>> i.e. it will >>>>>>>>>> not >>>>>>>>>>> change during the runtime at all and can be >>>>> accessed from the >>>>>>>>>> `toString()` >>>>>>>>>>> function already even before the instance >>> start >>>>> running, so I >>>>>>>> think >>>>>>>>> this >>>>>>>>>>> piece of information do not need to be exposed >>>>> through JMX >>>>>>>> anymore. >>>>>>>>>>> >>>>>>>>>>> WDYT? >>>>>>>>>>> >>>>>>>>>>> Guozhang >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Thu, Mar 2, 2017 at 3:11 AM, Damian Guy >>>>>>>> <damian....@gmail.com <mailto:damian....@gmail.com> >>>>>>> <mailto:damian....@gmail.com <mailto:damian....@gmail.com >>>>>>>> >>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Florian, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for the KIP. >>>>>>>>>>>> >>>>>>>>>>>> It seems there is some overlap here with >>> what >>>>> we already have in >>>>>>>>>>>> KafkaStreams.allMetadata(). This currently >>>>> returns a >>>>>>>>>>>> Collection<StreamsMetadata> where each >>>>> StreamsMetadata >>>>>>>> instance holds >>>>>>>>>> the >>>>>>>>>>>> state stores and partition assignment for >>>>> every instance of the >>>>>>>>>>>> KafkaStreams application. I'm wondering if >>>>> that is good >>>>>>>> enough for >>>>>>>>> what >>>>>>>>>>> you >>>>>>>>>>>> are trying to achieve? If not could it be >>>>> modified to >>>>>>>> include the per >>>>>>>>>>>> Thread assignment? >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Damian >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, 1 Mar 2017 at 22:49 Florian >>>>> Hussonnois < >>>>>>>>> fhussonn...@gmail.com <mailto: >>> fhussonn...@gmail.com >>>>>> >>>>>>> <mailto:fhussonn...@gmail.com <mailto: >>> fhussonn...@gmail.com >>>>>>>> >>>>>>> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Matthias, >>>>>>>>>>>>> >>>>>>>>>>>>> First, I will answer to your last >>> question. >>>>>>>>>>>>> >>>>>>>>>>>>> The main reason to have both >>>>>>> TaskState#assignment and >>>>>>>>>>>>> TaskState#consumedOffsetsByPartition is >>> that >>>>>>> tasks have no >>>>>>>>> consumed >>>>>>>>>>>> offsets >>>>>>>>>>>>> until at least one message is consumed for >>>>>>> each partition >>>>>>>> even if >>>>>>>>>>>> previous >>>>>>>>>>>>> offsets exist for the consumer group. >>>>>>>>>>>>> So yes this methods are redundant as it >>> only >>>>>>> diverge at >>>>>>>> application >>>>>>>>>>>>> startup. >>>>>>>>>>>>> >>>>>>>>>>>>> About the use case, currently we are >>>>>>> developping for a >>>>>>>> customer a >>>>>>>>>>> little >>>>>>>>>>>>> framework based on KafkaStreams which >>>>>>>> transform/denormalize data >>>>>>>>>> before >>>>>>>>>>>>> ingesting into hadoop. >>>>>>>>>>>>> >>>>>>>>>>>>> We have a cluster of workers (SpringBoot) >>>>>>> which instantiate >>>>>>>>> KStreams >>>>>>>>>>>>> topologies dynamicaly based on dataflow >>>>>>> configurations. >>>>>>>>>>>>> Each configuration describes a topic to >>>>>>> consume and how to >>>>>>>> process >>>>>>>>>>>> messages >>>>>>>>>>>>> (this looks like NiFi processors API). >>>>>>>>>>>>> >>>>>>>>>>>>> Our architecture is inspired from >>>>>>> KafkaConnect. We have >>>>>>>> topics for >>>>>>>>>>>> configs >>>>>>>>>>>>> and states which are consumed by each >>>>>>> workers (actually we >>>>>>>> have >>>>>>>>>> reused >>>>>>>>>>>> some >>>>>>>>>>>>> internals classes to the connect API). >>>>>>>>>>>>> >>>>>>>>>>>>> Now, we would like to develop UIs to >>>>>>> visualize topics and >>>>>>>>> partitions >>>>>>>>>>>>> consumed by our worker applications. >>>>>>>>>>>>> >>>>>>>>>>>>> Also, I think it would be nice to be able, >>>>>>> in the futur, to >>>>>>>>> develop >>>>>>>>>>> web >>>>>>>>>>>>> UIs similar to Spark but for KafkaStreams >>> to >>>>>>> visualize >>>>>>>> DAGs...so >>>>>>>>>> maybe >>>>>>>>>>>> this >>>>>>>>>>>>> KIP is just a first step. >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks, >>>>>>>>>>>>> >>>>>>>>>>>>> 2017-03-01 22:52 GMT+01:00 Matthias J. Sax >>>>>>>> <matth...@confluent.io <mailto: >>> matth...@confluent.io> >>>>>>> <mailto:matth...@confluent.io <mailto: >>> matth...@confluent.io >>>>>>> >>>>>>> >>>>>>>>>> : >>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for the KIP. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I am wondering a little bit, why you >>> need >>>>>>> to expose this >>>>>>>>>> information. >>>>>>>>>>>>>> Can you describe some use cases? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Would it be worth to unify this new API >>>>> with >>>>>>>> KafkaStreams#state() >>>>>>>>>> to >>>>>>>>>>>> get >>>>>>>>>>>>>> the overall state of an application >>>>>>> without the need to >>>>>>>> call two >>>>>>>>>>>>>> different methods? Not sure how this >>>>>>> unified API might >>>>>>>> look like >>>>>>>>>>>> though. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> One minor comment about the API: >>>>>>> TaskState#assignment >>>>>>>> seems to be >>>>>>>>>>>>>> redundant. It should be the same as >>>>>>>>>>>>>> >>>>> TaskState#consumedOffsetsByPartition.keySet() >>>>>>>>>>>>>> >>>>>>>>>>>>>> Or do I miss something? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -Matthias >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 3/1/17 5:19 AM, Florian Hussonnois >>>>> wrote: >>>>>>>>>>>>>>> Hi Eno, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Yes, but the state() method only >>> returns >>>>>>> the global >>>>>>>> state of >>>>>>>>> the >>>>>>>>>>>>>>> KafkaStream application (ie: CREATED, >>>>>>> RUNNING, >>>>>>>> REBALANCING, >>>>>>>>>>>>>>> PENDING_SHUTDOWN, NOT_RUNNING). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> An alternative to this KIP would be to >>>>>>> change this >>>>>>>> method to >>>>>>>>>> return >>>>>>>>>>>>> more >>>>>>>>>>>>>>> information instead of adding a new >>>>> method. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2017-03-01 13:46 GMT+01:00 Eno >>> Thereska >>>>> < >>>>>>>>> eno.there...@gmail.com >>>>>>> <mailto:eno.there...@gmail.com> >>>>>>> <mailto:eno.there...@gmail.com <mailto: >>>>> eno.there...@gmail.com>> >>>>>>>>>>> : >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks Florian, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Have you had a chance to look at the >>>>> new state methods in >>>>>>>>>> 0.10.2, >>>>>>>>>>>>> e.g., >>>>>>>>>>>>>>>> KafkaStreams.state()? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>> Eno >>>>>>>>>>>>>>>>> On 1 Mar 2017, at 11:54, Florian >>>>> Hussonnois < >>>>>>>>>>> fhussonn...@gmail.com >>>>>>> <mailto:fhussonn...@gmail.com> <mailto: >>>>> fhussonn...@gmail.com >>>>>>> <mailto:fhussonn...@gmail.com>> >>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi all, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I have just created KIP-130 to add a >>>>>>> new method to the >>>>>>>>>>> KafkaStreams >>>>>>>>>>>>> API >>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>> order to expose the states of >>> threads >>>>>>> and active tasks. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+ >>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+> >>>>>>>> >>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+ >>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+>> >>>>>>>>>>>>>>>> >>>>>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+ >>>>>>>>> public+API >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>> Florian HUSSONNOIS >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Florian HUSSONNOIS >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> -- Guozhang >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Florian HUSSONNOIS >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> -- Guozhang >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Florian HUSSONNOIS >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> -- Guozhang >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> -- Guozhang >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Florian HUSSONNOIS >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Florian HUSSONNOIS >>>>>> >>>>> >>>>> >>>> >>> >>> >> >> >