Hi, i wan to unsubscribe from this list. Can i do it? Please :) Thank u! Regards!!
El sáb., 11 jul. 2020 a las 22:06, Adam Bellemare (<adam.bellem...@gmail.com>) escribió: > My 2 cents - > > I agree with Colin. I think that it's important that the metadata not grow > unbounded without being delegated to external storage. Indefinite long-term > storage of entity data in Kafka can result in extremely large datasets > where the vast majority of data is stored in the external tier. I would be > very disappointed to have the metadata storage be a limiting factor to > exactly how much data I can store in Kafka. Additionally, and for example, > I think it's very reasonable that an AWS metadata store could be > implemented with DynamoDB (key-value store) paired with S3 - faster > random-access metadata lookup than plain S3, but without needing to rebuild > rocksDB state locally. > > > > On Fri, Jul 10, 2020 at 3:57 PM Colin McCabe <cmcc...@apache.org> wrote: > > > Hi all, > > > > Thanks for the KIP. > > > > I took a look and one thing that stood out to me is that the more > metadata > > we have, the more storage we will need on local disk for the rocksDB > > database. This seems like it contradicts some of the goals of the > > project. Ideally the space we need on local disk should be related only > to > > the size of the hot set, not the size of the cold set. It also seems > like > > it could lead to extremely long rocksdb rebuild times if we somehow lose > a > > broker's local storage and have to rebuild it. > > > > Instead, I think it would be more reasonable to store cold metadata in > the > > "remote" storage (HDFS, s3, etc.). Not only does this free up space on > the > > local and avoid long rebuild times, but it also gives us more control > over > > the management of our cache. With rocksDB we are delegating cache > > management to an external library that doesn't really understand our > > use-case. > > > > To give a concrete example of how this is bad, imagine that we have 10 > > worker threads and we get 10 requests for something that requires us to > > fetch cold tiered storage metadata. Now every worker thread is blocked > > inside rocksDB and the broker can do nothing until it finishes fetching > > from disk. When accessing a remote service like HDFS or S3, in contrast, > > we would be able to check if the data was in our local cache first. If > it > > wasn't, we could put the request in a purgatory and activate a background > > thread to fetch the needed data, and then release the worker thread to be > > used by some other request. Having control of our own caching strategy > > increases observability, maintainability, and performance. > > > > I can anticipate a possible counter-argument here: the size of the > > metadata should be small and usually fully resident in memory anyway. > > While this is true today, I don't think it will always be true. The > > current low limit of a few thousand partitions is not competitive in the > > long term and needs to be lifted. We'd like to get to at least a million > > partitions with KIP-500, and much more later. Also, when you give people > > the ability to have unlimited retention, they will want to make use of > it. > > That means lots of historical log segments to track. This scenario is by > > no means hypothetical. Even with the current software, it's easy to > think > > of cases where someone misconfigured the log segment roll settings and > > overwhelmed the system with segments. So overall, I like to understand > why > > we want to store metadata on local disk rather than remote, and what the > > options are for the future. > > > > best, > > Colin > > > > > > On Thu, Jul 9, 2020, at 09:55, Harsha Chintalapani wrote: > > > Hi Jun, > > > Thanks for the replies and feedback on design and giving > input. > > > We are coming close to finish the implementation. > > > We also did several perf tests as well at our peak production loads and > > > with tiered storage we didn't see any degradation on write throughputs > > and > > > latencies. > > > Ying already added some of the perf tests results in the KIP itself. > > > It will be great if we can get design and code reviews from > you > > > and others in the community as we make progress. > > > Thanks, > > > Harsha > > > > > > On Tue, Jul 7, 2020 at 10:34 AM Jun Rao <j...@confluent.io> wrote: > > > > > > > Hi, Ying, > > > > > > > > Thanks for the update. It's good to see the progress on this. Please > > let > > > > us know when you are done updating the KIP wiki. > > > > > > > > Jun > > > > > > > > On Tue, Jul 7, 2020 at 10:13 AM Ying Zheng <yi...@uber.com.invalid> > > wrote: > > > > > > > >> Hi Jun, > > > >> > > > >> Satish and I have added more design details in the KIP, including > how > > to > > > >> keep consistency between replicas (especially when there is > leadership > > > >> changes / log truncations) and new metrics. We also made some other > > minor > > > >> changes in the doc. We will finish the KIP changes in the next > couple > > of > > > >> days. We will let you know when we are done. Most of the changes are > > > >> already updated to the wiki KIP. You can take a look. But it's not > the > > > >> final version yet. > > > >> > > > >> As for the implementation, the code is mostly done and we already > had > > some > > > >> feature tests / system tests. I have added the performance test > > results in > > > >> the KIP. However the recent design changes (e.g. leader epoch info > > > >> management / log truncation / some of the new metrics) have not been > > > >> implemented yet. It will take about 2 weeks for us to implement > after > > you > > > >> review and agree with those design changes. > > > >> > > > >> > > > >> > > > >> On Tue, Jul 7, 2020 at 9:23 AM Jun Rao <j...@confluent.io> wrote: > > > >> > > > >> > Hi, Satish, Harsha, > > > >> > > > > >> > Any new updates on the KIP? This feature is one of the most > > important > > > >> and > > > >> > most requested features in Apache Kafka right now. It would be > > helpful > > > >> if > > > >> > we can make sustained progress on this. Could you share how far > > along is > > > >> > the design/implementation right now? Is there anything that other > > people > > > >> > can help to get it across the line? > > > >> > > > > >> > As for "transactional support" and "follower > requests/replication", > > no > > > >> > further comments from me as long as the producer state and leader > > epoch > > > >> can > > > >> > be restored properly from the object store when needed. > > > >> > > > > >> > Thanks, > > > >> > > > > >> > Jun > > > >> > > > > >> > On Tue, Jun 9, 2020 at 3:39 AM Satish Duggana < > > satish.dugg...@gmail.com > > > >> > > > > >> > wrote: > > > >> > > > > >> > > We did not want to add many implementation details in the KIP. > > But we > > > >> > > decided to add them in the KIP as appendix or > > sub-sections(including > > > >> > > follower fetch protocol) to describe the flow with the main > cases. > > > >> > > That will answer most of the queries. I will update on this mail > > > >> > > thread when the respective sections are updated. > > > >> > > > > > >> > > Thanks, > > > >> > > Satish. > > > >> > > > > > >> > > On Sat, Jun 6, 2020 at 7:49 PM Alexandre Dupriez > > > >> > > <alexandre.dupr...@gmail.com> wrote: > > > >> > > > > > > >> > > > Hi Satish, > > > >> > > > > > > >> > > > A couple of questions specific to the section "Follower > > > >> > > > Requests/Replication", pages 16:17 in the design document [1]. > > > >> > > > > > > >> > > > 900. It is mentioned that followers fetch auxiliary states > from > > the > > > >> > > > remote storage. > > > >> > > > > > > >> > > > 900.a Does the consistency model of the external storage > impacts > > > >> reads > > > >> > > > of leader epochs and other auxiliary data? > > > >> > > > > > > >> > > > 900.b What are the benefits of using a mechanism to store and > > access > > > >> > > > the leader epochs which is different from other metadata > > associated > > > >> to > > > >> > > > tiered segments? What are the benefits of retrieving this > > > >> information > > > >> > > > on-demand from the follower rather than relying on propagation > > via > > > >> the > > > >> > > > topic __remote_log_metadata? What are the advantages over > using > > a > > > >> > > > dedicated control structure (e.g. a new record type) > propagated > > via > > > >> > > > this topic? Since in the document, different control paths are > > > >> > > > operating in the system, how are the metadata stored in > > > >> > > > __remote_log_metadata [which also include the epoch of the > > leader > > > >> > > > which offloaded a segment] and the remote auxiliary states, > > kept in > > > >> > > > sync? > > > >> > > > > > > >> > > > 900.c A follower can encounter an > > OFFSET_MOVED_TO_TIERED_STORAGE. Is > > > >> > > > this in response to a Fetch or OffsetForLeaderEpoch request? > > > >> > > > > > > >> > > > 900.d What happens if, after a follower encountered an > > > >> > > > OFFSET_MOVED_TO_TIERED_STORAGE response, its attempts to > > retrieve > > > >> > > > leader epochs fail (for instance, because the remote storage > is > > > >> > > > temporarily unavailable)? Does the follower fallbacks to a > mode > > > >> where > > > >> > > > it ignores tiered segments, and applies truncation using only > > > >> locally > > > >> > > > available information? What happens when access to the remote > > > >> storage > > > >> > > > is restored? How is the replica lineage inferred by the remote > > > >> leader > > > >> > > > epochs reconciled with the follower's replica lineage, which > has > > > >> > > > evolved? Does the follower remember fetching auxiliary states > > failed > > > >> > > > in the past and attempt reconciliation? Is there a plan to > offer > > > >> > > > different strategies in this scenario, configurable via > > > >> configuration? > > > >> > > > > > > >> > > > 900.e Is the leader epoch cache offloaded with every segment? > Or > > > >> when > > > >> > > > a new checkpoint is detected? If that information is not > always > > > >> > > > offloaded to avoid duplicating data, how does the remote > storage > > > >> > > > satisfy the request to retrieve it? > > > >> > > > > > > >> > > > 900.f Since the leader epoch cache covers the entire replica > > > >> lineage, > > > >> > > > what happens if, after a leader epoch cache file is offloaded > > with a > > > >> > > > given segment, the local epoch cache is truncated [not > > necessarily > > > >> for > > > >> > > > a range of offset included in tiered segments]? How are remote > > and > > > >> > > > local leader epoch caches kept consistent? > > > >> > > > > > > >> > > > 900.g Consumer can also use leader epochs (e.g. to enable > > fencing to > > > >> > > > protect against stale leaders). What differences would there > be > > > >> > > > between consumer and follower fetches? Especially, would > > consumers > > > >> > > > also fetch leader epoch information from the remote storage? > > > >> > > > > > > >> > > > 900.h Assume a newly elected leader of a topic-partition > detects > > > >> more > > > >> > > > recent segments are available in the external storage, with > > epochs > > > > >> > > > its local epoch. Does it ignore these segments and their > > associated > > > >> > > > epoch-to-offset vectors? Or try to reconstruct its local > replica > > > >> > > > lineage based on the data remotely available? > > > >> > > > > > > >> > > > Thanks, > > > >> > > > Alexandre > > > >> > > > > > > >> > > > [1] > > > >> > > > > > >> > > > > >> > > > https://docs.google.com/document/d/18tnobSas3mKFZFr8oRguZoj_tkD_sGzivuLRlMloEMs/edit?usp=sharing > > > >> > > > > > > >> > > > Le jeu. 4 juin 2020 à 19:55, Satish Duggana < > > > >> satish.dugg...@gmail.com> > > > >> > > a écrit : > > > >> > > > > > > > >> > > > > Hi Jun, > > > >> > > > > Please let us know if you have any comments on > "transactional > > > >> > support" > > > >> > > > > and "follower requests/replication" mentioned in the wiki. > > > >> > > > > > > > >> > > > > Thanks, > > > >> > > > > Satish. > > > >> > > > > > > > >> > > > > On Tue, Jun 2, 2020 at 9:25 PM Satish Duggana < > > > >> > > satish.dugg...@gmail.com> wrote: > > > >> > > > > > > > > >> > > > > > Thanks Jun for your comments. > > > >> > > > > > > > > >> > > > > > >100. It would be useful to provide more details on how > > those > > > >> apis > > > >> > > are used. Otherwise, it's kind of hard to really assess whether > > the > > > >> new > > > >> > > apis are sufficient/redundant. A few examples below. > > > >> > > > > > > > > >> > > > > > We will update the wiki and let you know. > > > >> > > > > > > > > >> > > > > > >100.1 deleteRecords seems to only advance the > > logStartOffset in > > > >> > > Log. How does that trigger the deletion of remote log segments? > > > >> > > > > > > > > >> > > > > > RLMTask for leader partition periodically checks whether > > there > > > >> are > > > >> > > > > > remote log segments earlier to logStartOffset and the > > respective > > > >> > > > > > remote log segment metadata and data are deleted by using > > RLMM > > > >> and > > > >> > > > > > RSM. > > > >> > > > > > > > > >> > > > > > >100.2 stopReplica with deletion is used in 2 cases (a) > > replica > > > >> > > reassignment; (b) topic deletion. We only want to delete the > > tiered > > > >> > > metadata in the second case. Also, in the second case, who > > initiates > > > >> the > > > >> > > deletion of the remote segment since the leader may not exist? > > > >> > > > > > > > > >> > > > > > Right, it is deleted only incase of topic deletion only. > We > > will > > > >> > > cover > > > >> > > > > > the details in the KIP. > > > >> > > > > > > > > >> > > > > > >100.3 "LogStartOffset of a topic can be either in local > or > > in > > > >> > > remote storage." If LogStartOffset exists in both places, which > > one is > > > >> > the > > > >> > > source of truth? > > > >> > > > > > > > > >> > > > > > I meant the logStartOffset can point to either of local > > segment > > > >> or > > > >> > > > > > remote segment but it is initialised and maintained in the > > Log > > > >> > class > > > >> > > > > > like now. > > > >> > > > > > > > > >> > > > > > >100.4 List<RemoteLogSegmentMetadata> > > > >> > > listRemoteLogSegments(TopicPartition topicPartition, long > > minOffset): > > > >> How > > > >> > > is minOffset supposed to be used? > > > >> > > > > > > > > >> > > > > > Returns list of remote segments, sorted by baseOffset in > > > >> ascending > > > >> > > > > > order that have baseOffset >= the given min Offset. > > > >> > > > > > > > > >> > > > > > >100.5 When copying a segment to remote storage, it seems > > we are > > > >> > > calling the same RLMM.putRemoteLogSegmentData() twice before and > > after > > > >> > > copyLogSegment(). Could you explain why? > > > >> > > > > > > > > >> > > > > > This is more about prepare/commit/rollback as you > > suggested. We > > > >> > will > > > >> > > > > > update the wiki with the new APIs. > > > >> > > > > > > > > >> > > > > > >100.6 LogSegmentData includes leaderEpochCache, but there > > is no > > > >> > api > > > >> > > in RemoteStorageManager to retrieve it. > > > >> > > > > > > > > >> > > > > > Nice catch, copy/paste issue. There is an API to retrieve > > it. > > > >> > > > > > > > > >> > > > > > >101. If the __remote_log_metadata is for production > usage, > > > >> could > > > >> > > you provide more details? For example, what is the schema of the > > data > > > >> > (both > > > >> > > key and value)? How is the topic maintained,delete or compact? > > > >> > > > > > > > > >> > > > > > It is with delete config and it’s retention period is > > suggested > > > >> to > > > >> > be > > > >> > > > > > more than the remote retention period. > > > >> > > > > > > > > >> > > > > > >110. Is the cache implementation in > > RemoteLogMetadataManager > > > >> meant > > > >> > > for production usage? If so, could you provide more details on > the > > > >> schema > > > >> > > and how/where the data is stored? > > > >> > > > > > > > > >> > > > > > The proposal is to have a cache (with default > implementation > > > >> backed > > > >> > > by > > > >> > > > > > rocksdb) but it will be added in later versions. We will > add > > > >> this > > > >> > to > > > >> > > > > > future work items. > > > >> > > > > > > > > >> > > > > > >111. "Committed offsets can be stored in a local file". > > Could > > > >> you > > > >> > > describe the format of the file and where it's stored? > > > >> > > > > > > > > >> > > > > > We will cover this in the KIP. > > > >> > > > > > > > > >> > > > > > >112. Truncation of remote segments under unclean leader > > > >> election: > > > >> > I > > > >> > > am not sure who figures out the truncated remote segments and > how > > that > > > >> > > information is propagated to all replicas? > > > >> > > > > > > > > >> > > > > > We will add this in detail in the KIP. > > > >> > > > > > > > > >> > > > > > >113. "If there are any failures in removing remote log > > segments > > > >> > > then those are stored in a specific topic (default as > > > >> > > __remote_segments_to_be_deleted)". Is it necessary to add yet > > another > > > >> > > internal topic? Could we just keep retrying? > > > >> > > > > > > > > >> > > > > > This is not really an internal topic, it will be exposed > as > > a > > > >> user > > > >> > > > > > configurable topic. After a few retries, we want user to > > know > > > >> about > > > >> > > > > > the failure so that they can take an action later by > > consuming > > > >> from > > > >> > > > > > this topic. We want to keep this simple instead of > retrying > > > >> > > > > > continuously and maintaining the deletion state etc. > > > >> > > > > > > > > >> > > > > > >114. "We may not need to copy producer-id-snapshot as we > > are > > > >> > > copying only segments earlier to last-stable-offset." Hmm, not > > sure > > > >> about > > > >> > > that. The producer snapshot includes things like the last > > timestamp of > > > >> > each > > > >> > > open producer id and can affect when those producer ids are > > expired. > > > >> > > > > > > > > >> > > > > > Sure, this will be added as part of the LogSegmentData. > > > >> > > > > > > > > >> > > > > > Thanks, > > > >> > > > > > Satish. > > > >> > > > > > > > > >> > > > > > > > > >> > > > > > On Fri, May 29, 2020 at 6:39 AM Jun Rao <j...@confluent.io > > > > > >> wrote: > > > >> > > > > > > > > > >> > > > > > > Hi, Satish, > > > >> > > > > > > > > > >> > > > > > > Made another pass on the wiki. A few more comments > below. > > > >> > > > > > > > > > >> > > > > > > 100. It would be useful to provide more details on how > > those > > > >> apis > > > >> > > are used. Otherwise, it's kind of hard to really assess whether > > the > > > >> new > > > >> > > apis are sufficient/redundant. A few examples below. > > > >> > > > > > > 100.1 deleteRecords seems to only advance the > > logStartOffset > > > >> in > > > >> > > Log. How does that trigger the deletion of remote log segments? > > > >> > > > > > > 100.2 stopReplica with deletion is used in 2 cases (a) > > replica > > > >> > > reassignment; (b) topic deletion. We only want to delete the > > tiered > > > >> > > metadata in the second case. Also, in the second case, who > > initiates > > > >> the > > > >> > > deletion of the remote segment since the leader may not exist? > > > >> > > > > > > 100.3 "LogStartOffset of a topic can be either in local > > or in > > > >> > > remote storage." If LogStartOffset exists in both places, which > > one is > > > >> > the > > > >> > > source of truth? > > > >> > > > > > > 100.4 List<RemoteLogSegmentMetadata> > > > >> > > listRemoteLogSegments(TopicPartition topicPartition, long > > minOffset): > > > >> How > > > >> > > is minOffset supposed to be used? > > > >> > > > > > > 100.5 When copying a segment to remote storage, it seems > > we > > > >> are > > > >> > > calling the same RLMM.putRemoteLogSegmentData() twice before and > > after > > > >> > > copyLogSegment(). Could you explain why? > > > >> > > > > > > 100.6 LogSegmentData includes leaderEpochCache, but > there > > is > > > >> no > > > >> > > api in RemoteStorageManager to retrieve it. > > > >> > > > > > > > > > >> > > > > > > 101. If the __remote_log_metadata is for production > usage, > > > >> could > > > >> > > you provide more details? For example, what is the schema of the > > data > > > >> > (both > > > >> > > key and value)? How is the topic maintained,delete or compact? > > > >> > > > > > > > > > >> > > > > > > 110. Is the cache implementation in > > RemoteLogMetadataManager > > > >> > meant > > > >> > > for production usage? If so, could you provide more details on > the > > > >> schema > > > >> > > and how/where the data is stored? > > > >> > > > > > > > > > >> > > > > > > 111. "Committed offsets can be stored in a local file". > > Could > > > >> you > > > >> > > describe the format of the file and where it's stored? > > > >> > > > > > > > > > >> > > > > > > 112. Truncation of remote segments under unclean leader > > > >> election: > > > >> > > I am not sure who figures out the truncated remote segments and > > how > > > >> that > > > >> > > information is propagated to all replicas? > > > >> > > > > > > > > > >> > > > > > > 113. "If there are any failures in removing remote log > > > >> segments > > > >> > > then those are stored in a specific topic (default as > > > >> > > __remote_segments_to_be_deleted)". Is it necessary to add yet > > another > > > >> > > internal topic? Could we just keep retrying? > > > >> > > > > > > > > > >> > > > > > > 114. "We may not need to copy producer-id-snapshot as we > > are > > > >> > > copying only segments earlier to last-stable-offset." Hmm, not > > sure > > > >> about > > > >> > > that. The producer snapshot includes things like the last > > timestamp of > > > >> > each > > > >> > > open producer id and can affect when those producer ids are > > expired. > > > >> > > > > > > > > > >> > > > > > > Thanks, > > > >> > > > > > > > > > >> > > > > > > Jun > > > >> > > > > > > > > > >> > > > > > > On Thu, May 28, 2020 at 5:38 AM Satish Duggana < > > > >> > > satish.dugg...@gmail.com> wrote: > > > >> > > > > > >> > > > >> > > > > > >> Hi Jun, > > > >> > > > > > >> Gentle reminder. Please go through the updated wiki and > > let > > > >> us > > > >> > > know your comments. > > > >> > > > > > >> > > > >> > > > > > >> Thanks, > > > >> > > > > > >> Satish. > > > >> > > > > > >> > > > >> > > > > > >> On Tue, May 19, 2020 at 3:50 PM Satish Duggana < > > > >> > > satish.dugg...@gmail.com> wrote: > > > >> > > > > > >>> > > > >> > > > > > >>> Hi Jun, > > > >> > > > > > >>> Please go through the wiki which has the latest > updates. > > > >> Google > > > >> > > doc is updated frequently to be in sync with wiki. > > > >> > > > > > >>> > > > >> > > > > > >>> Thanks, > > > >> > > > > > >>> Satish. > > > >> > > > > > >>> > > > >> > > > > > >>> On Tue, May 19, 2020 at 12:30 AM Jun Rao < > > j...@confluent.io> > > > >> > > wrote: > > > >> > > > > > >>>> > > > >> > > > > > >>>> Hi, Satish, > > > >> > > > > > >>>> > > > >> > > > > > >>>> Thanks for the update. Just to clarify. Which doc has > > the > > > >> > > latest updates, the wiki or the google doc? > > > >> > > > > > >>>> > > > >> > > > > > >>>> Jun > > > >> > > > > > >>>> > > > >> > > > > > >>>> On Thu, May 14, 2020 at 10:38 AM Satish Duggana < > > > >> > > satish.dugg...@gmail.com> wrote: > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> Hi Jun, > > > >> > > > > > >>>>> Thanks for your comments. We updated the KIP with > > more > > > >> > > details. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> >100. For each of the operations related to tiering, > > it > > > >> would > > > >> > > be useful to provide a description on how it works with the new > > API. > > > >> > These > > > >> > > include things like consumer fetch, replica fetch, > > offsetForTimestamp, > > > >> > > retention (remote and local) by size, time and logStartOffset, > > topic > > > >> > > deletion, etc. This will tell us if the proposed APIs are > > sufficient. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> We addressed most of these APIs in the KIP. We can > add > > > >> more > > > >> > > details if needed. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> >101. For the default implementation based on > internal > > > >> topic, > > > >> > > is it meant as a proof of concept or for production usage? I > > assume > > > >> that > > > >> > > it's the former. However, if it's the latter, then the KIP needs > > to > > > >> > > describe the design in more detail. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> It is production usage as was mentioned in an > earlier > > > >> mail. > > > >> > We > > > >> > > plan to update this section in the next few days. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> >102. When tiering a segment, the segment is first > > > >> written to > > > >> > > the object store and then its metadata is written to RLMM using > > the > > > >> api > > > >> > > "void putRemoteLogSegmentData()". One potential issue with this > > > >> approach > > > >> > is > > > >> > > that if the system fails after the first operation, it leaves a > > > >> garbage > > > >> > in > > > >> > > the object store that's never reclaimed. One way to improve this > > is to > > > >> > have > > > >> > > two separate APIs, sth like preparePutRemoteLogSegmentData() and > > > >> > > commitPutRemoteLogSegmentData(). > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> That is a good point. We currently have a different > > way > > > >> using > > > >> > > markers in the segment but your suggestion is much better. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> >103. It seems that the transactional support and > the > > > >> ability > > > >> > > to read from follower are missing. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> KIP is updated with transactional support, follower > > fetch > > > >> > > semantics, and reading from a follower. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> >104. It would be useful to provide a testing plan > for > > > >> this > > > >> > > KIP. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> We added a few tests by introducing test util for > > tiered > > > >> > > storage in the PR. We will provide the testing plan in the next > > few > > > >> days. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> Thanks, > > > >> > > > > > >>>>> Satish. > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> > > > >> > > > > > >>>>> On Wed, Feb 26, 2020 at 9:43 PM Harsha Chintalapani > < > > > >> > > ka...@harsha.io> wrote: > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> On Tue, Feb 25, 2020 at 12:46 PM, Jun Rao < > > > >> j...@confluent.io > > > >> > > > > > >> > > wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi, Satish, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks for the updated doc. The new API seems to > be > > an > > > >> > > improvement overall. A few more comments below. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 100. For each of the operations related to > tiering, > > it > > > >> > would > > > >> > > be useful to provide a description on how it works with the new > > API. > > > >> > These > > > >> > > include things like consumer fetch, replica fetch, > > offsetForTimestamp, > > > >> > > retention > > > >> > > > > > >>>>>>> (remote and local) by size, time and > logStartOffset, > > > >> topic > > > >> > > deletion, etc. This will tell us if the proposed APIs are > > sufficient. > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> Thanks for the feedback Jun. We will add more > details > > > >> around > > > >> > > this. > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 101. For the default implementation based on > > internal > > > >> > topic, > > > >> > > is it meant as a proof of concept or for production usage? I > > assume > > > >> that > > > >> > > it's the former. However, if it's the latter, then the KIP needs > > to > > > >> > > describe the design in more detail. > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> Yes it meant to be for production use. Ideally it > > would > > > >> be > > > >> > > good to merge this in as the default implementation for metadata > > > >> service. > > > >> > > We can add more details around design and testing. > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>>> 102. When tiering a segment, the segment is first > > > >> written > > > >> > to > > > >> > > the object store and then its metadata is written to RLMM using > > the > > > >> api > > > >> > > "void putRemoteLogSegmentData()". > > > >> > > > > > >>>>>>> One potential issue with this approach is that if > > the > > > >> > system > > > >> > > fails after the first operation, it leaves a garbage in the > object > > > >> store > > > >> > > that's never reclaimed. One way to improve this is to have two > > > >> separate > > > >> > > APIs, sth like preparePutRemoteLogSegmentData() and > > > >> > > commitPutRemoteLogSegmentData(). > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 103. It seems that the transactional support and > the > > > >> > ability > > > >> > > to read from follower are missing. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 104. It would be useful to provide a testing plan > > for > > > >> this > > > >> > > KIP. > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> We are working on adding more details around > > > >> transactional > > > >> > > support and coming up with test plan. > > > >> > > > > > >>>>>> Add system tests and integration tests. > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>>> Thanks, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Jun > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Mon, Feb 24, 2020 at 8:10 AM Satish Duggana < > > > >> > > satish.dugg...@gmail.com> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi Jun, > > > >> > > > > > >>>>>>> Please look at the earlier reply and let us know > > your > > > >> > > comments. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks, > > > >> > > > > > >>>>>>> Satish. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Wed, Feb 12, 2020 at 4:06 PM Satish Duggana < > > > >> > > satish.dugg...@gmail.com> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi Jun, > > > >> > > > > > >>>>>>> Thanks for your comments on the separation of > > remote log > > > >> > > metadata storage and remote log storage. > > > >> > > > > > >>>>>>> We had a few discussions since early Jan on how to > > > >> support > > > >> > > eventually consistent stores like S3 by uncoupling remote log > > segment > > > >> > > metadata and remote log storage. It is written with details in > > the doc > > > >> > > here(1). Below is the brief summary of the discussion from that > > doc. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> The current approach consists of pulling the > remote > > log > > > >> > > segment metadata from remote log storage APIs. It worked fine > for > > > >> > storages > > > >> > > like HDFS. But one of the problems of relying on the remote > > storage to > > > >> > > maintain metadata is that tiered-storage needs to be strongly > > > >> consistent, > > > >> > > with an impact not only on the metadata(e.g. LIST in S3) but > also > > on > > > >> the > > > >> > > segment data(e.g. GET after a DELETE in S3). The cost of > > maintaining > > > >> > > metadata in remote storage needs to be factored in. This is true > > in > > > >> the > > > >> > > case of S3, LIST APIs incur huge costs as you raised earlier. > > > >> > > > > > >>>>>>> So, it is good to separate the remote storage from > > the > > > >> > > remote log metadata store. We refactored the existing > > > >> > RemoteStorageManager > > > >> > > and introduced RemoteLogMetadataManager. Remote log metadata > store > > > >> should > > > >> > > give strong consistency semantics but remote log storage can be > > > >> > eventually > > > >> > > consistent. > > > >> > > > > > >>>>>>> We can have a default implementation for > > > >> > > RemoteLogMetadataManager which uses an internal topic(as > > mentioned in > > > >> one > > > >> > > of our earlier emails) as storage. But users can always plugin > > their > > > >> own > > > >> > > RemoteLogMetadataManager implementation based on their > > environment. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Please go through the updated KIP and let us know > > your > > > >> > > comments. We have started refactoring for the changes mentioned > > in the > > > >> > KIP > > > >> > > and there may be a few more updates to the APIs. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> [1] > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> > > > >> > > > > > >> > > > > >> > > > https://docs.google.com/document/d/1qfkBCWL1e7ZWkHU7brxKDBebq4ie9yK20XJnKbgAlew/edit?ts=5e208ec7# > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Fri, Dec 27, 2019 at 5:43 PM Ivan Yurchenko < > > > >> > > ivan0yurche...@gmail.com> > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi all, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Jun: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> (a) Cost: S3 list object requests cost $0.005 per > > 1000 > > > >> > > requests. If > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> you > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> have 100,000 partitions and want to pull the > > metadata > > > >> for > > > >> > > each > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> partition > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> at > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the rate of 1/sec. It can cost $0.5/sec, which is > > > >> roughly > > > >> > > $40K per > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> day. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> I want to note here, that no reasonably durable > > storage > > > >> > will > > > >> > > be cheap at 100k RPS. For example, DynamoDB might give the same > > > >> ballpark > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> figures. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> If we want to keep the pull-based approach, we can > > try > > > >> to > > > >> > > reduce this > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> number > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> in several ways: doing listings less frequently > (as > > > >> Satish > > > >> > > mentioned, with the current defaults it's ~3.33k RPS for your > > > >> example), > > > >> > > batching listing operations in some way (depending on the > > storage; it > > > >> > might > > > >> > > require the change of RSM's interface). > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> There are different ways for doing push based > > metadata > > > >> > > propagation. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Some > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> object stores may support that already. For > > example, S3 > > > >> > > supports > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> events > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> notification > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> This sounds interesting. However, I see a couple > of > > > >> issues > > > >> > > using it: > > > >> > > > > > >>>>>>> 1. As I understand the documentation, notification > > > >> delivery > > > >> > > is not guaranteed > > > >> > > > > > >>>>>>> and it's recommended to periodically do LIST to > > fill the > > > >> > > gaps. Which brings us back to the same LIST consistency > guarantees > > > >> issue. > > > >> > > > > > >>>>>>> 2. The same goes for the broker start: to get the > > > >> current > > > >> > > state, we > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> need > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> to LIST. > > > >> > > > > > >>>>>>> 3. The dynamic set of multiple consumers (RSMs): > > AFAIK > > > >> SQS > > > >> > > and SNS > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> aren't > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> designed for such a case. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Alexandre: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> A.1 As commented on PR 7561, S3 consistency model > > [1][2] > > > >> > > implies RSM > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> cannot > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> relies solely on S3 APIs to guarantee the expected > > > >> strong > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> consistency. The > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> proposed implementation [3] would need to be > > updated to > > > >> > take > > > >> > > this > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> into > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> account. Let’s talk more about this. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thank you for the feedback. I clearly see the need > > for > > > >> > > changing the S3 implementation > > > >> > > > > > >>>>>>> to provide stronger consistency guarantees. As it > > see > > > >> from > > > >> > > this thread, there are > > > >> > > > > > >>>>>>> several possible approaches to this. Let's discuss > > > >> > > RemoteLogManager's contract and > > > >> > > > > > >>>>>>> behavior (like pull vs push model) further before > > > >> picking > > > >> > > one (or > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> several - > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> ?) of them. > > > >> > > > > > >>>>>>> I'm going to do some evaluation of DynamoDB for > the > > > >> > > pull-based > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> approach, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> if it's possible to apply it paying a reasonable > > bill. > > > >> > Also, > > > >> > > of the push-based approach > > > >> > > > > > >>>>>>> with a Kafka topic as the medium. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> A.2.3 Atomicity – what does an implementation of > RSM > > > >> need > > > >> > to > > > >> > > provide > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> with > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> respect to atomicity of the APIs copyLogSegment, > > > >> > > cleanupLogUntil and deleteTopicPartition? If a partial failure > > > >> happens in > > > >> > > any of those > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> (e.g. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> in > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the S3 implementation, if one of the multiple > > uploads > > > >> fails > > > >> > > [4]), > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> The S3 implementation is going to change, but it's > > worth > > > >> > > clarifying > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> anyway. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> The segment log file is being uploaded after S3 > has > > > >> acked > > > >> > > uploading of all other files associated with the segment and > only > > > >> after > > > >> > > this the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> whole > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> segment file set becomes visible remotely for > > operations > > > >> > > like listRemoteSegments [1]. > > > >> > > > > > >>>>>>> In case of upload failure, the files that has been > > > >> > > successfully > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> uploaded > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> stays > > > >> > > > > > >>>>>>> as invisible garbage that is collected by > > > >> cleanupLogUntil > > > >> > (or > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> overwritten > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> successfully later). > > > >> > > > > > >>>>>>> And the opposite happens during the deletion: log > > files > > > >> are > > > >> > > deleted > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> first. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> This approach should generally work when we solve > > > >> > > consistency issues by adding a strongly consistent storage: a > > > >> segment's > > > >> > > uploaded files > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> remain > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> invisible garbage until some metadata about them > is > > > >> > written. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> A.3 Caching – storing locally the segments > retrieved > > > >> from > > > >> > > the remote storage is excluded as it does not align with the > > original > > > >> > intent > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> and even > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> defeat some of its purposes (save disk space > etc.). > > That > > > >> > > said, could > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> there > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> be other types of use cases where the pattern of > > access > > > >> to > > > >> > > the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> remotely > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> stored segments would benefit from local caching > > (and > > > >> > > potentially read-ahead)? Consider the use case of a large pool > of > > > >> > consumers > > > >> > > which > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> start > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> a backfill at the same time for one day worth of > > data > > > >> from > > > >> > > one year > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> ago > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> stored remotely. Caching the segments locally > would > > > >> allow > > > >> > to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> uncouple the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> load on the remote storage from the load on the > > Kafka > > > >> > > cluster. Maybe > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> RLM could expose a configuration parameter to > switch > > > >> that > > > >> > > feature > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> on/off? > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> I tend to agree here, caching remote segments > > locally > > > >> and > > > >> > > making this configurable sounds pretty practical to me. We > should > > > >> > implement > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> this, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> maybe not in the first iteration. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Br, > > > >> > > > > > >>>>>>> Ivan > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> [1] > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> > > > >> > > > > > >> > > > > >> > > > https://github.com/harshach/kafka/pull/18/files#diff-4d73d01c16caed6f2548fc3063550ef0R152 > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Thu, 19 Dec 2019 at 19:49, Alexandre Dupriez < > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> alexandre.dupr...@gmail.com> > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi Jun, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thank you for the feedback. I am trying to > > understand > > > >> how a > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> push-based > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> approach would work. > > > >> > > > > > >>>>>>> In order for the metadata to be propagated (under > > the > > > >> > > assumption you stated), would you plan to add a new API in Kafka > > to > > > >> allow > > > >> > > the metadata store to send them directly to the brokers? > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks, > > > >> > > > > > >>>>>>> Alexandre > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Le mer. 18 déc. 2019 à 20:14, Jun Rao < > > j...@confluent.io> > > > >> a > > > >> > > écrit : > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi, Satish and Ivan, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> There are different ways for doing push based > > metadata > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> propagation. Some > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> object stores may support that already. For > > example, S3 > > > >> > > supports > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> events > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> notification ( > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> > > > >> > > > > > >> > > https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html). > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Otherwise one could use a separate metadata store > > that > > > >> > > supports > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> push-based > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> change propagation. Other people have mentioned > > using a > > > >> > Kafka > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> topic. The > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> best approach may depend on the object store and > the > > > >> > > operational environment (e.g. whether an external metadata store > > is > > > >> > already > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> available). > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> The above discussion is based on the assumption > > that we > > > >> > need > > > >> > > to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> cache the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> object metadata locally in every broker. I > mentioned > > > >> > earlier > > > >> > > that > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> an > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> alternative is to just store/retrieve those > > metadata in > > > >> an > > > >> > > external metadata store. That may simplify the implementation in > > some > > > >> > cases. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Jun > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Thu, Dec 5, 2019 at 7:01 AM Satish Duggana < > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> satish.dugg...@gmail.com> > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi Jun, > > > >> > > > > > >>>>>>> Thanks for your reply. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Currently, `listRemoteSegments` is called at the > > > >> configured > > > >> > > interval(not every second, defaults to 30secs). Storing remote > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> log > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> metadata in a strongly consistent store for S3 RSM > > is > > > >> > raised > > > >> > > in PR-comment[1]. > > > >> > > > > > >>>>>>> RLM invokes RSM at regular intervals and RSM can > > give > > > >> > remote > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> segment > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> metadata if it is available. RSM is responsible > for > > > >> > > maintaining > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> and > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> fetching those entries. It should be based on > > whatever > > > >> > > mechanism > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> is > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> consistent and efficient with the respective > remote > > > >> > storage. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Can you give more details about push based > mechanism > > > >> from > > > >> > > RSM? > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 1. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> > > > >> > > https://github.com/apache/kafka/pull/7561#discussion_r344576223 > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks, > > > >> > > > > > >>>>>>> Satish. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Thu, Dec 5, 2019 at 4:23 AM Jun Rao < > > > >> j...@confluent.io> > > > >> > > wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi, Harsha, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks for the reply. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 40/41. I am curious which block storages you have > > > >> tested. > > > >> > S3 > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> seems > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> to be > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> one of the popular block stores. The concerns > that I > > > >> have > > > >> > > with > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> pull > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> based > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> approach are the following. > > > >> > > > > > >>>>>>> (a) Cost: S3 list object requests cost $0.005 per > > 1000 > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> requests. If > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> you > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> have 100,000 partitions and want to pull the > > metadata > > > >> for > > > >> > > each > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> partition > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> at > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the rate of 1/sec. It can cost $0.5/sec, which is > > > >> roughly > > > >> > > $40K > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> per > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> day. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> (b) Semantics: S3 list objects are eventually > > > >> consistent. > > > >> > So, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> when > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> you > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> do a > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> list object request, there is no guarantee that > you > > can > > > >> see > > > >> > > all > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> uploaded > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> objects. This could impact the correctness of > > subsequent > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> logics. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> (c) Efficiency: Blindly pulling metadata when > there > > is > > > >> no > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> change adds > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> unnecessary overhead in the broker as well as in > the > > > >> block > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> store. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> So, have you guys tested S3? If so, could you > share > > your > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> experience > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> in > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> terms of cost, semantics and efficiency? > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Jun > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Tue, Dec 3, 2019 at 10:11 PM Harsha > Chintalapani > > < > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> ka...@harsha.io > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi Jun, > > > >> > > > > > >>>>>>> Thanks for the reply. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Tue, Nov 26, 2019 at 3:46 PM, Jun Rao < > > > >> j...@confluent.io > > > >> > > > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi, Satish and Ying, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks for the reply. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 40/41. There are two different ways that we can > > approach > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> this. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> One is > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> what > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> you said. We can have an opinionated way of > storing > > and > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> populating > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> tier > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> metadata that we think is good enough for > everyone. > > I am > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> not > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> sure if > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> this > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> is the case based on what's currently proposed in > > the > > > >> KIP. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> For > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> example, I > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> am not sure that (1) everyone always needs local > > > >> metadata; > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> (2) > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> current > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> local storage format is general enough and (3) > > everyone > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wants to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> use > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> pull based approach to propagate the metadata. > > Another > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> approach > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> is to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> make > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> this pluggable and let the implementor implements > > the > > > >> best > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> approach > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> for a > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> particular block storage. I haven't seen any > > comments > > > >> from > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Slack/AirBnb > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> in > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the mailing list on this topic. It would be great > if > > > >> they > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> can > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> provide > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> feedback directly here. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> The current interfaces are designed with most > > popular > > > >> block > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storages > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> available today and we did 2 implementations with > > these > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> interfaces and > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> they both are yielding good results as we going > > through > > > >> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> testing of > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> it. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> If there is ever a need for pull based approach we > > can > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> definitely > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> evolve > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the interface. > > > >> > > > > > >>>>>>> In the past we did mark interfaces to be evolving > to > > > >> make > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> room for > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> unknowns > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> in the future. > > > >> > > > > > >>>>>>> If you have any suggestions around the current > > > >> interfaces > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> please > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> propose we > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> are happy to see if we can work them into it. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 43. To offer tier storage as a general feature, > > ideally > > > >> all > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> existing > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> capabilities should still be supported. It's fine > > if the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> uber > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> implementation doesn't support all capabilities > for > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> internal > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> usage. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> However, the framework should be general enough. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> We agree on that as a principle. But all of these > > major > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> features > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> mostly > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> coming right now and to have a new big feature > such > > as > > > >> > tiered > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storage > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> support all the new features will be a big ask. We > > can > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> document on > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> how > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> do > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> we approach solving these in future iterations. > > > >> > > > > > >>>>>>> Our goal is to make this tiered storage feature > > work for > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> everyone. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 43.3 This is more than just serving the tier-ed > data > > > >> from > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> block > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storage. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> With KIP-392, the consumer now can resolve the > > conflicts > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> with the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> replica > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> based on leader epoch. So, we need to make sure > that > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> leader epoch > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> can be > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> recovered properly from tier storage. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> We are working on testing our approach and we will > > > >> update > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the KIP > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> with > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> design details. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 43.4 For JBOD, if tier storage stores the tier > > metadata > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> locally, we > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> need to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> support moving such metadata across disk > directories > > > >> since > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> JBOD > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> supports > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> moving data across disks. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> KIP is updated with JBOD details. Having said that > > JBOD > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> tooling > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> needs > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> evolve to support production loads. Most of the > > users > > > >> will > > > >> > be > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> interested in > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> using tiered storage without JBOD support support > on > > > >> day 1. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks, > > > >> > > > > > >>>>>>> Harsha > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> As for meeting, we could have a KIP e-meeting on > > this if > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> needed, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> but it > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> will be open to everyone and will be recorded and > > > >> shared. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Often, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> details are still resolved through the mailing > list. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Jun > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Tue, Nov 19, 2019 at 6:48 PM Ying Zheng > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> <yi...@uber.com.invalid> > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Please ignore my previous email > > > >> > > > > > >>>>>>> I didn't know Apache requires all the discussions > > to be > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> "open" > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Tue, Nov 19, 2019, 5:40 PM Ying Zheng < > > > >> yi...@uber.com> > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi Jun, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thank you very much for your feedback! > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Can we schedule a meeting in your Palo Alto office > > in > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> December? I > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> think a > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> face to face discussion is much more efficient > than > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> emails. Both > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Harsha > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> and > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> I can visit you. Satish may be able to join us > > remotely. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Fri, Nov 15, 2019 at 11:04 AM Jun Rao < > > > >> j...@confluent.io > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Hi, Satish and Harsha, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> The following is a more detailed high level > > feedback for > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the KIP. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Overall, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the KIP seems useful. The challenge is how to > > design it > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> such that > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> it’s > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> general enough to support different ways of > > implementing > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> this > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> feature > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> and > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> support existing features. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 40. Local segment metadata storage: The KIP makes > > the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> assumption > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> that > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> metadata for the archived log segments are cached > > > >> locally > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> in > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> every > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> broker > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> and provides a specific implementation for the > local > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storage in > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> framework. We probably should discuss this more. > For > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> example, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> some > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> tier > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storage providers may not want to cache the > metadata > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> locally and > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> just > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> rely > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> upon a remote key/value store if such a store is > > already > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> present. If > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> a > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> local store is used, there could be different ways > > of > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> implementing it > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> (e.g., based on customized local files, an > embedded > > > >> local > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> store > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> like > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> RocksDB, etc). An alternative of designing this is > > to > > > >> just > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> provide an > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> interface for retrieving the tier segment metadata > > and > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> leave the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> details > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> of > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> how to get the metadata outside of the framework. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 41. RemoteStorageManager interface and the usage > of > > the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> interface in > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> framework: I am not sure if the interface is > general > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> enough. For > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> example, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> it seems that RemoteLogIndexEntry is tied to a > > specific > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> way of > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storing > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> metadata in remote storage. The framework uses > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> listRemoteSegments() > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> api > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> in > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> a pull based approach. However, in some other > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> implementations, a > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> push > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> based > > > >> > > > > > >>>>>>> approach may be more preferred. I don’t have a > > concrete > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> proposal > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> yet. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> But, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> it would be useful to give this area some more > > thoughts > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> and see > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> if we > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> can > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> make the interface more general. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 42. In the diagram, the RemoteLogManager is side > by > > side > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> with > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> LogManager. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> This KIP only discussed how the fetch request is > > handled > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> between > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> two > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> layer. However, we should also consider how other > > > >> requests > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> that > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> touch > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> log can be handled. e.g., list offsets by > timestamp, > > > >> delete > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> records, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> etc. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Also, in this model, it's not clear which > component > > is > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> responsible > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> for > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> managing the log start offset. It seems that the > log > > > >> start > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> offset > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> could > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> be > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> changed by both RemoteLogManager and LogManager. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 43. There are quite a few existing features not > > covered > > > >> by > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> KIP. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> It > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> would be useful to discuss each of those. > > > >> > > > > > >>>>>>> 43.1 I won’t say that compacted topics are rarely > > used > > > >> and > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> always > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> small. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> For example, KStreams uses compacted topics for > > storing > > > >> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> states > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> and > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> sometimes the size of the topic could be large. > > While it > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> might > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> be ok > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> not > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> support compacted topics initially, it would be > > useful > > > >> to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> have a > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> high > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> level > > > >> > > > > > >>>>>>> idea on how this might be supported down the road > so > > > >> that > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> we > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> don’t > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> have > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> make incompatible API changes in the future. > > > >> > > > > > >>>>>>> 43.2 We need to discuss how EOS is supported. In > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> particular, how > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> is > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> producer state integrated with the remote storage. > > 43.3 > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Now that > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> KIP-392 > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> (allow consumers to fetch from closest replica) is > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> implemented, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> we > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> need > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> discuss how reading from a follower replica is > > supported > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> with > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> tier > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storage. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> 43.4 We need to discuss how JBOD is supported with > > tier > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storage. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Jun > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Fri, Nov 8, 2019 at 12:06 AM Tom Bentley < > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> tbent...@redhat.com > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks for those insights Ying. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> On Thu, Nov 7, 2019 at 9:26 PM Ying Zheng > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> <yi...@uber.com.invalid > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> wrote: > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Thanks, I missed that point. However, there's > still > > a > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> point at > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> which > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> consumer fetches start getting served from remote > > > >> storage > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> (even > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> if > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> that > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> point isn't as soon as the local log retention > > > >> time/size). > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> This > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> represents > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> a kind of performance cliff edge and what I'm > really > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> interested > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> in > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> is > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> how > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> easy it is for a consumer which falls off that > > cliff to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> catch up > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> and so > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> its > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> fetches again come from local storage. Obviously > > this > > > >> can > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> depend > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> on > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> all > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> sorts of factors (like production rate, > consumption > > > >> rate), > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> so > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> it's > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> not > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> guaranteed (just like it's not guaranteed for > Kafka > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> today), but > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> this > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> would > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> represent a new failure mode. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> As I have explained in the last mail, it's a very > > rare > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> case that > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> a > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> consumer > > > >> > > > > > >>>>>>> need to read remote data. With our experience at > > Uber, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> this only > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> happens > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> when the consumer service had an outage for > several > > > >> hours. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> There is not a "performance cliff" as you assume. > > The > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> remote > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storage > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> is > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> even faster than local disks in terms of > bandwidth. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Reading from > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> remote > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storage is going to have higher latency than local > > disk. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> But > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> since > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> consumer > > > >> > > > > > >>>>>>> is catching up several hours data, it's not > > sensitive to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> sub-second > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> level > > > >> > > > > > >>>>>>> latency, and each remote read request will read a > > large > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> amount of > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> data to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> make the overall performance better than reading > > from > > > >> local > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> disks. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Another aspect I'd like to understand better is > the > > > >> effect > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> of > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> serving > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> fetch > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> request from remote storage has on the broker's > > network > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> utilization. If > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> we're just trimming the amount of data held > locally > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> (without > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> increasing > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> overall local+remote retention), then we're > > effectively > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> trading > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> disk > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> bandwidth for network bandwidth when serving fetch > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> requests from > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> remote > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storage (which I understand to be a good thing, > > since > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> brokers are > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> often/usually disk bound). But if we're increasing > > the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> overall > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> local+remote > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> retention then it's more likely that network > itself > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> becomes the > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> bottleneck. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> I appreciate this is all rather hand wavy, I'm > just > > > >> trying > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> understand > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> how this would affect broker performance, so I'd > be > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> grateful for > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> any > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> insights you can offer. > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> Network bandwidth is a function of produce speed, > > it has > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> nothing > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> to > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> do > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> with > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> remote retention. As long as the data is shipped > to > > > >> remote > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> storage, > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> you > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> can > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> keep the data there for 1 day or 1 year or 100 > > years, it > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> doesn't > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> consume > > > >> > > > > > >>>>>>> > > > >> > > > > > >>>>>>> any > > > >> > > > > > >>>>>>> network resources. > > > >> > > > > > >>>>>> > > > >> > > > > > >>>>>> > > > >> > > > > > >> > > > > >> > > > > > > > > > >