Bump. Please review this proposal.
On Mon, Jun 17, 2024 at 6:55 PM Kamal Chandraprakash < kamal.chandraprak...@gmail.com> wrote: > Divij, > > Thanks for the review! Updated the KIP with 1, 2, 3, and 4 review > comments. > > > 4. Potential alternative - Instead of having an algorithm where we > traverse > across segment metadata and looking for isTxnIdxEmpty flag, should we > directly introduce a nextSegmentWithTrxInx() function? This would allow > implementers to optimize the otherwise linear scan across metadata for all > segments by using techniques such as skip list etc. > > This is a good point to optimize the scan. We need to maintain the > skip-list > for each leader-epoch. With unclean leader election, some brokers may not > have > the complete lineage. This will expand the scope of the work. > > In this version, we plan to optimize only for the below 2 cases: > > 1. A partition does not have the transaction index for any of the uploaded > segments. > The individual log segments `isTxnIdxEmpty` flag can be reduced to a > single flag > in RLMM (using AND operator) that can serve the query - "Is all the > transaction indexes empty for a partition?". > If yes, then we can directly scan the local-log for aborted > transactions. > 2. A partition is produced using the transactional producer. The > assumption made is that > the transaction will either commit/rollback within 15 minutes > (default transaction.max.timeout.ms = 15 mins), possibly we may have > to search only > a few consecutive remote log segments to collect the aborted > transactions. > 3. A partition is being produced with both normal and transactional > producers. In this case, > we will be doing linear traversal. Maintaining a skip-list might > improve the performance but > we delegate the RLMM implementation to users. If implemented > incorrectly, then it can lead > to delivery of the aborted transaction records to the consumer. > > I notice two drawbacks with the reduction method as proposed in the KIP: > > 1. Even if one segment has a transaction index, then we have to iterate > over all the metadata events. > 2. Assume that there are 10 segments and segment-5 has a txn index. Once > the first 6 segments are deleted, > due to breach by time/size/start-offset, then we should return `true` > for "Is all the transaction indexes empty for a partition?" > query but it will return `false` until the broker gets restarted and we > have to resort to iterate over all the metadata events. > > > 5. Potential alternative#2 - We know that we may want the indexes of > multiple higher segments. Instead of fetching them sequentially, we could > implement a parallel fetch or a pre-fetch for the indexes. This would help > hide the latency of sequentially fetching the trx indexes. > > We can implement parallel-fetch/prefetch once the tiered storage is GAed. > Since this feature will be useful > to prefetch the next remote log segment and it expands the scope of the > work. > > > 6. Should the proposed API take "segmentId" as a parameter instead of > "topicIdPartition"? Suggesting because isTxnIdEmpty is not a property of a > partition, instead it's a property of a specific segment. > > We propose to use the `topicIdPartition` in RemoteLogMetadataManager. > The implementation can fold/reduce the value of the individual log segment > `isTxnIdEmpty` flag. This is added to avoid scanning all the metadata > events > when the partition does not have a transaction index in any of the > segments. > > On Mon, Jun 17, 2024 at 4:05 PM Divij Vaidya <divijvaidy...@gmail.com> > wrote: > >> Hi Kamal >> >> Thanks for bringing this up. This is a problem worth solving. We have >> faced >> this in situations where some Kafka clients default to read_committed mode >> and end up having high latencies for remote fetches due to this traversal >> across all segments. >> >> First some nits to clarify the KIP: >> 1. The motivation should make it clear that traversal of all segments is >> only in the worst case. If I am not mistaken (please correct me if wrong), >> the traversal stops when it has found a segment containing LSO. >> 2. There is nothing like a non-txn topic. A transaction may be started on >> any topic. Perhaps, rephrase the statement in the KIP so that it is clear >> to the reader. >> 3. The hyperlink in the "the broker has to traverse all the..." seems >> incorrect. Did you want to point to >> >> https://github.com/apache/kafka/blob/21d60eabab8a14c8002611c65e092338bf584314/core/src/main/scala/kafka/log/LocalLog.scala#L444 >> ? >> 4. In the testing section, could we add a test plan? For example, I would >> list down adding a test which would verify the number of calls made to >> RLMM. This test would have a higher number of calls earlier vs. after this >> KIP. >> >> Other thoughts: >> 4. Potential alternative - Instead of having an algorithm where we >> traverse >> across segment metadata and looking for isTxnIdxEmpty flag, should we >> directly introduce a nextSegmentWithTrxInx() function? This would allow >> implementers to optimize the otherwise linear scan across metadata for all >> segments by using techniques such as skip list etc. >> 5. Potential alternative#2 - We know that we may want the indexes of >> multiple higher segments. Instead of fetching them sequentially, we could >> implement a parallel fetch or a pre-fetch for the indexes. This would help >> hide the latency of sequentially fetching the trx indexes. >> 6. Should the proposed API take "segmentId" as a parameter instead of >> "topicIdPartition"? Suggesting because isTxnIdEmpty is not a property of a >> partition, instead it's a property of a specific segment. >> >> Looking forward to hearing your thoughts about the alternatives. Let's get >> this fixed. >> >> -- >> Divij Vaidya >> >> >> >> On Mon, Jun 17, 2024 at 11:40 AM Kamal Chandraprakash < >> kamal.chandraprak...@gmail.com> wrote: >> >> > Hi all, >> > >> > I have opened a KIP-1058 >> > < >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic >> > > >> > to reduce the pressure on remote storage when transactional consumers >> are >> > reading non-txn topics from remote storage. >> > >> > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic >> > >> > Feedbacks and suggestions are welcome. >> > >> > Thanks, >> > Kamal >> > >> >