Hi Guozhang, Yes, indeed. We found that whenever the changelog offsets were very high it happened.
For now, we are trying the rc1 version on our staging environment. Will update on this thread if the fix that Jonathan mentioned above worked for us as well or not. (for downstream to be flooded with suppress) On Fri, May 24, 2019 at 11:18 PM Guozhang Wang <wangg...@gmail.com> wrote: > Hello Nayanjyoti, > > Regarding the KIP-328, on-disk buffer is indeed being implemented but it > has not been completed and unfortunately has to slip to the next release. > > Now about the "PARTITIONS_REVOKED to PARTITIONS_ASSIGNED" issue, it is > possible that if you are restoring tons of data from the changelog, then it > took long time and while you are doing it since stream did not call > consumer.poll() in time it would be kicked out of the group again. > > > Guozhang > > > On Tue, May 21, 2019 at 5:50 AM Jonathan Santilli < > jonathansanti...@gmail.com> wrote: > > > Hello Nayanjyoti, about this part you mentioned: > > > > "Also, we had noticed that on restarts the downstream of the suppress > > operator is *flooded* with events, which in the ideal case wouldn't have > > come. I came across https://stackoverflow.com/a/54227156 where Matthias > > had > > responded the behaviour of the supress buffer being in memory. (for 2.1 > > version) and that it reads changelog to *recreate* the buffer which > should > > actually *prevent* the behaviour(downstream being flooded) mentioned > above. > > Am I missing something?" > > > > It was me who asked that question in SO ( > > https://stackoverflow.com/a/54227156) > > Yes, in the version 2.2.0 the bug is still there but has been solved in > the > > version 2.2.1 (https://issues.apache.org/jira/browse/KAFKA-7895) (which > is > > under voting right now 2.2.1-RC1 > > https://home.apache.org/~vahid/kafka-2.2.1-rc1/RELEASE_NOTES.html) > > I have tested the App that was suffering that problem and now is solved. > Of > > course, you need to test your own App. > > > > I hope that helps. > > > > Cheers! > > -- > > Jonathan > > > > > > On Tue, May 21, 2019 at 11:29 AM Nayanjyoti Deka <nayanjy...@ixigo.com> > > wrote: > > > > > Hi Guozhang, > > > > > > I had looked more into it. Seems that on restart the suppress changelog > > > topic was being recreated and at that time there were no heartbeats to > > the > > > broker from the application, hence causing it to behave this way. I > could > > > see the log of reading the suppress changelog topic from offset 0 on > the > > > restart. > > > > > > I'm trying to understand why it needs to read the entire changelog > topic > > > since the window which has passed (past time) should have been > compacted > > > (or maybe deleted) from the broker's topic data. > > > > > > Also, we had noticed that on restarts the downstream of the suppress > > > operator is *flooded* with events, which in the ideal case wouldn't > have > > > come. I came across https://stackoverflow.com/a/54227156 where > Matthias > > > had > > > responded the behaviour of the supress buffer being in memory. (for 2.1 > > > version) and that it reads changelog to *recreate* the buffer which > > should > > > actually *prevent* the behaviour(downstream being flooded) mentioned > > above. > > > Am I missing something? > > > > > > We are using 2.2 version which probably has the same behaviour as well. > > > > > > Please correct me if I'm wrong on my analysis somewhere. > > > > > > Also if possible, could you please provide me with an understanding of > > why > > > suppress was implemented with an in-memory buffer in mind first and not > > > spilling to disk. I have read the KIP document, > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables > > > , > > > but this doesn't mention any specifics as to why the in-memory > > > implementation was chosen since across restarts on-disk spills would > have > > > provided the exact semantics described by the operator. > > > > > > > > > > > > On Mon, May 20, 2019 at 9:24 PM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > Hello Nayanjyoti, > > > > > > > > Did you find anything else from the streams log entries (is it > enabled > > on > > > > DEBUG or TRACE?), and what version of Kafka are you using? > > > > > > > > > > > > Guozhang > > > > > > > > On Sun, May 19, 2019 at 1:04 PM Nayanjyoti Deka < > nayanjy...@ixigo.com> > > > > wrote: > > > > > > > > > Forgot to add that there is no transition to RUNNING state. > > > > > > > > > > On Mon, May 20, 2019 at 1:10 AM Nayanjyoti Deka < > > nayanjy...@ixigo.com> > > > > > wrote: > > > > > > > > > > > Hey guys, > > > > > > > > > > > > We are running a stream application in our production > environment. > > On > > > > our > > > > > > latest restart, the application is consistently moving between > > these > > > > two > > > > > > states. > > > > > > > > > > > > From our logs: > > > > > > > > > > > > grep "State transition from " application.log | jq -r '.message' > | > > > > sort | > > > > > > uniq -c | sort -n -r > > > > > > > > > > > > 40 stream-thread [yyyy-StreamThread-9] State transition from > > > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > > > > > 40 stream-thread [yyyy-StreamThread-8] State transition from > > > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > > > > > 40 stream-thread [yyyy-StreamThread-7] State transition from > > > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > > > > > 40 stream-thread [yyyy-StreamThread-6] State transition from > > > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > > > > > 40 stream-thread [yyyy-StreamThread-5] State transition from > > > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > > > > > 40 stream-thread [yyyy-StreamThread-4] State transition from > > > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > > > > > 40 stream-thread [yyyy-StreamThread-3] State transition from > > > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > > > > > 40 stream-thread [yyyy-StreamThread-2] State transition from > > > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > > > > > 40 stream-thread [yyyy-StreamThread-1] State transition from > > > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > > > > > 40 stream-thread [yyyy-StreamThread-12] State transition > from > > > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > > > > > 40 stream-thread [yyyy-StreamThread-11] State transition > from > > > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > > > > > 40 stream-thread [yyyy-StreamThread-10] State transition > from > > > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > > > > > 39 stream-thread [yyyy-StreamThread-9] State transition from > > > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > > > > > 39 stream-thread [yyyy-StreamThread-8] State transition from > > > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > > > > > 39 stream-thread [yyyy-StreamThread-7] State transition from > > > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > > > > > 39 stream-thread [yyyy-StreamThread-6] State transition from > > > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > > > > > 39 stream-thread [yyyy-StreamThread-5] State transition from > > > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > > > > > 39 stream-thread [yyyy-StreamThread-4] State transition from > > > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > > > > > 39 stream-thread [yyyy-StreamThread-3] State transition from > > > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > > > > > 39 stream-thread [yyyy-StreamThread-2] State transition from > > > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > > > > > 39 stream-thread [yyyy-StreamThread-1] State transition from > > > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > > > > > 39 stream-thread [yyyy-StreamThread-12] State transition > from > > > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > > > > > 39 stream-thread [yyyy-StreamThread-11] State transition > from > > > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > > > > > 39 stream-thread [yyyy-StreamThread-10] State transition > from > > > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > > > > > > > > > > > > > > > > > As we can see the stream threads are first revoked than again > > > assigned. > > > > > > > > > > > > Also we can see the logs of resetting of offsets continuously as > > > > follows: > > > > > > > > > > > > Resetting offset for partition xxxx-2 to offset 9166288. > > > > > > > > > > > > > > > > > > We had actually deleted the consumer group on broker before the > > > restart > > > > > as > > > > > > there was considerable lag in the topic and processing of the > stale > > > > data > > > > > > was not intended. We had assumed that on deleting the group, the > > > > > > application will start processing from latest offset as mentioned > > in > > > > the > > > > > > config auto.offset.reset policy. > > > > > > > > > > > > On describing the consumer group on broker side, we receive > output > > > with > > > > > > the current offset and lag set as -- (Eg shown below) > > > > > > > > > > > > > > > > > > TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG > > > > > > CONSUMER-ID > > > > > > > > > > > > xxx 10 - 129822997 - > > > > > > yyyy-StreamThread-2-consumer-a-b-c-d > > > > > > > > > > > > > > > > > > > > > > > > Please help us understand why this can be happening and how to > > solve > > > > > this. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > -- > > Santilli Jonathan > > > > > -- > -- Guozhang >