Hi,

If I change application id, it will start process new messages I
assume? The old data will be dropped. But this solution will not work
during production deployment, since we can't change application id for
each release.

My code looks like below:

builder.stream(topicName)
.mapValues()
stream.selectKey(selectKey A)
.groupByKey(..)
.windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO))
.reduce((value1,value2) -> value2)
.suppress
.toStreams()
.selectKey(selectKey B)
.mapValues()
.filter()
.groupByKey()
.reduce
.toStream()
.to()

It will create 5 internal topics:
03-repartition, 03-changelog, 09-changelog, 14-repartition, 14-changelog.

When I restart/redeployment the application, only 03-repartition has
traffic and messages, but no out-traffic. Other internal topics have
no traffic at all after restart/redeployment.
It only works when I change the application ID. Should I include
streams.cleanUp() before start the stream each time? Or anything else
goes wrong?

Thanks a lot!

On Wed, Sep 25, 2019 at 2:35 PM Boyang Chen <reluctanthero...@gmail.com> wrote:
>
> Hey Xiyuan,
>
> I would assume it's easier for us to help you by reading your application
> with a full paste of code (a prototype). Changing application id would work
> suggests that re-process all the data again shall work, do I understand
> that correctly?
>
> Boyang
>
> On Wed, Sep 25, 2019 at 8:16 AM Xiyuan Hu <xiyuan.h...@gmail.com> wrote:
>
> > Hi,
> >
> > I'm running a Kafka streams app(v2.1.0) with windowed function(reduce
> > and suppress). One thing I noticed is, every time when I redeployment
> > or restart the application, I have to change the application ID to a
> > new one, otherwise, only the reduce-repartition internal topic has
> > input traffic(and it has no out-traffic), all other internal topics
> > has no traffic as all. Looks like it just flows into the first
> > internal repartition topic(reduce-repartiton), the reduce-changelog
> > has no traffic and no output traffic as well.
> >
> > Could anyone know what's wrong with it? Changing application Id and
> > create new internal topics each time seems not the right thing to go
> > with.
> >
> > I started the app like below:
> >
> > streams = new KafkaStreams(topology.getTopology(config),
> > properties.getProperties());
> > streams.start();
> >
> > Any help would be appreciated! Thanks!
> >

Reply via email to