Divij,

Thanks for the review! Updated the KIP with 1, 2, 3, and 4 review comments.

> 4. Potential alternative - Instead of having an algorithm where we
traverse
across segment metadata and looking for isTxnIdxEmpty flag, should we
directly introduce a nextSegmentWithTrxInx() function? This would allow
implementers to optimize the otherwise linear scan across metadata for all
segments by using techniques such as skip list etc.

This is a good point to optimize the scan. We need to maintain the
skip-list
for each leader-epoch. With unclean leader election, some brokers may not
have
the complete lineage. This will expand the scope of the work.

In this version, we plan to optimize only for the below 2 cases:

1. A partition does not have the transaction index for any of the uploaded
segments.
   The individual log segments `isTxnIdxEmpty` flag can be reduced to a
single flag
   in RLMM (using AND operator) that can serve the query - "Is all the
transaction indexes empty for a partition?".
   If yes, then we can directly scan the local-log for aborted transactions.
2. A partition is produced using the transactional producer. The assumption
made is that
    the transaction will either commit/rollback within 15 minutes
    (default transaction.max.timeout.ms = 15 mins), possibly we may have to
search only
    a few consecutive remote log segments to collect the aborted
transactions.
3. A partition is being produced with both normal and transactional
producers. In this case,
    we will be doing linear traversal. Maintaining a skip-list might
improve the performance but
    we delegate the RLMM implementation to users. If implemented
incorrectly, then it can lead
    to delivery of the aborted transaction records to the consumer.

I notice two drawbacks with the reduction method as proposed in the KIP:

1. Even if one segment has a transaction index, then we have to iterate
over all the metadata events.
2. Assume that there are 10 segments and segment-5 has a txn index. Once
the first 6 segments are deleted,
    due to breach by time/size/start-offset, then we should return `true`
for "Is all the transaction indexes empty for a partition?"
   query but it will return `false` until the broker gets restarted and we
have to resort to iterate over all the metadata events.

> 5. Potential alternative#2 - We know that we may want the indexes of
multiple higher segments. Instead of fetching them sequentially, we could
implement a parallel fetch or a pre-fetch for the indexes. This would help
hide the latency of sequentially fetching the trx indexes.

We can implement parallel-fetch/prefetch once the tiered storage is GAed.
Since this feature will be useful
to prefetch the next remote log segment and it expands the scope of the
work.

> 6. Should the proposed API take "segmentId" as a parameter instead of
"topicIdPartition"? Suggesting because isTxnIdEmpty is not a property of a
partition, instead it's a property of a specific segment.

We propose to use the `topicIdPartition` in RemoteLogMetadataManager.
The implementation can fold/reduce the value of the individual log segment
`isTxnIdEmpty` flag. This is added to avoid scanning all the metadata events
when the partition does not have a transaction index in any of the
segments.

On Mon, Jun 17, 2024 at 4:05 PM Divij Vaidya <[email protected]>
wrote:

> Hi Kamal
>
> Thanks for bringing this up. This is a problem worth solving. We have faced
> this in situations where some Kafka clients default to read_committed mode
> and end up having high latencies for remote fetches due to this traversal
> across all segments.
>
> First some nits to clarify the KIP:
> 1. The motivation should make it clear that traversal of all segments is
> only in the worst case. If I am not mistaken (please correct me if wrong),
> the traversal stops when it has found a segment containing LSO.
> 2. There is nothing like a non-txn topic. A transaction may be started on
> any topic. Perhaps, rephrase the statement in the KIP so that it is clear
> to the reader.
> 3. The hyperlink in the "the broker has to traverse all the..." seems
> incorrect. Did you want to point to
>
> https://github.com/apache/kafka/blob/21d60eabab8a14c8002611c65e092338bf584314/core/src/main/scala/kafka/log/LocalLog.scala#L444
> ?
> 4. In the testing section, could we add a test plan? For example, I would
> list down adding a test which would verify the number of calls made to
> RLMM. This test would have a higher number of calls earlier vs. after this
> KIP.
>
> Other thoughts:
> 4. Potential alternative - Instead of having an algorithm where we traverse
> across segment metadata and looking for isTxnIdxEmpty flag, should we
> directly introduce a nextSegmentWithTrxInx() function? This would allow
> implementers to optimize the otherwise linear scan across metadata for all
> segments by using techniques such as skip list etc.
> 5. Potential alternative#2 - We know that we may want the indexes of
> multiple higher segments. Instead of fetching them sequentially, we could
> implement a parallel fetch or a pre-fetch for the indexes. This would help
> hide the latency of sequentially fetching the trx indexes.
> 6. Should the proposed API take "segmentId" as a parameter instead of
> "topicIdPartition"? Suggesting because isTxnIdEmpty is not a property of a
> partition, instead it's a property of a specific segment.
>
> Looking forward to hearing your thoughts about the alternatives. Let's get
> this fixed.
>
> --
> Divij Vaidya
>
>
>
> On Mon, Jun 17, 2024 at 11:40 AM Kamal Chandraprakash <
> [email protected]> wrote:
>
> > Hi all,
> >
> > I have opened a KIP-1058
> > <
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic
> > >
> > to reduce the pressure on remote storage when transactional consumers are
> > reading non-txn topics from remote storage.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1058%3A+Txn+consumer+exerts+pressure+on+remote+storage+when+reading+non-txn+topic
> >
> > Feedbacks and suggestions are welcome.
> >
> > Thanks,
> > Kamal
> >
>

Reply via email to