Hey Jun Yes, it is possible to maintain the log size in the cache (see rejected alternative#3 in the KIP) but I did not understand how it is possible to retrieve it without the new API. The log size could be calculated on startup by scanning through the segments (though I would disagree that this is the right approach since scanning itself takes order of minutes and hence delay the start of archive process), and incrementally maintained afterwards, even then, we would need an API in RemoteLogMetadataManager so that RLM could fetch the cached size!
If we wish to cache the size without adding a new API, then we need to cache the size in RLM itself (instead of RLMM implementation) and incrementally manage it. The downside of longer archive time at startup (due to initial scale) still remains valid in this situation. -- Divij Vaidya On Fri, Dec 16, 2022 at 12:43 AM Jun Rao <j...@confluent.io.invalid> wrote: > Hi, Divij, > > Thanks for the explanation. > > If there is in-memory cache, could we maintain the log size in the cache > with the existing API? For example, a replica could make a > listRemoteLogSegments(TopicIdPartition topicIdPartition) call on startup to > get the remote segment size before the current leaderEpoch. The leader > could then maintain the size incrementally afterwards. On leader change, > other replicas can make a listRemoteLogSegments(TopicIdPartition > topicIdPartition, int leaderEpoch) call to get the size of newly generated > segments. > > Thanks, > > Jun > > > On Wed, Dec 14, 2022 at 3:27 AM Divij Vaidya <divijvaidy...@gmail.com> > wrote: > > > > Is the new method enough for doing size-based retention? > > > > Yes. You are right in assuming that this API only provides the Remote > > storage size (for current epoch chain). We would use this API for size > > based retention along with a value of localOnlyLogSegmentSize which is > > computed as Log.sizeInBytes(logSegments.filter(_.baseOffset > > > highestOffsetWithRemoteIndex)). Hence, (total_log_size = > > remoteLogSizeBytes + log.localOnlyLogSegmentSize). I have updated the KIP > > with this information. You can also check an example implementation at > > > > > https://github.com/satishd/kafka/blob/2.8.x-tiered-storage/core/src/main/scala/kafka/log/Log.scala#L2077 > > > > > > > Do you imagine all accesses to remote metadata will be across the > network > > or will there be some local in-memory cache? > > > > I would expect a disk-less implementation to maintain a finite in-memory > > cache for segment metadata to optimize the number of network calls made > to > > fetch the data. In future, we can think about bringing this finite size > > cache into RLM itself but that's probably a conversation for a different > > KIP. There are many other things we would like to do to optimize the > Tiered > > storage interface such as introducing a circular buffer / streaming > > interface from RSM (so that we don't have to wait to fetch the entire > > segment before starting to send records to the consumer), caching the > > segments fetched from RSM locally (I would assume all RSM plugin > > implementations to do this, might as well add it to RLM) etc. > > > > -- > > Divij Vaidya > > > > > > > > On Mon, Dec 12, 2022 at 7:35 PM Jun Rao <j...@confluent.io.invalid> > wrote: > > > > > Hi, Divij, > > > > > > Thanks for the reply. > > > > > > Is the new method enough for doing size-based retention? It gives the > > total > > > size of the remote segments, but it seems that we still don't know the > > > exact total size for a log since there could be overlapping segments > > > between the remote and the local segments. > > > > > > You mentioned a disk-less implementation. Do you imagine all accesses > to > > > remote metadata will be across the network or will there be some local > > > in-memory cache? > > > > > > Thanks, > > > > > > Jun > > > > > > > > > > > > On Wed, Dec 7, 2022 at 3:10 AM Divij Vaidya <divijvaidy...@gmail.com> > > > wrote: > > > > > > > The method is needed for RLMM implementations which fetch the > > information > > > > over the network and not for the disk based implementations (such as > > the > > > > default topic based RLMM). > > > > > > > > I would argue that adding this API makes the interface more generic > > than > > > > what it is today. This is because, with the current APIs an > implementor > > > is > > > > restricted to use disk based RLMM solutions only (i.e. the default > > > > solution) whereas if we add this new API, we unblock usage of network > > > based > > > > RLMM implementations such as databases. > > > > > > > > > > > > > > > > On Wed 30. Nov 2022 at 20:40, Jun Rao <j...@confluent.io.invalid> > > wrote: > > > > > > > > > Hi, Divij, > > > > > > > > > > Thanks for the reply. > > > > > > > > > > Point#2. My high level question is that is the new method needed > for > > > > every > > > > > implementation of remote storage or just for a specific > > implementation. > > > > The > > > > > issues that you pointed out exist for the default implementation of > > > RLMM > > > > as > > > > > well and so far, the default implementation hasn't found a need > for a > > > > > similar new method. For public interface, ideally we want to make > it > > > more > > > > > general. > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > On Mon, Nov 21, 2022 at 7:11 AM Divij Vaidya < > > divijvaidy...@gmail.com> > > > > > wrote: > > > > > > > > > > > Thank you Jun and Alex for your comments. > > > > > > > > > > > > Point#1: You are right Jun. As Alex mentioned, the "derived > > metadata" > > > > can > > > > > > increase the size of cached metadata by a factor of 10 but it > > should > > > be > > > > > ok > > > > > > to cache just the actual metadata. My point about size being a > > > > limitation > > > > > > for using cache is not valid anymore. > > > > > > > > > > > > Point#2: For a new replica, it would still have to fetch the > > metadata > > > > > over > > > > > > the network to initiate the warm up of the cache and hence, > > increase > > > > the > > > > > > start time of the archival process. Please also note the > > > repercussions > > > > of > > > > > > the warm up scan that Alex mentioned in this thread as part of > > > #102.2. > > > > > > > > > > > > 100#: Agreed Alex. Thanks for clarifying that. My point about > size > > > > being > > > > > a > > > > > > limitation for using cache is not valid anymore. > > > > > > > > > > > > 101#: Alex, if I understand correctly, you are suggesting to > cache > > > the > > > > > > total size at the leader and update it on archival. This wouldn't > > > work > > > > > for > > > > > > cases when the leader restarts where we would have to make a full > > > scan > > > > > > to update the total size entry on startup. We expect users to > store > > > > data > > > > > > over longer duration in remote storage which increases the > > likelihood > > > > of > > > > > > leader restarts / failovers. > > > > > > > > > > > > 102#.1: I don't think that the current design accommodates the > fact > > > > that > > > > > > data corruption could happen at the RLMM plugin (we don't have > > > checksum > > > > > as > > > > > > a field in metadata as part of KIP405). If data corruption > occurs, > > w/ > > > > or > > > > > > w/o the cache, it would be a different problem to solve. I would > > like > > > > to > > > > > > keep this outside the scope of this KIP. > > > > > > > > > > > > 102#.2: Agree. This remains as the main concern for using the > cache > > > to > > > > > > fetch total size. > > > > > > > > > > > > Regards, > > > > > > Divij Vaidya > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Nov 18, 2022 at 12:59 PM Alexandre Dupriez < > > > > > > alexandre.dupr...@gmail.com> wrote: > > > > > > > > > > > > > Hi Divij, > > > > > > > > > > > > > > Thanks for the KIP. Please find some comments based on what I > > read > > > on > > > > > > > this thread so far - apologies for the repeats and the late > > reply. > > > > > > > > > > > > > > If I understand correctly, one of the main elements of > discussion > > > is > > > > > > > about caching in Kafka versus delegation of providing the > remote > > > size > > > > > > > of a topic-partition to the plugin. > > > > > > > > > > > > > > A few comments: > > > > > > > > > > > > > > 100. The size of the “derived metadata” which is managed by the > > > > plugin > > > > > > > to represent an rlmMetadata can indeed be close to 1 kB on > > average > > > > > > > depending on its own internal structure, e.g. the redundancy it > > > > > > > enforces (unfortunately resulting to duplication), additional > > > > > > > information such as checksums and primary and secondary > indexable > > > > > > > keys. But indeed, the rlmMetadata is itself a lighter data > > > structure > > > > > > > by a factor of 10. And indeed, instead of caching the “derived > > > > > > > metadata”, only the rlmMetadata could be, which should address > > the > > > > > > > concern regarding the memory occupancy of the cache. > > > > > > > > > > > > > > 101. I am not sure I fully understand why we would need to > cache > > > the > > > > > > > list of rlmMetadata to retain the remote size of a > > topic-partition. > > > > > > > Since the leader of a topic-partition is, in non-degenerated > > cases, > > > > > > > the only actor which can mutate the remote part of the > > > > > > > topic-partition, hence its size, it could in theory only cache > > the > > > > > > > size of the remote log once it has calculated it? In which case > > > there > > > > > > > would not be any problem regarding the size of the caching > > > strategy. > > > > > > > Did I miss something there? > > > > > > > > > > > > > > 102. There may be a few challenges to consider with caching: > > > > > > > > > > > > > > 102.1) As mentioned above, the caching strategy assumes no > > mutation > > > > > > > outside the lifetime of a leader. While this is true in the > > normal > > > > > > > course of operation, there could be accidental mutation outside > > of > > > > the > > > > > > > leader and a loss of consistency between the cached state and > the > > > > > > > actual remote representation of the log. E.g. split-brain > > > scenarios, > > > > > > > bugs in the plugins, bugs in external systems with mutating > > access > > > on > > > > > > > the derived metadata. In the worst case, a drift between the > > cached > > > > > > > size and the actual size could lead to over-deleting remote > data > > > > which > > > > > > > is a durability risk. > > > > > > > > > > > > > > The alternative you propose, by making the plugin the source of > > > truth > > > > > > > w.r.t. to the size of the remote log, can make it easier to > avoid > > > > > > > inconsistencies between plugin-managed metadata and the remote > > log > > > > > > > from the perspective of Kafka. On the other hand, plugin > vendors > > > > would > > > > > > > have to implement it with the expected efficiency to have it > > yield > > > > > > > benefits. > > > > > > > > > > > > > > 102.2) As you mentioned, the caching strategy in Kafka would > > still > > > > > > > require one iteration over the list of rlmMetadata when the > > > > leadership > > > > > > > of a topic-partition is assigned to a broker, while the plugin > > can > > > > > > > offer alternative constant-time approaches. This calculation > > cannot > > > > be > > > > > > > put on the LeaderAndIsr path and would be performed in the > > > > background. > > > > > > > In case of bulk leadership migration, listing the rlmMetadata > > could > > > > a) > > > > > > > result in request bursts to any backend system the plugin may > use > > > > > > > [which shouldn’t be a problem for high-throughput data stores > but > > > > > > > could have cost implications] b) increase utilisation timespan > of > > > the > > > > > > > RLM threads for these calculations potentially leading to > > transient > > > > > > > starvation of tasks queued for, typically, offloading > operations > > c) > > > > > > > could have a non-marginal CPU footprint on hardware with strict > > > > > > > resource constraints. All these elements could have an impact > to > > > some > > > > > > > degree depending on the operational environment. > > > > > > > > > > > > > > From a design perspective, one question is where we want the > > source > > > > of > > > > > > > truth w.r.t. remote log size to be during the lifetime of a > > leader. > > > > > > > The responsibility of maintaining a consistent representation > of > > > the > > > > > > > remote log is shared by Kafka and the plugin. Which system is > > best > > > > > > > placed to maintain such a state while providing the highest > > > > > > > consistency guarantees is something both Kafka and plugin > > designers > > > > > > > could help understand better. > > > > > > > > > > > > > > Many thanks, > > > > > > > Alexandre > > > > > > > > > > > > > > > > > > > > > Le jeu. 17 nov. 2022 à 19:27, Jun Rao <j...@confluent.io.invalid > > > > a > > > > > > écrit : > > > > > > > > > > > > > > > > Hi, Divij, > > > > > > > > > > > > > > > > Thanks for the reply. > > > > > > > > > > > > > > > > Point #1. Is the average remote segment metadata really 1KB? > > > What's > > > > > > > listed > > > > > > > > in the public interface is probably well below 100 bytes. > > > > > > > > > > > > > > > > Point #2. I guess you are assuming that each broker only > caches > > > the > > > > > > > remote > > > > > > > > segment metadata in memory. An alternative approach is to > cache > > > > them > > > > > in > > > > > > > > both memory and local disk. That way, on broker restart, you > > just > > > > > need > > > > > > to > > > > > > > > fetch the new remote segments' metadata using the > > > > > > > > listRemoteLogSegments(TopicIdPartition topicIdPartition, int > > > > > > leaderEpoch) > > > > > > > > api. Will that work? > > > > > > > > > > > > > > > > Point #3. Thanks for the explanation and it sounds good. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > On Thu, Nov 17, 2022 at 7:31 AM Divij Vaidya < > > > > > divijvaidy...@gmail.com> > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Jun > > > > > > > > > > > > > > > > > > There are three points that I would like to present here: > > > > > > > > > > > > > > > > > > 1. We would require a large cache size to efficiently cache > > all > > > > > > segment > > > > > > > > > metadata. > > > > > > > > > 2. Linear scan of all metadata at broker startup to > populate > > > the > > > > > > cache > > > > > > > will > > > > > > > > > be slow and will impact the archival process. > > > > > > > > > 3. There is no other use case where a full scan of segment > > > > metadata > > > > > > is > > > > > > > > > required. > > > > > > > > > > > > > > > > > > Let's start by quantifying 1. Here's my estimate for the > size > > > of > > > > > the > > > > > > > cache. > > > > > > > > > Average size of segment metadata = 1KB. This could be more > if > > > we > > > > > have > > > > > > > > > frequent leader failover with a large number of leader > epochs > > > > being > > > > > > > stored > > > > > > > > > per segment. > > > > > > > > > Segment size = 100MB. Users will prefer to reduce the > segment > > > > size > > > > > > > from the > > > > > > > > > default value of 1GB to ensure timely archival of data > since > > > data > > > > > > from > > > > > > > > > active segment is not archived. > > > > > > > > > Cache size = num segments * avg. segment metadata size = > > > > > > > (100TB/100MB)*1KB > > > > > > > > > = 1GB. > > > > > > > > > While 1GB for cache may not sound like a large number for > > > larger > > > > > > > machines, > > > > > > > > > it does eat into the memory as an additional cache and > makes > > > use > > > > > > cases > > > > > > > with > > > > > > > > > large data retention with low throughout expensive (where > > such > > > > use > > > > > > case > > > > > > > > > would could use smaller machines). > > > > > > > > > > > > > > > > > > About point#2: > > > > > > > > > Even if we say that all segment metadata can fit into the > > > cache, > > > > we > > > > > > > will > > > > > > > > > need to populate the cache on broker startup. It would not > be > > > in > > > > > the > > > > > > > > > critical patch of broker startup and hence won't impact the > > > > startup > > > > > > > time. > > > > > > > > > But it will impact the time when we could start the > archival > > > > > process > > > > > > > since > > > > > > > > > the RLM thread pool will be blocked on the first call to > > > > > > > > > listRemoteLogSegments(). To scan metadata for 1MM segments > > > > > (computed > > > > > > > above) > > > > > > > > > and transfer 1GB data over the network from a RLMM such as > a > > > > remote > > > > > > > > > database would be in the order of minutes (depending on how > > > > > efficient > > > > > > > the > > > > > > > > > scan is with the RLMM implementation). Although, I would > > > concede > > > > > that > > > > > > > > > having RLM threads blocked for a few minutes is perhaps OK > > but > > > if > > > > > we > > > > > > > > > introduce the new API proposed in the KIP, we would have a > > > > > > > > > deterministic startup time for RLM. Adding the API comes > at a > > > low > > > > > > cost > > > > > > > and > > > > > > > > > I believe the trade off is worth it. > > > > > > > > > > > > > > > > > > About point#3: > > > > > > > > > We can use listRemoteLogSegments(TopicIdPartition > > > > topicIdPartition, > > > > > > int > > > > > > > > > leaderEpoch) to calculate the segments eligible for > deletion > > > > (based > > > > > > on > > > > > > > size > > > > > > > > > retention) where leader epoch(s) belong to the current > leader > > > > epoch > > > > > > > chain. > > > > > > > > > I understand that it may lead to segments belonging to > other > > > > epoch > > > > > > > lineage > > > > > > > > > not getting deleted and would require a separate mechanism > to > > > > > delete > > > > > > > them. > > > > > > > > > The separate mechanism would anyways be required to delete > > > these > > > > > > > "leaked" > > > > > > > > > segments as there are other cases which could lead to leaks > > > such > > > > as > > > > > > > network > > > > > > > > > problems with RSM mid way writing through. segment etc. > > > > > > > > > > > > > > > > > > Thank you for the replies so far. They have made me > re-think > > my > > > > > > > assumptions > > > > > > > > > and this dialogue has been very constructive for me. > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > Divij Vaidya > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Nov 10, 2022 at 10:49 PM Jun Rao > > > > <j...@confluent.io.invalid > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi, Divij, > > > > > > > > > > > > > > > > > > > > Thanks for the reply. > > > > > > > > > > > > > > > > > > > > It's true that the data in Kafka could be kept longer > with > > > > > KIP-405. > > > > > > > How > > > > > > > > > > much data do you envision to have per broker? For 100TB > > data > > > > per > > > > > > > broker, > > > > > > > > > > with 1GB segment and segment metadata of 100 bytes, it > > > requires > > > > > > > > > > 100TB/1GB*100 = 10MB, which should fit in memory. > > > > > > > > > > > > > > > > > > > > RemoteLogMetadataManager has two listRemoteLogSegments() > > > > methods. > > > > > > > The one > > > > > > > > > > you listed listRemoteLogSegments(TopicIdPartition > > > > > topicIdPartition, > > > > > > > int > > > > > > > > > > leaderEpoch) does return data in offset order. However, > the > > > > other > > > > > > > > > > one listRemoteLogSegments(TopicIdPartition > > topicIdPartition) > > > > > > doesn't > > > > > > > > > > specify the return order. I assume that you need the > latter > > > to > > > > > > > calculate > > > > > > > > > > the segment size? > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > On Thu, Nov 10, 2022 at 10:25 AM Divij Vaidya < > > > > > > > divijvaidy...@gmail.com> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > *Jun,* > > > > > > > > > > > > > > > > > > > > > > *"the default implementation of RLMM does local > caching, > > > > > right?"* > > > > > > > > > > > Yes, Jun. The default implementation of RLMM does > indeed > > > > cache > > > > > > the > > > > > > > > > > segment > > > > > > > > > > > metadata today, hence, it won't work for use cases when > > the > > > > > > number > > > > > > > of > > > > > > > > > > > segments in remote storage is large enough to exceed > the > > > size > > > > > of > > > > > > > cache. > > > > > > > > > > As > > > > > > > > > > > part of this KIP, I will implement the new proposed API > > in > > > > the > > > > > > > default > > > > > > > > > > > implementation of RLMM but the underlying > implementation > > > will > > > > > > > still be > > > > > > > > > a > > > > > > > > > > > scan. I will pick up optimizing that in a separate PR. > > > > > > > > > > > > > > > > > > > > > > *"we also cache all segment metadata in the brokers > > without > > > > > > > KIP-405. Do > > > > > > > > > > you > > > > > > > > > > > see a need to change that?"* > > > > > > > > > > > Please correct me if I am wrong here but we cache > > metadata > > > > for > > > > > > > segments > > > > > > > > > > > "residing in local storage". The size of the current > > cache > > > > > works > > > > > > > fine > > > > > > > > > for > > > > > > > > > > > the scale of the number of segments that we expect to > > store > > > > in > > > > > > > local > > > > > > > > > > > storage. After KIP-405, that cache will continue to > store > > > > > > metadata > > > > > > > for > > > > > > > > > > > segments which are residing in local storage and hence, > > we > > > > > don't > > > > > > > need > > > > > > > > > to > > > > > > > > > > > change that. For segments which have been offloaded to > > > remote > > > > > > > storage, > > > > > > > > > it > > > > > > > > > > > would rely on RLMM. Note that the scale of data stored > in > > > > RLMM > > > > > is > > > > > > > > > > different > > > > > > > > > > > from local cache because the number of segments is > > expected > > > > to > > > > > be > > > > > > > much > > > > > > > > > > > larger than what current implementation stores in local > > > > > storage. > > > > > > > > > > > > > > > > > > > > > > 2,3,4: RemoteLogMetadataManager.listRemoteLogSegments() > > > does > > > > > > > specify > > > > > > > > > the > > > > > > > > > > > order i.e. it returns the segments sorted by first > offset > > > in > > > > > > > ascending > > > > > > > > > > > order. I am copying the API docs for KIP-405 here for > > your > > > > > > > reference > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *Returns iterator of remote log segment metadata, > sorted > > by > > > > > > {@link > > > > > > > > > > > RemoteLogSegmentMetadata#startOffset()} inascending > order > > > > which > > > > > > > > > contains > > > > > > > > > > > the given leader epoch. This is used by remote log > > > retention > > > > > > > management > > > > > > > > > > > subsystemto fetch the segment metadata for a given > leader > > > > > > > epoch.@param > > > > > > > > > > > topicIdPartition topic partition@param leaderEpoch > > > > leader > > > > > > > > > > > epoch@return > > > > > > > > > > > Iterator of remote segments, sorted by start offset in > > > > > ascending > > > > > > > > > order. * > > > > > > > > > > > > > > > > > > > > > > *Luke,* > > > > > > > > > > > > > > > > > > > > > > 5. Note that we are trying to optimize the efficiency > of > > > size > > > > > > based > > > > > > > > > > > retention for remote storage. KIP-405 does not > introduce > > a > > > > new > > > > > > > config > > > > > > > > > for > > > > > > > > > > > periodically checking remote similar to > > > > > > > > > log.retention.check.interval.ms > > > > > > > > > > > which is applicable for remote storage. Hence, the > metric > > > > will > > > > > be > > > > > > > > > updated > > > > > > > > > > > at the time of invoking log retention check for remote > > tier > > > > > which > > > > > > > is > > > > > > > > > > > pending implementation today. We can perhaps come back > > and > > > > > update > > > > > > > the > > > > > > > > > > > metric description after the implementation of log > > > retention > > > > > > check > > > > > > > in > > > > > > > > > > > RemoteLogManager. > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > Divij Vaidya > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Nov 10, 2022 at 6:16 AM Luke Chen < > > > show...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi Divij, > > > > > > > > > > > > > > > > > > > > > > > > One more question about the metric: > > > > > > > > > > > > I think the metric will be updated when > > > > > > > > > > > > (1) each time we run the log retention check (that > is, > > > > > > > > > > > > log.retention.check.interval.ms) > > > > > > > > > > > > (2) When user explicitly call getRemoteLogSize > > > > > > > > > > > > > > > > > > > > > > > > Is that correct? > > > > > > > > > > > > Maybe we should add a note in metric description, > > > > otherwise, > > > > > > when > > > > > > > > > user > > > > > > > > > > > got, > > > > > > > > > > > > let's say 0 of RemoteLogSizeBytes, will be surprised. > > > > > > > > > > > > > > > > > > > > > > > > Otherwise, LGTM > > > > > > > > > > > > > > > > > > > > > > > > Thank you for the KIP > > > > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Nov 10, 2022 at 2:55 AM Jun Rao > > > > > > <j...@confluent.io.invalid > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi, Divij, > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the explanation. > > > > > > > > > > > > > > > > > > > > > > > > > > 1. Hmm, the default implementation of RLMM does > local > > > > > > caching, > > > > > > > > > right? > > > > > > > > > > > > > Currently, we also cache all segment metadata in > the > > > > > brokers > > > > > > > > > without > > > > > > > > > > > > > KIP-405. Do you see a need to change that? > > > > > > > > > > > > > > > > > > > > > > > > > > 2,3,4: Yes, your explanation makes sense. However, > > > > > > > > > > > > > currently, > > > > RemoteLogMetadataManager.listRemoteLogSegments() > > > > > > > doesn't > > > > > > > > > > > > specify > > > > > > > > > > > > > a particular order of the iterator. Do you intend > to > > > > change > > > > > > > that? > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Nov 8, 2022 at 3:31 AM Divij Vaidya < > > > > > > > > > divijvaidy...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you for your comments. > > > > > > > > > > > > > > > > > > > > > > > > > > > > *1. "RLMM implementor could ensure that > > > > > > > listRemoteLogSegments() > > > > > > > > > is > > > > > > > > > > > > fast"* > > > > > > > > > > > > > > This would be ideal but pragmatically, it is > > > difficult > > > > to > > > > > > > ensure > > > > > > > > > > that > > > > > > > > > > > > > > listRemoteLogSegments() is fast. This is because > of > > > the > > > > > > > > > possibility > > > > > > > > > > > of > > > > > > > > > > > > a > > > > > > > > > > > > > > large number of segments (much larger than what > > Kafka > > > > > > > currently > > > > > > > > > > > handles > > > > > > > > > > > > > > with local storage today) would make it > infeasible > > to > > > > > adopt > > > > > > > > > > > strategies > > > > > > > > > > > > > such > > > > > > > > > > > > > > as local caching to improve the performance of > > > > > > > > > > listRemoteLogSegments. > > > > > > > > > > > > > Apart > > > > > > > > > > > > > > from caching (which won't work due to size > > > > limitations) I > > > > > > > can't > > > > > > > > > > think > > > > > > > > > > > > of > > > > > > > > > > > > > > other strategies which may eliminate the need for > > IO > > > > > > > > > > > > > > operations proportional to the number of total > > > > segments. > > > > > > > Please > > > > > > > > > > > advise > > > > > > > > > > > > if > > > > > > > > > > > > > > you have something in mind. > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. "*If the size exceeds the retention size, we > > need > > > > to > > > > > > > > > determine > > > > > > > > > > > the > > > > > > > > > > > > > > subset of segments to delete to bring the size > > within > > > > the > > > > > > > > > retention > > > > > > > > > > > > > limit. > > > > > > > > > > > > > > Do we need to call > > > > > > > > > RemoteLogMetadataManager.listRemoteLogSegments() > > > > > > > > > > > to > > > > > > > > > > > > > > determine that?"* > > > > > > > > > > > > > > Yes, we need to call listRemoteLogSegments() to > > > > determine > > > > > > > which > > > > > > > > > > > > segments > > > > > > > > > > > > > > should be deleted. But there is a difference with > > the > > > > use > > > > > > > case we > > > > > > > > > > are > > > > > > > > > > > > > > trying to optimize with this KIP. To determine > the > > > > subset > > > > > > of > > > > > > > > > > segments > > > > > > > > > > > > > which > > > > > > > > > > > > > > would be deleted, we only read metadata for > > segments > > > > > which > > > > > > > would > > > > > > > > > be > > > > > > > > > > > > > deleted > > > > > > > > > > > > > > via the listRemoteLogSegments(). But to determine > > the > > > > > > > > > totalLogSize, > > > > > > > > > > > > which > > > > > > > > > > > > > > is required every time retention logic based on > > size > > > > > > > executes, we > > > > > > > > > > > read > > > > > > > > > > > > > > metadata of *all* the segments in remote storage. > > > > Hence, > > > > > > the > > > > > > > > > number > > > > > > > > > > > of > > > > > > > > > > > > > > results returned by > > > > > > > > > > *RemoteLogMetadataManager.listRemoteLogSegments() > > > > > > > > > > > > *is > > > > > > > > > > > > > > different when we are calculating totalLogSize > vs. > > > when > > > > > we > > > > > > > are > > > > > > > > > > > > > determining > > > > > > > > > > > > > > the subset of segments to delete. > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3. > > > > > > > > > > > > > > *"Also, what about time-based retention? To make > > that > > > > > > > efficient, > > > > > > > > > do > > > > > > > > > > > we > > > > > > > > > > > > > need > > > > > > > > > > > > > > to make some additional interface changes?"*No. > > Note > > > > that > > > > > > > time > > > > > > > > > > > > complexity > > > > > > > > > > > > > > to determine the segments for retention is > > different > > > > for > > > > > > time > > > > > > > > > based > > > > > > > > > > > vs. > > > > > > > > > > > > > > size based. For time based, the time complexity > is > > a > > > > > > > function of > > > > > > > > > > the > > > > > > > > > > > > > number > > > > > > > > > > > > > > of segments which are "eligible for deletion" > > (since > > > we > > > > > > only > > > > > > > read > > > > > > > > > > > > > metadata > > > > > > > > > > > > > > for segments which would be deleted) whereas in > > size > > > > > based > > > > > > > > > > retention, > > > > > > > > > > > > the > > > > > > > > > > > > > > time complexity is a function of "all segments" > > > > available > > > > > > in > > > > > > > > > remote > > > > > > > > > > > > > storage > > > > > > > > > > > > > > (metadata of all segments needs to be read to > > > calculate > > > > > the > > > > > > > total > > > > > > > > > > > > size). > > > > > > > > > > > > > As > > > > > > > > > > > > > > you may observe, this KIP will bring the time > > > > complexity > > > > > > for > > > > > > > both > > > > > > > > > > > time > > > > > > > > > > > > > > based retention & size based retention to the > same > > > > > > function. > > > > > > > > > > > > > > > > > > > > > > > > > > > > 4. Also, please note that this new API introduced > > in > > > > this > > > > > > KIP > > > > > > > > > also > > > > > > > > > > > > > enables > > > > > > > > > > > > > > us to provide a metric for total size of data > > stored > > > in > > > > > > > remote > > > > > > > > > > > storage. > > > > > > > > > > > > > > Without the API, calculation of this metric will > > > become > > > > > > very > > > > > > > > > > > expensive > > > > > > > > > > > > > with > > > > > > > > > > > > > > *listRemoteLogSegments().* > > > > > > > > > > > > > > I understand that your motivation here is to > avoid > > > > > > polluting > > > > > > > the > > > > > > > > > > > > > interface > > > > > > > > > > > > > > with optimization specific APIs and I will agree > > with > > > > > that > > > > > > > goal. > > > > > > > > > > But > > > > > > > > > > > I > > > > > > > > > > > > > > believe that this new API proposed in the KIP > > brings > > > in > > > > > > > > > significant > > > > > > > > > > > > > > improvement and there is no other work around > > > available > > > > > to > > > > > > > > > achieve > > > > > > > > > > > the > > > > > > > > > > > > > same > > > > > > > > > > > > > > performance. > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > Divij Vaidya > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Nov 8, 2022 at 12:12 AM Jun Rao > > > > > > > <j...@confluent.io.invalid > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, Divij, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP. Sorry for the late reply. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The motivation of the KIP is to improve the > > > > efficiency > > > > > of > > > > > > > size > > > > > > > > > > > based > > > > > > > > > > > > > > > retention. I am not sure the proposed changes > are > > > > > enough. > > > > > > > For > > > > > > > > > > > > example, > > > > > > > > > > > > > if > > > > > > > > > > > > > > > the size exceeds the retention size, we need to > > > > > determine > > > > > > > the > > > > > > > > > > > subset > > > > > > > > > > > > of > > > > > > > > > > > > > > > segments to delete to bring the size within the > > > > > retention > > > > > > > > > limit. > > > > > > > > > > Do > > > > > > > > > > > > we > > > > > > > > > > > > > > need > > > > > > > > > > > > > > > to call > > > > > RemoteLogMetadataManager.listRemoteLogSegments() > > > > > > to > > > > > > > > > > > determine > > > > > > > > > > > > > > that? > > > > > > > > > > > > > > > Also, what about time-based retention? To make > > that > > > > > > > efficient, > > > > > > > > > do > > > > > > > > > > > we > > > > > > > > > > > > > need > > > > > > > > > > > > > > > to make some additional interface changes? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > An alternative approach is for the RLMM > > implementor > > > > to > > > > > > make > > > > > > > > > sure > > > > > > > > > > > > > > > that > > > RemoteLogMetadataManager.listRemoteLogSegments() > > > > > is > > > > > > > fast > > > > > > > > > > > (e.g., > > > > > > > > > > > > > with > > > > > > > > > > > > > > > local caching). This way, we could keep the > > > interface > > > > > > > simple. > > > > > > > > > > Have > > > > > > > > > > > we > > > > > > > > > > > > > > > considered that? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Sep 28, 2022 at 6:28 AM Divij Vaidya < > > > > > > > > > > > > divijvaidy...@gmail.com> > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey folks > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Does anyone else have any thoughts on this > > > before I > > > > > > > propose > > > > > > > > > > this > > > > > > > > > > > > for > > > > > > > > > > > > > a > > > > > > > > > > > > > > > > vote? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > Divij Vaidya > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Sep 5, 2022 at 12:57 PM Satish > Duggana > > < > > > > > > > > > > > > > > satish.dugg...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP Divij! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This is a nice improvement to avoid > > > recalculation > > > > > of > > > > > > > size. > > > > > > > > > > > > > Customized > > > > > > > > > > > > > > > > RLMMs > > > > > > > > > > > > > > > > > can implement the best possible approach by > > > > caching > > > > > > or > > > > > > > > > > > > maintaining > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > size > > > > > > > > > > > > > > > > > in an efficient way. But this is not a big > > > > concern > > > > > > for > > > > > > > the > > > > > > > > > > > > default > > > > > > > > > > > > > > > topic > > > > > > > > > > > > > > > > > based RLMM as mentioned in the KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > ~Satish. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, 13 Jul 2022 at 18:48, Divij Vaidya > < > > > > > > > > > > > > > divijvaidy...@gmail.com> > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you for your review Luke. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Reg: is that would the new > > > > `RemoteLogSizeBytes` > > > > > > > metric > > > > > > > > > > be a > > > > > > > > > > > > > > > > performance > > > > > > > > > > > > > > > > > > overhead? Although we move the > calculation > > > to a > > > > > > > seperate > > > > > > > > > > API, > > > > > > > > > > > > we > > > > > > > > > > > > > > > still > > > > > > > > > > > > > > > > > > can't assume users will implement a > > > > light-weight > > > > > > > method, > > > > > > > > > > > right? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This metric would be logged using the > > > > information > > > > > > > that is > > > > > > > > > > > > already > > > > > > > > > > > > > > > being > > > > > > > > > > > > > > > > > > calculated for handling remote retention > > > logic, > > > > > > > hence, no > > > > > > > > > > > > > > additional > > > > > > > > > > > > > > > > work > > > > > > > > > > > > > > > > > > is required to calculate this metric. > More > > > > > > > specifically, > > > > > > > > > > > > whenever > > > > > > > > > > > > > > > > > > RemoteLogManager calls getRemoteLogSize > > API, > > > > this > > > > > > > metric > > > > > > > > > > > would > > > > > > > > > > > > be > > > > > > > > > > > > > > > > > captured. > > > > > > > > > > > > > > > > > > This API call is made every time > > > > RemoteLogManager > > > > > > > wants > > > > > > > > > to > > > > > > > > > > > > handle > > > > > > > > > > > > > > > > expired > > > > > > > > > > > > > > > > > > remote log segments (which should be > > > periodic). > > > > > > Does > > > > > > > that > > > > > > > > > > > > address > > > > > > > > > > > > > > > your > > > > > > > > > > > > > > > > > > concern? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Divij Vaidya > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 12, 2022 at 11:01 AM Luke > Chen > > < > > > > > > > > > > > show...@gmail.com> > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Divij, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think it makes sense to delegate the > > > > > > > responsibility > > > > > > > > > of > > > > > > > > > > > > > > > calculation > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > specific RemoteLogMetadataManager > > > > > implementation. > > > > > > > > > > > > > > > > > > > But one thing I'm not quite sure, is > that > > > > would > > > > > > > the new > > > > > > > > > > > > > > > > > > > `RemoteLogSizeBytes` metric be a > > > performance > > > > > > > overhead? > > > > > > > > > > > > > > > > > > > Although we move the calculation to a > > > > seperate > > > > > > > API, we > > > > > > > > > > > still > > > > > > > > > > > > > > can't > > > > > > > > > > > > > > > > > assume > > > > > > > > > > > > > > > > > > > users will implement a light-weight > > method, > > > > > > right? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you. > > > > > > > > > > > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jul 1, 2022 at 5:47 PM Divij > > > Vaidya < > > > > > > > > > > > > > > > divijvaidy...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-852%3A+Optimize+calculation+of+size+for+log+in+remote+tier > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey folks > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Please take a look at this KIP which > > > > proposes > > > > > > an > > > > > > > > > > > extension > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > KIP-405. > > > > > > > > > > > > > > > > > > > This > > > > > > > > > > > > > > > > > > > > is my first KIP with Apache Kafka > > > community > > > > > so > > > > > > > any > > > > > > > > > > > feedback > > > > > > > > > > > > > > would > > > > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > > > > highly > > > > > > > > > > > > > > > > > > > > appreciated. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > Divij Vaidya > > > > > > > > > > > > > > > > > > > > Sr. Software Engineer > > > > > > > > > > > > > > > > > > > > Amazon > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >