Hello Upesh,

These are super helpful logs, and I think I'm very close to the root cause
of it. You see, the written changelog record's timestamp is set to 0
(i.e. January 1st 1970 at midnight GMT), and hence given a reasonable Kafka
server start time (presumingly in 21st century), the retention time would
always be breached, and causing the log deletion mechanism to trigger.

The timestamp is set with `context.timestamp()` which would use the
processing record's timestamp; but myself have seen and fixed a bug (
https://issues.apache.org/jira/browse/KAFKA-12323) where the timestamp was
not populated and hence set to 0 if was generated as part of a punctuation.
So my next key question is: is this changelog record generated, i.e. its
put call triggered, from processing an input record, or from a punctuation
call?


Guozhang

On Mon, Mar 29, 2021 at 2:01 PM Upesh Desai <ude...@itrsgroup.com> wrote:

> Hi Guozhang,
>
>
>
> When testing with a 2.6.1 broker and 2.7 streams application, I see the
> same behavior as described before with the 2.7 broker where just after a
> record is written to the changelog topic, the log segment is rolled and
> deleted citing that the retention time has passed (the record was written
> to the state store at ~15:49:
>
>
>
> [2021-03-29 15:49:13,757] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Found deletable
> segments with base offsets [0] due to retention time 259200000ms breach
> (kafka.log.Log)
> [2021-03-29 15:49:13,761] INFO [ProducerStateManager
> partition=test-stream-store-changelog-4] Writing producer snapshot at
> offset 1 (kafka.log.ProducerStateManager)
> [2021-03-29 15:49:13,763] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Rolled new log
> segment at offset 1 in 5 ms. (kafka.log.Log)
> [2021-03-29 15:49:13,764] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Scheduling segments
> for deletion LogSegment(baseOffset=0, size=156,
> lastModifiedTime=1617050940118, largestTime=0) (kafka.log.Log)
> [2021-03-29 15:49:13,765] INFO [Log
> partition=test-stream-store-changelog-4,
> dir=/Users/udesai/Projects/kafka-2.6.1-src/logs/data] Incremented log start
> offset to 1 due to segment deletion (kafka.log.Log)
>
>
>
> Does this have anything to do with the *largetTime=0* mentioned in the
> log? This was the first and only record written to the store/changelog. Is
> there anything else we can try to resolve this issue or give us more
> insight into where this issue could originate from?
>
>
>
> Thanks,
> Upesh
>
>
> Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> <ude...@itrsgroup.com>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> <https://www.itrsgroup.com/>
>
> *From: *Upesh Desai <ude...@itrsgroup.com>
> *Date: *Thursday, March 25, 2021 at 6:46 PM
> *To: *users@kafka.apache.org <users@kafka.apache.org>
> *Cc: *Bart Lilje <bli...@itrsgroup.com>
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> We have not tried running 2.6 brokers and 2.7 client, I will try and get
> back to you.
>
>
>
> We are not enabling EOS on the streams, we have it set to AT_LEAST_ONCE.
> The shutdowns and restarts of the stream app are clean each time.
>
>
>
> I see in the broker logs certain lines indicating that the log segment is
> being rolled and deleted, but I don’t see how or why this should be
> happening when the records were just written. See the log line snippets
> included in the attached file. Initially 8 records are added (offsets 0-8),
> followed by a single record (offset 9). They are rolled and deleted almost
> instantly.
>
>
>
> Best,
>
> Upesh
>
>
>
> *Upesh Desai**​*
>
> * | *
>
> Senior Software Developer
>
>  |
>
> *ude...@itrsgroup.com <ude...@itrsgroup.com>*
>
> *www.itrsgroup.com* <https://www.itrsgroup.com/>
>
> <https://www.itrsgroup.com/>
>
> *From: *Guozhang Wang <wangg...@gmail.com>
> *Date: *Thursday, March 25, 2021 at 6:31 PM
> *To: *Users <users@kafka.apache.org>
> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> changelog topics
>
> BTW, yes that indicates the record in the changelog was already truncated
> (logically). But since we only physically truncate logs by segments, which
> is 1GB by default, it should still be physically on the log. Are you
> enabling EOS on Streams, and when you shutdown the streams app, is that a
> clean shutdown?
>
> On Thu, Mar 25, 2021 at 4:22 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > That's indeed weird.
> >
> > Have you tried to run Kafka brokers with 2.6 while Kafka Streams client
> > with 2.7?
> >
> > On Thu, Mar 25, 2021 at 2:34 PM Upesh Desai <ude...@itrsgroup.com>
> wrote:
> >
> >> Hello Guozhang,
> >>
> >>
> >>
> >> I have tried your suggestions with an inMemoryStore FYI and seen the
> >> following:
> >>
> >>
> >>
> >>    1. I have the record added to the state store, stopped the
> >>    application, and check the earliest and latest offsets via the
> command line
> >>    tools. This shows that the earliest offset is 1, and the latest
> offset is
> >>    also 1. Does this mean that the record has been marked for deletion
> >>    already? My retention.ms config is set to 3 days (259200000 ms), so
> >>    it should not be marked for deletion if added a couple minutes prior?
> >>    2. Following the above, this makes sense as well. When logging the
> >>    starting offset, it is not 0, but rather 1:
> >>
> >>    *topic: streamapp-teststore-changelog, partition: 4, start offset: 1,
> >>    end offset: 1*
> >>
> >>
> >>
> >> I also confirmed different behavior when we change the changelog topic
> >> cleanup policy from “*compact,delete”* to just “*compact”*. We DO NOT
> >> see this issue when the changelog is just set to compact. We also
> confirmed
> >> that this does not happen when we run everything on Kafka version 2.6.
> >>
> >>
> >>
> >> Thanks,
> >>
> >> Upesh
> >>
> >>
> >> Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> >> <ude...@itrsgroup.com>
> >> *www.itrsgroup.com* <https://www.itrsgroup.com/>
> >> <https://www.itrsgroup.com/>
> >>
> >> *From: *Guozhang Wang <wangg...@gmail.com>
> >> *Date: *Thursday, March 25, 2021 at 4:01 PM
> >> *To: *Users <users@kafka.apache.org>
> >> *Cc: *Bart Lilje <bli...@itrsgroup.com>
> >> *Subject: *Re: Kafka Streams Processor API state stores not restored via
> >> changelog topics
> >>
> >> Hello Upesh,
> >>
> >> Could you confirm a few more things for me:
> >>
> >> 1. After you stopped the application, and wiped out the state dir; check
> >> if
> >> the corresponding changelog topic has one record indeed at offset 0 ---
> >> this can be done via the admin#listOffsets (get the earliest and latest
> >> offset, which should be 0 and 1 correspondingly).
> >> 2. After you resumed the application, check from which starting position
> >> we
> >> are restoring the changelog --- this can be done via implementing the
> >> `stateRestoreListener.onRestoreStart(partition, storeName, startOffset,
> >> restoreEndOffset);`, should be 0
> >>
> >> If both of them check out fine as expected, then from the code I think
> >> bufferedLimitIndex should be updated to 1.
> >>
> >>
> >> Guozhang
> >>
> >> On Wed, Mar 24, 2021 at 5:14 PM Upesh Desai <ude...@itrsgroup.com>
> wrote:
> >>
> >> > Hi Guozhang,
> >> >
> >> >
> >> >
> >> > Here are some of the answers to your questions I see during my
> testing:
> >> >
> >> >
> >> >
> >> >    1. ChangelogMetadata#restoreEndOffset == 1 ; This is expected as in
> >> my
> >> >    test 1 record had been added to the store. However the numRecords
> >> variable
> >> >    is still set to 0
> >> >    2. For that particular test, `hasRestoredToEnd()` indeed returns
> true
> >> >    as well. But it is confusing since the store is actually empty /
> that
> >> >    record I added does not exist in the store when trying to check for
> >> it.
> >> >    3. N/A
> >> >
> >> >
> >> >
> >> > A little more information, the records we add to this store/changelog
> >> are
> >> > of type <CustomKey,byte[]> where the value is always set to an empty
> >> byte
> >> > array `new byte[0]`. A couple other variations I have tried are
> setting
> >> to
> >> > a non-empty static byte array such as `new byte[1]` or `new
> byte[]{1}`.
> >> >
> >> >
> >> >
> >> > Hope this gives a little more clarity and hope to hear from you soon.
> >> >
> >> >
> >> >
> >> > Thanks,
> >> >
> >> > Upesh
> >> >
> >> >
> >> > Upesh Desai​  |  Senior Software Developer  |  *ude...@itrsgroup.com*
> >> > <ude...@itrsgroup.com>
> >> > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> >> > <https://www.itrsgroup.com/>
> >> >
> >> > *From: *Guozhang Wang <wangg...@gmail.com>
> >> > *Date: *Wednesday, March 24, 2021 at 1:37 PM
> >> > *To: *Users <users@kafka.apache.org>
> >> > *Cc: *Bart Lilje <bli...@itrsgroup.com>
> >> > *Subject: *Re: Kafka Streams Processor API state stores not restored
> via
> >> > changelog topics
> >> >
> >> > Hello Upesh,
> >> >
> >> > Thanks for the detailed report. I looked through the code and tried to
> >> > reproduce the issue, but so far have not succeeded. I think I may need
> >> some
> >> > further information from you to help my further investigation.
> >> >
> >> > 1) The `bufferedLimitIndex == 0` itself does not necessarily mean
> >> there's
> >> > an issue, as long as it could still be bumped later (i.e. it is
> possible
> >> > that the restore consumer has not fetched data yet). What's key
> though,
> >> is
> >> > to check `ChangelogMetadata#restoreEndOffset`: for active tasks, it
> >> would
> >> > be created with null value, and then been initialized once.
> >> ChangelogReader
> >> > would stop restoring once the current offset has reached beyond this
> >> value
> >> > or if this value itself is 0.
> >> >
> >> > 2) If `restoreEndOffset` is initialized to a non-zero value, then
> check
> >> if
> >> > the restoration indeed completed without applying any records, this is
> >> > determined as `hasRestoredToEnd()` returning true.
> >> >
> >> > 3) If `restoreEndOffset` is initialized to 0, then we need to check
> >> why: on
> >> > top of my head I can only think of that the consumer's end offset
> >> request
> >> > gets the response with 0, indicating the changelog is now empty.
> >> >
> >> >
> >> > Guozhang
> >> >
> >> >
> >> > On Tue, Mar 23, 2021 at 8:44 AM Upesh Desai <ude...@itrsgroup.com>
> >> wrote:
> >> >
> >> > > Hi all,
> >> > >
> >> > >
> >> > >
> >> > > Our team think we discovered a bug over the weekend withing the
> Kafka
> >> > > Streams / Processor API. We are running 2.7.0.
> >> > >
> >> > >
> >> > >
> >> > > When configuring a state store backed by a changelog topic with the
> >> > > cleanup policy configuration set to “compact,delete”:
> >> > >
> >> > >
> >> > >
> >> > > final StoreBuilder<KeyValueStore<k,v>> store = Stores
> >> > >   .*keyValueStoreBuilder*(
> >> > >     Stores.*persistentKeyValueStore*(*STORE_ID*),
> >> > >     kSerde,
> >> > >     vSerde)
> >> > >   .withLoggingEnabled(Map.*of*(
> >> > >     *RETENTION_MS_CONFIG*, "90000000"),
> >> > >     *CLEANUP_POLICY_CONFIG*, "compact,delete"))
> >> > >   .withCachingEnabled();
> >> > >
> >> > >
> >> > >
> >> > > Here is how we reproduced the problem:
> >> > >
> >> > >    1. Records are written to the state store, and subsequently
> >> produced
> >> > >    to the changelog topic.
> >> > >    2. Store streams application
> >> > >    3. Delete state.dir directory
> >> > >    4. Restart streams application
> >> > >    5. Confirm state store is initialized empty with no records
> >> restored
> >> > >    from changelog
> >> > >
> >> > >
> >> > >
> >> > > We see this problem with both in-memory and RocksDB backed state
> >> stores.
> >> > > For persistent state store, if the streams application is restarted
> >> > without
> >> > > the state dir being deleted, the application still does not
> “restore”
> >> > from
> >> > > the changelog, but records are still seen in the state store.
> >> > >
> >> > >
> >> > >
> >> > > When rolling back to 2.6, we do not see this issue.
> >> > >
> >> > >
> >> > >
> >> > > Doing some debugging in the source code, in the StoreChangelogReader
> >> > class
> >> > > I found that the number of records to restore is always 0 based on
> the
> >> > > below snippet:
> >> > >
> >> > >
> >> > >
> >> > > private void restoreChangelog(final ChangelogMetadata
> >> changelogMetadata)
> >> > {
> >> > >     final ProcessorStateManager stateManager =
> >> > changelogMetadata.stateManager;
> >> > >     final StateStoreMetadata storeMetadata =
> >> > changelogMetadata.storeMetadata;
> >> > >     final TopicPartition partition =
> >> storeMetadata.changelogPartition();
> >> > >     final String storeName = storeMetadata.store().name();
> >> > >     final int numRecords = changelogMetadata.bufferedLimitIndex;
> >> > >
> >> > >
> >> > >
> >> > > Where ‘changelogMetadata.bufferedLimitIndex’ always == 0.
> >> > >
> >> > >
> >> > >
> >> > > My question to you all is, 1) Is this expected behavior? 2) If not,
> >> is it
> >> > > a bug?
> >> > >
> >> > >
> >> > >
> >> > > Hope to get some clarity, and thanks in advance!
> >> > >
> >> > >
> >> > >
> >> > > Best,
> >> > > Upesh
> >> > > <https://www.itrsgroup.com/>
> >> > > Upesh Desai​
> >> > > Senior Software Developer
> >> > > *ude...@itrsgroup.com* <ude...@itrsgroup.com>
> >> > > *www.itrsgroup.com* <https://www.itrsgroup.com/>
> >> > > Internet communications are not secure and therefore the ITRS Group
> >> does
> >> > > not accept legal responsibility for the contents of this message.
> Any
> >> > view
> >> > > or opinions presented are solely those of the author and do not
> >> > necessarily
> >> > > represent those of the ITRS Group unless otherwise specifically
> >> stated.
> >> > > [itrs.email.signature]
> >> > >
> >> > >
> >> > > *Disclaimer*
> >> > >
> >> > > The information contained in this communication from the sender is
> >> > > confidential. It is intended solely for use by the recipient and
> >> others
> >> > > authorized to receive it. If you are not the recipient, you are
> hereby
> >> > > notified that any disclosure, copying, distribution or taking action
> >> in
> >> > > relation of the contents of this information is strictly prohibited
> >> and
> >> > may
> >> > > be unlawful.
> >> > >
> >> > > This email has been scanned for viruses and malware, and may have
> been
> >> > > automatically archived by *Mimecast Ltd*, an innovator in Software
> as
> >> a
> >> > > Service (SaaS) for business. Providing a *safer* and *more useful*
> >> place
> >> > > for your human generated data. Specializing in; Security, archiving
> >> and
> >> > > compliance.
> >> > >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >> >
> >> >
> >> > *Disclaimer*
> >> >
> >> > The information contained in this communication from the sender is
> >> > confidential. It is intended solely for use by the recipient and
> others
> >> > authorized to receive it. If you are not the recipient, you are hereby
> >> > notified that any disclosure, copying, distribution or taking action
> in
> >> > relation of the contents of this information is strictly prohibited
> and
> >> may
> >> > be unlawful.
> >> >
> >> > This email has been scanned for viruses and malware, and may have been
> >> > automatically archived by *Mimecast Ltd*, an innovator in Software as
> a
> >> > Service (SaaS) for business. Providing a *safer* and *more useful*
> place
> >> > for your human generated data. Specializing in; Security, archiving
> and
> >> > compliance.
> >> >
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >>
> >> *Disclaimer*
> >>
> >> The information contained in this communication from the sender is
> >> confidential. It is intended solely for use by the recipient and others
> >> authorized to receive it. If you are not the recipient, you are hereby
> >> notified that any disclosure, copying, distribution or taking action in
> >> relation of the contents of this information is strictly prohibited and
> may
> >> be unlawful.
> >>
> >> This email has been scanned for viruses and malware, and may have been
> >> automatically archived by *Mimecast Ltd*, an innovator in Software as a
> >> Service (SaaS) for business. Providing a *safer* and *more useful* place
> >> for your human generated data. Specializing in; Security, archiving and
> >> compliance.
> >>
> >
> >
> > --
> > -- Guozhang
> >
>
>
> --
> -- Guozhang
>
>
> *Disclaimer*
>
> The information contained in this communication from the sender is
> confidential. It is intended solely for use by the recipient and others
> authorized to receive it. If you are not the recipient, you are hereby
> notified that any disclosure, copying, distribution or taking action in
> relation of the contents of this information is strictly prohibited and may
> be unlawful.
>
> This email has been scanned for viruses and malware, and may have been
> automatically archived by *Mimecast Ltd*, an innovator in Software as a
> Service (SaaS) for business. Providing a *safer* and *more useful* place
> for your human generated data. Specializing in; Security, archiving and
> compliance.
>


-- 
-- Guozhang

Reply via email to