Bump. Please review this proposal.

On Mon, Jun 17, 2024 at 6:55 PM Kamal Chandraprakash <
kamal.chandraprak...@gmail.com> wrote:

> Divij,
>
> Thanks for the review! Updated the KIP with 1, 2, 3, and 4 review
> comments.
>
> > 4. Potential alternative - Instead of having an algorithm where we
> traverse
> across segment metadata and looking for isTxnIdxEmpty flag, should we
> directly introduce a nextSegmentWithTrxInx() function? This would allow
> implementers to optimize the otherwise linear scan across metadata for all
> segments by using techniques such as skip list etc.
>
> This is a good point to optimize the scan. We need to maintain the
> skip-list
> for each leader-epoch. With unclean leader election, some brokers may not
> have
> the complete lineage. This will expand the scope of the work.
>
> In this version, we plan to optimize only for the below 2 cases:
>
> 1. A partition does not have the transaction index for any of the uploaded
> segments.
>    The individual log segments `isTxnIdxEmpty` flag can be reduced to a
> single flag
>    in RLMM (using AND operator) that can serve the query - "Is all the
> transaction indexes empty for a partition?".
>    If yes, then we can directly scan the local-log for aborted
> transactions.
> 2. A partition is produced using the transactional producer. The
> assumption made is that
>     the transaction will either commit/rollback within 15 minutes
>     (default transaction.max.timeout.ms = 15 mins), possibly we may have
> to search only
>     a few consecutive remote log segments to collect the aborted
> transactions.
> 3. A partition is being produced with both normal and transactional
> producers. In this case,
>     we will be doing linear traversal. Maintaining a skip-list might
> improve the performance but
>     we delegate the RLMM implementation to users. If implemented
> incorrectly, then it can lead
>     to delivery of the aborted transaction records to the consumer.
>
> I notice two drawbacks with the reduction method as proposed in the KIP:
>
> 1. Even if one segment has a transaction index, then we have to iterate
> over all the metadata events.
> 2. Assume that there are 10 segments and segment-5 has a txn index. Once
> the first 6 segments are deleted,
>     due to breach by time/size/start-offset, then we should return `true`
> for "Is all the transaction indexes empty for a partition?"
>    query but it will return `false` until the broker gets restarted and we
> have to resort to iterate over all the metadata events.
>
> > 5. Potential alternative#2 - We know that we may want the indexes of
> multiple higher segments. Instead of fetching them sequentially, we could
> implement a parallel fetch or a pre-fetch for the indexes. This would help
> hide the latency of sequentially fetching the trx indexes.
>
> We can implement parallel-fetch/prefetch once the tiered storage is GAed.
> Since this feature will be useful
> to prefetch the next remote log segment and it expands the scope of the
> work.
>
> > 6. Should the proposed API take "segmentId" as a parameter instead of
> "topicIdPartition"? Suggesting because isTxnIdEmpty is not a property of a
> partition, instead it's a property of a specific segment.
>
> We propose to use the `topicIdPartition` in RemoteLogMetadataManager.
> The implementation can fold/reduce the value of the individual log segment
> `isTxnIdEmpty` flag. This is added to avoid scanning all the metadata
> events
> when the partition does not have a transaction index in any of the
> segments.
>
> On Mon, Jun 17, 2024 at 4:05 PM Divij Vaidya <divijvaidy...@gmail.com>
> wrote:
>
>> Hi Kamal
>>
>> Thanks for bringing this up. This is a problem worth solving. We have
>> faced
>> this in situations where some Kafka clients default to read_committed mode
>> and end up having high latencies for remote fetches due to this traversal
>> across all segments.
>>
>> First some nits to clarify the KIP:
>> 1. The motivation should make it clear that traversal of all segments is
>> only in the worst case. If I am not mistaken (please correct me if wrong),
>> the traversal stops when it has found a segment containing LSO.
>> 2. There is nothing like a non-txn topic. A transaction may be started on
>> any topic. Perhaps, rephrase the statement in the KIP so that it is clear
>> to the reader.
>> 3. The hyperlink in the "the broker has to traverse all the..." seems
>> incorrect. Did you want to point to
>>
>> https://github.com/apache/kafka/blob/21d60eabab8a14c8002611c65e092338bf584314/core/src/main/scala/kafka/log/LocalLog.scala#L444
>> ?
>> 4. In the testing section, could we add a test plan? For example, I would
>> list down adding a test which would verify the number of calls made to
>> RLMM. This test would have a higher number of calls earlier vs. after this
>> KIP.
>>
>> Other thoughts:
>> 4. Potential alternative - Instead of having an algorithm where we
>> traverse
>> across segment metadata and looking for isTxnIdxEmpty flag, should we
>> directly introduce a nextSegmentWithTrxInx() function? This would allow
>> implementers to optimize the otherwise linear scan across metadata for all
>> segments by using techniques such as skip list etc.
>> 5. Potential alternative#2 - We know that we may want the indexes of
>> multiple higher segments. Instead of fetching them sequentially, we could
>> implement a parallel fetch or a pre-fetch for the indexes. This would help
>> hide the latency of sequentially fetching the trx indexes.
>> 6. Should the proposed API take "segmentId" as a parameter instead of
>> "topicIdPartition"? Suggesting because isTxnIdEmpty is not a property of a
>> partition, instead it's a property of a specific segment.
>>
>> Looking forward to hearing your thoughts about the alternatives. Let's get
>> this fixed.
>>
>> --
>> Divij Vaidya
>>
>>
>>
>> On Mon, Jun 17, 2024 at 11:40 AM Kamal Chandraprakash <
>> kamal.chandraprak...@gmail.com> wrote:
>>
>> > Hi all,
>> >
>> > I have opened a KIP-1058
>> > <
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic
>> > >
>> > to reduce the pressure on remote storage when transactional consumers
>> are
>> > reading non-txn topics from remote storage.
>> >
>> >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic
>> >
>> > Feedbacks and suggestions are welcome.
>> >
>> > Thanks,
>> > Kamal
>> >
>>
>

Reply via email to