Bart, Thanks for providing your observations and conclusions. Stay tuned on further discussions on adding dynamic subscriptions in Streams.
Guozhang On Wed, Aug 16, 2017 at 1:37 AM, Bart Vercammen <b...@cloutrix.com> wrote: > Hi Guozhang, > > In the end I opted for a native Kafka consumer/producer application instead > of using Kafka streams for this. > The overhead in creating new streams applications for each update of the > metadata was a bit to cumbersome. > But still, the issue remains that, although this works (thanks for the > suggestion) I double the data on Kafka. > > In order to reduce the data I started moving some of the aggregation logic > from my streams application to this > new application that combines the input streams. So this leaves me with > only little logic in the streams application > to preserve the state in its kafka backed state-stores. > > So it already is happening: > when I now implement state handling in my "combining application" I will > end up creating an own stream-processing framework > instead of levering on kafka streams ... this was not really the intention. > > So, in short, your suggestion is a nice work-around, but still leaves me > with my initial remark that it would be useful > to somehow be able to alter the subscriptions in a running streams > application. > > Bart > > > On Tue, Aug 15, 2017 at 1:45 PM, Bart Vercammen <b...@cloutrix.com> wrote: > > > HI Guozhang, > > > > Thanks for your swift feedback. > > Using your "Pipe App" example might actually be a neat work-around. > > I'll see if I can work out a simple prototype for this on our platform. > > > > The only downside of this is that I will double the message-load on the > > platform (from source-topics to processing-topic) > > But again, I'll try it out and see how far I get with this solution. > > > > Thanks again for sharing your insights, > > Bart > > > > > > On Mon, Aug 14, 2017 at 10:10 PM, Guozhang Wang <wangg...@gmail.com> > > wrote: > > > >> Hello Bart, > >> > >> Thanks for your detailed explanation. I saw your motivation now and it > >> indeed validates itself as a single application that dynamically change > >> subscriptions. > >> > >> As I mentioned Streams today do not have a native support for > dynamically > >> changing subscriptions. That being said, If you would not care about the > >> offsets then there may be one sub-optimal workaround: you can have a > >> two-stage pipeline to separate topic consumption with processing, where > >> the > >> first stage is a very simple "Pipe" app (e.g. > >> https://github.com/apache/kafka/blob/trunk/streams/quickstar > >> t/java/src/main/resources/archetype-resources/src/main/java/Pipe.java) > >> which is very lightweight in resource consumption. Each app pipes from a > >> topic to the processing topic which aggregates all the topics that you > >> want > >> to track in the second stage, i.e. the processing stage. > >> > >> Your processing app only reads from this aggregated source topic, while > >> your piping apps pipe each input topic to the aggregate topic. You can > >> read > >> from the global "command" topic to 1) auto generate the code with the > >> source topic swapped with the specified string, compile and execute the > >> code, or 2) shutdown an existing program piping from a topic. This will > >> admittedly introduce a duplicate topic containing the aggregated data, > but > >> operational-wise may still be simpler. > >> > >> > >> Guozhang > >> > >> > >> On Mon, Aug 14, 2017 at 2:47 AM, Bart Vercammen <b...@cloutrix.com> > >> wrote: > >> > >> > Hi Guozhang, > >> > > >> > For the use-cases I have in mind, the offset of the source topics is > >> > irrelevant to the state stored by the streams application. > >> > So, when topic 'A' gets dropped and topic 'B' is added, I would > prefer > >> the > >> > application to start reading from 'latest' but that is actually not > >> *that* > >> > important to me. > >> > The main thing is that I'm able somehow to add a new kafka topic to > the > >> > source of the streams application at runtime, triggered by messages > >> flowing > >> > on another "metadata" kafka topic. > >> > > >> > So considering your example: > >> > When 'B' is added, I would expect the streams application to start > >> reading > >> > from 'latest' (but not that important) > >> > When 'A' is removed, the state from 'A' is still valid in the state > >> stores, > >> > but 'A' should not be tracked anymore. > >> > When 'A' is added again, I would expect the streams application to > start > >> > reading from 'latest' (not the committed offset, but again not that > >> > important for my use-case) > >> > > >> > But this being said, my main focus is on the ability to 'add' new > kafka > >> > topics to the application rather than removing them. > >> > What I could do is define a wildcard subscription on all topics. This > >> > would update dynamically then, but the problem here is that I will run > >> > *all* topics through the application which is major overkill and would > >> make > >> > it unperformant (especially as there are topics in there that produce > a > >> lot > >> > of data that should not be tracked, but will pass through the streams > >> > application then). Let's say from the 300 kafka topics on the system, > >> > about 50 of them need to be tracked by the application. > >> > We have the ability to mark these topics through the "metadata" topic, > >> so > >> > it would be nice that this could also trigger updating the > >> source-pattern > >> > for the subscriptions in the kafka streams application. > >> > > >> > The problem with multiple applications is the following: > >> > - the "state" should be centralized, as it can be queried (having > >> multiple > >> > applications would make it more difficult to achieve this) > >> > - multiple applications will required more resources to be reserved on > >> the > >> > cluster > >> > - We need an external tool to start/stop the streams applications, > >> > depending on the info on the metadata topic. > >> > > >> > Greets, > >> > Bart > >> > > >> > > >> > On Mon, Aug 14, 2017 at 3:03 AM, Guozhang Wang <wangg...@gmail.com> > >> wrote: > >> > > >> > > Hello Bart, > >> > > > >> > > Before we talk about dynamic subscription in Streams I'd like to ask > >> some > >> > > more questions about your usage. > >> > > > >> > > Let's say the current running application is reading from topic A, > and > >> > have > >> > > just reached offset 100; now from the global state there is a > >> > notification > >> > > saying "stop on topic A, start on topic B". How would you start > >> fetching > >> > > from topic B? Would you start at the earliest offset or latest > offset > >> (so > >> > > it is totally irrelevant to the point you stopped at topic A), or > >> would > >> > you > >> > > start at offset 100? In addition, if your application maintains some > >> > local > >> > > state, how would the state be affected by the switching of the > topic, > >> > since > >> > > at that point it is actually reflecting "the state of the > application > >> up > >> > to > >> > > topic A's offset 100", could you still reuse that state with topic B > >> at > >> > the > >> > > specified offset? > >> > > > >> > > To make things a bit more complicated, if later another notification > >> from > >> > > the global state has arrived saying "now switch back to topic A > >> again", > >> > > would you restart at where you stopped, i.e. offset 100, or you'll > >> stop > >> > at > >> > > the earliest or latest offset of topic A, are the application states > >> > still > >> > > reusable? > >> > > > >> > > I think if your answer to these questions are "no", then you'd > better > >> > treat > >> > > them as different applications, i.e. one app reading from A, and one > >> app > >> > > reading from B but with similar topology. It's just that based on > the > >> > meta > >> > > topic the apps may be stopped / restarted dynamically. Currently > Kafka > >> > > Streams do not have support for dynamic support yet, since lots of > >> such > >> > > request scenarios turned out to be not really fit to be collapsed > >> into a > >> > > single applications; if there is indeed a common request, I think > one > >> way > >> > > to do that is to make PartitionAssignor customizable by users as you > >> > > suggested, so that only the selected partitions are used to re-form > >> the > >> > > tasks; but one still need some way to trigger a rebalance so that > the > >> > > PartitionAssignor can be called. > >> > > > >> > > > >> > > Guozhang > >> > > > >> > > On Fri, Aug 11, 2017 at 1:42 AM, Bart Vercammen <b...@cloutrix.com> > >> > wrote: > >> > > > >> > > > Hi, > >> > > > > >> > > > I have a question basically on how it would be the best way to > >> > implement > >> > > > something within Kafka Streams. The thing I would like to do: > >> > > "dynamically > >> > > > update the subscription pattern of the source topics. > >> > > > > >> > > > The reasoning behind this (in my project): > >> > > > meta data about the source topics is evented on an other kafka > >> topic, > >> > > that > >> > > > should be tracked by the kafka streams topology, and depending on > >> that > >> > > meta > >> > > > data specific source topics should be added, or removed from the > >> kafka > >> > > > streams topology. > >> > > > > >> > > > Currently I track the "meta data topic" as "global state", so that > >> > every > >> > > > processor can actually access it to fetch the meta data (this meta > >> data > >> > > for > >> > > > instance also describes whether or not a specific topic pattern > >> should > >> > be > >> > > > tracked by the stream processor) - so consider this as some kind > of > >> > > > "configuration" stream about the source topics. > >> > > > > >> > > > So now it comes, > >> > > > Is there any way I could (from a running topology) update the > kafka > >> > > > consumer subscriptions? > >> > > > So that I'm able to replace the source topic pattern while the > >> topology > >> > > is > >> > > > running? > >> > > > > >> > > > I don't think there currently is a way to do this, but as under > the > >> > hood > >> > > it > >> > > > is just a kafka consumer, my believe is that it should be possible > >> > > somehow > >> > > > ... > >> > > > > >> > > > I was thinking about the PartitionAssigner ... if I could get my > >> hands > >> > on > >> > > > that one, maybe I could dynamically configure it to only allow > >> specific > >> > > > topic-patterns? > >> > > > Or directly alter the subscription on the underlying consumer? > >> > > > > >> > > > I don't know all the nifty details about the Kafka Streams > >> internals, > >> > so > >> > > it > >> > > > would be nice if someone could direct me in the right direction to > >> > > achieve > >> > > > this ... > >> > > > > >> > > > Thanks, > >> > > > Bart > >> > > > > >> > > > >> > > > >> > > > >> > > -- > >> > > -- Guozhang > >> > > > >> > > >> > >> > >> > >> -- > >> -- Guozhang > >> > > > > > -- -- Guozhang