Hi, Divij, Thanks for the explanation.
Good question. Hi, Satish, Could you explain how you exposed the log size in your KIP-405 implementation? Thanks, Jun On Tue, Dec 20, 2022 at 4:59 AM Divij Vaidya <divijvaidy...@gmail.com> wrote: > Hey Jun > > Yes, it is possible to maintain the log size in the cache (see rejected > alternative#3 in the KIP) but I did not understand how it is possible to > retrieve it without the new API. The log size could be calculated on > startup by scanning through the segments (though I would disagree that this > is the right approach since scanning itself takes order of minutes and > hence delay the start of archive process), and incrementally maintained > afterwards, even then, we would need an API in RemoteLogMetadataManager so > that RLM could fetch the cached size! > > If we wish to cache the size without adding a new API, then we need to > cache the size in RLM itself (instead of RLMM implementation) and > incrementally manage it. The downside of longer archive time at startup > (due to initial scale) still remains valid in this situation. > > -- > Divij Vaidya > > > > On Fri, Dec 16, 2022 at 12:43 AM Jun Rao <j...@confluent.io.invalid> wrote: > > > Hi, Divij, > > > > Thanks for the explanation. > > > > If there is in-memory cache, could we maintain the log size in the cache > > with the existing API? For example, a replica could make a > > listRemoteLogSegments(TopicIdPartition topicIdPartition) call on startup > to > > get the remote segment size before the current leaderEpoch. The leader > > could then maintain the size incrementally afterwards. On leader change, > > other replicas can make a listRemoteLogSegments(TopicIdPartition > > topicIdPartition, int leaderEpoch) call to get the size of newly > generated > > segments. > > > > Thanks, > > > > Jun > > > > > > On Wed, Dec 14, 2022 at 3:27 AM Divij Vaidya <divijvaidy...@gmail.com> > > wrote: > > > > > > Is the new method enough for doing size-based retention? > > > > > > Yes. You are right in assuming that this API only provides the Remote > > > storage size (for current epoch chain). We would use this API for size > > > based retention along with a value of localOnlyLogSegmentSize which is > > > computed as Log.sizeInBytes(logSegments.filter(_.baseOffset > > > > highestOffsetWithRemoteIndex)). Hence, (total_log_size = > > > remoteLogSizeBytes + log.localOnlyLogSegmentSize). I have updated the > KIP > > > with this information. You can also check an example implementation at > > > > > > > > > https://github.com/satishd/kafka/blob/2.8.x-tiered-storage/core/src/main/scala/kafka/log/Log.scala#L2077 > > > > > > > > > > Do you imagine all accesses to remote metadata will be across the > > network > > > or will there be some local in-memory cache? > > > > > > I would expect a disk-less implementation to maintain a finite > in-memory > > > cache for segment metadata to optimize the number of network calls made > > to > > > fetch the data. In future, we can think about bringing this finite size > > > cache into RLM itself but that's probably a conversation for a > different > > > KIP. There are many other things we would like to do to optimize the > > Tiered > > > storage interface such as introducing a circular buffer / streaming > > > interface from RSM (so that we don't have to wait to fetch the entire > > > segment before starting to send records to the consumer), caching the > > > segments fetched from RSM locally (I would assume all RSM plugin > > > implementations to do this, might as well add it to RLM) etc. > > > > > > -- > > > Divij Vaidya > > > > > > > > > > > > On Mon, Dec 12, 2022 at 7:35 PM Jun Rao <j...@confluent.io.invalid> > > wrote: > > > > > > > Hi, Divij, > > > > > > > > Thanks for the reply. > > > > > > > > Is the new method enough for doing size-based retention? It gives the > > > total > > > > size of the remote segments, but it seems that we still don't know > the > > > > exact total size for a log since there could be overlapping segments > > > > between the remote and the local segments. > > > > > > > > You mentioned a disk-less implementation. Do you imagine all accesses > > to > > > > remote metadata will be across the network or will there be some > local > > > > in-memory cache? > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > > > > > > > > > On Wed, Dec 7, 2022 at 3:10 AM Divij Vaidya <divijvaidy...@gmail.com > > > > > > wrote: > > > > > > > > > The method is needed for RLMM implementations which fetch the > > > information > > > > > over the network and not for the disk based implementations (such > as > > > the > > > > > default topic based RLMM). > > > > > > > > > > I would argue that adding this API makes the interface more generic > > > than > > > > > what it is today. This is because, with the current APIs an > > implementor > > > > is > > > > > restricted to use disk based RLMM solutions only (i.e. the default > > > > > solution) whereas if we add this new API, we unblock usage of > network > > > > based > > > > > RLMM implementations such as databases. > > > > > > > > > > > > > > > > > > > > On Wed 30. Nov 2022 at 20:40, Jun Rao <j...@confluent.io.invalid> > > > wrote: > > > > > > > > > > > Hi, Divij, > > > > > > > > > > > > Thanks for the reply. > > > > > > > > > > > > Point#2. My high level question is that is the new method needed > > for > > > > > every > > > > > > implementation of remote storage or just for a specific > > > implementation. > > > > > The > > > > > > issues that you pointed out exist for the default implementation > of > > > > RLMM > > > > > as > > > > > > well and so far, the default implementation hasn't found a need > > for a > > > > > > similar new method. For public interface, ideally we want to make > > it > > > > more > > > > > > general. > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jun > > > > > > > > > > > > On Mon, Nov 21, 2022 at 7:11 AM Divij Vaidya < > > > divijvaidy...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Thank you Jun and Alex for your comments. > > > > > > > > > > > > > > Point#1: You are right Jun. As Alex mentioned, the "derived > > > metadata" > > > > > can > > > > > > > increase the size of cached metadata by a factor of 10 but it > > > should > > > > be > > > > > > ok > > > > > > > to cache just the actual metadata. My point about size being a > > > > > limitation > > > > > > > for using cache is not valid anymore. > > > > > > > > > > > > > > Point#2: For a new replica, it would still have to fetch the > > > metadata > > > > > > over > > > > > > > the network to initiate the warm up of the cache and hence, > > > increase > > > > > the > > > > > > > start time of the archival process. Please also note the > > > > repercussions > > > > > of > > > > > > > the warm up scan that Alex mentioned in this thread as part of > > > > #102.2. > > > > > > > > > > > > > > 100#: Agreed Alex. Thanks for clarifying that. My point about > > size > > > > > being > > > > > > a > > > > > > > limitation for using cache is not valid anymore. > > > > > > > > > > > > > > 101#: Alex, if I understand correctly, you are suggesting to > > cache > > > > the > > > > > > > total size at the leader and update it on archival. This > wouldn't > > > > work > > > > > > for > > > > > > > cases when the leader restarts where we would have to make a > full > > > > scan > > > > > > > to update the total size entry on startup. We expect users to > > store > > > > > data > > > > > > > over longer duration in remote storage which increases the > > > likelihood > > > > > of > > > > > > > leader restarts / failovers. > > > > > > > > > > > > > > 102#.1: I don't think that the current design accommodates the > > fact > > > > > that > > > > > > > data corruption could happen at the RLMM plugin (we don't have > > > > checksum > > > > > > as > > > > > > > a field in metadata as part of KIP405). If data corruption > > occurs, > > > w/ > > > > > or > > > > > > > w/o the cache, it would be a different problem to solve. I > would > > > like > > > > > to > > > > > > > keep this outside the scope of this KIP. > > > > > > > > > > > > > > 102#.2: Agree. This remains as the main concern for using the > > cache > > > > to > > > > > > > fetch total size. > > > > > > > > > > > > > > Regards, > > > > > > > Divij Vaidya > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Nov 18, 2022 at 12:59 PM Alexandre Dupriez < > > > > > > > alexandre.dupr...@gmail.com> wrote: > > > > > > > > > > > > > > > Hi Divij, > > > > > > > > > > > > > > > > Thanks for the KIP. Please find some comments based on what I > > > read > > > > on > > > > > > > > this thread so far - apologies for the repeats and the late > > > reply. > > > > > > > > > > > > > > > > If I understand correctly, one of the main elements of > > discussion > > > > is > > > > > > > > about caching in Kafka versus delegation of providing the > > remote > > > > size > > > > > > > > of a topic-partition to the plugin. > > > > > > > > > > > > > > > > A few comments: > > > > > > > > > > > > > > > > 100. The size of the “derived metadata” which is managed by > the > > > > > plugin > > > > > > > > to represent an rlmMetadata can indeed be close to 1 kB on > > > average > > > > > > > > depending on its own internal structure, e.g. the redundancy > it > > > > > > > > enforces (unfortunately resulting to duplication), additional > > > > > > > > information such as checksums and primary and secondary > > indexable > > > > > > > > keys. But indeed, the rlmMetadata is itself a lighter data > > > > structure > > > > > > > > by a factor of 10. And indeed, instead of caching the > “derived > > > > > > > > metadata”, only the rlmMetadata could be, which should > address > > > the > > > > > > > > concern regarding the memory occupancy of the cache. > > > > > > > > > > > > > > > > 101. I am not sure I fully understand why we would need to > > cache > > > > the > > > > > > > > list of rlmMetadata to retain the remote size of a > > > topic-partition. > > > > > > > > Since the leader of a topic-partition is, in non-degenerated > > > cases, > > > > > > > > the only actor which can mutate the remote part of the > > > > > > > > topic-partition, hence its size, it could in theory only > cache > > > the > > > > > > > > size of the remote log once it has calculated it? In which > case > > > > there > > > > > > > > would not be any problem regarding the size of the caching > > > > strategy. > > > > > > > > Did I miss something there? > > > > > > > > > > > > > > > > 102. There may be a few challenges to consider with caching: > > > > > > > > > > > > > > > > 102.1) As mentioned above, the caching strategy assumes no > > > mutation > > > > > > > > outside the lifetime of a leader. While this is true in the > > > normal > > > > > > > > course of operation, there could be accidental mutation > outside > > > of > > > > > the > > > > > > > > leader and a loss of consistency between the cached state and > > the > > > > > > > > actual remote representation of the log. E.g. split-brain > > > > scenarios, > > > > > > > > bugs in the plugins, bugs in external systems with mutating > > > access > > > > on > > > > > > > > the derived metadata. In the worst case, a drift between the > > > cached > > > > > > > > size and the actual size could lead to over-deleting remote > > data > > > > > which > > > > > > > > is a durability risk. > > > > > > > > > > > > > > > > The alternative you propose, by making the plugin the source > of > > > > truth > > > > > > > > w.r.t. to the size of the remote log, can make it easier to > > avoid > > > > > > > > inconsistencies between plugin-managed metadata and the > remote > > > log > > > > > > > > from the perspective of Kafka. On the other hand, plugin > > vendors > > > > > would > > > > > > > > have to implement it with the expected efficiency to have it > > > yield > > > > > > > > benefits. > > > > > > > > > > > > > > > > 102.2) As you mentioned, the caching strategy in Kafka would > > > still > > > > > > > > require one iteration over the list of rlmMetadata when the > > > > > leadership > > > > > > > > of a topic-partition is assigned to a broker, while the > plugin > > > can > > > > > > > > offer alternative constant-time approaches. This calculation > > > cannot > > > > > be > > > > > > > > put on the LeaderAndIsr path and would be performed in the > > > > > background. > > > > > > > > In case of bulk leadership migration, listing the rlmMetadata > > > could > > > > > a) > > > > > > > > result in request bursts to any backend system the plugin may > > use > > > > > > > > [which shouldn’t be a problem for high-throughput data stores > > but > > > > > > > > could have cost implications] b) increase utilisation > timespan > > of > > > > the > > > > > > > > RLM threads for these calculations potentially leading to > > > transient > > > > > > > > starvation of tasks queued for, typically, offloading > > operations > > > c) > > > > > > > > could have a non-marginal CPU footprint on hardware with > strict > > > > > > > > resource constraints. All these elements could have an impact > > to > > > > some > > > > > > > > degree depending on the operational environment. > > > > > > > > > > > > > > > > From a design perspective, one question is where we want the > > > source > > > > > of > > > > > > > > truth w.r.t. remote log size to be during the lifetime of a > > > leader. > > > > > > > > The responsibility of maintaining a consistent representation > > of > > > > the > > > > > > > > remote log is shared by Kafka and the plugin. Which system is > > > best > > > > > > > > placed to maintain such a state while providing the highest > > > > > > > > consistency guarantees is something both Kafka and plugin > > > designers > > > > > > > > could help understand better. > > > > > > > > > > > > > > > > Many thanks, > > > > > > > > Alexandre > > > > > > > > > > > > > > > > > > > > > > > > Le jeu. 17 nov. 2022 à 19:27, Jun Rao > <j...@confluent.io.invalid > > > > > > a > > > > > > > écrit : > > > > > > > > > > > > > > > > > > Hi, Divij, > > > > > > > > > > > > > > > > > > Thanks for the reply. > > > > > > > > > > > > > > > > > > Point #1. Is the average remote segment metadata really > 1KB? > > > > What's > > > > > > > > listed > > > > > > > > > in the public interface is probably well below 100 bytes. > > > > > > > > > > > > > > > > > > Point #2. I guess you are assuming that each broker only > > caches > > > > the > > > > > > > > remote > > > > > > > > > segment metadata in memory. An alternative approach is to > > cache > > > > > them > > > > > > in > > > > > > > > > both memory and local disk. That way, on broker restart, > you > > > just > > > > > > need > > > > > > > to > > > > > > > > > fetch the new remote segments' metadata using the > > > > > > > > > listRemoteLogSegments(TopicIdPartition topicIdPartition, > int > > > > > > > leaderEpoch) > > > > > > > > > api. Will that work? > > > > > > > > > > > > > > > > > > Point #3. Thanks for the explanation and it sounds good. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > On Thu, Nov 17, 2022 at 7:31 AM Divij Vaidya < > > > > > > divijvaidy...@gmail.com> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Jun > > > > > > > > > > > > > > > > > > > > There are three points that I would like to present here: > > > > > > > > > > > > > > > > > > > > 1. We would require a large cache size to efficiently > cache > > > all > > > > > > > segment > > > > > > > > > > metadata. > > > > > > > > > > 2. Linear scan of all metadata at broker startup to > > populate > > > > the > > > > > > > cache > > > > > > > > will > > > > > > > > > > be slow and will impact the archival process. > > > > > > > > > > 3. There is no other use case where a full scan of > segment > > > > > metadata > > > > > > > is > > > > > > > > > > required. > > > > > > > > > > > > > > > > > > > > Let's start by quantifying 1. Here's my estimate for the > > size > > > > of > > > > > > the > > > > > > > > cache. > > > > > > > > > > Average size of segment metadata = 1KB. This could be > more > > if > > > > we > > > > > > have > > > > > > > > > > frequent leader failover with a large number of leader > > epochs > > > > > being > > > > > > > > stored > > > > > > > > > > per segment. > > > > > > > > > > Segment size = 100MB. Users will prefer to reduce the > > segment > > > > > size > > > > > > > > from the > > > > > > > > > > default value of 1GB to ensure timely archival of data > > since > > > > data > > > > > > > from > > > > > > > > > > active segment is not archived. > > > > > > > > > > Cache size = num segments * avg. segment metadata size = > > > > > > > > (100TB/100MB)*1KB > > > > > > > > > > = 1GB. > > > > > > > > > > While 1GB for cache may not sound like a large number for > > > > larger > > > > > > > > machines, > > > > > > > > > > it does eat into the memory as an additional cache and > > makes > > > > use > > > > > > > cases > > > > > > > > with > > > > > > > > > > large data retention with low throughout expensive (where > > > such > > > > > use > > > > > > > case > > > > > > > > > > would could use smaller machines). > > > > > > > > > > > > > > > > > > > > About point#2: > > > > > > > > > > Even if we say that all segment metadata can fit into the > > > > cache, > > > > > we > > > > > > > > will > > > > > > > > > > need to populate the cache on broker startup. It would > not > > be > > > > in > > > > > > the > > > > > > > > > > critical patch of broker startup and hence won't impact > the > > > > > startup > > > > > > > > time. > > > > > > > > > > But it will impact the time when we could start the > > archival > > > > > > process > > > > > > > > since > > > > > > > > > > the RLM thread pool will be blocked on the first call to > > > > > > > > > > listRemoteLogSegments(). To scan metadata for 1MM > segments > > > > > > (computed > > > > > > > > above) > > > > > > > > > > and transfer 1GB data over the network from a RLMM such > as > > a > > > > > remote > > > > > > > > > > database would be in the order of minutes (depending on > how > > > > > > efficient > > > > > > > > the > > > > > > > > > > scan is with the RLMM implementation). Although, I would > > > > concede > > > > > > that > > > > > > > > > > having RLM threads blocked for a few minutes is perhaps > OK > > > but > > > > if > > > > > > we > > > > > > > > > > introduce the new API proposed in the KIP, we would have > a > > > > > > > > > > deterministic startup time for RLM. Adding the API comes > > at a > > > > low > > > > > > > cost > > > > > > > > and > > > > > > > > > > I believe the trade off is worth it. > > > > > > > > > > > > > > > > > > > > About point#3: > > > > > > > > > > We can use listRemoteLogSegments(TopicIdPartition > > > > > topicIdPartition, > > > > > > > int > > > > > > > > > > leaderEpoch) to calculate the segments eligible for > > deletion > > > > > (based > > > > > > > on > > > > > > > > size > > > > > > > > > > retention) where leader epoch(s) belong to the current > > leader > > > > > epoch > > > > > > > > chain. > > > > > > > > > > I understand that it may lead to segments belonging to > > other > > > > > epoch > > > > > > > > lineage > > > > > > > > > > not getting deleted and would require a separate > mechanism > > to > > > > > > delete > > > > > > > > them. > > > > > > > > > > The separate mechanism would anyways be required to > delete > > > > these > > > > > > > > "leaked" > > > > > > > > > > segments as there are other cases which could lead to > leaks > > > > such > > > > > as > > > > > > > > network > > > > > > > > > > problems with RSM mid way writing through. segment etc. > > > > > > > > > > > > > > > > > > > > Thank you for the replies so far. They have made me > > re-think > > > my > > > > > > > > assumptions > > > > > > > > > > and this dialogue has been very constructive for me. > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > Divij Vaidya > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Nov 10, 2022 at 10:49 PM Jun Rao > > > > > <j...@confluent.io.invalid > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi, Divij, > > > > > > > > > > > > > > > > > > > > > > Thanks for the reply. > > > > > > > > > > > > > > > > > > > > > > It's true that the data in Kafka could be kept longer > > with > > > > > > KIP-405. > > > > > > > > How > > > > > > > > > > > much data do you envision to have per broker? For 100TB > > > data > > > > > per > > > > > > > > broker, > > > > > > > > > > > with 1GB segment and segment metadata of 100 bytes, it > > > > requires > > > > > > > > > > > 100TB/1GB*100 = 10MB, which should fit in memory. > > > > > > > > > > > > > > > > > > > > > > RemoteLogMetadataManager has two > listRemoteLogSegments() > > > > > methods. > > > > > > > > The one > > > > > > > > > > > you listed listRemoteLogSegments(TopicIdPartition > > > > > > topicIdPartition, > > > > > > > > int > > > > > > > > > > > leaderEpoch) does return data in offset order. However, > > the > > > > > other > > > > > > > > > > > one listRemoteLogSegments(TopicIdPartition > > > topicIdPartition) > > > > > > > doesn't > > > > > > > > > > > specify the return order. I assume that you need the > > latter > > > > to > > > > > > > > calculate > > > > > > > > > > > the segment size? > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > On Thu, Nov 10, 2022 at 10:25 AM Divij Vaidya < > > > > > > > > divijvaidy...@gmail.com> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > *Jun,* > > > > > > > > > > > > > > > > > > > > > > > > *"the default implementation of RLMM does local > > caching, > > > > > > right?"* > > > > > > > > > > > > Yes, Jun. The default implementation of RLMM does > > indeed > > > > > cache > > > > > > > the > > > > > > > > > > > segment > > > > > > > > > > > > metadata today, hence, it won't work for use cases > when > > > the > > > > > > > number > > > > > > > > of > > > > > > > > > > > > segments in remote storage is large enough to exceed > > the > > > > size > > > > > > of > > > > > > > > cache. > > > > > > > > > > > As > > > > > > > > > > > > part of this KIP, I will implement the new proposed > API > > > in > > > > > the > > > > > > > > default > > > > > > > > > > > > implementation of RLMM but the underlying > > implementation > > > > will > > > > > > > > still be > > > > > > > > > > a > > > > > > > > > > > > scan. I will pick up optimizing that in a separate > PR. > > > > > > > > > > > > > > > > > > > > > > > > *"we also cache all segment metadata in the brokers > > > without > > > > > > > > KIP-405. Do > > > > > > > > > > > you > > > > > > > > > > > > see a need to change that?"* > > > > > > > > > > > > Please correct me if I am wrong here but we cache > > > metadata > > > > > for > > > > > > > > segments > > > > > > > > > > > > "residing in local storage". The size of the current > > > cache > > > > > > works > > > > > > > > fine > > > > > > > > > > for > > > > > > > > > > > > the scale of the number of segments that we expect to > > > store > > > > > in > > > > > > > > local > > > > > > > > > > > > storage. After KIP-405, that cache will continue to > > store > > > > > > > metadata > > > > > > > > for > > > > > > > > > > > > segments which are residing in local storage and > hence, > > > we > > > > > > don't > > > > > > > > need > > > > > > > > > > to > > > > > > > > > > > > change that. For segments which have been offloaded > to > > > > remote > > > > > > > > storage, > > > > > > > > > > it > > > > > > > > > > > > would rely on RLMM. Note that the scale of data > stored > > in > > > > > RLMM > > > > > > is > > > > > > > > > > > different > > > > > > > > > > > > from local cache because the number of segments is > > > expected > > > > > to > > > > > > be > > > > > > > > much > > > > > > > > > > > > larger than what current implementation stores in > local > > > > > > storage. > > > > > > > > > > > > > > > > > > > > > > > > 2,3,4: > RemoteLogMetadataManager.listRemoteLogSegments() > > > > does > > > > > > > > specify > > > > > > > > > > the > > > > > > > > > > > > order i.e. it returns the segments sorted by first > > offset > > > > in > > > > > > > > ascending > > > > > > > > > > > > order. I am copying the API docs for KIP-405 here for > > > your > > > > > > > > reference > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *Returns iterator of remote log segment metadata, > > sorted > > > by > > > > > > > {@link > > > > > > > > > > > > RemoteLogSegmentMetadata#startOffset()} inascending > > order > > > > > which > > > > > > > > > > contains > > > > > > > > > > > > the given leader epoch. This is used by remote log > > > > retention > > > > > > > > management > > > > > > > > > > > > subsystemto fetch the segment metadata for a given > > leader > > > > > > > > epoch.@param > > > > > > > > > > > > topicIdPartition topic partition@param leaderEpoch > > > > > leader > > > > > > > > > > > > epoch@return > > > > > > > > > > > > Iterator of remote segments, sorted by start offset > in > > > > > > ascending > > > > > > > > > > order. * > > > > > > > > > > > > > > > > > > > > > > > > *Luke,* > > > > > > > > > > > > > > > > > > > > > > > > 5. Note that we are trying to optimize the efficiency > > of > > > > size > > > > > > > based > > > > > > > > > > > > retention for remote storage. KIP-405 does not > > introduce > > > a > > > > > new > > > > > > > > config > > > > > > > > > > for > > > > > > > > > > > > periodically checking remote similar to > > > > > > > > > > log.retention.check.interval.ms > > > > > > > > > > > > which is applicable for remote storage. Hence, the > > metric > > > > > will > > > > > > be > > > > > > > > > > updated > > > > > > > > > > > > at the time of invoking log retention check for > remote > > > tier > > > > > > which > > > > > > > > is > > > > > > > > > > > > pending implementation today. We can perhaps come > back > > > and > > > > > > update > > > > > > > > the > > > > > > > > > > > > metric description after the implementation of log > > > > retention > > > > > > > check > > > > > > > > in > > > > > > > > > > > > RemoteLogManager. > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > Divij Vaidya > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Nov 10, 2022 at 6:16 AM Luke Chen < > > > > show...@gmail.com > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi Divij, > > > > > > > > > > > > > > > > > > > > > > > > > > One more question about the metric: > > > > > > > > > > > > > I think the metric will be updated when > > > > > > > > > > > > > (1) each time we run the log retention check (that > > is, > > > > > > > > > > > > > log.retention.check.interval.ms) > > > > > > > > > > > > > (2) When user explicitly call getRemoteLogSize > > > > > > > > > > > > > > > > > > > > > > > > > > Is that correct? > > > > > > > > > > > > > Maybe we should add a note in metric description, > > > > > otherwise, > > > > > > > when > > > > > > > > > > user > > > > > > > > > > > > got, > > > > > > > > > > > > > let's say 0 of RemoteLogSizeBytes, will be > surprised. > > > > > > > > > > > > > > > > > > > > > > > > > > Otherwise, LGTM > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you for the KIP > > > > > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Nov 10, 2022 at 2:55 AM Jun Rao > > > > > > > <j...@confluent.io.invalid > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, Divij, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the explanation. > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. Hmm, the default implementation of RLMM does > > local > > > > > > > caching, > > > > > > > > > > right? > > > > > > > > > > > > > > Currently, we also cache all segment metadata in > > the > > > > > > brokers > > > > > > > > > > without > > > > > > > > > > > > > > KIP-405. Do you see a need to change that? > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2,3,4: Yes, your explanation makes sense. > However, > > > > > > > > > > > > > > currently, > > > > > RemoteLogMetadataManager.listRemoteLogSegments() > > > > > > > > doesn't > > > > > > > > > > > > > specify > > > > > > > > > > > > > > a particular order of the iterator. Do you intend > > to > > > > > change > > > > > > > > that? > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Nov 8, 2022 at 3:31 AM Divij Vaidya < > > > > > > > > > > divijvaidy...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you for your comments. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > *1. "RLMM implementor could ensure that > > > > > > > > listRemoteLogSegments() > > > > > > > > > > is > > > > > > > > > > > > > fast"* > > > > > > > > > > > > > > > This would be ideal but pragmatically, it is > > > > difficult > > > > > to > > > > > > > > ensure > > > > > > > > > > > that > > > > > > > > > > > > > > > listRemoteLogSegments() is fast. This is > because > > of > > > > the > > > > > > > > > > possibility > > > > > > > > > > > > of > > > > > > > > > > > > > a > > > > > > > > > > > > > > > large number of segments (much larger than what > > > Kafka > > > > > > > > currently > > > > > > > > > > > > handles > > > > > > > > > > > > > > > with local storage today) would make it > > infeasible > > > to > > > > > > adopt > > > > > > > > > > > > strategies > > > > > > > > > > > > > > such > > > > > > > > > > > > > > > as local caching to improve the performance of > > > > > > > > > > > listRemoteLogSegments. > > > > > > > > > > > > > > Apart > > > > > > > > > > > > > > > from caching (which won't work due to size > > > > > limitations) I > > > > > > > > can't > > > > > > > > > > > think > > > > > > > > > > > > > of > > > > > > > > > > > > > > > other strategies which may eliminate the need > for > > > IO > > > > > > > > > > > > > > > operations proportional to the number of total > > > > > segments. > > > > > > > > Please > > > > > > > > > > > > advise > > > > > > > > > > > > > if > > > > > > > > > > > > > > > you have something in mind. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. "*If the size exceeds the retention size, > we > > > need > > > > > to > > > > > > > > > > determine > > > > > > > > > > > > the > > > > > > > > > > > > > > > subset of segments to delete to bring the size > > > within > > > > > the > > > > > > > > > > retention > > > > > > > > > > > > > > limit. > > > > > > > > > > > > > > > Do we need to call > > > > > > > > > > RemoteLogMetadataManager.listRemoteLogSegments() > > > > > > > > > > > > to > > > > > > > > > > > > > > > determine that?"* > > > > > > > > > > > > > > > Yes, we need to call listRemoteLogSegments() to > > > > > determine > > > > > > > > which > > > > > > > > > > > > > segments > > > > > > > > > > > > > > > should be deleted. But there is a difference > with > > > the > > > > > use > > > > > > > > case we > > > > > > > > > > > are > > > > > > > > > > > > > > > trying to optimize with this KIP. To determine > > the > > > > > subset > > > > > > > of > > > > > > > > > > > segments > > > > > > > > > > > > > > which > > > > > > > > > > > > > > > would be deleted, we only read metadata for > > > segments > > > > > > which > > > > > > > > would > > > > > > > > > > be > > > > > > > > > > > > > > deleted > > > > > > > > > > > > > > > via the listRemoteLogSegments(). But to > determine > > > the > > > > > > > > > > totalLogSize, > > > > > > > > > > > > > which > > > > > > > > > > > > > > > is required every time retention logic based on > > > size > > > > > > > > executes, we > > > > > > > > > > > > read > > > > > > > > > > > > > > > metadata of *all* the segments in remote > storage. > > > > > Hence, > > > > > > > the > > > > > > > > > > number > > > > > > > > > > > > of > > > > > > > > > > > > > > > results returned by > > > > > > > > > > > *RemoteLogMetadataManager.listRemoteLogSegments() > > > > > > > > > > > > > *is > > > > > > > > > > > > > > > different when we are calculating totalLogSize > > vs. > > > > when > > > > > > we > > > > > > > > are > > > > > > > > > > > > > > determining > > > > > > > > > > > > > > > the subset of segments to delete. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3. > > > > > > > > > > > > > > > *"Also, what about time-based retention? To > make > > > that > > > > > > > > efficient, > > > > > > > > > > do > > > > > > > > > > > > we > > > > > > > > > > > > > > need > > > > > > > > > > > > > > > to make some additional interface changes?"*No. > > > Note > > > > > that > > > > > > > > time > > > > > > > > > > > > > complexity > > > > > > > > > > > > > > > to determine the segments for retention is > > > different > > > > > for > > > > > > > time > > > > > > > > > > based > > > > > > > > > > > > vs. > > > > > > > > > > > > > > > size based. For time based, the time complexity > > is > > > a > > > > > > > > function of > > > > > > > > > > > the > > > > > > > > > > > > > > number > > > > > > > > > > > > > > > of segments which are "eligible for deletion" > > > (since > > > > we > > > > > > > only > > > > > > > > read > > > > > > > > > > > > > > metadata > > > > > > > > > > > > > > > for segments which would be deleted) whereas in > > > size > > > > > > based > > > > > > > > > > > retention, > > > > > > > > > > > > > the > > > > > > > > > > > > > > > time complexity is a function of "all segments" > > > > > available > > > > > > > in > > > > > > > > > > remote > > > > > > > > > > > > > > storage > > > > > > > > > > > > > > > (metadata of all segments needs to be read to > > > > calculate > > > > > > the > > > > > > > > total > > > > > > > > > > > > > size). > > > > > > > > > > > > > > As > > > > > > > > > > > > > > > you may observe, this KIP will bring the time > > > > > complexity > > > > > > > for > > > > > > > > both > > > > > > > > > > > > time > > > > > > > > > > > > > > > based retention & size based retention to the > > same > > > > > > > function. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 4. Also, please note that this new API > introduced > > > in > > > > > this > > > > > > > KIP > > > > > > > > > > also > > > > > > > > > > > > > > enables > > > > > > > > > > > > > > > us to provide a metric for total size of data > > > stored > > > > in > > > > > > > > remote > > > > > > > > > > > > storage. > > > > > > > > > > > > > > > Without the API, calculation of this metric > will > > > > become > > > > > > > very > > > > > > > > > > > > expensive > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > *listRemoteLogSegments().* > > > > > > > > > > > > > > > I understand that your motivation here is to > > avoid > > > > > > > polluting > > > > > > > > the > > > > > > > > > > > > > > interface > > > > > > > > > > > > > > > with optimization specific APIs and I will > agree > > > with > > > > > > that > > > > > > > > goal. > > > > > > > > > > > But > > > > > > > > > > > > I > > > > > > > > > > > > > > > believe that this new API proposed in the KIP > > > brings > > > > in > > > > > > > > > > significant > > > > > > > > > > > > > > > improvement and there is no other work around > > > > available > > > > > > to > > > > > > > > > > achieve > > > > > > > > > > > > the > > > > > > > > > > > > > > same > > > > > > > > > > > > > > > performance. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Regards, > > > > > > > > > > > > > > > Divij Vaidya > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Nov 8, 2022 at 12:12 AM Jun Rao > > > > > > > > <j...@confluent.io.invalid > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, Divij, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP. Sorry for the late reply. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The motivation of the KIP is to improve the > > > > > efficiency > > > > > > of > > > > > > > > size > > > > > > > > > > > > based > > > > > > > > > > > > > > > > retention. I am not sure the proposed changes > > are > > > > > > enough. > > > > > > > > For > > > > > > > > > > > > > example, > > > > > > > > > > > > > > if > > > > > > > > > > > > > > > > the size exceeds the retention size, we need > to > > > > > > determine > > > > > > > > the > > > > > > > > > > > > subset > > > > > > > > > > > > > of > > > > > > > > > > > > > > > > segments to delete to bring the size within > the > > > > > > retention > > > > > > > > > > limit. > > > > > > > > > > > Do > > > > > > > > > > > > > we > > > > > > > > > > > > > > > need > > > > > > > > > > > > > > > > to call > > > > > > RemoteLogMetadataManager.listRemoteLogSegments() > > > > > > > to > > > > > > > > > > > > determine > > > > > > > > > > > > > > > that? > > > > > > > > > > > > > > > > Also, what about time-based retention? To > make > > > that > > > > > > > > efficient, > > > > > > > > > > do > > > > > > > > > > > > we > > > > > > > > > > > > > > need > > > > > > > > > > > > > > > > to make some additional interface changes? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > An alternative approach is for the RLMM > > > implementor > > > > > to > > > > > > > make > > > > > > > > > > sure > > > > > > > > > > > > > > > > that > > > > RemoteLogMetadataManager.listRemoteLogSegments() > > > > > > is > > > > > > > > fast > > > > > > > > > > > > (e.g., > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > local caching). This way, we could keep the > > > > interface > > > > > > > > simple. > > > > > > > > > > > Have > > > > > > > > > > > > we > > > > > > > > > > > > > > > > considered that? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Sep 28, 2022 at 6:28 AM Divij Vaidya > < > > > > > > > > > > > > > divijvaidy...@gmail.com> > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey folks > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Does anyone else have any thoughts on this > > > > before I > > > > > > > > propose > > > > > > > > > > > this > > > > > > > > > > > > > for > > > > > > > > > > > > > > a > > > > > > > > > > > > > > > > > vote? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > Divij Vaidya > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Sep 5, 2022 at 12:57 PM Satish > > Duggana > > > < > > > > > > > > > > > > > > > satish.dugg...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP Divij! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This is a nice improvement to avoid > > > > recalculation > > > > > > of > > > > > > > > size. > > > > > > > > > > > > > > Customized > > > > > > > > > > > > > > > > > RLMMs > > > > > > > > > > > > > > > > > > can implement the best possible approach > by > > > > > caching > > > > > > > or > > > > > > > > > > > > > maintaining > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > size > > > > > > > > > > > > > > > > > > in an efficient way. But this is not a > big > > > > > concern > > > > > > > for > > > > > > > > the > > > > > > > > > > > > > default > > > > > > > > > > > > > > > > topic > > > > > > > > > > > > > > > > > > based RLMM as mentioned in the KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > ~Satish. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, 13 Jul 2022 at 18:48, Divij > Vaidya > > < > > > > > > > > > > > > > > divijvaidy...@gmail.com> > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you for your review Luke. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Reg: is that would the new > > > > > `RemoteLogSizeBytes` > > > > > > > > metric > > > > > > > > > > > be a > > > > > > > > > > > > > > > > > performance > > > > > > > > > > > > > > > > > > > overhead? Although we move the > > calculation > > > > to a > > > > > > > > seperate > > > > > > > > > > > API, > > > > > > > > > > > > > we > > > > > > > > > > > > > > > > still > > > > > > > > > > > > > > > > > > > can't assume users will implement a > > > > > light-weight > > > > > > > > method, > > > > > > > > > > > > right? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > This metric would be logged using the > > > > > information > > > > > > > > that is > > > > > > > > > > > > > already > > > > > > > > > > > > > > > > being > > > > > > > > > > > > > > > > > > > calculated for handling remote > retention > > > > logic, > > > > > > > > hence, no > > > > > > > > > > > > > > > additional > > > > > > > > > > > > > > > > > work > > > > > > > > > > > > > > > > > > > is required to calculate this metric. > > More > > > > > > > > specifically, > > > > > > > > > > > > > whenever > > > > > > > > > > > > > > > > > > > RemoteLogManager calls getRemoteLogSize > > > API, > > > > > this > > > > > > > > metric > > > > > > > > > > > > would > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > > > captured. > > > > > > > > > > > > > > > > > > > This API call is made every time > > > > > RemoteLogManager > > > > > > > > wants > > > > > > > > > > to > > > > > > > > > > > > > handle > > > > > > > > > > > > > > > > > expired > > > > > > > > > > > > > > > > > > > remote log segments (which should be > > > > periodic). > > > > > > > Does > > > > > > > > that > > > > > > > > > > > > > address > > > > > > > > > > > > > > > > your > > > > > > > > > > > > > > > > > > > concern? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Divij Vaidya > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 12, 2022 at 11:01 AM Luke > > Chen > > > < > > > > > > > > > > > > show...@gmail.com> > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Divij, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the KIP! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I think it makes sense to delegate > the > > > > > > > > responsibility > > > > > > > > > > of > > > > > > > > > > > > > > > > calculation > > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > specific RemoteLogMetadataManager > > > > > > implementation. > > > > > > > > > > > > > > > > > > > > But one thing I'm not quite sure, is > > that > > > > > would > > > > > > > > the new > > > > > > > > > > > > > > > > > > > > `RemoteLogSizeBytes` metric be a > > > > performance > > > > > > > > overhead? > > > > > > > > > > > > > > > > > > > > Although we move the calculation to a > > > > > seperate > > > > > > > > API, we > > > > > > > > > > > > still > > > > > > > > > > > > > > > can't > > > > > > > > > > > > > > > > > > assume > > > > > > > > > > > > > > > > > > > > users will implement a light-weight > > > method, > > > > > > > right? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thank you. > > > > > > > > > > > > > > > > > > > > Luke > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jul 1, 2022 at 5:47 PM Divij > > > > Vaidya < > > > > > > > > > > > > > > > > divijvaidy...@gmail.com > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-852%3A+Optimize+calculation+of+size+for+log+in+remote+tier > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hey folks > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Please take a look at this KIP > which > > > > > proposes > > > > > > > an > > > > > > > > > > > > extension > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > KIP-405. > > > > > > > > > > > > > > > > > > > > This > > > > > > > > > > > > > > > > > > > > > is my first KIP with Apache Kafka > > > > community > > > > > > so > > > > > > > > any > > > > > > > > > > > > feedback > > > > > > > > > > > > > > > would > > > > > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > > > > > highly > > > > > > > > > > > > > > > > > > > > > appreciated. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > > > > > > > > > > > > > Divij Vaidya > > > > > > > > > > > > > > > > > > > > > Sr. Software Engineer > > > > > > > > > > > > > > > > > > > > > Amazon > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >