HI Guozhang, Thanks for your swift feedback. Using your "Pipe App" example might actually be a neat work-around. I'll see if I can work out a simple prototype for this on our platform.
The only downside of this is that I will double the message-load on the platform (from source-topics to processing-topic) But again, I'll try it out and see how far I get with this solution. Thanks again for sharing your insights, Bart On Mon, Aug 14, 2017 at 10:10 PM, Guozhang Wang <wangg...@gmail.com> wrote: > Hello Bart, > > Thanks for your detailed explanation. I saw your motivation now and it > indeed validates itself as a single application that dynamically change > subscriptions. > > As I mentioned Streams today do not have a native support for dynamically > changing subscriptions. That being said, If you would not care about the > offsets then there may be one sub-optimal workaround: you can have a > two-stage pipeline to separate topic consumption with processing, where the > first stage is a very simple "Pipe" app (e.g. > https://github.com/apache/kafka/blob/trunk/streams/ > quickstart/java/src/main/resources/archetype-resources/ > src/main/java/Pipe.java) > which is very lightweight in resource consumption. Each app pipes from a > topic to the processing topic which aggregates all the topics that you want > to track in the second stage, i.e. the processing stage. > > Your processing app only reads from this aggregated source topic, while > your piping apps pipe each input topic to the aggregate topic. You can read > from the global "command" topic to 1) auto generate the code with the > source topic swapped with the specified string, compile and execute the > code, or 2) shutdown an existing program piping from a topic. This will > admittedly introduce a duplicate topic containing the aggregated data, but > operational-wise may still be simpler. > > > Guozhang > > > On Mon, Aug 14, 2017 at 2:47 AM, Bart Vercammen <b...@cloutrix.com> wrote: > > > Hi Guozhang, > > > > For the use-cases I have in mind, the offset of the source topics is > > irrelevant to the state stored by the streams application. > > So, when topic 'A' gets dropped and topic 'B' is added, I would prefer > the > > application to start reading from 'latest' but that is actually not > *that* > > important to me. > > The main thing is that I'm able somehow to add a new kafka topic to the > > source of the streams application at runtime, triggered by messages > flowing > > on another "metadata" kafka topic. > > > > So considering your example: > > When 'B' is added, I would expect the streams application to start > reading > > from 'latest' (but not that important) > > When 'A' is removed, the state from 'A' is still valid in the state > stores, > > but 'A' should not be tracked anymore. > > When 'A' is added again, I would expect the streams application to start > > reading from 'latest' (not the committed offset, but again not that > > important for my use-case) > > > > But this being said, my main focus is on the ability to 'add' new kafka > > topics to the application rather than removing them. > > What I could do is define a wildcard subscription on all topics. This > > would update dynamically then, but the problem here is that I will run > > *all* topics through the application which is major overkill and would > make > > it unperformant (especially as there are topics in there that produce a > lot > > of data that should not be tracked, but will pass through the streams > > application then). Let's say from the 300 kafka topics on the system, > > about 50 of them need to be tracked by the application. > > We have the ability to mark these topics through the "metadata" topic, so > > it would be nice that this could also trigger updating the source-pattern > > for the subscriptions in the kafka streams application. > > > > The problem with multiple applications is the following: > > - the "state" should be centralized, as it can be queried (having > multiple > > applications would make it more difficult to achieve this) > > - multiple applications will required more resources to be reserved on > the > > cluster > > - We need an external tool to start/stop the streams applications, > > depending on the info on the metadata topic. > > > > Greets, > > Bart > > > > > > On Mon, Aug 14, 2017 at 3:03 AM, Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Hello Bart, > > > > > > Before we talk about dynamic subscription in Streams I'd like to ask > some > > > more questions about your usage. > > > > > > Let's say the current running application is reading from topic A, and > > have > > > just reached offset 100; now from the global state there is a > > notification > > > saying "stop on topic A, start on topic B". How would you start > fetching > > > from topic B? Would you start at the earliest offset or latest offset > (so > > > it is totally irrelevant to the point you stopped at topic A), or would > > you > > > start at offset 100? In addition, if your application maintains some > > local > > > state, how would the state be affected by the switching of the topic, > > since > > > at that point it is actually reflecting "the state of the application > up > > to > > > topic A's offset 100", could you still reuse that state with topic B at > > the > > > specified offset? > > > > > > To make things a bit more complicated, if later another notification > from > > > the global state has arrived saying "now switch back to topic A again", > > > would you restart at where you stopped, i.e. offset 100, or you'll stop > > at > > > the earliest or latest offset of topic A, are the application states > > still > > > reusable? > > > > > > I think if your answer to these questions are "no", then you'd better > > treat > > > them as different applications, i.e. one app reading from A, and one > app > > > reading from B but with similar topology. It's just that based on the > > meta > > > topic the apps may be stopped / restarted dynamically. Currently Kafka > > > Streams do not have support for dynamic support yet, since lots of such > > > request scenarios turned out to be not really fit to be collapsed into > a > > > single applications; if there is indeed a common request, I think one > way > > > to do that is to make PartitionAssignor customizable by users as you > > > suggested, so that only the selected partitions are used to re-form the > > > tasks; but one still need some way to trigger a rebalance so that the > > > PartitionAssignor can be called. > > > > > > > > > Guozhang > > > > > > On Fri, Aug 11, 2017 at 1:42 AM, Bart Vercammen <b...@cloutrix.com> > > wrote: > > > > > > > Hi, > > > > > > > > I have a question basically on how it would be the best way to > > implement > > > > something within Kafka Streams. The thing I would like to do: > > > "dynamically > > > > update the subscription pattern of the source topics. > > > > > > > > The reasoning behind this (in my project): > > > > meta data about the source topics is evented on an other kafka topic, > > > that > > > > should be tracked by the kafka streams topology, and depending on > that > > > meta > > > > data specific source topics should be added, or removed from the > kafka > > > > streams topology. > > > > > > > > Currently I track the "meta data topic" as "global state", so that > > every > > > > processor can actually access it to fetch the meta data (this meta > data > > > for > > > > instance also describes whether or not a specific topic pattern > should > > be > > > > tracked by the stream processor) - so consider this as some kind of > > > > "configuration" stream about the source topics. > > > > > > > > So now it comes, > > > > Is there any way I could (from a running topology) update the kafka > > > > consumer subscriptions? > > > > So that I'm able to replace the source topic pattern while the > topology > > > is > > > > running? > > > > > > > > I don't think there currently is a way to do this, but as under the > > hood > > > it > > > > is just a kafka consumer, my believe is that it should be possible > > > somehow > > > > ... > > > > > > > > I was thinking about the PartitionAssigner ... if I could get my > hands > > on > > > > that one, maybe I could dynamically configure it to only allow > specific > > > > topic-patterns? > > > > Or directly alter the subscription on the underlying consumer? > > > > > > > > I don't know all the nifty details about the Kafka Streams internals, > > so > > > it > > > > would be nice if someone could direct me in the right direction to > > > achieve > > > > this ... > > > > > > > > Thanks, > > > > Bart > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >