Hello Nayanjyoti, about this part you mentioned:

"Also, we had noticed that on restarts the downstream of the suppress
operator is *flooded* with events, which in the ideal case wouldn't have
come. I came across https://stackoverflow.com/a/54227156 where Matthias had
responded the behaviour of the supress buffer being in memory. (for 2.1
version) and that it reads changelog to *recreate* the buffer which should
actually *prevent* the behaviour(downstream being flooded) mentioned above.
Am I missing something?"

It was me who asked that question in SO (
https://stackoverflow.com/a/54227156)
Yes, in the version 2.2.0 the bug is still there but has been solved in the
version 2.2.1 (https://issues.apache.org/jira/browse/KAFKA-7895) (which is
under voting right now 2.2.1-RC1
https://home.apache.org/~vahid/kafka-2.2.1-rc1/RELEASE_NOTES.html)
I have tested the App that was suffering that problem and now is solved. Of
course, you need to test your own App.

I hope that helps.

Cheers!
--
Jonathan


On Tue, May 21, 2019 at 11:29 AM Nayanjyoti Deka <nayanjy...@ixigo.com>
wrote:

> Hi Guozhang,
>
> I had looked more into it. Seems that on restart the suppress changelog
> topic was being recreated and at that time there were no heartbeats to the
> broker from the application, hence causing it to behave this way. I could
> see the log of reading the suppress changelog topic from offset 0 on the
> restart.
>
> I'm trying to understand why it needs to read the entire changelog topic
> since the window which has passed (past time) should have been compacted
> (or maybe deleted)  from the broker's topic data.
>
> Also, we had noticed that on restarts the downstream of the suppress
> operator is *flooded* with events, which in the ideal case wouldn't have
> come. I came across https://stackoverflow.com/a/54227156 where Matthias
> had
> responded the behaviour of the supress buffer being in memory. (for 2.1
> version) and that it reads changelog to *recreate* the buffer which should
> actually *prevent* the behaviour(downstream being flooded) mentioned above.
> Am I missing something?
>
> We are using 2.2 version which probably has the same behaviour as well.
>
> Please correct me if I'm wrong on my analysis somewhere.
>
> Also if possible, could you please provide me with an understanding of why
> suppress was implemented with an in-memory buffer in mind first and not
> spilling to disk. I have read the KIP document,
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables
> ,
> but this doesn't mention any specifics as to why the in-memory
> implementation was chosen since across restarts on-disk spills would have
> provided the exact semantics described by the operator.
>
>
>
> On Mon, May 20, 2019 at 9:24 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hello Nayanjyoti,
> >
> > Did you find anything else from the streams log entries (is it enabled on
> > DEBUG or TRACE?), and what version of Kafka are you using?
> >
> >
> > Guozhang
> >
> > On Sun, May 19, 2019 at 1:04 PM Nayanjyoti Deka <nayanjy...@ixigo.com>
> > wrote:
> >
> > > Forgot to add that there is no transition to RUNNING state.
> > >
> > > On Mon, May 20, 2019 at 1:10 AM Nayanjyoti Deka <nayanjy...@ixigo.com>
> > > wrote:
> > >
> > > > Hey guys,
> > > >
> > > > We are running a stream application in our production environment. On
> > our
> > > > latest restart, the application is consistently moving between these
> > two
> > > > states.
> > > >
> > > > From our logs:
> > > >
> > > > grep "State transition from " application.log | jq -r '.message' |
> > sort |
> > > > uniq -c | sort -n -r
> > > >
> > > >      40 stream-thread [yyyy-StreamThread-9] State transition from
> > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > > >
> > > >      40 stream-thread [yyyy-StreamThread-8] State transition from
> > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > > >
> > > >      40 stream-thread [yyyy-StreamThread-7] State transition from
> > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > > >
> > > >      40 stream-thread [yyyy-StreamThread-6] State transition from
> > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > > >
> > > >      40 stream-thread [yyyy-StreamThread-5] State transition from
> > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > > >
> > > >      40 stream-thread [yyyy-StreamThread-4] State transition from
> > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > > >
> > > >      40 stream-thread [yyyy-StreamThread-3] State transition from
> > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > > >
> > > >      40 stream-thread [yyyy-StreamThread-2] State transition from
> > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > > >
> > > >      40 stream-thread [yyyy-StreamThread-1] State transition from
> > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > > >
> > > >      40 stream-thread [yyyy-StreamThread-12] State transition from
> > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > > >
> > > >      40 stream-thread [yyyy-StreamThread-11] State transition from
> > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > > >
> > > >      40 stream-thread [yyyy-StreamThread-10] State transition from
> > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
> > > >
> > > >      39 stream-thread [yyyy-StreamThread-9] State transition from
> > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
> > > >
> > > >      39 stream-thread [yyyy-StreamThread-8] State transition from
> > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
> > > >
> > > >      39 stream-thread [yyyy-StreamThread-7] State transition from
> > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
> > > >
> > > >      39 stream-thread [yyyy-StreamThread-6] State transition from
> > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
> > > >
> > > >      39 stream-thread [yyyy-StreamThread-5] State transition from
> > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
> > > >
> > > >      39 stream-thread [yyyy-StreamThread-4] State transition from
> > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
> > > >
> > > >      39 stream-thread [yyyy-StreamThread-3] State transition from
> > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
> > > >
> > > >      39 stream-thread [yyyy-StreamThread-2] State transition from
> > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
> > > >
> > > >      39 stream-thread [yyyy-StreamThread-1] State transition from
> > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
> > > >
> > > >      39 stream-thread [yyyy-StreamThread-12] State transition from
> > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
> > > >
> > > >      39 stream-thread [yyyy-StreamThread-11] State transition from
> > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
> > > >
> > > >      39 stream-thread [yyyy-StreamThread-10] State transition from
> > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED
> > > >
> > > >
> > > >
> > > > As we can see the stream threads are first revoked than again
> assigned.
> > > >
> > > > Also we can see the logs of resetting of offsets continuously as
> > follows:
> > > >
> > > > Resetting offset for partition xxxx-2 to offset 9166288.
> > > >
> > > >
> > > > We had actually deleted the consumer group on broker before the
> restart
> > > as
> > > > there was considerable lag in the topic and processing of the stale
> > data
> > > > was not intended. We had assumed that on deleting the group, the
> > > > application will start processing from latest offset as mentioned in
> > the
> > > > config auto.offset.reset policy.
> > > >
> > > > On describing the consumer group on broker side, we receive output
> with
> > > > the current offset and lag set as --  (Eg shown below)
> > > >
> > > >
> > > > TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> > > > CONSUMER-ID
> > > >
> > > > xxx     10         -               129822997       -
> > > > yyyy-StreamThread-2-consumer-a-b-c-d
> > > >
> > > >
> > > >
> > > > Please help us understand why this can be happening and how to solve
> > > this.
> > > >
> > > >
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
Santilli Jonathan

Reply via email to