HI Guozhang,

Thanks for your swift feedback.
Using your "Pipe App" example might actually be a neat work-around.
I'll see if I can work out a simple prototype for this on our platform.

The only downside of this is that I will double the message-load on the
platform (from source-topics to processing-topic)
But again, I'll try it out and see how far I get with this solution.

Thanks again for sharing your insights,
Bart

On Mon, Aug 14, 2017 at 10:10 PM, Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Bart,
>
> Thanks for your detailed explanation. I saw your motivation now and it
> indeed validates itself as a single application that dynamically change
> subscriptions.
>
> As I mentioned Streams today do not have a native support for dynamically
> changing subscriptions. That being said, If you would not care about the
> offsets then there may be one sub-optimal workaround: you can have a
> two-stage pipeline to separate topic consumption with processing, where the
> first stage is a very simple "Pipe" app (e.g.
> https://github.com/apache/kafka/blob/trunk/streams/
> quickstart/java/src/main/resources/archetype-resources/
> src/main/java/Pipe.java)
> which is very lightweight in resource consumption. Each app pipes from a
> topic to the processing topic which aggregates all the topics that you want
> to track in the second stage, i.e. the processing stage.
>
> Your processing app only reads from this aggregated source topic, while
> your piping apps pipe each input topic to the aggregate topic. You can read
> from the global "command" topic to 1) auto generate the code with the
> source topic swapped with the specified string, compile and execute the
> code, or 2) shutdown an existing program piping from a topic. This will
> admittedly introduce a duplicate topic containing the aggregated data, but
> operational-wise may still be simpler.
>
>
> Guozhang
>
>
> On Mon, Aug 14, 2017 at 2:47 AM, Bart Vercammen <b...@cloutrix.com> wrote:
>
> > Hi Guozhang,
> >
> > For the use-cases I have in mind, the offset of the source topics is
> > irrelevant to the state stored by the streams application.
> > So, when topic 'A' gets dropped and topic 'B' is added,  I would prefer
> the
> > application to start reading from 'latest' but that is actually not
> *that*
> > important to me.
> > The main thing is that I'm able somehow to add a new kafka topic to the
> > source of the streams application at runtime, triggered by messages
> flowing
> > on another "metadata" kafka topic.
> >
> > So considering your example:
> > When 'B' is added, I would expect the streams application to start
> reading
> > from 'latest' (but not that important)
> > When 'A' is removed, the state from 'A' is still valid in the state
> stores,
> > but 'A' should not be tracked anymore.
> > When 'A' is added again, I would expect the streams application to start
> > reading from 'latest' (not the committed offset, but again not that
> > important for my use-case)
> >
> > But this being said, my main focus is on the ability to 'add' new kafka
> > topics to the application rather than removing them.
> > What I could do is define a wildcard subscription on all topics.  This
> > would update dynamically then, but the problem here is that I will run
> > *all* topics through the application which is major overkill and would
> make
> > it unperformant (especially as there are topics in there that produce a
> lot
> > of data that should not be tracked, but will pass through the streams
> > application then).  Let's say from the 300 kafka topics on the system,
> > about 50 of them need to be tracked by the application.
> > We have the ability to mark these topics through the "metadata" topic, so
> > it would be nice that this could also trigger updating the source-pattern
> > for the subscriptions in the kafka streams application.
> >
> > The problem with multiple applications is the following:
> > - the "state" should be centralized, as it can be queried (having
> multiple
> > applications would make it more difficult to achieve this)
> > - multiple applications will required more resources to be reserved on
> the
> > cluster
> > - We need an external tool to start/stop the streams applications,
> > depending on the info on the metadata topic.
> >
> > Greets,
> > Bart
> >
> >
> > On Mon, Aug 14, 2017 at 3:03 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Hello Bart,
> > >
> > > Before we talk about dynamic subscription in Streams I'd like to ask
> some
> > > more questions about your usage.
> > >
> > > Let's say the current running application is reading from topic A, and
> > have
> > > just reached offset 100; now from the global state there is a
> > notification
> > > saying "stop on topic A, start on topic B". How would you start
> fetching
> > > from topic B? Would you start at the earliest offset or latest offset
> (so
> > > it is totally irrelevant to the point you stopped at topic A), or would
> > you
> > > start at offset 100? In addition, if your application maintains some
> > local
> > > state, how would the state be affected by the switching of the topic,
> > since
> > > at that point it is actually reflecting "the state of the application
> up
> > to
> > > topic A's offset 100", could you still reuse that state with topic B at
> > the
> > > specified offset?
> > >
> > > To make things a bit more complicated, if later another notification
> from
> > > the global state has arrived saying "now switch back to topic A again",
> > > would you restart at where you stopped, i.e. offset 100, or you'll stop
> > at
> > > the earliest or latest offset of topic A, are the application states
> > still
> > > reusable?
> > >
> > > I think if your answer to these questions are "no", then you'd better
> > treat
> > > them as different applications, i.e. one app reading from A, and one
> app
> > > reading from B but with similar topology. It's just that based on the
> > meta
> > > topic the apps may be stopped / restarted dynamically. Currently Kafka
> > > Streams do not have support for dynamic support yet, since lots of such
> > > request scenarios turned out to be not really fit to be collapsed into
> a
> > > single applications; if there is indeed a common request, I think one
> way
> > > to do that is to make PartitionAssignor customizable by users as you
> > > suggested, so that only the selected partitions are used to re-form the
> > > tasks; but one still need some way to trigger a rebalance so that the
> > > PartitionAssignor can be called.
> > >
> > >
> > > Guozhang
> > >
> > > On Fri, Aug 11, 2017 at 1:42 AM, Bart Vercammen <b...@cloutrix.com>
> > wrote:
> > >
> > > > Hi,
> > > >
> > > > I have a question basically on how it would be the best way to
> > implement
> > > > something within Kafka Streams.  The thing I would like to do:
> > > "dynamically
> > > > update the subscription pattern of the source topics.
> > > >
> > > > The reasoning behind this (in my project):
> > > > meta data about the source topics is evented on an other kafka topic,
> > > that
> > > > should be tracked by the kafka streams topology, and depending on
> that
> > > meta
> > > > data specific source topics should be added, or removed from the
> kafka
> > > > streams topology.
> > > >
> > > > Currently I track the "meta data topic" as "global state", so that
> > every
> > > > processor can actually access it to fetch the meta data (this meta
> data
> > > for
> > > > instance also describes whether or not a specific topic pattern
> should
> > be
> > > > tracked by the stream processor) - so consider this as some kind of
> > > > "configuration" stream about the source topics.
> > > >
> > > > So now it comes,
> > > > Is there any way I could (from a running topology) update the kafka
> > > > consumer subscriptions?
> > > > So that I'm able to replace the source topic pattern while the
> topology
> > > is
> > > > running?
> > > >
> > > > I don't think there currently is a way to do this, but as under the
> > hood
> > > it
> > > > is just a kafka consumer, my believe is that it should be possible
> > > somehow
> > > > ...
> > > >
> > > > I was thinking about the PartitionAssigner ... if I could get my
> hands
> > on
> > > > that one, maybe I could dynamically configure it to only allow
> specific
> > > > topic-patterns?
> > > > Or directly alter the subscription on the underlying consumer?
> > > >
> > > > I don't know all the nifty details about the Kafka Streams internals,
> > so
> > > it
> > > > would be nice if someone could direct me in the right direction to
> > > achieve
> > > > this ...
> > > >
> > > > Thanks,
> > > > Bart
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to