Bart,

Thanks for providing your observations and conclusions. Stay tuned on
further discussions on adding dynamic subscriptions in Streams.


Guozhang


On Wed, Aug 16, 2017 at 1:37 AM, Bart Vercammen <b...@cloutrix.com> wrote:

> Hi Guozhang,
>
> In the end I opted for a native Kafka consumer/producer application instead
> of using Kafka streams for this.
> The overhead in creating new streams applications for each update of the
> metadata was a bit to cumbersome.
> But still, the issue remains that, although this works (thanks for the
> suggestion) I double the data on Kafka.
>
> In order to reduce the data I started moving some of the aggregation logic
> from my streams application to this
> new application that combines the input streams.  So this leaves me with
> only little logic in the streams application
> to preserve the state in its kafka backed state-stores.
>
> So it already is happening:
> when I now implement state handling in my "combining application" I will
> end up creating an own stream-processing framework
> instead of levering on kafka streams ... this was not really the intention.
>
> So, in short, your suggestion is a nice work-around, but still leaves me
> with my initial remark that it would be useful
> to somehow be able to alter the subscriptions in a running streams
> application.
>
> Bart
>
>
> On Tue, Aug 15, 2017 at 1:45 PM, Bart Vercammen <b...@cloutrix.com> wrote:
>
> > HI Guozhang,
> >
> > Thanks for your swift feedback.
> > Using your "Pipe App" example might actually be a neat work-around.
> > I'll see if I can work out a simple prototype for this on our platform.
> >
> > The only downside of this is that I will double the message-load on the
> > platform (from source-topics to processing-topic)
> > But again, I'll try it out and see how far I get with this solution.
> >
> > Thanks again for sharing your insights,
> > Bart
> >
> >
> > On Mon, Aug 14, 2017 at 10:10 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> >
> >> Hello Bart,
> >>
> >> Thanks for your detailed explanation. I saw your motivation now and it
> >> indeed validates itself as a single application that dynamically change
> >> subscriptions.
> >>
> >> As I mentioned Streams today do not have a native support for
> dynamically
> >> changing subscriptions. That being said, If you would not care about the
> >> offsets then there may be one sub-optimal workaround: you can have a
> >> two-stage pipeline to separate topic consumption with processing, where
> >> the
> >> first stage is a very simple "Pipe" app (e.g.
> >> https://github.com/apache/kafka/blob/trunk/streams/quickstar
> >> t/java/src/main/resources/archetype-resources/src/main/java/Pipe.java)
> >> which is very lightweight in resource consumption. Each app pipes from a
> >> topic to the processing topic which aggregates all the topics that you
> >> want
> >> to track in the second stage, i.e. the processing stage.
> >>
> >> Your processing app only reads from this aggregated source topic, while
> >> your piping apps pipe each input topic to the aggregate topic. You can
> >> read
> >> from the global "command" topic to 1) auto generate the code with the
> >> source topic swapped with the specified string, compile and execute the
> >> code, or 2) shutdown an existing program piping from a topic. This will
> >> admittedly introduce a duplicate topic containing the aggregated data,
> but
> >> operational-wise may still be simpler.
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Mon, Aug 14, 2017 at 2:47 AM, Bart Vercammen <b...@cloutrix.com>
> >> wrote:
> >>
> >> > Hi Guozhang,
> >> >
> >> > For the use-cases I have in mind, the offset of the source topics is
> >> > irrelevant to the state stored by the streams application.
> >> > So, when topic 'A' gets dropped and topic 'B' is added,  I would
> prefer
> >> the
> >> > application to start reading from 'latest' but that is actually not
> >> *that*
> >> > important to me.
> >> > The main thing is that I'm able somehow to add a new kafka topic to
> the
> >> > source of the streams application at runtime, triggered by messages
> >> flowing
> >> > on another "metadata" kafka topic.
> >> >
> >> > So considering your example:
> >> > When 'B' is added, I would expect the streams application to start
> >> reading
> >> > from 'latest' (but not that important)
> >> > When 'A' is removed, the state from 'A' is still valid in the state
> >> stores,
> >> > but 'A' should not be tracked anymore.
> >> > When 'A' is added again, I would expect the streams application to
> start
> >> > reading from 'latest' (not the committed offset, but again not that
> >> > important for my use-case)
> >> >
> >> > But this being said, my main focus is on the ability to 'add' new
> kafka
> >> > topics to the application rather than removing them.
> >> > What I could do is define a wildcard subscription on all topics.  This
> >> > would update dynamically then, but the problem here is that I will run
> >> > *all* topics through the application which is major overkill and would
> >> make
> >> > it unperformant (especially as there are topics in there that produce
> a
> >> lot
> >> > of data that should not be tracked, but will pass through the streams
> >> > application then).  Let's say from the 300 kafka topics on the system,
> >> > about 50 of them need to be tracked by the application.
> >> > We have the ability to mark these topics through the "metadata" topic,
> >> so
> >> > it would be nice that this could also trigger updating the
> >> source-pattern
> >> > for the subscriptions in the kafka streams application.
> >> >
> >> > The problem with multiple applications is the following:
> >> > - the "state" should be centralized, as it can be queried (having
> >> multiple
> >> > applications would make it more difficult to achieve this)
> >> > - multiple applications will required more resources to be reserved on
> >> the
> >> > cluster
> >> > - We need an external tool to start/stop the streams applications,
> >> > depending on the info on the metadata topic.
> >> >
> >> > Greets,
> >> > Bart
> >> >
> >> >
> >> > On Mon, Aug 14, 2017 at 3:03 AM, Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >> >
> >> > > Hello Bart,
> >> > >
> >> > > Before we talk about dynamic subscription in Streams I'd like to ask
> >> some
> >> > > more questions about your usage.
> >> > >
> >> > > Let's say the current running application is reading from topic A,
> and
> >> > have
> >> > > just reached offset 100; now from the global state there is a
> >> > notification
> >> > > saying "stop on topic A, start on topic B". How would you start
> >> fetching
> >> > > from topic B? Would you start at the earliest offset or latest
> offset
> >> (so
> >> > > it is totally irrelevant to the point you stopped at topic A), or
> >> would
> >> > you
> >> > > start at offset 100? In addition, if your application maintains some
> >> > local
> >> > > state, how would the state be affected by the switching of the
> topic,
> >> > since
> >> > > at that point it is actually reflecting "the state of the
> application
> >> up
> >> > to
> >> > > topic A's offset 100", could you still reuse that state with topic B
> >> at
> >> > the
> >> > > specified offset?
> >> > >
> >> > > To make things a bit more complicated, if later another notification
> >> from
> >> > > the global state has arrived saying "now switch back to topic A
> >> again",
> >> > > would you restart at where you stopped, i.e. offset 100, or you'll
> >> stop
> >> > at
> >> > > the earliest or latest offset of topic A, are the application states
> >> > still
> >> > > reusable?
> >> > >
> >> > > I think if your answer to these questions are "no", then you'd
> better
> >> > treat
> >> > > them as different applications, i.e. one app reading from A, and one
> >> app
> >> > > reading from B but with similar topology. It's just that based on
> the
> >> > meta
> >> > > topic the apps may be stopped / restarted dynamically. Currently
> Kafka
> >> > > Streams do not have support for dynamic support yet, since lots of
> >> such
> >> > > request scenarios turned out to be not really fit to be collapsed
> >> into a
> >> > > single applications; if there is indeed a common request, I think
> one
> >> way
> >> > > to do that is to make PartitionAssignor customizable by users as you
> >> > > suggested, so that only the selected partitions are used to re-form
> >> the
> >> > > tasks; but one still need some way to trigger a rebalance so that
> the
> >> > > PartitionAssignor can be called.
> >> > >
> >> > >
> >> > > Guozhang
> >> > >
> >> > > On Fri, Aug 11, 2017 at 1:42 AM, Bart Vercammen <b...@cloutrix.com>
> >> > wrote:
> >> > >
> >> > > > Hi,
> >> > > >
> >> > > > I have a question basically on how it would be the best way to
> >> > implement
> >> > > > something within Kafka Streams.  The thing I would like to do:
> >> > > "dynamically
> >> > > > update the subscription pattern of the source topics.
> >> > > >
> >> > > > The reasoning behind this (in my project):
> >> > > > meta data about the source topics is evented on an other kafka
> >> topic,
> >> > > that
> >> > > > should be tracked by the kafka streams topology, and depending on
> >> that
> >> > > meta
> >> > > > data specific source topics should be added, or removed from the
> >> kafka
> >> > > > streams topology.
> >> > > >
> >> > > > Currently I track the "meta data topic" as "global state", so that
> >> > every
> >> > > > processor can actually access it to fetch the meta data (this meta
> >> data
> >> > > for
> >> > > > instance also describes whether or not a specific topic pattern
> >> should
> >> > be
> >> > > > tracked by the stream processor) - so consider this as some kind
> of
> >> > > > "configuration" stream about the source topics.
> >> > > >
> >> > > > So now it comes,
> >> > > > Is there any way I could (from a running topology) update the
> kafka
> >> > > > consumer subscriptions?
> >> > > > So that I'm able to replace the source topic pattern while the
> >> topology
> >> > > is
> >> > > > running?
> >> > > >
> >> > > > I don't think there currently is a way to do this, but as under
> the
> >> > hood
> >> > > it
> >> > > > is just a kafka consumer, my believe is that it should be possible
> >> > > somehow
> >> > > > ...
> >> > > >
> >> > > > I was thinking about the PartitionAssigner ... if I could get my
> >> hands
> >> > on
> >> > > > that one, maybe I could dynamically configure it to only allow
> >> specific
> >> > > > topic-patterns?
> >> > > > Or directly alter the subscription on the underlying consumer?
> >> > > >
> >> > > > I don't know all the nifty details about the Kafka Streams
> >> internals,
> >> > so
> >> > > it
> >> > > > would be nice if someone could direct me in the right direction to
> >> > > achieve
> >> > > > this ...
> >> > > >
> >> > > > Thanks,
> >> > > > Bart
> >> > > >
> >> > >
> >> > >
> >> > >
> >> > > --
> >> > > -- Guozhang
> >> > >
> >> >
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
>



-- 
-- Guozhang

Reply via email to