We already mentioned in the KIP that RemoteLogMetadataMamnager is pluggable. Users have an option to plugin their own implementation of RLMM instead of using the default implementaion(which is on topic storage), which can be based on their remote storage environments like AWS, GCP, Azure etc.
On Sun, Jul 12, 2020 at 6:36 AM Adam Bellemare <adam.bellem...@gmail.com> wrote: > > My 2 cents - > > I agree with Colin. I think that it's important that the metadata not grow > unbounded without being delegated to external storage. Indefinite long-term > storage of entity data in Kafka can result in extremely large datasets > where the vast majority of data is stored in the external tier. I would be > very disappointed to have the metadata storage be a limiting factor to > exactly how much data I can store in Kafka. Additionally, and for example, > I think it's very reasonable that an AWS metadata store could be > implemented with DynamoDB (key-value store) paired with S3 - faster > random-access metadata lookup than plain S3, but without needing to rebuild > rocksDB state locally. > > > > On Fri, Jul 10, 2020 at 3:57 PM Colin McCabe <cmcc...@apache.org> wrote: > > > Hi all, > > > > Thanks for the KIP. > > > > I took a look and one thing that stood out to me is that the more metadata > > we have, the more storage we will need on local disk for the rocksDB > > database. This seems like it contradicts some of the goals of the > > project. Ideally the space we need on local disk should be related only to > > the size of the hot set, not the size of the cold set. It also seems like > > it could lead to extremely long rocksdb rebuild times if we somehow lose a > > broker's local storage and have to rebuild it. > > > > Instead, I think it would be more reasonable to store cold metadata in the > > "remote" storage (HDFS, s3, etc.). Not only does this free up space on the > > local and avoid long rebuild times, but it also gives us more control over > > the management of our cache. With rocksDB we are delegating cache > > management to an external library that doesn't really understand our > > use-case. > > > > To give a concrete example of how this is bad, imagine that we have 10 > > worker threads and we get 10 requests for something that requires us to > > fetch cold tiered storage metadata. Now every worker thread is blocked > > inside rocksDB and the broker can do nothing until it finishes fetching > > from disk. When accessing a remote service like HDFS or S3, in contrast, > > we would be able to check if the data was in our local cache first. If it > > wasn't, we could put the request in a purgatory and activate a background > > thread to fetch the needed data, and then release the worker thread to be > > used by some other request. Having control of our own caching strategy > > increases observability, maintainability, and performance. > > > > I can anticipate a possible counter-argument here: the size of the > > metadata should be small and usually fully resident in memory anyway. > > While this is true today, I don't think it will always be true. The > > current low limit of a few thousand partitions is not competitive in the > > long term and needs to be lifted. We'd like to get to at least a million > > partitions with KIP-500, and much more later. Also, when you give people > > the ability to have unlimited retention, they will want to make use of it. > > That means lots of historical log segments to track. This scenario is by > > no means hypothetical. Even with the current software, it's easy to think > > of cases where someone misconfigured the log segment roll settings and > > overwhelmed the system with segments. So overall, I like to understand why > > we want to store metadata on local disk rather than remote, and what the > > options are for the future. > > > > best, > > Colin > > > > > > On Thu, Jul 9, 2020, at 09:55, Harsha Chintalapani wrote: > > > Hi Jun, > > > Thanks for the replies and feedback on design and giving input. > > > We are coming close to finish the implementation. > > > We also did several perf tests as well at our peak production loads and > > > with tiered storage we didn't see any degradation on write throughputs > > and > > > latencies. > > > Ying already added some of the perf tests results in the KIP itself. > > > It will be great if we can get design and code reviews from you > > > and others in the community as we make progress. > > > Thanks, > > > Harsha > > > > > > On Tue, Jul 7, 2020 at 10:34 AM Jun Rao <j...@confluent.io> wrote: > > > > > > > Hi, Ying, > > > > > > > > Thanks for the update. It's good to see the progress on this. Please > > let > > > > us know when you are done updating the KIP wiki. > > > > > > > > Jun > > > > > > > > On Tue, Jul 7, 2020 at 10:13 AM Ying Zheng <yi...@uber.com.invalid> > > wrote: > > > > > > > >> Hi Jun, > > > >> > > > >> Satish and I have added more design details in the KIP, including how > > to > > > >> keep consistency between replicas (especially when there is leadership > > > >> changes / log truncations) and new metrics. We also made some other > > minor > > > >> changes in the doc. We will finish the KIP changes in the next couple > > of > > > >> days. We will let you know when we are done. Most of the changes are > > > >> already updated to the wiki KIP. You can take a look. But it's not the > > > >> final version yet. > > > >> > > > >> As for the implementation, the code is mostly done and we already had > > some > > > >> feature tests / system tests. I have added the performance test > > results in > > > >> the KIP. However the recent design changes (e.g. leader epoch info > > > >> management / log truncation / some of the new metrics) have not been > > > >> implemented yet. It will take about 2 weeks for us to implement after > > you > > > >> review and agree with those design changes. > > > >> > > > >> > > > >> > > > >> On Tue, Jul 7, 2020 at 9:23 AM Jun Rao <j...@confluent.io> wrote: > > > >> > > > >> > Hi, Satish, Harsha, > > > >> > > > > >> > Any new updates on the KIP? This feature is one of the most > > important > > > >> and > > > >> > most requested features in Apache Kafka right now. It would be > > helpful > > > >> if > > > >> > we can make sustained progress on this. Could you share how far > > along is > > > >> > the design/implementation right now? Is there anything that other > > people > > > >> > can help to get it across the line? > > > >> > > > > >> > As for "transactional support" and "follower requests/replication", > > no > > > >> > further comments from me as long as the producer state and leader > > epoch > > > >> can > > > >> > be restored properly from the object store when needed. > > > >> > > > > >> > Thanks, > > > >> > > > > >> > Jun > > > >> > > > > >> > On Tue, Jun 9, 2020 at 3:39 AM Satish Duggana < > > satish.dugg...@gmail.com > > > >> > > > > >> > wrote: > > > >> > > > > >> > > We did not want to add many implementation details in the KIP. > > But we > > > >> > > decided to add them in the KIP as appendix or > > sub-sections(including > > > >> > > follower fetch protocol) to describe the flow with the main cases. > > > >> > > That will answer most of the queries. I will update on this mail > > > >> > > thread when the respective sections are updated. > > > >> > > > > > >> > > Thanks, > > > >> > > Satish. > > > >> > > > > > >> > > On Sat, Jun 6, 2020 at 7:49 PM Alexandre Dupriez > > > >> > > <alexandre.dupr...@gmail.com> wrote: > > > >> > > > > > > >> > > > Hi Satish, > > > >> > > > > > > >> > > > A couple of questions specific to the section "Follower > > > >> > > > Requests/Replication", pages 16:17 in the design document [1]. > > > >> > > > > > > >> > > > 900. It is mentioned that followers fetch auxiliary states from > > the > > > >> > > > remote storage. > > > >> > > > > > > >> > > > 900.a Does the consistency model of the external storage impacts > > > >> reads > > > >> > > > of leader epochs and other auxiliary data? > > > >> > > > > > > >> > > > 900.b What are the benefits of using a mechanism to store and > > access > > > >> > > > the leader epochs which is different from other metadata > > associated > > > >> to > > > >> > > > tiered segments? What are the benefits of retrieving this > > > >> information > > > >> > > > on-demand from the follower rather than relying on propagation > > via > > > >> the > > > >> > > > topic __remote_log_metadata? What are the advantages over using > > a > > > >> > > > dedicated control structure (e.g. a new record type) propagated > > via > > > >> > > > this topic? Since in the document, different control paths are > > > >> > > > operating in the system, how are the metadata stored in > > > >> > > > __remote_log_metadata [which also include the epoch of the > > leader > > > >> > > > which offloaded a segment] and the remote auxiliary states, > > kept in > > > >> > > > sync? > > > >> > > > > > > >> > > > 900.c A follower can encounter an > > OFFSET_MOVED_TO_TIERED_STORAGE. Is > > > >> > > > this in response to a Fetch or OffsetForLeaderEpoch request? > > > >> > > > > > > >> > > > 900.d What happens if, after a follower encountered an > > > >> > > > OFFSET_MOVED_TO_TIERED_STORAGE response, its attempts to > > retrieve > > > >> > > > leader epochs fail (for instance, because the remote storage is > > > >> > > > temporarily unavailable)? Does the follower fallbacks to a mode > > > >> where > > > >> > > > it ignores tiered segments, and applies truncation using only > > > >> locally > > > >> > > > available information? What happens when access to the remote > > > >> storage > > > >> > > > is restored? How is the replica lineage inferred by the remote > > > >> leader > > > >> > > > epochs reconciled with the follower's replica lineage, which has > > > >> > > > evolved? Does the follower remember fetching auxiliary states > > failed > > > >> > > > in the past and attempt reconciliation? Is there a plan to offer > > > >> > > > different strategies in this scenario, configurable via > > > >> configuration? > > > >> > > > > > > >> > > > 900.e Is the leader epoch cache offloaded with every segment? Or > > > >> when > > > >> > > > a new checkpoint is detected? If that information is not always > > > >> > > > offloaded to avoid duplicating data, how does the remote storage > > > >> > > > satisfy the request to retrieve it? > > > >> > > > > > > >> > > > 900.f Since the leader epoch cache covers the entire replica > > > >> lineage, > > > >> > > > what happens if, after a leader epoch cache file is offloaded > > with a > > > >> > > > given segment, the local epoch cache is truncated [not > > necessarily > > > >> for > > > >> > > > a range of offset included in tiered segments]? How are remote > > and > > > >> > > > local leader epoch caches kept consistent? > > > >> > > > > > > >> > > > 900.g Consumer can also use leader epochs (e.g. to enable > > fencing to > > > >> > > > protect against stale leaders). What differences would there be > > > >> > > > between consumer and follower fetches? Especially, would > > consumers > > > >> > > > also fetch leader epoch information from the remote storage? > > > >> > > > > > > >> > > > 900.h Assume a newly elected leader of a topic-partition detects > > > >> more > > > >> > > > recent segments are available in the external storage, with > > epochs > > > > >> > > > its local epoch. Does it ignore these segments and their > > associated > > > >> > > > epoch-to-offset vectors? Or try to reconstruct its local replica > > > >> > > > lineage based on the data remotely available? > > > >> > > > > > > >> > > > Thanks, > > > >> > > > Alexandre > > > >> > > > > > > >> > > > [1] > > > >> > > > > > >> > > > > >> > > https://docs.google.com/document/d/18tnobSas3mKFZFr8oRguZoj_tkD_sGzivuLRlMloEMs/edit?usp=sharing > > > >> > > > > > > >> > > > Le jeu. 4 juin 2020 à 19:55, Satish Duggana < > > > >> satish.dugg...@gmail.com> > > > >> > > a écrit : > > > >> > > > > > > > >> > > > > Hi Jun, > > > >> > > > > Please let us know if you have any comments on "transactional > > > >> > support" > > > >> > > > > and "follower requests/replication" mentioned in the wiki. > > > >> > > > > > > > >> > > > > Thanks, > > > >> > > > > Satish. > > > >> > > > > > > > >> > > > > On Tue, Jun 2, 2020 at 9:25 PM Satish Duggana < > > > >> > > satish.dugg...@gmail.com> wrote: > > > >> > > > > > > > > >> > > > > > Thanks Jun for your comments. > > > >> > > > > > > > > >> > > > > > >100. It would be useful to provide more details on how > > those > > > >> apis > > > >> > > are used. Otherwise, it's kind of hard to really assess whether > > the > > > >> new > > > >> > > apis are sufficient/redundant. A few examples below. > > > >> > > > > > > > > >> > > > > > We will update the wiki and let you know. > > > >> > > > > > > > > >> > > > > > >100.1 deleteRecords seems to only advance the > > logStartOffset in > > > >> > > Log. How does that trigger the deletion of remote log segments? > > > >> > > > > > > > > >> > > > > > RLMTask for leader partition periodically checks whether > > there > > > >> are > > > >> > > > > > remote log segments earlier to logStartOffset and the > > respective > > > >> > > > > > remote log segment metadata and data are deleted by using > > RLMM > > > >> and > > > >> > > > > > RSM. > > > >> > > > > > > > > >> > > > > > >100.2 stopReplica with deletion is used in 2 cases (a) > > replica > > > >> > > reassignment; (b) topic deletion. We only want to delete the > > tiered > > > >> > > metadata in the second case. Also, in the second case, who > > initiates > > > >> the > > > >> > > deletion of the remote segment since the leader may not exist? > > > >> > > > > > > > > >> > > > > > Right, it is deleted only incase of topic deletion only. We > > will > > > >> > > cover > > > >> > > > > > the details in the KIP. > > > >> > > > > > > > > >> > > > > > >100.3 "LogStartOffset of a topic can be either in local or > > in > > > >> > > remote storage." If LogStartOffset exists in both places, which > > one is > > > >> > the > > > >> > > source of truth? > > > >> > > > > > > > > >> > > > > > I meant the logStartOffset can point to either of local > > segment > > > >> or > > > >> > > > > > remote segment but it is initialised and maintained in the > > Log > > > >> > class > > > >> > > > > > like now. > > > >> > > > > > > > > >> > > > > > >100.4 List<RemoteLogSegmentMetadata> > > > >> > > listRemoteLogSegments(TopicPartition topicPartition, long > > minOffset): > > > >> How > > > >> > > is minOffset supposed to be used? > > > >> > > > > > > > > >> > > > > > Returns list of remote segments, sorted by baseOffset in > > > >> ascending > > > >> > > > > > order that have baseOffset >= the given min Offset. > > > >> > > > > > > > > >> > > > > > >100.5 When copying a segment to remote storage, it seems > > we are > > > >> > > calling the same RLMM.putRemoteLogSegmentData() twice before and > > after > > > >> > > copyLogSegment(). Could you explain why? > > > >> > > > > > > > > >> > > > > > This is more about prepare/commit/rollback as you > > suggested. We > > > >> > will > > > >> > > > > > update the wiki with the new APIs. > > > >> > > > > > > > > >> > > > > > >100.6 LogSegmentData includes leaderEpochCache, but there > > is no > > > >> > api > > > >> > > in RemoteStorageManager to retrieve it. > > > >> > > > > > > > > >> > > > > > Nice catch, copy/paste issue. There is an API to retrieve > > it. > > > >> > > > > > > > > >> > > > > > >101. If the __remote_log_metadata is for production usage, > > > >> could > > > >> > > you provide more details? For example, what is the schema of the > > data > > > >> > (both > > > >> > > key and value)? How is the topic maintained,delete or compact? > > > >> > > > > > > > > >> > > > > > It is with delete config and it’s retention period is > > suggested > > > >> to > > > >> > be > > > >> > > > > > more than the remote retention period. > > > >> > > > > > > > > >> > > > > > >110. Is the cache implementation in > > RemoteLogMetadataManager > > > >> meant > > > >> > > for production usage? If so, could you provide more details on the > > > >> schema > > > >> > > and how/where the data is stored? > > > >> > > > > > > > > >> > > > > > The proposal is to have a cache (with default implementation > > > >> backed > > > >> > > by > > > >> > > > > > rocksdb) but it will be added in later versions. We will add > > > >> this > > > >> > to > > > >> > > > > > future work items. > > > >> > > > > > > > > >> > > > > > >111. "Committed offsets can be stored in a local file". > > Could > > > >> you > > > >> > > describe the format of the file and where it's stored? > > > >> > > > > > > > > >> > > > > > We will cover this in the KIP. > > > >> > > > > > > > > >> > > > > > >112. Truncation of remote segments under unclean leader > > > >> election: > > > >> > I > > > >> > > am not sure who figures out the truncated remote segments and how > > that > > > >> > > information is propagated to all replicas? > > > >> > > > > > > > > >> > > > > > We will add this in detail in the KIP. > > > >> > > > > > > > > >> > > > > > >113. "If there are any failures in removing remote log > > segments > > > >> > > then those are stored in a specific topic (default as > > > >> > > __remote_segments_to_be_deleted)". Is it necessary to add yet > > another > > > >> > > internal topic? Could we just keep retrying? > > > >> > > > > > > > > >> > > > > > This is not really an internal topic, it will be exposed as > > a > > > >> user > > > >> > > > > > configurable topic. After a few retries, we want user to > > know > > > >> about > > > >> > > > > > the failure so that they can take an action later by > > consuming > > > >> from > > > >> > > > > > this topic. We want to keep this simple instead of retrying > > > >> > > > > > continuously and maintaining the deletion state etc. > > > >> > > > > > > > > >> > > > > > >114. "We may not need to copy producer-id-snapshot as we > > are > > > >> > > copying only segments earlier to last-stable-offset." Hmm, not > > sure > > > >> about > > > >> > > that. The producer snapshot includes things like the last > > timestamp of > > > >> > each > > > >> > > open producer id and can affect when those producer ids are > > expired. > > > >> > > > > > > > > >> > > > > > Sure, this will be added as part of the LogSegmentData. > > > >> > > > > > > > > >> > > > > > Thanks, > > > >> > > > > > Satish. > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > On Fri, May 29, 2020 at 6:39 AM Jun Rao <j...@confluent.io> > > > >> wrote: > > > >> > > > > > > > > > >> > > > > > > Hi, Satish, > > > >> > > > > > > > > > >> > > > > > > Made another pass on the wiki. A few more comments below. > > > >> > > > > > > > > > >> > > > > > > 100. It would be useful to provide more details on how > > those > > > >> apis > > > >> > > are used. Otherwise, it's kind of hard to really assess whether > > the > > > >> new > > > >> > > apis are sufficient/redundant. A few examples below. > > > >> > > > > > > 100.1 deleteRecords seems to only advance the > > logStartOffset > > > >> in > > > >> > > Log. How does that trigger the deletion of remote log segments? > > > >> > > > > > > 100.2 stopReplica with deletion is used in 2 cases (a) > > replica > > > >> > > reassignment; (b) topic deletion. We only want to delete the > > tiered > > > >> > > metadata in the second case. Also, in the second case, who > > initiates > > > >> the > > > >> > > deletion of the remote segment since the leader may not exist? > > > >> > > > > > > 100.3 "LogStartOffset of a topic can be either in local > > or in > > > >> > > remote storage." If LogStartOffset exists in both places, which > > one is > > > >> > the > > > >> > > source of truth? > > > >> > > > > > > 100.4 List<RemoteLogSegmentMetadata> > > > >> > > listRemoteLogSegments(TopicPartition topicPartition, long > > minOffset): > > > >> How > > > >> > > is minOffset supposed to be used? > > > >> > > > > > > 100.5 When copying a segment to remote storage, it seems > > we > > > >> are > > > >> > > calling the same RLMM.putRemoteLogSegmentData() twice before and > > after > > > >> > > copyLogSegment(). Could you explain why? > > > >> > > > > > > 100.6 LogSegmentData includes leaderEpochCache, but there > > is > > > >> no > > > >> > > api in RemoteStorageManager to retrieve it. > > > >> > > > > > > > > > >> > > > > > > 101. If the __remote_log_metadata is for production usage, > > > >> could > > > >> > > you provide more details? For example, what is the schema of the > > data > > > >> > (both > > > >> > > key and value)? How is the topic maintained,delete or compact? > > > >> > > > > > > > > > >> > > > > > > 110. Is the cache implementation in > > RemoteLogMetadataManager > > > >> > meant > > > >> > > for production usage? If so, could you provide more details on the > > > >> schema > > > >> > > and how/where the data is stored? > > > >> > > > > > > > > > >> > > > > > > 111. "Committed offsets can be stored in a local file". > > Could > > > >> you > > > >> > > describe the format of the file and where it's stored? > > > >> > > > > > > > > > >> > > > > > > 112. Truncation of remote segments under unclean leader > > > >> election: > > > >> > > I am not sure who figures out the truncated remote segments and > > how > > > >> that > > > >> > > information is propagated to all replicas? > > > >> > > > > > > > > > >> > > > > > > 113. "If there are any failures in removing remote log > > > >> segments > > > >> > > then those are stored in a specific topic (default as > > > >> > > __remote_segments_to_be_deleted)". Is it necessary to add yet > > another > > > >> > > internal topic? Could we just keep retrying? > > > >> > > > > > > > > > >> > > > > > > 114. "We may not need to copy producer-id-snapshot as we > > are > > > >> > > copying only segments earlier to last-stable-offset." Hmm, not > > sure > > > >> about > > > >> > > that. The producer snapshot includes things like the last > > timestamp of > > > >> > each > > > >> > > open producer id and can affect when those producer ids are > > expired. > > > >> > > > > > > > > > >> > > > > > > Thanks, > > > >> > > > > > > > > > >> > > > > > > Jun > > > >> > > > > > > > > > >> > > > > > > On Thu, May 28, 2020 at 5:38 AM Satish Duggana < > > > >> > > satish.dugg...@gmail.com> wrote: > > > >> > > > > > >> > > > >> > > > > > >> Hi Jun, > > > >> > > > > > >> Gentle reminder. Please go through the updated wiki and > > let > > > >> us > > > >> > > know your comments. > > > >> > > > > > >> > > > >> > > > > > >> Thanks, > > > >> > > > > > >> Satish. > > > >> > > > > > >> > > > >> > > > > > >> On Tue, May 19, 2020 at 3:50 PM Satish Duggana < > > > >> > > satish.dugg...@gmail.com> wrote: > > > >> > > > > > >>> > > > >> > > > > > >>> Hi Jun, > > > >> > > > > > >>> Please go through the wiki which has the latest updates. > > > >> Google > > > >> > > doc is updated frequently to be in sync with wiki. > > > >> > > > > > >>> > > > >> > > > > > >>> Thanks, > > > >> > > > > > >>> Satish. > > > >> > > > > > >>> > > > >> > > > > > >>> On Tue, May 19, 2020 at 12:30 AM Jun Rao < > > j...@confluent.io> > > > >> > > wrote: > > > >> > > > > > >>>> > > > >> > > > > > >>>> Hi, Satish, > > > >> > > > > > >>>> > > > >> > > > > > >>>> Thanks for the update. Just to clarify. Which doc has > > the > > > >> > > latest updates, the wiki or the google doc? > > > >> > > > > > >>>> > > > >> > > > > > >>>> Jun > > > >> > > > > > >>>> > > > >> > > > > > >>>> On Thu, May 14, 2020 at 10:38 AM Satish Duggana < > > > >> > > satish.dugg...@gmail.com> wrote: > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> Hi Jun, > > > >> > > > > > >>>>> Thanks for your comments. We updated the KIP with > > more > > > >> > > details. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> >100. For each of the operations related to tiering, > > it > > > >> would > > > >> > > be useful to provide a description on how it works with the new > > API. > > > >> > These > > > >> > > include things like consumer fetch, replica fetch, > > offsetForTimestamp, > > > >> > > retention (remote and local) by size, time and logStartOffset, > > topic > > > >> > > deletion, etc. This will tell us if the proposed APIs are > > sufficient. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> We addressed most of these APIs in the KIP. We can add > > > >> more > > > >> > > details if needed. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> >101. For the default implementation based on internal > > > >> topic, > > > >> > > is it meant as a proof of concept or for production usage? I > > assume > > > >> that > > > >> > > it's the former. However, if it's the latter, then the KIP needs > > to > > > >> > > describe the design in more detail. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> It is production usage as was mentioned in an earlier > > > >> mail. > > > >> > We > > > >> > > plan to update this section in the next few days. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> >102. When tiering a segment, the segment is first > > > >> written to > > > >> > > the object store and then its metadata is written to RLMM using > > the > > > >> api > > > >> > > "void putRemoteLogSegmentData()". One potential issue with this > > > >> approach > > > >> > is > > > >> > > that if the system fails after the first operation, it leaves a > > > >> garbage > > > >> > in > > > >> > > the object store that's never reclaimed. One way to improve this > > is to > > > >> > have > > > >> > > two separate APIs, sth like preparePutRemoteLogSegmentData() and > > > >> > > commitPutRemoteLogSegmentData(). > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> That is a good point. We currently have a different > > way > > > >> using > > > >> > > markers in the segment but your suggestion is much better. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> >103. It seems that the transactional support and the > > > >> ability > > > >> > > to read from follower are missing. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> KIP is updated with transactional support, follower > > fetch > > > >> > > semantics, and reading from a follower. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> >104. It would be useful to provide a testing plan for > > > >> this > > > >> > > KIP. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> We added a few tests by introducing test util for > > tiered > > > >> > > storage in the PR. We will provide the testing plan in the next > > few > > > >> days. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> Thanks, > > > >> > > > > > >>>>> Satish. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> On Wed, Feb 26, 2020 at 9:43 PM Harsha Chintalapani < > > > >> > > ka...@harsha.io> wrote: > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> On Tue, Feb 25, 2020 at 12:46 PM, Jun Rao < > > > >> j...@confluent.io > > > >> > > > > > >> > > wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi, Satish, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks for the updated doc. The new API seems to be > > an > > > >> > > improvement overall. A few more comments below. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 100. For each of the operations related to tiering, > > it > > > >> > would > > > >> > > be useful to provide a description on how it works with the new > > API. > > > >> > These > > > >> > > include things like consumer fetch, replica fetch, > > offsetForTimestamp, > > > >> > > retention > > > >> > > > > > >>>>>>> (remote and local) by size, time and logStartOffset, > > > >> topic > > > >> > > deletion, etc. This will tell us if the proposed APIs are > > sufficient. > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> Thanks for the feedback Jun. We will add more details > > > >> around > > > >> > > this. > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 101. For the default implementation based on > > internal > > > >> > topic, > > > >> > > is it meant as a proof of concept or for production usage? I > > assume > > > >> that > > > >> > > it's the former. However, if it's the latter, then the KIP needs > > to > > > >> > > describe the design in more detail. > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> Yes it meant to be for production use. Ideally it > > would > > > >> be > > > >> > > good to merge this in as the default implementation for metadata > > > >> service. > > > >> > > We can add more details around design and testing. > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>>> 102. When tiering a segment, the segment is first > > > >> written > > > >> > to > > > >> > > the object store and then its metadata is written to RLMM using > > the > > > >> api > > > >> > > "void putRemoteLogSegmentData()". > > > >> > > > > > >>>>>>> One potential issue with this approach is that if > > the > > > >> > system > > > >> > > fails after the first operation, it leaves a garbage in the object > > > >> store > > > >> > > that's never reclaimed. One way to improve this is to have two > > > >> separate > > > >> > > APIs, sth like preparePutRemoteLogSegmentData() and > > > >> > > commitPutRemoteLogSegmentData(). > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 103. It seems that the transactional support and the > > > >> > ability > > > >> > > to read from follower are missing. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 104. It would be useful to provide a testing plan > > for > > > >> this > > > >> > > KIP. > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> We are working on adding more details around > > > >> transactional > > > >> > > support and coming up with test plan. > > > >> > > > > > >>>>>> Add system tests and integration tests. > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>>> Thanks, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Jun > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Mon, Feb 24, 2020 at 8:10 AM Satish Duggana < > > > >> > > satish.dugg...@gmail.com> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi Jun, > > > >> > > > > > >>>>>>> Please look at the earlier reply and let us know > > your > > > >> > > comments. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks, > > > >> > > > > > >>>>>>> Satish. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Wed, Feb 12, 2020 at 4:06 PM Satish Duggana < > > > >> > > satish.dugg...@gmail.com> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi Jun, > > > >> > > > > > >>>>>>> Thanks for your comments on the separation of > > remote log > > > >> > > metadata storage and remote log storage. > > > >> > > > > > >>>>>>> We had a few discussions since early Jan on how to > > > >> support > > > >> > > eventually consistent stores like S3 by uncoupling remote log > > segment > > > >> > > metadata and remote log storage. It is written with details in > > the doc > > > >> > > here(1). Below is the brief summary of the discussion from that > > doc. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> The current approach consists of pulling the remote > > log > > > >> > > segment metadata from remote log storage APIs. It worked fine for > > > >> > storages > > > >> > > like HDFS. But one of the problems of relying on the remote > > storage to > > > >> > > maintain metadata is that tiered-storage needs to be strongly > > > >> consistent, > > > >> > > with an impact not only on the metadata(e.g. LIST in S3) but also > > on > > > >> the > > > >> > > segment data(e.g. GET after a DELETE in S3). The cost of > > maintaining > > > >> > > metadata in remote storage needs to be factored in. This is true > > in > > > >> the > > > >> > > case of S3, LIST APIs incur huge costs as you raised earlier. > > > >> > > > > > >>>>>>> So, it is good to separate the remote storage from > > the > > > >> > > remote log metadata store. We refactored the existing > > > >> > RemoteStorageManager > > > >> > > and introduced RemoteLogMetadataManager. Remote log metadata store > > > >> should > > > >> > > give strong consistency semantics but remote log storage can be > > > >> > eventually > > > >> > > consistent. > > > >> > > > > > >>>>>>> We can have a default implementation for > > > >> > > RemoteLogMetadataManager which uses an internal topic(as > > mentioned in > > > >> one > > > >> > > of our earlier emails) as storage. But users can always plugin > > their > > > >> own > > > >> > > RemoteLogMetadataManager implementation based on their > > environment. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Please go through the updated KIP and let us know > > your > > > >> > > comments. We have started refactoring for the changes mentioned > > in the > > > >> > KIP > > > >> > > and there may be a few more updates to the APIs. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> [1] > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> > > > >> > > > > > >> > > > > >> > > https://docs.google.com/document/d/1qfkBCWL1e7ZWkHU7brxKDBebq4ie9yK20XJnKbgAlew/edit?ts=5e208ec7# > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Fri, Dec 27, 2019 at 5:43 PM Ivan Yurchenko < > > > >> > > ivan0yurche...@gmail.com> > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi all, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Jun: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> (a) Cost: S3 list object requests cost $0.005 per > > 1000 > > > >> > > requests. If > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> you > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> have 100,000 partitions and want to pull the > > metadata > > > >> for > > > >> > > each > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> partition > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> at > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the rate of 1/sec. It can cost $0.5/sec, which is > > > >> roughly > > > >> > > $40K per > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> day. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> I want to note here, that no reasonably durable > > storage > > > >> > will > > > >> > > be cheap at 100k RPS. For example, DynamoDB might give the same > > > >> ballpark > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> figures. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> If we want to keep the pull-based approach, we can > > try > > > >> to > > > >> > > reduce this > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> number > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> in several ways: doing listings less frequently (as > > > >> Satish > > > >> > > mentioned, with the current defaults it's ~3.33k RPS for your > > > >> example), > > > >> > > batching listing operations in some way (depending on the > > storage; it > > > >> > might > > > >> > > require the change of RSM's interface). > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> There are different ways for doing push based > > metadata > > > >> > > propagation. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Some > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> object stores may support that already. For > > example, S3 > > > >> > > supports > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> events > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> notification > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> This sounds interesting. However, I see a couple of > > > >> issues > > > >> > > using it: > > > >> > > > > > >>>>>>> 1. As I understand the documentation, notification > > > >> delivery > > > >> > > is not guaranteed > > > >> > > > > > >>>>>>> and it's recommended to periodically do LIST to > > fill the > > > >> > > gaps. Which brings us back to the same LIST consistency guarantees > > > >> issue. > > > >> > > > > > >>>>>>> 2. The same goes for the broker start: to get the > > > >> current > > > >> > > state, we > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> need > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> to LIST. > > > >> > > > > > >>>>>>> 3. The dynamic set of multiple consumers (RSMs): > > AFAIK > > > >> SQS > > > >> > > and SNS > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> aren't > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> designed for such a case. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Alexandre: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> A.1 As commented on PR 7561, S3 consistency model > > [1][2] > > > >> > > implies RSM > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> cannot > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> relies solely on S3 APIs to guarantee the expected > > > >> strong > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> consistency. The > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> proposed implementation [3] would need to be > > updated to > > > >> > take > > > >> > > this > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> into > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> account. Let’s talk more about this. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thank you for the feedback. I clearly see the need > > for > > > >> > > changing the S3 implementation > > > >> > > > > > >>>>>>> to provide stronger consistency guarantees. As it > > see > > > >> from > > > >> > > this thread, there are > > > >> > > > > > >>>>>>> several possible approaches to this. Let's discuss > > > >> > > RemoteLogManager's contract and > > > >> > > > > > >>>>>>> behavior (like pull vs push model) further before > > > >> picking > > > >> > > one (or > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> several - > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> ?) of them. > > > >> > > > > > >>>>>>> I'm going to do some evaluation of DynamoDB for the > > > >> > > pull-based > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> approach, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> if it's possible to apply it paying a reasonable > > bill. > > > >> > Also, > > > >> > > of the push-based approach > > > >> > > > > > >>>>>>> with a Kafka topic as the medium. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> A.2.3 Atomicity – what does an implementation of RSM > > > >> need > > > >> > to > > > >> > > provide > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> with > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> respect to atomicity of the APIs copyLogSegment, > > > >> > > cleanupLogUntil and deleteTopicPartition? If a partial failure > > > >> happens in > > > >> > > any of those > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> (e.g. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> in > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the S3 implementation, if one of the multiple > > uploads > > > >> fails > > > >> > > [4]), > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> The S3 implementation is going to change, but it's > > worth > > > >> > > clarifying > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> anyway. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> The segment log file is being uploaded after S3 has > > > >> acked > > > >> > > uploading of all other files associated with the segment and only > > > >> after > > > >> > > this the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> whole > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> segment file set becomes visible remotely for > > operations > > > >> > > like listRemoteSegments [1]. > > > >> > > > > > >>>>>>> In case of upload failure, the files that has been > > > >> > > successfully > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> uploaded > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> stays > > > >> > > > > > >>>>>>> as invisible garbage that is collected by > > > >> cleanupLogUntil > > > >> > (or > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> overwritten > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> successfully later). > > > >> > > > > > >>>>>>> And the opposite happens during the deletion: log > > files > > > >> are > > > >> > > deleted > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> first. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> This approach should generally work when we solve > > > >> > > consistency issues by adding a strongly consistent storage: a > > > >> segment's > > > >> > > uploaded files > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> remain > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> invisible garbage until some metadata about them is > > > >> > written. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> A.3 Caching – storing locally the segments retrieved > > > >> from > > > >> > > the remote storage is excluded as it does not align with the > > original > > > >> > intent > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> and even > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> defeat some of its purposes (save disk space etc.). > > That > > > >> > > said, could > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> there > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> be other types of use cases where the pattern of > > access > > > >> to > > > >> > > the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> remotely > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> stored segments would benefit from local caching > > (and > > > >> > > potentially read-ahead)? Consider the use case of a large pool of > > > >> > consumers > > > >> > > which > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> start > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> a backfill at the same time for one day worth of > > data > > > >> from > > > >> > > one year > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> ago > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> stored remotely. Caching the segments locally would > > > >> allow > > > >> > to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> uncouple the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> load on the remote storage from the load on the > > Kafka > > > >> > > cluster. Maybe > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> RLM could expose a configuration parameter to switch > > > >> that > > > >> > > feature > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> on/off? > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> I tend to agree here, caching remote segments > > locally > > > >> and > > > >> > > making this configurable sounds pretty practical to me. We should > > > >> > implement > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> this, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> maybe not in the first iteration. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Br, > > > >> > > > > > >>>>>>> Ivan > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> [1] > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> > > > >> > > > > > >> > > > > >> > > https://github.com/harshach/kafka/pull/18/files#diff-4d73d01c16caed6f2548fc3063550ef0R152 > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Thu, 19 Dec 2019 at 19:49, Alexandre Dupriez < > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> alexandre.dupr...@gmail.com> > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi Jun, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thank you for the feedback. I am trying to > > understand > > > >> how a > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> push-based > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> approach would work. > > > >> > > > > > >>>>>>> In order for the metadata to be propagated (under > > the > > > >> > > assumption you stated), would you plan to add a new API in Kafka > > to > > > >> allow > > > >> > > the metadata store to send them directly to the brokers? > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks, > > > >> > > > > > >>>>>>> Alexandre > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Le mer. 18 déc. 2019 à 20:14, Jun Rao < > > j...@confluent.io> > > > >> a > > > >> > > écrit : > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi, Satish and Ivan, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> There are different ways for doing push based > > metadata > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> propagation. Some > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> object stores may support that already. For > > example, S3 > > > >> > > supports > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> events > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> notification ( > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> > > > >> > > > > > >> > > https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html). > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Otherwise one could use a separate metadata store > > that > > > >> > > supports > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> push-based > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> change propagation. Other people have mentioned > > using a > > > >> > Kafka > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> topic. The > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> best approach may depend on the object store and the > > > >> > > operational environment (e.g. whether an external metadata store > > is > > > >> > already > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> available). > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> The above discussion is based on the assumption > > that we > > > >> > need > > > >> > > to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> cache the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> object metadata locally in every broker. I mentioned > > > >> > earlier > > > >> > > that > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> an > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> alternative is to just store/retrieve those > > metadata in > > > >> an > > > >> > > external metadata store. That may simplify the implementation in > > some > > > >> > cases. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Jun > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Thu, Dec 5, 2019 at 7:01 AM Satish Duggana < > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> satish.dugg...@gmail.com> > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi Jun, > > > >> > > > > > >>>>>>> Thanks for your reply. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Currently, `listRemoteSegments` is called at the > > > >> configured > > > >> > > interval(not every second, defaults to 30secs). Storing remote > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> log > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> metadata in a strongly consistent store for S3 RSM > > is > > > >> > raised > > > >> > > in PR-comment[1]. > > > >> > > > > > >>>>>>> RLM invokes RSM at regular intervals and RSM can > > give > > > >> > remote > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> segment > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> metadata if it is available. RSM is responsible for > > > >> > > maintaining > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> and > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> fetching those entries. It should be based on > > whatever > > > >> > > mechanism > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> is > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> consistent and efficient with the respective remote > > > >> > storage. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Can you give more details about push based mechanism > > > >> from > > > >> > > RSM? > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 1. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> > > > >> > > https://github.com/apache/kafka/pull/7561#discussion_r344576223 > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks, > > > >> > > > > > >>>>>>> Satish. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Thu, Dec 5, 2019 at 4:23 AM Jun Rao < > > > >> j...@confluent.io> > > > >> > > wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi, Harsha, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks for the reply. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 40/41. I am curious which block storages you have > > > >> tested. > > > >> > S3 > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> seems > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> to be > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> one of the popular block stores. The concerns that I > > > >> have > > > >> > > with > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> pull > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> based > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> approach are the following. > > > >> > > > > > >>>>>>> (a) Cost: S3 list object requests cost $0.005 per > > 1000 > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> requests. If > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> you > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> have 100,000 partitions and want to pull the > > metadata > > > >> for > > > >> > > each > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> partition > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> at > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the rate of 1/sec. It can cost $0.5/sec, which is > > > >> roughly > > > >> > > $40K > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> per > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> day. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> (b) Semantics: S3 list objects are eventually > > > >> consistent. > > > >> > So, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> when > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> you > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> do a > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> list object request, there is no guarantee that you > > can > > > >> see > > > >> > > all > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> uploaded > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> objects. This could impact the correctness of > > subsequent > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> logics. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> (c) Efficiency: Blindly pulling metadata when there > > is > > > >> no > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> change adds > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> unnecessary overhead in the broker as well as in the > > > >> block > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> store. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> So, have you guys tested S3? If so, could you share > > your > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> experience > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> in > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> terms of cost, semantics and efficiency? > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Jun > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Tue, Dec 3, 2019 at 10:11 PM Harsha Chintalapani > > < > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> ka...@harsha.io > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi Jun, > > > >> > > > > > >>>>>>> Thanks for the reply. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Tue, Nov 26, 2019 at 3:46 PM, Jun Rao < > > > >> j...@confluent.io > > > >> > > > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi, Satish and Ying, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks for the reply. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 40/41. There are two different ways that we can > > approach > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> this. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> One is > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> what > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> you said. We can have an opinionated way of storing > > and > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> populating > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> tier > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> metadata that we think is good enough for everyone. > > I am > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> not > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> sure if > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> this > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> is the case based on what's currently proposed in > > the > > > >> KIP. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> For > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> example, I > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> am not sure that (1) everyone always needs local > > > >> metadata; > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> (2) > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> current > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> local storage format is general enough and (3) > > everyone > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wants to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> use > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> pull based approach to propagate the metadata. > > Another > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> approach > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> is to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> make > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> this pluggable and let the implementor implements > > the > > > >> best > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> approach > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> for a > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> particular block storage. I haven't seen any > > comments > > > >> from > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Slack/AirBnb > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> in > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the mailing list on this topic. It would be great if > > > >> they > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> can > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> provide > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> feedback directly here. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> The current interfaces are designed with most > > popular > > > >> block > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storages > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> available today and we did 2 implementations with > > these > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> interfaces and > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> they both are yielding good results as we going > > through > > > >> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> testing of > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> it. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> If there is ever a need for pull based approach we > > can > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> definitely > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> evolve > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the interface. > > > >> > > > > > >>>>>>> In the past we did mark interfaces to be evolving to > > > >> make > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> room for > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> unknowns > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> in the future. > > > >> > > > > > >>>>>>> If you have any suggestions around the current > > > >> interfaces > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> please > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> propose we > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> are happy to see if we can work them into it. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 43. To offer tier storage as a general feature, > > ideally > > > >> all > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> existing > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> capabilities should still be supported. It's fine > > if the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> uber > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> implementation doesn't support all capabilities for > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> internal > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> usage. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> However, the framework should be general enough. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> We agree on that as a principle. But all of these > > major > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> features > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> mostly > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> coming right now and to have a new big feature such > > as > > > >> > tiered > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storage > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> support all the new features will be a big ask. We > > can > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> document on > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> how > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> do > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> we approach solving these in future iterations. > > > >> > > > > > >>>>>>> Our goal is to make this tiered storage feature > > work for > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> everyone. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 43.3 This is more than just serving the tier-ed data > > > >> from > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> block > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storage. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> With KIP-392, the consumer now can resolve the > > conflicts > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> with the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> replica > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> based on leader epoch. So, we need to make sure that > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> leader epoch > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> can be > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> recovered properly from tier storage. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> We are working on testing our approach and we will > > > >> update > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the KIP > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> with > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> design details. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 43.4 For JBOD, if tier storage stores the tier > > metadata > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> locally, we > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> need to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> support moving such metadata across disk directories > > > >> since > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> JBOD > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> supports > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> moving data across disks. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> KIP is updated with JBOD details. Having said that > > JBOD > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> tooling > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> needs > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> evolve to support production loads. Most of the > > users > > > >> will > > > >> > be > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> interested in > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> using tiered storage without JBOD support support on > > > >> day 1. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks, > > > >> > > > > > >>>>>>> Harsha > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> As for meeting, we could have a KIP e-meeting on > > this if > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> needed, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> but it > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> will be open to everyone and will be recorded and > > > >> shared. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Often, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> details are still resolved through the mailing list. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Jun > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Tue, Nov 19, 2019 at 6:48 PM Ying Zheng > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> <yi...@uber.com.invalid> > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Please ignore my previous email > > > >> > > > > > >>>>>>> I didn't know Apache requires all the discussions > > to be > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> "open" > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Tue, Nov 19, 2019, 5:40 PM Ying Zheng < > > > >> yi...@uber.com> > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi Jun, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thank you very much for your feedback! > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Can we schedule a meeting in your Palo Alto office > > in > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> December? I > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> think a > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> face to face discussion is much more efficient than > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> emails. Both > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Harsha > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> and > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> I can visit you. Satish may be able to join us > > remotely. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Fri, Nov 15, 2019 at 11:04 AM Jun Rao < > > > >> j...@confluent.io > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi, Satish and Harsha, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> The following is a more detailed high level > > feedback for > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the KIP. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Overall, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the KIP seems useful. The challenge is how to > > design it > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> such that > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> it’s > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> general enough to support different ways of > > implementing > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> this > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> feature > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> and > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> support existing features. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 40. Local segment metadata storage: The KIP makes > > the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> assumption > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> that > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> metadata for the archived log segments are cached > > > >> locally > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> in > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> every > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> broker > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> and provides a specific implementation for the local > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storage in > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> framework. We probably should discuss this more. For > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> example, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> some > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> tier > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storage providers may not want to cache the metadata > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> locally and > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> just > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> rely > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> upon a remote key/value store if such a store is > > already > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> present. If > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> a > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> local store is used, there could be different ways > > of > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> implementing it > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> (e.g., based on customized local files, an embedded > > > >> local > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> store > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> like > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> RocksDB, etc). An alternative of designing this is > > to > > > >> just > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> provide an > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> interface for retrieving the tier segment metadata > > and > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> leave the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> details > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> of > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> how to get the metadata outside of the framework. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 41. RemoteStorageManager interface and the usage of > > the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> interface in > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> framework: I am not sure if the interface is general > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> enough. For > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> example, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> it seems that RemoteLogIndexEntry is tied to a > > specific > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> way of > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storing > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> metadata in remote storage. The framework uses > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> listRemoteSegments() > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> api > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> in > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> a pull based approach. However, in some other > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> implementations, a > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> push > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> based > > > >> > > > > > >>>>>>> approach may be more preferred. I don’t have a > > concrete > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> proposal > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> yet. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> But, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> it would be useful to give this area some more > > thoughts > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> and see > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> if we > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> can > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> make the interface more general. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 42. In the diagram, the RemoteLogManager is side by > > side > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> with > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> LogManager. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> This KIP only discussed how the fetch request is > > handled > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> between > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> two > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> layer. However, we should also consider how other > > > >> requests > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> that > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> touch > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> log can be handled. e.g., list offsets by timestamp, > > > >> delete > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> records, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> etc. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Also, in this model, it's not clear which component > > is > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> responsible > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> for > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> managing the log start offset. It seems that the log > > > >> start > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> offset > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> could > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> be > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> changed by both RemoteLogManager and LogManager. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 43. There are quite a few existing features not > > covered > > > >> by > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> KIP. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> It > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> would be useful to discuss each of those. > > > >> > > > > > >>>>>>> 43.1 I won’t say that compacted topics are rarely > > used > > > >> and > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> always > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> small. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> For example, KStreams uses compacted topics for > > storing > > > >> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> states > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> and > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> sometimes the size of the topic could be large. > > While it > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> might > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> be ok > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> not > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> support compacted topics initially, it would be > > useful > > > >> to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> have a > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> high > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> level > > > >> > > > > > >>>>>>> idea on how this might be supported down the road so > > > >> that > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> we > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> don’t > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> have > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> make incompatible API changes in the future. > > > >> > > > > > >>>>>>> 43.2 We need to discuss how EOS is supported. In > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> particular, how > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> is > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> producer state integrated with the remote storage. > > 43.3 > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Now that > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> KIP-392 > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> (allow consumers to fetch from closest replica) is > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> implemented, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> we > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> need > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> discuss how reading from a follower replica is > > supported > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> with > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> tier > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storage. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 43.4 We need to discuss how JBOD is supported with > > tier > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storage. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Jun > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Fri, Nov 8, 2019 at 12:06 AM Tom Bentley < > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> tbent...@redhat.com > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks for those insights Ying. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Thu, Nov 7, 2019 at 9:26 PM Ying Zheng > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> <yi...@uber.com.invalid > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks, I missed that point. However, there's still > > a > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> point at > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> which > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> consumer fetches start getting served from remote > > > >> storage > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> (even > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> if > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> that > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> point isn't as soon as the local log retention > > > >> time/size). > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> This > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> represents > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> a kind of performance cliff edge and what I'm really > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> interested > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> in > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> is > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> how > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> easy it is for a consumer which falls off that > > cliff to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> catch up > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> and so > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> its > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> fetches again come from local storage. Obviously > > this > > > >> can > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> depend > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> on > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> all > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> sorts of factors (like production rate, consumption > > > >> rate), > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> so > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> it's > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> not > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> guaranteed (just like it's not guaranteed for Kafka > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> today), but > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> this > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> would > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> represent a new failure mode. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> As I have explained in the last mail, it's a very > > rare > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> case that > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> a > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> consumer > > > >> > > > > > >>>>>>> need to read remote data. With our experience at > > Uber, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> this only > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> happens > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> when the consumer service had an outage for several > > > >> hours. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> There is not a "performance cliff" as you assume. > > The > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> remote > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storage > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> is > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> even faster than local disks in terms of bandwidth. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Reading from > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> remote > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storage is going to have higher latency than local > > disk. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> But > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> since > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> consumer > > > >> > > > > > >>>>>>> is catching up several hours data, it's not > > sensitive to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> sub-second > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> level > > > >> > > > > > >>>>>>> latency, and each remote read request will read a > > large > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> amount of > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> data to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> make the overall performance better than reading > > from > > > >> local > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> disks. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Another aspect I'd like to understand better is the > > > >> effect > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> of > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> serving > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> fetch > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> request from remote storage has on the broker's > > network > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> utilization. If > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> we're just trimming the amount of data held locally > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> (without > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> increasing > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> overall local+remote retention), then we're > > effectively > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> trading > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> disk > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> bandwidth for network bandwidth when serving fetch > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> requests from > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> remote > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storage (which I understand to be a good thing, > > since > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> brokers are > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> often/usually disk bound). But if we're increasing > > the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> overall > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> local+remote > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> retention then it's more likely that network itself > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> becomes the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> bottleneck. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> I appreciate this is all rather hand wavy, I'm just > > > >> trying > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> understand > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> how this would affect broker performance, so I'd be > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> grateful for > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> any > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> insights you can offer. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Network bandwidth is a function of produce speed, > > it has > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> nothing > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> do > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> with > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> remote retention. As long as the data is shipped to > > > >> remote > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storage, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> you > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> can > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> keep the data there for 1 day or 1 year or 100 > > years, it > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> doesn't > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> consume > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> any > > > >> > > > > > >>>>>>> network resources. > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >> > > > > >> > > > > > > > > >