I think I'm having again this issue, this time though it only happens on
some state stores.

Here you can find the code and the logs
https://gist.github.com/alex88/1d60f63f0eee9f1568d89d5e1900fffc
We first seen that our confluent cloud bill went up 10x, then seen that our
streams processor was restarted 12 times (kubernetes pod), checking
confluent cloud usage it seems that the writes/reads went up from the usual
1-2 KB/s to 12-20 MB/s during app restarts.

I've then deployed a new version on a new container (no local store/state)
to see what happened:
 - first, it logs everything up to line 460 of the log file in the gist
 - at this point confluent cloud reports high read usage and the consumer
lag starts to increase, the app is accumulating messages behind
 - after a certain point, writes go up as well
 - when streams app transition to RUNNING state, the final aggregation
function resumes back to where it stopped (without reprocessing old data)
 - consumer lag goes back to 0

What makes me think it's re-reading everything are these lines:

Resetting offset for partition
myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2 to offset
20202847
Restoring task 0_2's state store KSTREAM-AGGREGATE-STATE-STORE-0000000004
from beginning of the changelog
myapp-id-KSTREAM-AGGREGATE-STATE-STORE-0000000004-changelog-2

At first I thought it's because I don't persist the aggregate store
changelog as I do with the "LastValueStore" store which has
"withLoggingEnabled()", but even that store has:

Resetting offset for partition myapp-id-LastValueStore-changelog-0 to
offset 403910
Restoring task 0_0's state store LastValueStore from beginning of the
changelog myapp-id-LastValueStore-changelog-0

Thank you everyone in advance

--
Alessandro Tagliapietra

On Thu, Jun 6, 2019 at 11:37 AM Alessandro Tagliapietra <
tagliapietra.alessan...@gmail.com> wrote:

