Hello Nayanjyoti, about this part you mentioned: "Also, we had noticed that on restarts the downstream of the suppress operator is *flooded* with events, which in the ideal case wouldn't have come. I came across https://stackoverflow.com/a/54227156 where Matthias had responded the behaviour of the supress buffer being in memory. (for 2.1 version) and that it reads changelog to *recreate* the buffer which should actually *prevent* the behaviour(downstream being flooded) mentioned above. Am I missing something?"
It was me who asked that question in SO ( https://stackoverflow.com/a/54227156) Yes, in the version 2.2.0 the bug is still there but has been solved in the version 2.2.1 (https://issues.apache.org/jira/browse/KAFKA-7895) (which is under voting right now 2.2.1-RC1 https://home.apache.org/~vahid/kafka-2.2.1-rc1/RELEASE_NOTES.html) I have tested the App that was suffering that problem and now is solved. Of course, you need to test your own App. I hope that helps. Cheers! -- Jonathan On Tue, May 21, 2019 at 11:29 AM Nayanjyoti Deka <nayanjy...@ixigo.com> wrote: > Hi Guozhang, > > I had looked more into it. Seems that on restart the suppress changelog > topic was being recreated and at that time there were no heartbeats to the > broker from the application, hence causing it to behave this way. I could > see the log of reading the suppress changelog topic from offset 0 on the > restart. > > I'm trying to understand why it needs to read the entire changelog topic > since the window which has passed (past time) should have been compacted > (or maybe deleted) from the broker's topic data. > > Also, we had noticed that on restarts the downstream of the suppress > operator is *flooded* with events, which in the ideal case wouldn't have > come. I came across https://stackoverflow.com/a/54227156 where Matthias > had > responded the behaviour of the supress buffer being in memory. (for 2.1 > version) and that it reads changelog to *recreate* the buffer which should > actually *prevent* the behaviour(downstream being flooded) mentioned above. > Am I missing something? > > We are using 2.2 version which probably has the same behaviour as well. > > Please correct me if I'm wrong on my analysis somewhere. > > Also if possible, could you please provide me with an understanding of why > suppress was implemented with an in-memory buffer in mind first and not > spilling to disk. I have read the KIP document, > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables > , > but this doesn't mention any specifics as to why the in-memory > implementation was chosen since across restarts on-disk spills would have > provided the exact semantics described by the operator. > > > > On Mon, May 20, 2019 at 9:24 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > Hello Nayanjyoti, > > > > Did you find anything else from the streams log entries (is it enabled on > > DEBUG or TRACE?), and what version of Kafka are you using? > > > > > > Guozhang > > > > On Sun, May 19, 2019 at 1:04 PM Nayanjyoti Deka <nayanjy...@ixigo.com> > > wrote: > > > > > Forgot to add that there is no transition to RUNNING state. > > > > > > On Mon, May 20, 2019 at 1:10 AM Nayanjyoti Deka <nayanjy...@ixigo.com> > > > wrote: > > > > > > > Hey guys, > > > > > > > > We are running a stream application in our production environment. On > > our > > > > latest restart, the application is consistently moving between these > > two > > > > states. > > > > > > > > From our logs: > > > > > > > > grep "State transition from " application.log | jq -r '.message' | > > sort | > > > > uniq -c | sort -n -r > > > > > > > > 40 stream-thread [yyyy-StreamThread-9] State transition from > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > 40 stream-thread [yyyy-StreamThread-8] State transition from > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > 40 stream-thread [yyyy-StreamThread-7] State transition from > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > 40 stream-thread [yyyy-StreamThread-6] State transition from > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > 40 stream-thread [yyyy-StreamThread-5] State transition from > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > 40 stream-thread [yyyy-StreamThread-4] State transition from > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > 40 stream-thread [yyyy-StreamThread-3] State transition from > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > 40 stream-thread [yyyy-StreamThread-2] State transition from > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > 40 stream-thread [yyyy-StreamThread-1] State transition from > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > 40 stream-thread [yyyy-StreamThread-12] State transition from > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > 40 stream-thread [yyyy-StreamThread-11] State transition from > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > 40 stream-thread [yyyy-StreamThread-10] State transition from > > > > PARTITIONS_REVOKED to PARTITIONS_ASSIGNED > > > > > > > > 39 stream-thread [yyyy-StreamThread-9] State transition from > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > 39 stream-thread [yyyy-StreamThread-8] State transition from > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > 39 stream-thread [yyyy-StreamThread-7] State transition from > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > 39 stream-thread [yyyy-StreamThread-6] State transition from > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > 39 stream-thread [yyyy-StreamThread-5] State transition from > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > 39 stream-thread [yyyy-StreamThread-4] State transition from > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > 39 stream-thread [yyyy-StreamThread-3] State transition from > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > 39 stream-thread [yyyy-StreamThread-2] State transition from > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > 39 stream-thread [yyyy-StreamThread-1] State transition from > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > 39 stream-thread [yyyy-StreamThread-12] State transition from > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > 39 stream-thread [yyyy-StreamThread-11] State transition from > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > 39 stream-thread [yyyy-StreamThread-10] State transition from > > > > PARTITIONS_ASSIGNED to PARTITIONS_REVOKED > > > > > > > > > > > > > > > > As we can see the stream threads are first revoked than again > assigned. > > > > > > > > Also we can see the logs of resetting of offsets continuously as > > follows: > > > > > > > > Resetting offset for partition xxxx-2 to offset 9166288. > > > > > > > > > > > > We had actually deleted the consumer group on broker before the > restart > > > as > > > > there was considerable lag in the topic and processing of the stale > > data > > > > was not intended. We had assumed that on deleting the group, the > > > > application will start processing from latest offset as mentioned in > > the > > > > config auto.offset.reset policy. > > > > > > > > On describing the consumer group on broker side, we receive output > with > > > > the current offset and lag set as -- (Eg shown below) > > > > > > > > > > > > TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG > > > > CONSUMER-ID > > > > > > > > xxx 10 - 129822997 - > > > > yyyy-StreamThread-2-consumer-a-b-c-d > > > > > > > > > > > > > > > > Please help us understand why this can be happening and how to solve > > > this. > > > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- Santilli Jonathan