I've opened a PR which adds an integration test <https://github.com/apache/kafka/pull/17668/files#diff-a60b518846fc0f770164d55b6d7cd31e03a002514377578c80c6e22cc120af40R88> to show the impact of this change/KIP. PTAL.
-- Kamal On Fri, Nov 1, 2024 at 3:31 PM Kamal Chandraprakash < kamal.chandraprak...@gmail.com> wrote: > Thanks for the review! > > > 5 > Updated the KIP to include the `isEmpty` method in the transaction index > > > 6 > You're right. The offset parameter will be equal to the > next-segment-to-consider base offset. > But, the API introduced in RLMM is for *one* epoch. The next epoch > start-offset may not be > the base-offset. You can refer to this line > <https://github.com/apache/kafka/pull/17659/files#diff-380e4d8859ea9148f21794c09039425c82d9012a392c2dbbe1ce2ec8677a1970R1857> > in the draft PR. > > > On Fri, Nov 1, 2024 at 3:16 PM Divij Vaidya <divijvaidy...@gmail.com> > wrote: > >> Last few things - >> >> # 5 >> About setting the TrxIndexEmpty field, could we introduce an isEmpty() >> function in TransactionIndex which has the following implementation: >> >> public boolean isEmpty() { >> return !iterable().iterator().hasNext(); >> } >> >> >> The advantages of this approach is: >> 1. It works for both cases when the file is not present and also when the >> file is present but is empty. >> 2. It prevents leaking the underlying implementation of TransactionIndex >> outside via the file() method. I think that making file() as public is an >> implementation leak (for example, what is the trx indx is not file >> based!). >> >> >> #6 >> In the documentation for nextSegmentWithTxnIndex, the offset parameter >> should be equal to the next-segment-to-consider's base offset, no? >> I assume that we will add a new fetch here with nextSegmentBaseOffset >> >> https://github.com/apache/kafka/blob/346fdbafc539bc48bb66eedae89a15e240007fd9/core/src/main/java/kafka/log/remote/RemoteLogManager.java#L1801 >> . Is there a case where the parameter "offset" will not be equal to the >> baseOffset of a segment? >> >> -- >> Divij Vaidya >> >> >> >> On Fri, Nov 1, 2024 at 10:26 AM Kamal Chandraprakash < >> kamal.chandraprak...@gmail.com> wrote: >> >> > Hi Divij, >> > >> > Thanks for the detailed review! >> > >> > > 1, 2, 3, 4 >> > Updated the KIP-1058 >> > < >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+collecting+aborted+transactions >> > > >> > with the feedback received and also opened a draft PR for #17659 >> > <https://github.com/apache/kafka/pull/17659/files> reference. >> > PTAL. >> > >> > > 5. How are we determining the value of the TrxIndexEmpty field on >> segment >> > rotation? >> > Transaction index file is optional, the file does not exists when there >> are >> > no aborted txn entries for a >> > segment, we will be using the file null check. Also, updated it in the >> KIP. >> > >> > Thanks, >> > Kamal >> > >> > >> > On Tue, Oct 29, 2024 at 8:47 PM Divij Vaidya <divijvaidy...@gmail.com> >> > wrote: >> > >> > > A few more points to discuss (please add to the KIP as well) >> > > >> > > 5. How are we determining the value of the TrxIndexEmpty field on >> segment >> > > rotation? >> > > >> > > One option is to do a boolean txnIdxEmpty = >> > > segment.txnIndex().allAbortedTxns().isEmpty() but this will have an >> > > overhead of reading the contents of the file and storing them in >> memory, >> > > when we have a non-empty index. >> > > The other option (preferred) is to add an isEmpty() public method to >> the >> > > TransactionIndex and perform a segment.txnIndex().isEmpty() check >> which >> > > will internally use Files.size() java API. >> > > >> > > On Tue, Oct 29, 2024 at 1:21 PM Divij Vaidya <divijvaidy...@gmail.com >> > >> > > wrote: >> > > >> > > > Let's get the ball rolling (again) on this one. >> > > > >> > > > Kamal, could you please add the following to the KIP: >> > > > 1. the API as discussed above. Please add the failure modes for this >> > API >> > > > as well such as the exceptions thrown and a recommendation on how a >> > > caller >> > > > is expected to handle those. I am assuming that the three parameters >> > for >> > > > this API will be topicPartition, epoch and offset. >> > > > 2. implementation details for Topic based RLMM. I am assuming that >> the >> > > > plugin will default the field to false if this field is absent >> (case of >> > > old >> > > > metadata). >> > > > 3. In the test plan section, additionally, we need to assert that we >> > > don't >> > > > read metadata for all segments (i.e. it is not a linear search) from >> > the >> > > > Topic based RLMM. >> > > > 4. in the compatibility section, please document how the existing >> > > clusters >> > > > with Tiered Storage metadata will work during/after a rolling >> upgrade >> > to >> > > a >> > > > version which contains this new change. >> > > > >> > > > -- >> > > > Divij Vaidya >> > > > >> > > > >> > > > >> > > > On Fri, Oct 11, 2024 at 12:26 PM Kamal Chandraprakash < >> > > > kamal.chandraprak...@gmail.com> wrote: >> > > > >> > > >> Bump for review. >> > > >> >> > > >> If the additional proposal looks good, I'll append them to the KIP. >> > > PTAL. >> > > >> >> > > >> New API in RLMM#nextRemoteLogSegmentMetadataWithTxnIndex >> > > >> >> > > >> -- >> > > >> Kamal >> > > >> >> > > >> On Sun, Oct 6, 2024 at 7:20 PM Kamal Chandraprakash < >> > > >> kamal.chandraprak...@gmail.com> wrote: >> > > >> >> > > >> > Hi Christo, >> > > >> > >> > > >> > Thanks for the review! >> > > >> > >> > > >> > Adding the new API `nextRemoteLogSegmentMetadataWithTxnIndex` in >> > RLMM >> > > >> > helps to >> > > >> > reduce the complexity of linear search. With this API, we have >> to: >> > > >> > >> > > >> > 1. Maintain one more skip-list [1] for each of the epochs in the >> > > >> partition >> > > >> > in RLMM that might >> > > >> > increase the memory usage of TopicBased RLMM implementation. >> > > >> > 1a) The skip-list will be empty when there are no aborted txn >> > > >> entries >> > > >> > for a partition/epoch which is the predominant case. >> > > >> > 1b) The skip-list will act as a duplicate when *most* of the >> > > >> segments >> > > >> > have aborted txn entries, assuming aborted txn are quite low, >> this >> > > >> should >> > > >> > be fine. >> > > >> > 2. Change the logic to retrieve the aborted txns (we have to >> query >> > the >> > > >> > nextRLSMWithTxnIndex >> > > >> > for each of the leader-epoch). >> > > >> > 3. Logic divergence from how we retrieve the aborted txn entries >> > > >> compared >> > > >> > to the local-log. >> > > >> > >> > > >> > The approach looks good to me. If everyone is aligned, then we >> can >> > > >> proceed >> > > >> > to add this API to RLMM. >> > > >> > >> > > >> > Another option I was thinking of is to capture the >> > > `lastStableOffsetLag` >> > > >> > [2] while rotating the segment. >> > > >> > But, that is a bigger change we can take later. >> > > >> > >> > > >> > [1]: >> > > >> > >> > > >> >> > > >> > >> https://sourcegraph.com/github.com/apache/kafka/-/blob/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java?L43 >> > > >> > [2]: >> > > >> > >> > > >> >> > > >> > >> https://sourcegraph.com/github.com/apache/kafka/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L432 >> > > >> > >> > > >> > >> > > >> > Thanks, >> > > >> > Kamal >> > > >> > >> > > >> > On Fri, Oct 4, 2024 at 4:21 PM Christo Lolov < >> > christolo...@gmail.com> >> > > >> > wrote: >> > > >> > >> > > >> >> Heya, >> > > >> >> >> > > >> >> Apologies for the delay. I have been thinking about this problem >> > > >> recently >> > > >> >> as well and while I believe storing a boolean in the metadata is >> > > good, >> > > >> I >> > > >> >> think we can do better by introducing a new method to the RLMM >> > along >> > > >> the >> > > >> >> lines of >> > > >> >> >> > > >> >> Optional<RemoteLogSegmentMetadata> >> > > >> >> nextRemoteLogSegmentMetadataWithTxnIndex(TopicIdPartition >> > > >> >> topicIdPartition, >> > > >> >> int epochForOffset, long offset) throws RemoteStorageException >> > > >> >> >> > > >> >> This will help plugin implementers to build optimisations such >> as >> > > skip >> > > >> >> lists which will give them the next segment quicker than a >> linear >> > > >> search. >> > > >> >> >> > > >> >> I am keen to hear your thoughts! >> > > >> >> >> > > >> >> Best, >> > > >> >> Christo >> > > >> >> >> > > >> >> On Fri, 4 Oct 2024 at 10:48, Kamal Chandraprakash < >> > > >> >> kamal.chandraprak...@gmail.com> wrote: >> > > >> >> >> > > >> >> > Hi Luke, >> > > >> >> > >> > > >> >> > Thanks for the review! >> > > >> >> > >> > > >> >> > > Do you think it is helpful if we store the "least abort >> start >> > > >> offset >> > > >> >> in >> > > >> >> > the >> > > >> >> > segment", and -1 means no txnIndex. So that we can have a way >> to >> > > know >> > > >> >> if we >> > > >> >> > need to fetch this txn index or not. >> > > >> >> > >> > > >> >> > 1. No, this change won't have an effect. To find the >> upper-bound >> > > >> offset >> > > >> >> > [1], we have to >> > > >> >> > fetch that segment's offset index file. The >> RemoteIndexCache >> > > [2] >> > > >> >> > fetches all the 3 >> > > >> >> > index files together and caches them for subsequent use, >> so >> > > this >> > > >> >> > improvement >> > > >> >> > won't have an effect as the current segment txn index gets >> > > >> >> downloaded >> > > >> >> > anyway. >> > > >> >> > >> > > >> >> > 2. The reason for choosing boolean is to make the change >> backward >> > > >> >> > compatible. >> > > >> >> > There can be existing RLM events for the uploaded >> segments. >> > > The >> > > >> >> > default >> > > >> >> > value of `txnIdxEmpty` is false so the *old* RLM events >> are >> > > >> >> assumed to >> > > >> >> > contain >> > > >> >> > the txn index files and those files are downloaded if >> they >> > > >> exist. >> > > >> >> > >> > > >> >> > [1]: >> > > >> >> > >> > > >> >> > >> > > >> >> >> > > >> >> > > >> > >> https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/java/kafka/log/remote/RemoteLogManager.java?L1732 >> > > >> >> > [2]: >> > > >> >> > >> > > >> >> > >> > > >> >> >> > > >> >> > > >> > >> https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteIndexCache.java?L383 >> > > >> >> > >> > > >> >> > Thanks, >> > > >> >> > Kamal >> > > >> >> > >> > > >> >> > On Thu, Oct 3, 2024 at 3:11 PM Luke Chen <show...@gmail.com> >> > > wrote: >> > > >> >> > >> > > >> >> > > Hi Kamal, >> > > >> >> > > >> > > >> >> > > Sorry for the late review. >> > > >> >> > > Thanks for the KIP, this will improve the transaction >> reading >> > for >> > > >> >> remote >> > > >> >> > > storage. >> > > >> >> > > Overall LGTM, just one minor thought: >> > > >> >> > > >> > > >> >> > > Currently, we only store the `TxnIndexEmpty` bool value in >> the >> > > >> segment >> > > >> >> > > metadata. >> > > >> >> > > Do you think it is helpful if we store the "least abort >> start >> > > >> offset >> > > >> >> in >> > > >> >> > the >> > > >> >> > > segment", and -1 means no txnIndex. So that we can have a >> way >> > to >> > > >> know >> > > >> >> if >> > > >> >> > we >> > > >> >> > > need to fetch this txn index or not. >> > > >> >> > > >> > > >> >> > > Thanks. >> > > >> >> > > Luke >> > > >> >> > > >> > > >> >> > > On Mon, Sep 9, 2024 at 3:26 PM Kamal Chandraprakash < >> > > >> >> > > kamal.chandraprak...@gmail.com> wrote: >> > > >> >> > > >> > > >> >> > > > Hi all, >> > > >> >> > > > >> > > >> >> > > > If there are no more comments, I'll start a voting thread >> > soon. >> > > >> >> > > > >> > > >> >> > > > Thanks, >> > > >> >> > > > Kamal >> > > >> >> > > > >> > > >> >> > > > On Fri, Sep 6, 2024 at 7:28 PM Kamal Chandraprakash < >> > > >> >> > > > kamal.chandraprak...@gmail.com> wrote: >> > > >> >> > > > >> > > >> >> > > > > Bumping this thread again for review! >> > > >> >> > > > > >> > > >> >> > > > > Reduced the scope of the proposal to minimum. We will be >> > > adding >> > > >> >> only >> > > >> >> > > one >> > > >> >> > > > > field (txnIdxEmpty) to the >> > > >> >> > > > > RemoteLogSegmentMetadata event which is backward >> > compatible. >> > > >> PTAL. >> > > >> >> > > > > >> > > >> >> > > > > Thanks, >> > > >> >> > > > > Kamal >> > > >> >> > > > > >> > > >> >> > > > > >> > > >> >> > > > > On Tue, Aug 13, 2024 at 6:33 PM Kamal Chandraprakash < >> > > >> >> > > > > kamal.chandraprak...@gmail.com> wrote: >> > > >> >> > > > > >> > > >> >> > > > >> Bumping this thread for KIP review! >> > > >> >> > > > >> >> > > >> >> > > > >> We can go for the simplest solution that is proposed in >> > this >> > > >> KIP >> > > >> >> and >> > > >> >> > > > >> it can be improved in the subsequent iteration. PTAL. >> > > >> >> > > > >> >> > > >> >> > > > >> Thanks, >> > > >> >> > > > >> Kamal >> > > >> >> > > > >> >> > > >> >> > > > >> On Fri, Aug 2, 2024 at 11:42 AM Kamal Chandraprakash < >> > > >> >> > > > >> kamal.chandraprak...@gmail.com> wrote: >> > > >> >> > > > >> >> > > >> >> > > > >>> Hi Divij, >> > > >> >> > > > >>> >> > > >> >> > > > >>> Thanks for the review! And, sorry for the late reply. >> > > >> >> > > > >>> >> > > >> >> > > > >>> From the UnifiedLog.scala >> > > >> >> > > > >>> < >> > > >> >> > > > >> > > >> >> > > >> > > >> >> > >> > > >> >> >> > > >> >> > > >> > >> https://sourcegraph.com/github.com/apache/kafka@trunk/-/blob/core/src/main/scala/kafka/log/UnifiedLog.scala?L421-427 >> > > >> >> > > > > >> > > >> >> > > > >>> doc: >> > > >> >> > > > >>> >> > > >> >> > > > >>> """ >> > > >> >> > > > >>> The last stable offset (LSO) is defined as the first >> > offset >> > > >> such >> > > >> >> > that >> > > >> >> > > > >>> all lower offsets have been "decided." >> > > >> >> > > > >>> * Non-transactional messages are considered decided >> > > >> >> immediately, >> > > >> >> > > but >> > > >> >> > > > >>> transactional messages are only decided when >> > > >> >> > > > >>> * the corresponding COMMIT or ABORT marker is >> written. >> > > >> This >> > > >> >> > > implies >> > > >> >> > > > >>> that the last stable offset will be equal >> > > >> >> > > > >>> * to the high watermark if there are no >> transactional >> > > >> >> messages >> > > >> >> > in >> > > >> >> > > > the >> > > >> >> > > > >>> log. Note also that the LSO cannot advance >> > > >> >> > > > >>> * beyond the high watermark. >> > > >> >> > > > >>> """ >> > > >> >> > > > >>> While rolling the active segment to passive, if LSO >> > equals >> > > to >> > > >> >> HW, >> > > >> >> > > then >> > > >> >> > > > >>> all the messages in that segment are >> > > >> >> > > > >>> decided and we can store the `lastStableOffsetLag` as >> an >> > > >> >> attribute >> > > >> >> > in >> > > >> >> > > > >>> the rolled segment. We can then propagate >> > > >> >> > > > >>> the `lastStableOffsetLag` information in the >> > > >> RemoteLogMetadata >> > > >> >> > > events. >> > > >> >> > > > >>> >> > > >> >> > > > >>> While reading the remote log segment, if the >> > > >> >> `lastStableOffsetLag` >> > > >> >> > is >> > > >> >> > > > 0, >> > > >> >> > > > >>> then there is no need to traverse to >> > > >> >> > > > >>> the subsequent segments for aborted transactions which >> > > covers >> > > >> >> the >> > > >> >> > > case >> > > >> >> > > > >>> for the dominant case where the >> > > >> >> > > > >>> partition had no transactions at all. >> > > >> >> > > > >>> >> > > >> >> > > > >>> With Log compaction, the shrinked segments might get >> > > merged. >> > > >> One >> > > >> >> > > option >> > > >> >> > > > >>> is to take the max of `lastStableOffsetLag` >> > > >> >> > > > >>> and store it in the new LogSegment. Since, the tiered >> > > storage >> > > >> >> does >> > > >> >> > > not >> > > >> >> > > > >>> support compacted topics / historical compacted >> > > >> >> > > > >>> topics, we can omit this case. >> > > >> >> > > > >>> >> > > >> >> > > > >>> If this approach looks good, I can update the KIP with >> > the >> > > >> >> details. >> > > >> >> > > > >>> >> > > >> >> > > > >>> -- >> > > >> >> > > > >>> Kamal >> > > >> >> > > > >>> >> > > >> >> > > > >>> >> > > >> >> > > > >>> >> > > >> >> > > > >>> >> > > >> >> > > > >>> On Tue, Jun 25, 2024 at 4:24 PM Divij Vaidya < >> > > >> >> > > divijvaidy...@gmail.com> >> > > >> >> > > > >>> wrote: >> > > >> >> > > > >>> >> > > >> >> > > > >>>> Hi Kamal >> > > >> >> > > > >>>> >> > > >> >> > > > >>>> Thanks for the bump. I have been thinking about this >> > > >> passively >> > > >> >> for >> > > >> >> > > the >> > > >> >> > > > >>>> past >> > > >> >> > > > >>>> few days. >> > > >> >> > > > >>>> >> > > >> >> > > > >>>> The simplest solution is to store a state at segment >> > level >> > > >> >> > metadata. >> > > >> >> > > > The >> > > >> >> > > > >>>> state should specify whether the trx index is empty >> or >> > > not. >> > > >> It >> > > >> >> > would >> > > >> >> > > > be >> > > >> >> > > > >>>> populated during segment archival. We would then >> iterate >> > > >> over >> > > >> >> the >> > > >> >> > > > >>>> metadata >> > > >> >> > > > >>>> for future segments without having to make a remote >> call >> > > to >> > > >> >> > download >> > > >> >> > > > the >> > > >> >> > > > >>>> trx index itself. >> > > >> >> > > > >>>> >> > > >> >> > > > >>>> The other solution for storing state at a partition >> > level >> > > >> >> wouldn't >> > > >> >> > > > >>>> work, as >> > > >> >> > > > >>>> you mentioned, because we will have to change the >> state >> > on >> > > >> >> every >> > > >> >> > > > >>>> mutation >> > > >> >> > > > >>>> to the log i.e. at expiration of segments and append. >> > > >> >> > > > >>>> >> > > >> >> > > > >>>> I have been thinking whether we can do something >> better >> > > than >> > > >> >> the >> > > >> >> > > > simple >> > > >> >> > > > >>>> solution, hence the delay in replying. Let me tell >> you >> > my >> > > >> half >> > > >> >> > baked >> > > >> >> > > > >>>> train >> > > >> >> > > > >>>> of thoughts, perhaps, you can explore this as well. I >> > have >> > > >> been >> > > >> >> > > > thinking >> > > >> >> > > > >>>> about using LSO (last stable offset) to handle the >> case >> > > when >> > > >> >> the >> > > >> >> > > > >>>> partition >> > > >> >> > > > >>>> never had any transactions. For a partition which >> never >> > > had >> > > >> any >> > > >> >> > > > >>>> transaction, I would assume that the LSO is never >> > > >> initialized >> > > >> >> (or >> > > >> >> > is >> > > >> >> > > > >>>> equal >> > > >> >> > > > >>>> to log start offset)? Or is it equal to HW in that >> case? >> > > >> This >> > > >> >> is >> > > >> >> > > > >>>> something >> > > >> >> > > > >>>> that I am yet to verify. If this idea works, then we >> > would >> > > >> not >> > > >> >> > have >> > > >> >> > > to >> > > >> >> > > > >>>> iterate through the metadata for the dominant case >> where >> > > the >> > > >> >> > > partition >> > > >> >> > > > >>>> had >> > > >> >> > > > >>>> no transactions at all. >> > > >> >> > > > >>>> >> > > >> >> > > > >>>> -- >> > > >> >> > > > >>>> Divij Vaidya >> > > >> >> > > > >>>> >> > > >> >> > > > >>>> >> > > >> >> > > > >>>> >> > > >> >> > > > >>>> On Tue, Jun 25, 2024 at 11:42 AM Kamal >> Chandraprakash < >> > > >> >> > > > >>>> kamal.chandraprak...@gmail.com> wrote: >> > > >> >> > > > >>>> >> > > >> >> > > > >>>> > Bump. Please review this proposal. >> > > >> >> > > > >>>> > >> > > >> >> > > > >>>> > >> > > >> >> > > > >>>> > On Mon, Jun 17, 2024 at 6:55 PM Kamal >> Chandraprakash < >> > > >> >> > > > >>>> > kamal.chandraprak...@gmail.com> wrote: >> > > >> >> > > > >>>> > >> > > >> >> > > > >>>> > > Divij, >> > > >> >> > > > >>>> > > >> > > >> >> > > > >>>> > > Thanks for the review! Updated the KIP with 1, >> 2, 3, >> > > >> and 4 >> > > >> >> > > review >> > > >> >> > > > >>>> > > comments. >> > > >> >> > > > >>>> > > >> > > >> >> > > > >>>> > > > 4. Potential alternative - Instead of having an >> > > >> algorithm >> > > >> >> > > where >> > > >> >> > > > we >> > > >> >> > > > >>>> > > traverse >> > > >> >> > > > >>>> > > across segment metadata and looking for >> > isTxnIdxEmpty >> > > >> flag, >> > > >> >> > > should >> > > >> >> > > > >>>> we >> > > >> >> > > > >>>> > > directly introduce a nextSegmentWithTrxInx() >> > function? >> > > >> This >> > > >> >> > > would >> > > >> >> > > > >>>> allow >> > > >> >> > > > >>>> > > implementers to optimize the otherwise linear >> scan >> > > >> across >> > > >> >> > > metadata >> > > >> >> > > > >>>> for >> > > >> >> > > > >>>> > all >> > > >> >> > > > >>>> > > segments by using techniques such as skip list >> etc. >> > > >> >> > > > >>>> > > >> > > >> >> > > > >>>> > > This is a good point to optimize the scan. We >> need >> > to >> > > >> >> maintain >> > > >> >> > > the >> > > >> >> > > > >>>> > > skip-list >> > > >> >> > > > >>>> > > for each leader-epoch. With unclean leader >> election, >> > > >> some >> > > >> >> > > brokers >> > > >> >> > > > >>>> may not >> > > >> >> > > > >>>> > > have >> > > >> >> > > > >>>> > > the complete lineage. This will expand the scope >> of >> > > the >> > > >> >> work. >> > > >> >> > > > >>>> > > >> > > >> >> > > > >>>> > > In this version, we plan to optimize only for the >> > > below >> > > >> 2 >> > > >> >> > cases: >> > > >> >> > > > >>>> > > >> > > >> >> > > > >>>> > > 1. A partition does not have the transaction >> index >> > for >> > > >> any >> > > >> >> of >> > > >> >> > > the >> > > >> >> > > > >>>> > uploaded >> > > >> >> > > > >>>> > > segments. >> > > >> >> > > > >>>> > > The individual log segments `isTxnIdxEmpty` >> flag >> > > can >> > > >> be >> > > >> >> > > reduced >> > > >> >> > > > >>>> to a >> > > >> >> > > > >>>> > > single flag >> > > >> >> > > > >>>> > > in RLMM (using AND operator) that can serve >> the >> > > >> query - >> > > >> >> "Is >> > > >> >> > > all >> > > >> >> > > > >>>> the >> > > >> >> > > > >>>> > > transaction indexes empty for a partition?". >> > > >> >> > > > >>>> > > If yes, then we can directly scan the >> local-log >> > for >> > > >> >> aborted >> > > >> >> > > > >>>> > > transactions. >> > > >> >> > > > >>>> > > 2. A partition is produced using the >> transactional >> > > >> >> producer. >> > > >> >> > The >> > > >> >> > > > >>>> > > assumption made is that >> > > >> >> > > > >>>> > > the transaction will either commit/rollback >> > within >> > > >> 15 >> > > >> >> > > minutes >> > > >> >> > > > >>>> > > (default transaction.max.timeout.ms = 15 >> mins), >> > > >> >> possibly >> > > >> >> > we >> > > >> >> > > > >>>> may have >> > > >> >> > > > >>>> > > to search only >> > > >> >> > > > >>>> > > a few consecutive remote log segments to >> collect >> > > the >> > > >> >> > aborted >> > > >> >> > > > >>>> > > transactions. >> > > >> >> > > > >>>> > > 3. A partition is being produced with both normal >> > and >> > > >> >> > > > transactional >> > > >> >> > > > >>>> > > producers. In this case, >> > > >> >> > > > >>>> > > we will be doing linear traversal. >> Maintaining a >> > > >> >> skip-list >> > > >> >> > > > might >> > > >> >> > > > >>>> > > improve the performance but >> > > >> >> > > > >>>> > > we delegate the RLMM implementation to >> users. If >> > > >> >> > implemented >> > > >> >> > > > >>>> > > incorrectly, then it can lead >> > > >> >> > > > >>>> > > to delivery of the aborted transaction >> records >> > to >> > > >> the >> > > >> >> > > > consumer. >> > > >> >> > > > >>>> > > >> > > >> >> > > > >>>> > > I notice two drawbacks with the reduction method >> as >> > > >> >> proposed >> > > >> >> > in >> > > >> >> > > > the >> > > >> >> > > > >>>> KIP: >> > > >> >> > > > >>>> > > >> > > >> >> > > > >>>> > > 1. Even if one segment has a transaction index, >> then >> > > we >> > > >> >> have >> > > >> >> > to >> > > >> >> > > > >>>> iterate >> > > >> >> > > > >>>> > > over all the metadata events. >> > > >> >> > > > >>>> > > 2. Assume that there are 10 segments and >> segment-5 >> > > has a >> > > >> >> txn >> > > >> >> > > > index. >> > > >> >> > > > >>>> Once >> > > >> >> > > > >>>> > > the first 6 segments are deleted, >> > > >> >> > > > >>>> > > due to breach by time/size/start-offset, >> then we >> > > >> should >> > > >> >> > > return >> > > >> >> > > > >>>> `true` >> > > >> >> > > > >>>> > > for "Is all the transaction indexes empty for a >> > > >> partition?" >> > > >> >> > > > >>>> > > query but it will return `false` until the >> broker >> > > >> gets >> > > >> >> > > > restarted >> > > >> >> > > > >>>> and >> > > >> >> > > > >>>> > we >> > > >> >> > > > >>>> > > have to resort to iterate over all the metadata >> > > events. >> > > >> >> > > > >>>> > > >> > > >> >> > > > >>>> > > > 5. Potential alternative#2 - We know that we >> may >> > > want >> > > >> the >> > > >> >> > > > indexes >> > > >> >> > > > >>>> of >> > > >> >> > > > >>>> > > multiple higher segments. Instead of fetching >> them >> > > >> >> > sequentially, >> > > >> >> > > > we >> > > >> >> > > > >>>> could >> > > >> >> > > > >>>> > > implement a parallel fetch or a pre-fetch for the >> > > >> indexes. >> > > >> >> > This >> > > >> >> > > > >>>> would >> > > >> >> > > > >>>> > help >> > > >> >> > > > >>>> > > hide the latency of sequentially fetching the trx >> > > >> indexes. >> > > >> >> > > > >>>> > > >> > > >> >> > > > >>>> > > We can implement parallel-fetch/prefetch once the >> > > tiered >> > > >> >> > storage >> > > >> >> > > > is >> > > >> >> > > > >>>> GAed. >> > > >> >> > > > >>>> > > Since this feature will be useful >> > > >> >> > > > >>>> > > to prefetch the next remote log segment and it >> > expands >> > > >> the >> > > >> >> > scope >> > > >> >> > > > of >> > > >> >> > > > >>>> the >> > > >> >> > > > >>>> > > work. >> > > >> >> > > > >>>> > > >> > > >> >> > > > >>>> > > > 6. Should the proposed API take "segmentId" as >> a >> > > >> >> parameter >> > > >> >> > > > >>>> instead of >> > > >> >> > > > >>>> > > "topicIdPartition"? Suggesting because >> isTxnIdEmpty >> > is >> > > >> not >> > > >> >> a >> > > >> >> > > > >>>> property of >> > > >> >> > > > >>>> > a >> > > >> >> > > > >>>> > > partition, instead it's a property of a specific >> > > >> segment. >> > > >> >> > > > >>>> > > >> > > >> >> > > > >>>> > > We propose to use the `topicIdPartition` in >> > > >> >> > > > >>>> RemoteLogMetadataManager. >> > > >> >> > > > >>>> > > The implementation can fold/reduce the value of >> the >> > > >> >> individual >> > > >> >> > > log >> > > >> >> > > > >>>> > segment >> > > >> >> > > > >>>> > > `isTxnIdEmpty` flag. This is added to avoid >> scanning >> > > all >> > > >> >> the >> > > >> >> > > > >>>> metadata >> > > >> >> > > > >>>> > > events >> > > >> >> > > > >>>> > > when the partition does not have a transaction >> index >> > > in >> > > >> >> any of >> > > >> >> > > the >> > > >> >> > > > >>>> > > segments. >> > > >> >> > > > >>>> > > >> > > >> >> > > > >>>> > > On Mon, Jun 17, 2024 at 4:05 PM Divij Vaidya < >> > > >> >> > > > >>>> divijvaidy...@gmail.com> >> > > >> >> > > > >>>> > > wrote: >> > > >> >> > > > >>>> > > >> > > >> >> > > > >>>> > >> Hi Kamal >> > > >> >> > > > >>>> > >> >> > > >> >> > > > >>>> > >> Thanks for bringing this up. This is a problem >> > worth >> > > >> >> solving. >> > > >> >> > > We >> > > >> >> > > > >>>> have >> > > >> >> > > > >>>> > >> faced >> > > >> >> > > > >>>> > >> this in situations where some Kafka clients >> default >> > > to >> > > >> >> > > > >>>> read_committed >> > > >> >> > > > >>>> > mode >> > > >> >> > > > >>>> > >> and end up having high latencies for remote >> fetches >> > > >> due to >> > > >> >> > this >> > > >> >> > > > >>>> > traversal >> > > >> >> > > > >>>> > >> across all segments. >> > > >> >> > > > >>>> > >> >> > > >> >> > > > >>>> > >> First some nits to clarify the KIP: >> > > >> >> > > > >>>> > >> 1. The motivation should make it clear that >> > traversal >> > > >> of >> > > >> >> all >> > > >> >> > > > >>>> segments is >> > > >> >> > > > >>>> > >> only in the worst case. If I am not mistaken >> > (please >> > > >> >> correct >> > > >> >> > me >> > > >> >> > > > if >> > > >> >> > > > >>>> > wrong), >> > > >> >> > > > >>>> > >> the traversal stops when it has found a segment >> > > >> containing >> > > >> >> > LSO. >> > > >> >> > > > >>>> > >> 2. There is nothing like a non-txn topic. A >> > > transaction >> > > >> >> may >> > > >> >> > be >> > > >> >> > > > >>>> started >> > > >> >> > > > >>>> > on >> > > >> >> > > > >>>> > >> any topic. Perhaps, rephrase the statement in >> the >> > KIP >> > > >> so >> > > >> >> that >> > > >> >> > > it >> > > >> >> > > > is >> > > >> >> > > > >>>> > clear >> > > >> >> > > > >>>> > >> to the reader. >> > > >> >> > > > >>>> > >> 3. The hyperlink in the "the broker has to >> traverse >> > > all >> > > >> >> > the..." >> > > >> >> > > > >>>> seems >> > > >> >> > > > >>>> > >> incorrect. Did you want to point to >> > > >> >> > > > >>>> > >> >> > > >> >> > > > >>>> > >> >> > > >> >> > > > >>>> > >> > > >> >> > > > >>>> >> > > >> >> > > > >> > > >> >> > > >> > > >> >> > >> > > >> >> >> > > >> >> > > >> > >> https://github.com/apache/kafka/blob/21d60eabab8a14c8002611c65e092338bf584314/core/src/main/scala/kafka/log/LocalLog.scala#L444 >> > > >> >> > > > >>>> > >> ? >> > > >> >> > > > >>>> > >> 4. In the testing section, could we add a test >> > plan? >> > > >> For >> > > >> >> > > > example, I >> > > >> >> > > > >>>> > would >> > > >> >> > > > >>>> > >> list down adding a test which would verify the >> > number >> > > >> of >> > > >> >> > calls >> > > >> >> > > > >>>> made to >> > > >> >> > > > >>>> > >> RLMM. This test would have a higher number of >> calls >> > > >> >> earlier >> > > >> >> > vs. >> > > >> >> > > > >>>> after >> > > >> >> > > > >>>> > this >> > > >> >> > > > >>>> > >> KIP. >> > > >> >> > > > >>>> > >> >> > > >> >> > > > >>>> > >> Other thoughts: >> > > >> >> > > > >>>> > >> 4. Potential alternative - Instead of having an >> > > >> algorithm >> > > >> >> > where >> > > >> >> > > > we >> > > >> >> > > > >>>> > >> traverse >> > > >> >> > > > >>>> > >> across segment metadata and looking for >> > isTxnIdxEmpty >> > > >> >> flag, >> > > >> >> > > > should >> > > >> >> > > > >>>> we >> > > >> >> > > > >>>> > >> directly introduce a nextSegmentWithTrxInx() >> > > function? >> > > >> >> This >> > > >> >> > > would >> > > >> >> > > > >>>> allow >> > > >> >> > > > >>>> > >> implementers to optimize the otherwise linear >> scan >> > > >> across >> > > >> >> > > > metadata >> > > >> >> > > > >>>> for >> > > >> >> > > > >>>> > all >> > > >> >> > > > >>>> > >> segments by using techniques such as skip list >> etc. >> > > >> >> > > > >>>> > >> 5. Potential alternative#2 - We know that we may >> > want >> > > >> the >> > > >> >> > > indexes >> > > >> >> > > > >>>> of >> > > >> >> > > > >>>> > >> multiple higher segments. Instead of fetching >> them >> > > >> >> > > sequentially, >> > > >> >> > > > we >> > > >> >> > > > >>>> > could >> > > >> >> > > > >>>> > >> implement a parallel fetch or a pre-fetch for >> the >> > > >> indexes. >> > > >> >> > This >> > > >> >> > > > >>>> would >> > > >> >> > > > >>>> > help >> > > >> >> > > > >>>> > >> hide the latency of sequentially fetching the >> trx >> > > >> indexes. >> > > >> >> > > > >>>> > >> 6. Should the proposed API take "segmentId" as a >> > > >> parameter >> > > >> >> > > > instead >> > > >> >> > > > >>>> of >> > > >> >> > > > >>>> > >> "topicIdPartition"? Suggesting because >> isTxnIdEmpty >> > > is >> > > >> >> not a >> > > >> >> > > > >>>> property >> > > >> >> > > > >>>> > of a >> > > >> >> > > > >>>> > >> partition, instead it's a property of a specific >> > > >> segment. >> > > >> >> > > > >>>> > >> >> > > >> >> > > > >>>> > >> Looking forward to hearing your thoughts about >> the >> > > >> >> > > alternatives. >> > > >> >> > > > >>>> Let's >> > > >> >> > > > >>>> > get >> > > >> >> > > > >>>> > >> this fixed. >> > > >> >> > > > >>>> > >> >> > > >> >> > > > >>>> > >> -- >> > > >> >> > > > >>>> > >> Divij Vaidya >> > > >> >> > > > >>>> > >> >> > > >> >> > > > >>>> > >> >> > > >> >> > > > >>>> > >> >> > > >> >> > > > >>>> > >> On Mon, Jun 17, 2024 at 11:40 AM Kamal >> > > Chandraprakash < >> > > >> >> > > > >>>> > >> kamal.chandraprak...@gmail.com> wrote: >> > > >> >> > > > >>>> > >> >> > > >> >> > > > >>>> > >> > Hi all, >> > > >> >> > > > >>>> > >> > >> > > >> >> > > > >>>> > >> > I have opened a KIP-1058 >> > > >> >> > > > >>>> > >> > < >> > > >> >> > > > >>>> > >> > >> > > >> >> > > > >>>> > >> >> > > >> >> > > > >>>> > >> > > >> >> > > > >>>> >> > > >> >> > > > >> > > >> >> > > >> > > >> >> > >> > > >> >> >> > > >> >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic >> > > >> >> > > > >>>> > >> > > >> > > >> >> > > > >>>> > >> > to reduce the pressure on remote storage when >> > > >> >> transactional >> > > >> >> > > > >>>> consumers >> > > >> >> > > > >>>> > >> are >> > > >> >> > > > >>>> > >> > reading non-txn topics from remote storage. >> > > >> >> > > > >>>> > >> > >> > > >> >> > > > >>>> > >> > >> > > >> >> > > > >>>> > >> > >> > > >> >> > > > >>>> > >> >> > > >> >> > > > >>>> > >> > > >> >> > > > >>>> >> > > >> >> > > > >> > > >> >> > > >> > > >> >> > >> > > >> >> >> > > >> >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic >> > > >> >> > > > >>>> > >> > >> > > >> >> > > > >>>> > >> > Feedbacks and suggestions are welcome. >> > > >> >> > > > >>>> > >> > >> > > >> >> > > > >>>> > >> > Thanks, >> > > >> >> > > > >>>> > >> > Kamal >> > > >> >> > > > >>>> > >> > >> > > >> >> > > > >>>> > >> >> > > >> >> > > > >>>> > > >> > > >> >> > > > >>>> > >> > > >> >> > > > >>>> >> > > >> >> > > > >>> >> > > >> >> > > > >> > > >> >> > > >> > > >> >> > >> > > >> >> >> > > >> > >> > > >> >> > > > >> > > >> > >> >