> I'm not sure, one thing I know for sure is that on the cloud control
> panel, in the consumer lag page, the offset didn't reset on the input
> topic, so it was probably something after that.
>
> Anyway, thanks a lot for helping, if we experience that again I'll try to
> add more verbose logging to better understand what's going on.
>
> Have a great day
>
> --
> Alessandro Tagliapietra
>
>
> On Thu, Jun 6, 2019 at 11:27 AM Guozhang Wang <wangg...@gmail.com> wrote:
>
>> Honestly I cannot think of an issue that fixed in 2.2.1 but not in 2.2.0
>> which could be correlated to your observations:
>>
>>
>> https://issues.apache.org/jira/issues/?filter=-1&jql=project%20%3D%20KAFKA%20AND%20resolution%20%3D%20Fixed%20AND%20fixVersion%20%3D%202.2.1%20AND%20component%20%3D%20streams%20order%20by%20updated%20DESC
>>
>> If you observed that on the cloud, both partitions of the source topic
>> gets
>> re-processed from the beginning, then it means the committed offsets were
>> somehow lost, and hence has to start reading the source topic from
>> scratch.
>> If this is a re-producible issue maybe there are some lurking things in
>> 2.2.0.
>>
>> On Thu, Jun 6, 2019 at 11:10 AM Alessandro Tagliapietra <
>> tagliapietra.alessan...@gmail.com> wrote:
>>
>> > Yes that's right,
>> >
>> > could that be the problem? Anyway, so far after upgrading to 2.2.1 from
>> > 2.2.0 we didn't experience that problem anymore.
>> >
>> > Regards
>> >
>> > --
>> > Alessandro Tagliapietra
>> >
>> >
>> > On Thu, Jun 6, 2019 at 10:50 AM Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> >
>> > > That's right, but local state is used as a "materialized view" of your
>> > > changelog topics: if you have nothing locally, then it has to
>> bootstrap
>> > > from the beginning of your changelog topic.
>> > >
>> > > But I think your question was about the source "sensors-input" topic,
>> not
>> > > the changelog topic. I looked at the logs from two runs, and it seems
>> > > locally your sensors-input has one partition, but on the cloud your
>> > > sensors-input has two partitions. Is that right?
>> > >
>> > >
>> > > Guozhang
>> > >
>> > >
>> > > On Thu, Jun 6, 2019 at 10:23 AM Alessandro Tagliapietra <
>> > > tagliapietra.alessan...@gmail.com> wrote:
>> > >
>> > > > Isn't the windowing state stored in the additional state store
>> topics
>> > > that
>> > > > I had to additionally create?
>> > > >
>> > > > Like these I have here:
>> > > >
>> > > > sensors-pipeline-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog
>> > > > sensors-pipeline-KTABLE-SUPPRESS-STATE-STORE-0000000004-changelog
>> > > >
>> > > > Thank you
>> > > >
>> > > > --
>> > > > Alessandro Tagliapietra
>> > > >
>> > > >
>> > > > On Thu, Jun 6, 2019 at 10:13 AM Guozhang Wang <wangg...@gmail.com>
>> > > wrote:
>> > > >
>> > > > > If you deploy your streams app into a docker container, you'd
>> need to
>> > > > make
>> > > > > sure local state directories are preserved, since otherwise
>> whenever
>> > > you
>> > > > > restart all the state would be lost and Streams then has to
>> bootstrap
>> > > > from
>> > > > > scratch. E.g. if you are using K8s for cluster management, you'd
>> > better
>> > > > use
>> > > > > stateful sets to make sure local states are preserves across
>> > > > re-deployment.
>> > > > >
>> > > > >
>> > > > > Guozhang
>> > > > >
>> > > > > On Wed, Jun 5, 2019 at 4:52 PM Alessandro Tagliapietra <
>> > > > > tagliapietra.alessan...@gmail.com> wrote:
>> > > > >
>> > > > > > Hi Guozhang,
>> > > > > >
>> > > > > > sorry, by "app" i mean the stream processor app, the one shown
>> in
>> > > > > > pipeline.kt.
>> > > > > >
>> > > > > > The app reads a topic of data sent by a sensor each second and
>> > > > generates
>> > > > > a
>> > > > > > 20 second window output to another topic.
>> > > > > > My "problem" is that when running locally with my local kafka
>> > setup,
>> > > > > let's
>> > > > > > say I stop it and start it again, it continues processing the
>> last
>> > > > > window.
>> > > > > > When deploying the app into a docker container and using the
>> > > confluent
>> > > > > > cloud as broker, every time I restart the app it starts
>> processing
>> > > > again
>> > > > > > from the beginning of the input topic and generates again old
>> > windows
>> > > > it
>> > > > > > already processed.
>> > > > > >
>> > > > > > In the meantime I'm trying to upgrade to kafka 2.2.1 to see if I
>> > get
>> > > > any
>> > > > > > improvement.
>> > > > > >
>> > > > > > --
>> > > > > > Alessandro Tagliapietra
>> > > > > >
>> > > > > >
>> > > > > > On Wed, Jun 5, 2019 at 4:45 PM Guozhang Wang <
>> wangg...@gmail.com>
>> > > > wrote:
>> > > > > >
>> > > > > > > Hello Alessandro,
>> > > > > > >
>> > > > > > > What did you do for `restarting the app online`? I'm not sure
>> I
>> > > > follow
>> > > > > > the
>> > > > > > > difference between "restart the streams app" and "restart the
>> app
>> > > > > online"
>> > > > > > > from your description.
>> > > > > > >
>> > > > > > >
>> > > > > > > Guozhang
>> > > > > > >
>> > > > > > >
>> > > > > > > On Wed, Jun 5, 2019 at 10:42 AM Alessandro Tagliapietra <
>> > > > > > > tagliapietra.alessan...@gmail.com> wrote:
>> > > > > > > >
>> > > > > > > > Hello everyone,
>> > > > > > > >
>> > > > > > > > I've a small streams app, the configuration and part of the
>> > code
>> > > > I'm
>> > > > > > > using
>> > > > > > > > can be found here
>> > > > > > > >
>> > https://gist.github.com/alex88/6b7b31c2b008817a24f63246557099bc
>> > > > > > > > There's also the log when the app is started locally and
>> when
>> > the
>> > > > app
>> > > > > > is
>> > > > > > > > started on our servers connecting to the confluent cloud
>> kafka
>> > > > > broker.
>> > > > > > > >
>> > > > > > > > The problem is that locally everything is working properly,
>> if
>> > I
>> > > > > > restart
>> > > > > > > > the streams app it just continues where it left, if I
>> restart
>> > the
>> > > > app
>> > > > > > > > online it reprocesses the whole topic.
>> > > > > > > >
>> > > > > > > > That shouldn't happen right?
>> > > > > > > >
>> > > > > > > > Thanks in advance
>> > > > > > > >
>> > > > > > > > --
>> > > > > > > > Alessandro Tagliapietra
>> > > > > > >
>> > > > > > >
>> > > > > > >
>> > > > > > > --
>> > > > > > > -- Guozhang
>> > > > > > >
>> > > > > >
>> > > > >
>> > > > >
>> > > > > --
>> > > > > -- Guozhang
>> > > > >
>> > > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>>
>>
>> --
>> -- Guozhang
>>
>

Reply via email to