We did not want to add many implementation details in the KIP. But we
decided to add them in the KIP as appendix or sub-sections(including
follower fetch protocol) to describe the flow with the main cases.
That will answer most of the queries. I will update on this mail
thread when the respective sections are updated.

Thanks,
Satish.

On Sat, Jun 6, 2020 at 7:49 PM Alexandre Dupriez
<alexandre.dupr...@gmail.com> wrote:
>
> Hi Satish,
>
> A couple of questions specific to the section "Follower
> Requests/Replication", pages 16:17 in the design document [1].
>
> 900. It is mentioned that followers fetch auxiliary states from the
> remote storage.
>
> 900.a Does the consistency model of the external storage impacts reads
> of leader epochs and other auxiliary data?
>
> 900.b What are the benefits of using a mechanism to store and access
> the leader epochs which is different from other metadata associated to
> tiered segments? What are the benefits of retrieving this information
> on-demand from the follower rather than relying on propagation via the
> topic __remote_log_metadata? What are the advantages over using a
> dedicated control structure (e.g. a new record type) propagated via
> this topic? Since in the document, different control paths are
> operating in the system, how are the metadata stored in
> __remote_log_metadata [which also include the epoch of the leader
> which offloaded a segment] and the remote auxiliary states, kept in
> sync?
>
> 900.c A follower can encounter an OFFSET_MOVED_TO_TIERED_STORAGE. Is
> this in response to a Fetch or OffsetForLeaderEpoch request?
>
> 900.d What happens if, after a follower encountered an
> OFFSET_MOVED_TO_TIERED_STORAGE response, its attempts to retrieve
> leader epochs fail (for instance, because the remote storage is
> temporarily unavailable)? Does the follower fallbacks to a mode where
> it ignores tiered segments, and applies truncation using only locally
> available information? What happens when access to the remote storage
> is restored? How is the replica lineage inferred by the remote leader
> epochs reconciled with the follower's replica lineage, which has
> evolved? Does the follower remember fetching auxiliary states failed
> in the past and attempt reconciliation? Is there a plan to offer
> different strategies in this scenario, configurable via configuration?
>
> 900.e Is the leader epoch cache offloaded with every segment? Or when
> a new checkpoint is detected? If that information is not always
> offloaded to avoid duplicating data, how does the remote storage
> satisfy the request to retrieve it?
>
> 900.f Since the leader epoch cache covers the entire replica lineage,
> what happens if, after a leader epoch cache file is offloaded with a
> given segment, the local epoch cache is truncated [not necessarily for
> a range of offset included in tiered segments]? How are remote and
> local leader epoch caches kept consistent?
>
> 900.g Consumer can also use leader epochs (e.g. to enable fencing to
> protect against stale leaders). What differences would there be
> between consumer and follower fetches? Especially, would consumers
> also fetch leader epoch information from the remote storage?
>
> 900.h Assume a newly elected leader of a topic-partition detects more
> recent segments are available in the external storage, with epochs >
> its local epoch. Does it ignore these segments and their associated
> epoch-to-offset vectors? Or try to reconstruct its local replica
> lineage based on the data remotely available?
>
> Thanks,
> Alexandre
>
> [1] 
> https://docs.google.com/document/d/18tnobSas3mKFZFr8oRguZoj_tkD_sGzivuLRlMloEMs/edit?usp=sharing
>
> Le jeu. 4 juin 2020 à 19:55, Satish Duggana <satish.dugg...@gmail.com> a 
> écrit :
> >
> > Hi Jun,
> > Please let us know if you have any comments on "transactional support"
> > and "follower requests/replication" mentioned in the wiki.
> >
> > Thanks,
> > Satish.
> >
> > On Tue, Jun 2, 2020 at 9:25 PM Satish Duggana <satish.dugg...@gmail.com> 
> > wrote:
> > >
> > > Thanks Jun for your comments.
> > >
> > > >100. It would be useful to provide more details on how those apis are 
> > > >used. Otherwise, it's kind of hard to really assess whether the new apis 
> > > >are sufficient/redundant. A few examples below.
> > >
> > > We will update the wiki and let you know.
> > >
> > > >100.1 deleteRecords seems to only advance the logStartOffset in Log. How 
> > > >does that trigger the deletion of remote log segments?
> > >
> > > RLMTask for leader partition periodically checks whether there are
> > > remote log segments earlier to logStartOffset and the respective
> > > remote log segment metadata and data are deleted by using RLMM and
> > > RSM.
> > >
> > > >100.2 stopReplica with deletion is used in 2 cases (a) replica 
> > > >reassignment; (b) topic deletion. We only want to delete the tiered 
> > > >metadata in the second case. Also, in the second case, who initiates the 
> > > >deletion of the remote segment since the leader may not exist?
> > >
> > > Right, it is deleted only incase of topic deletion only. We will cover
> > > the details in the KIP.
> > >
> > > >100.3 "LogStartOffset of a topic can be either in local or in remote 
> > > >storage." If LogStartOffset exists in both places, which one is the 
> > > >source of truth?
> > >
> > > I meant the logStartOffset can point to either of local segment or
> > > remote segment but it is initialised and maintained in the Log class
> > > like now.
> > >
> > > >100.4 List<RemoteLogSegmentMetadata> 
> > > >listRemoteLogSegments(TopicPartition topicPartition, long minOffset): 
> > > >How is minOffset supposed to be used?
> > >
> > > Returns list of remote segments, sorted by baseOffset in ascending
> > > order that have baseOffset >= the given min Offset.
> > >
> > > >100.5 When copying a segment to remote storage, it seems we are calling 
> > > >the same RLMM.putRemoteLogSegmentData() twice before and after 
> > > >copyLogSegment(). Could you explain why?
> > >
> > > This is more about prepare/commit/rollback as you suggested.  We will
> > > update the wiki with the new APIs.
> > >
> > > >100.6 LogSegmentData includes leaderEpochCache, but there is no api in 
> > > >RemoteStorageManager to retrieve it.
> > >
> > > Nice catch, copy/paste issue. There is an API to retrieve it.
> > >
> > > >101. If the __remote_log_metadata is for production usage, could you 
> > > >provide more details? For example, what is the schema of the data (both 
> > > >key and value)? How is the topic maintained,delete or compact?
> > >
> > > It is with delete config and it’s retention period is suggested to be
> > > more than the remote retention period.
> > >
> > > >110. Is the cache implementation in RemoteLogMetadataManager meant for 
> > > >production usage? If so, could you provide more details on the schema 
> > > >and how/where the data is stored?
> > >
> > > The proposal is to have a cache (with default implementation backed by
> > > rocksdb) but it will be added in later versions. We will add this to
> > > future work items.
> > >
> > > >111. "Committed offsets can be stored in a local file". Could you 
> > > >describe the format of the file and where it's stored?
> > >
> > > We will cover this in the KIP.
> > >
> > > >112. Truncation of remote segments under unclean leader election: I am 
> > > >not sure who figures out the truncated remote segments and how that 
> > > >information is propagated to all replicas?
> > >
> > > We will add this in detail in the KIP.
> > >
> > > >113. "If there are any failures in removing remote log segments then 
> > > >those are stored in a specific topic (default as 
> > > >__remote_segments_to_be_deleted)". Is it necessary to add yet another 
> > > >internal topic? Could we just keep retrying?
> > >
> > > This is not really an internal topic, it will be exposed as a user
> > > configurable topic. After a few retries, we want user to know about
> > > the failure so that they can take an action later by consuming from
> > > this topic. We want to keep this simple instead of retrying
> > > continuously and maintaining the deletion state etc.
> > >
> > > >114. "We may not need to copy producer-id-snapshot as we are copying 
> > > >only segments earlier to last-stable-offset." Hmm, not sure about that. 
> > > >The producer snapshot includes things like the last timestamp of each 
> > > >open producer id and can affect when those producer ids are expired.
> > >
> > > Sure, this will be added as part of the LogSegmentData.
> > >
> > > Thanks,
> > > Satish.
> > >
> > >
> > > On Fri, May 29, 2020 at 6:39 AM Jun Rao <j...@confluent.io> wrote:
> > > >
> > > > Hi, Satish,
> > > >
> > > > Made another pass on the wiki. A few more comments below.
> > > >
> > > > 100. It would be useful to provide more details on how those apis are 
> > > > used. Otherwise, it's kind of hard to really assess whether the new 
> > > > apis are sufficient/redundant. A few examples below.
> > > > 100.1 deleteRecords seems to only advance the logStartOffset in Log. 
> > > > How does that trigger the deletion of remote log segments?
> > > > 100.2 stopReplica with deletion is used in 2 cases (a) replica 
> > > > reassignment; (b) topic deletion. We only want to delete the tiered 
> > > > metadata in the second case. Also, in the second case, who initiates 
> > > > the deletion of the remote segment since the leader may not exist?
> > > > 100.3 "LogStartOffset of a topic can be either in local or in remote 
> > > > storage." If LogStartOffset exists in both places, which one is the 
> > > > source of truth?
> > > > 100.4 List<RemoteLogSegmentMetadata> 
> > > > listRemoteLogSegments(TopicPartition topicPartition, long minOffset): 
> > > > How is minOffset supposed to be used?
> > > > 100.5 When copying a segment to remote storage, it seems we are calling 
> > > > the same RLMM.putRemoteLogSegmentData() twice before and after 
> > > > copyLogSegment(). Could you explain why?
> > > > 100.6 LogSegmentData includes leaderEpochCache, but there is no api in 
> > > > RemoteStorageManager to retrieve it.
> > > >
> > > > 101. If the __remote_log_metadata is for production usage, could you 
> > > > provide more details? For example, what is the schema of the data (both 
> > > > key and value)? How is the topic maintained,delete or compact?
> > > >
> > > > 110. Is the cache implementation in RemoteLogMetadataManager meant for 
> > > > production usage? If so, could you provide more details on the schema 
> > > > and how/where the data is stored?
> > > >
> > > > 111. "Committed offsets can be stored in a local file". Could you 
> > > > describe the format of the file and where it's stored?
> > > >
> > > > 112. Truncation of remote segments under unclean leader election: I am 
> > > > not sure who figures out the truncated remote segments and how that 
> > > > information is propagated to all replicas?
> > > >
> > > > 113. "If there are any failures in removing remote log segments then 
> > > > those are stored in a specific topic (default as 
> > > > __remote_segments_to_be_deleted)". Is it necessary to add yet another 
> > > > internal topic? Could we just keep retrying?
> > > >
> > > > 114. "We may not need to copy producer-id-snapshot as we are copying 
> > > > only segments earlier to last-stable-offset." Hmm, not sure about that. 
> > > > The producer snapshot includes things like the last timestamp of each 
> > > > open producer id and can affect when those producer ids are expired.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Thu, May 28, 2020 at 5:38 AM Satish Duggana 
> > > > <satish.dugg...@gmail.com> wrote:
> > > >>
> > > >> Hi Jun,
> > > >> Gentle reminder. Please go through the updated wiki and let us know 
> > > >> your comments.
> > > >>
> > > >> Thanks,
> > > >> Satish.
> > > >>
> > > >> On Tue, May 19, 2020 at 3:50 PM Satish Duggana 
> > > >> <satish.dugg...@gmail.com> wrote:
> > > >>>
> > > >>> Hi Jun,
> > > >>> Please go through the wiki which has the latest updates. Google doc 
> > > >>> is updated frequently to be in sync with wiki.
> > > >>>
> > > >>> Thanks,
> > > >>> Satish.
> > > >>>
> > > >>> On Tue, May 19, 2020 at 12:30 AM Jun Rao <j...@confluent.io> wrote:
> > > >>>>
> > > >>>> Hi, Satish,
> > > >>>>
> > > >>>> Thanks for the update. Just to clarify. Which doc has the latest 
> > > >>>> updates, the wiki or the google doc?
> > > >>>>
> > > >>>> Jun
> > > >>>>
> > > >>>> On Thu, May 14, 2020 at 10:38 AM Satish Duggana 
> > > >>>> <satish.dugg...@gmail.com> wrote:
> > > >>>>>
> > > >>>>> Hi Jun,
> > > >>>>> Thanks for your comments.  We updated the KIP with more details.
> > > >>>>>
> > > >>>>> >100. For each of the operations related to tiering, it would be 
> > > >>>>> >useful to provide a description on how it works with the new API. 
> > > >>>>> >These include things like consumer fetch, replica fetch, 
> > > >>>>> >offsetForTimestamp, retention (remote and local) by size, time and 
> > > >>>>> >logStartOffset, topic deletion, etc. This will tell us if the 
> > > >>>>> >proposed APIs are sufficient.
> > > >>>>>
> > > >>>>> We addressed most of these APIs in the KIP. We can add more details 
> > > >>>>> if needed.
> > > >>>>>
> > > >>>>> >101. For the default implementation based on internal topic, is it 
> > > >>>>> >meant as a proof of concept or for production usage? I assume that 
> > > >>>>> >it's the former. However, if it's the latter, then the KIP needs 
> > > >>>>> >to describe the design in more detail.
> > > >>>>>
> > > >>>>> It is production usage as was mentioned in an earlier mail. We plan 
> > > >>>>> to update this section in the next few days.
> > > >>>>>
> > > >>>>> >102. When tiering a segment, the segment is first written to the 
> > > >>>>> >object store and then its metadata is written to RLMM using the 
> > > >>>>> >api "void putRemoteLogSegmentData()". One potential issue with 
> > > >>>>> >this approach is that if the system fails after the first 
> > > >>>>> >operation, it leaves a garbage in the object store that's never 
> > > >>>>> >reclaimed. One way to improve this is to have two separate APIs, 
> > > >>>>> >sth like preparePutRemoteLogSegmentData() and 
> > > >>>>> >commitPutRemoteLogSegmentData().
> > > >>>>>
> > > >>>>> That is a good point. We currently have a different way using 
> > > >>>>> markers in the segment but your suggestion is much better.
> > > >>>>>
> > > >>>>> >103. It seems that the transactional support and the ability to 
> > > >>>>> >read from follower are missing.
> > > >>>>>
> > > >>>>> KIP is updated with transactional support, follower fetch 
> > > >>>>> semantics, and reading from a follower.
> > > >>>>>
> > > >>>>> >104. It would be useful to provide a testing plan for this KIP.
> > > >>>>>
> > > >>>>> We added a few tests by introducing test util for tiered storage in 
> > > >>>>> the PR. We will provide the testing plan in the next few days.
> > > >>>>>
> > > >>>>> Thanks,
> > > >>>>> Satish.
> > > >>>>>
> > > >>>>>
> > > >>>>> On Wed, Feb 26, 2020 at 9:43 PM Harsha Chintalapani 
> > > >>>>> <ka...@harsha.io> wrote:
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Tue, Feb 25, 2020 at 12:46 PM, Jun Rao <j...@confluent.io> 
> > > >>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>> Hi, Satish,
> > > >>>>>>>
> > > >>>>>>> Thanks for the updated doc. The new API seems to be an 
> > > >>>>>>> improvement overall. A few more comments below.
> > > >>>>>>>
> > > >>>>>>> 100. For each of the operations related to tiering, it would be 
> > > >>>>>>> useful to provide a description on how it works with the new API. 
> > > >>>>>>> These include things like consumer fetch, replica fetch, 
> > > >>>>>>> offsetForTimestamp, retention
> > > >>>>>>> (remote and local) by size, time and logStartOffset, topic 
> > > >>>>>>> deletion, etc. This will tell us if the proposed APIs are 
> > > >>>>>>> sufficient.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> Thanks for the feedback Jun. We will add more details around this.
> > > >>>>>>
> > > >>>>>>>
> > > >>>>>>> 101. For the default implementation based on internal topic, is 
> > > >>>>>>> it meant as a proof of concept or for production usage? I assume 
> > > >>>>>>> that it's the former. However, if it's the latter, then the KIP 
> > > >>>>>>> needs to describe the design in more detail.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> Yes it meant to be for production use.  Ideally it would be good 
> > > >>>>>> to merge this in as the default implementation for metadata 
> > > >>>>>> service. We can add more details around design and testing.
> > > >>>>>>
> > > >>>>>>> 102. When tiering a segment, the segment is first written to the 
> > > >>>>>>> object store and then its metadata is written to RLMM using the 
> > > >>>>>>> api "void putRemoteLogSegmentData()".
> > > >>>>>>> One potential issue with this approach is that if the system 
> > > >>>>>>> fails after the first operation, it leaves a garbage in the 
> > > >>>>>>> object store that's never reclaimed. One way to improve this is 
> > > >>>>>>> to have two separate APIs, sth like 
> > > >>>>>>> preparePutRemoteLogSegmentData() and 
> > > >>>>>>> commitPutRemoteLogSegmentData().
> > > >>>>>>>
> > > >>>>>>> 103. It seems that the transactional support and the ability to 
> > > >>>>>>> read from follower are missing.
> > > >>>>>>>
> > > >>>>>>> 104. It would be useful to provide a testing plan for this KIP.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> We are working on adding more details around transactional support 
> > > >>>>>> and coming up with test plan.
> > > >>>>>> Add system tests and integration tests.
> > > >>>>>>
> > > >>>>>>> Thanks,
> > > >>>>>>>
> > > >>>>>>> Jun
> > > >>>>>>>
> > > >>>>>>> On Mon, Feb 24, 2020 at 8:10 AM Satish Duggana 
> > > >>>>>>> <satish.dugg...@gmail.com> wrote:
> > > >>>>>>>
> > > >>>>>>> Hi Jun,
> > > >>>>>>> Please look at the earlier reply and let us know your comments.
> > > >>>>>>>
> > > >>>>>>> Thanks,
> > > >>>>>>> Satish.
> > > >>>>>>>
> > > >>>>>>> On Wed, Feb 12, 2020 at 4:06 PM Satish Duggana 
> > > >>>>>>> <satish.dugg...@gmail.com> wrote:
> > > >>>>>>>
> > > >>>>>>> Hi Jun,
> > > >>>>>>> Thanks for your comments on the separation of remote log metadata 
> > > >>>>>>> storage and remote log storage.
> > > >>>>>>> We had a few discussions since early Jan on how to support 
> > > >>>>>>> eventually consistent stores like S3 by uncoupling remote log 
> > > >>>>>>> segment metadata and remote log storage. It is written with 
> > > >>>>>>> details in the doc here(1). Below is the brief summary of the 
> > > >>>>>>> discussion from that doc.
> > > >>>>>>>
> > > >>>>>>> The current approach consists of pulling the remote log segment 
> > > >>>>>>> metadata from remote log storage APIs. It worked fine for 
> > > >>>>>>> storages like HDFS. But one of the problems of relying on the 
> > > >>>>>>> remote storage to maintain metadata is that tiered-storage needs 
> > > >>>>>>> to be strongly consistent, with an impact not only on the 
> > > >>>>>>> metadata(e.g. LIST in S3) but also on the segment data(e.g. GET 
> > > >>>>>>> after a DELETE in S3). The cost of maintaining metadata in remote 
> > > >>>>>>> storage needs to be factored in. This is true in the case of S3, 
> > > >>>>>>> LIST APIs incur huge costs as you raised earlier.
> > > >>>>>>> So, it is good to separate the remote storage from the remote log 
> > > >>>>>>> metadata store. We refactored the existing RemoteStorageManager 
> > > >>>>>>> and introduced RemoteLogMetadataManager. Remote log metadata 
> > > >>>>>>> store should give strong consistency semantics but remote log 
> > > >>>>>>> storage can be eventually consistent.
> > > >>>>>>> We can have a default implementation for RemoteLogMetadataManager 
> > > >>>>>>> which uses an internal topic(as mentioned in one of our earlier 
> > > >>>>>>> emails) as storage. But users can always plugin their own 
> > > >>>>>>> RemoteLogMetadataManager implementation based on their 
> > > >>>>>>> environment.
> > > >>>>>>>
> > > >>>>>>> Please go through the updated KIP and let us know your comments. 
> > > >>>>>>> We have started refactoring for the changes mentioned in the KIP 
> > > >>>>>>> and there may be a few more updates to the APIs.
> > > >>>>>>>
> > > >>>>>>> [1]
> > > >>>>>>>
> > > >>>>>>> https://docs.google.com/document/d/1qfkBCWL1e7ZWkHU7brxKDBebq4ie9yK20XJnKbgAlew/edit?ts=5e208ec7#
> > > >>>>>>>
> > > >>>>>>> On Fri, Dec 27, 2019 at 5:43 PM Ivan Yurchenko 
> > > >>>>>>> <ivan0yurche...@gmail.com>
> > > >>>>>>>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>> Hi all,
> > > >>>>>>>
> > > >>>>>>> Jun:
> > > >>>>>>>
> > > >>>>>>> (a) Cost: S3 list object requests cost $0.005 per 1000 requests. 
> > > >>>>>>> If
> > > >>>>>>>
> > > >>>>>>> you
> > > >>>>>>>
> > > >>>>>>> have 100,000 partitions and want to pull the metadata for each
> > > >>>>>>>
> > > >>>>>>> partition
> > > >>>>>>>
> > > >>>>>>> at
> > > >>>>>>>
> > > >>>>>>> the rate of 1/sec. It can cost $0.5/sec, which is roughly $40K per
> > > >>>>>>>
> > > >>>>>>> day.
> > > >>>>>>>
> > > >>>>>>> I want to note here, that no reasonably durable storage will be 
> > > >>>>>>> cheap at 100k RPS. For example, DynamoDB might give the same 
> > > >>>>>>> ballpark
> > > >>>>>>>
> > > >>>>>>> figures.
> > > >>>>>>>
> > > >>>>>>> If we want to keep the pull-based approach, we can try to reduce 
> > > >>>>>>> this
> > > >>>>>>>
> > > >>>>>>> number
> > > >>>>>>>
> > > >>>>>>> in several ways: doing listings less frequently (as Satish 
> > > >>>>>>> mentioned, with the current defaults it's ~3.33k RPS for your 
> > > >>>>>>> example), batching listing operations in some way (depending on 
> > > >>>>>>> the storage; it might require the change of RSM's interface).
> > > >>>>>>>
> > > >>>>>>> There are different ways for doing push based metadata 
> > > >>>>>>> propagation.
> > > >>>>>>>
> > > >>>>>>> Some
> > > >>>>>>>
> > > >>>>>>> object stores may support that already. For example, S3 supports
> > > >>>>>>>
> > > >>>>>>> events
> > > >>>>>>>
> > > >>>>>>> notification
> > > >>>>>>>
> > > >>>>>>> This sounds interesting. However, I see a couple of issues using 
> > > >>>>>>> it:
> > > >>>>>>> 1. As I understand the documentation, notification delivery is 
> > > >>>>>>> not guaranteed
> > > >>>>>>> and it's recommended to periodically do LIST to fill the gaps. 
> > > >>>>>>> Which brings us back to the same LIST consistency guarantees 
> > > >>>>>>> issue.
> > > >>>>>>> 2. The same goes for the broker start: to get the current state, 
> > > >>>>>>> we
> > > >>>>>>>
> > > >>>>>>> need
> > > >>>>>>>
> > > >>>>>>> to LIST.
> > > >>>>>>> 3. The dynamic set of multiple consumers (RSMs): AFAIK SQS and SNS
> > > >>>>>>>
> > > >>>>>>> aren't
> > > >>>>>>>
> > > >>>>>>> designed for such a case.
> > > >>>>>>>
> > > >>>>>>> Alexandre:
> > > >>>>>>>
> > > >>>>>>> A.1 As commented on PR 7561, S3 consistency model [1][2] implies 
> > > >>>>>>> RSM
> > > >>>>>>>
> > > >>>>>>> cannot
> > > >>>>>>>
> > > >>>>>>> relies solely on S3 APIs to guarantee the expected strong
> > > >>>>>>>
> > > >>>>>>> consistency. The
> > > >>>>>>>
> > > >>>>>>> proposed implementation [3] would need to be updated to take this
> > > >>>>>>>
> > > >>>>>>> into
> > > >>>>>>>
> > > >>>>>>> account. Let’s talk more about this.
> > > >>>>>>>
> > > >>>>>>> Thank you for the feedback. I clearly see the need for changing 
> > > >>>>>>> the S3 implementation
> > > >>>>>>> to provide stronger consistency guarantees. As it see from this 
> > > >>>>>>> thread, there are
> > > >>>>>>> several possible approaches to this. Let's discuss 
> > > >>>>>>> RemoteLogManager's contract and
> > > >>>>>>> behavior (like pull vs push model) further before picking one (or
> > > >>>>>>>
> > > >>>>>>> several -
> > > >>>>>>>
> > > >>>>>>> ?) of them.
> > > >>>>>>> I'm going to do some evaluation of DynamoDB for the pull-based
> > > >>>>>>>
> > > >>>>>>> approach,
> > > >>>>>>>
> > > >>>>>>> if it's possible to apply it paying a reasonable bill. Also, of 
> > > >>>>>>> the push-based approach
> > > >>>>>>> with a Kafka topic as the medium.
> > > >>>>>>>
> > > >>>>>>> A.2.3 Atomicity – what does an implementation of RSM need to 
> > > >>>>>>> provide
> > > >>>>>>>
> > > >>>>>>> with
> > > >>>>>>>
> > > >>>>>>> respect to atomicity of the APIs copyLogSegment, cleanupLogUntil 
> > > >>>>>>> and deleteTopicPartition? If a partial failure happens in any of 
> > > >>>>>>> those
> > > >>>>>>>
> > > >>>>>>> (e.g.
> > > >>>>>>>
> > > >>>>>>> in
> > > >>>>>>>
> > > >>>>>>> the S3 implementation, if one of the multiple uploads fails [4]),
> > > >>>>>>>
> > > >>>>>>> The S3 implementation is going to change, but it's worth 
> > > >>>>>>> clarifying
> > > >>>>>>>
> > > >>>>>>> anyway.
> > > >>>>>>>
> > > >>>>>>> The segment log file is being uploaded after S3 has acked 
> > > >>>>>>> uploading of all other files associated with the segment and only 
> > > >>>>>>> after this the
> > > >>>>>>>
> > > >>>>>>> whole
> > > >>>>>>>
> > > >>>>>>> segment file set becomes visible remotely for operations like 
> > > >>>>>>> listRemoteSegments [1].
> > > >>>>>>> In case of upload failure, the files that has been successfully
> > > >>>>>>>
> > > >>>>>>> uploaded
> > > >>>>>>>
> > > >>>>>>> stays
> > > >>>>>>> as invisible garbage that is collected by cleanupLogUntil (or
> > > >>>>>>>
> > > >>>>>>> overwritten
> > > >>>>>>>
> > > >>>>>>> successfully later).
> > > >>>>>>> And the opposite happens during the deletion: log files are 
> > > >>>>>>> deleted
> > > >>>>>>>
> > > >>>>>>> first.
> > > >>>>>>>
> > > >>>>>>> This approach should generally work when we solve consistency 
> > > >>>>>>> issues by adding a strongly consistent storage: a segment's 
> > > >>>>>>> uploaded files
> > > >>>>>>>
> > > >>>>>>> remain
> > > >>>>>>>
> > > >>>>>>> invisible garbage until some metadata about them is written.
> > > >>>>>>>
> > > >>>>>>> A.3 Caching – storing locally the segments retrieved from the 
> > > >>>>>>> remote storage is excluded as it does not align with the original 
> > > >>>>>>> intent
> > > >>>>>>>
> > > >>>>>>> and even
> > > >>>>>>>
> > > >>>>>>> defeat some of its purposes (save disk space etc.). That said, 
> > > >>>>>>> could
> > > >>>>>>>
> > > >>>>>>> there
> > > >>>>>>>
> > > >>>>>>> be other types of use cases where the pattern of access to the
> > > >>>>>>>
> > > >>>>>>> remotely
> > > >>>>>>>
> > > >>>>>>> stored segments would benefit from local caching (and potentially 
> > > >>>>>>> read-ahead)? Consider the use case of a large pool of consumers 
> > > >>>>>>> which
> > > >>>>>>>
> > > >>>>>>> start
> > > >>>>>>>
> > > >>>>>>> a backfill at the same time for one day worth of data from one 
> > > >>>>>>> year
> > > >>>>>>>
> > > >>>>>>> ago
> > > >>>>>>>
> > > >>>>>>> stored remotely. Caching the segments locally would allow to
> > > >>>>>>>
> > > >>>>>>> uncouple the
> > > >>>>>>>
> > > >>>>>>> load on the remote storage from the load on the Kafka cluster. 
> > > >>>>>>> Maybe
> > > >>>>>>>
> > > >>>>>>> the
> > > >>>>>>>
> > > >>>>>>> RLM could expose a configuration parameter to switch that feature
> > > >>>>>>>
> > > >>>>>>> on/off?
> > > >>>>>>>
> > > >>>>>>> I tend to agree here, caching remote segments locally and making 
> > > >>>>>>> this configurable sounds pretty practical to me. We should 
> > > >>>>>>> implement
> > > >>>>>>>
> > > >>>>>>> this,
> > > >>>>>>>
> > > >>>>>>> maybe not in the first iteration.
> > > >>>>>>>
> > > >>>>>>> Br,
> > > >>>>>>> Ivan
> > > >>>>>>>
> > > >>>>>>> [1]
> > > >>>>>>>
> > > >>>>>>> https://github.com/harshach/kafka/pull/18/files#diff-4d73d01c16caed6f2548fc3063550ef0R152
> > > >>>>>>>
> > > >>>>>>> On Thu, 19 Dec 2019 at 19:49, Alexandre Dupriez <
> > > >>>>>>>
> > > >>>>>>> alexandre.dupr...@gmail.com>
> > > >>>>>>>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>> Hi Jun,
> > > >>>>>>>
> > > >>>>>>> Thank you for the feedback. I am trying to understand how a
> > > >>>>>>>
> > > >>>>>>> push-based
> > > >>>>>>>
> > > >>>>>>> approach would work.
> > > >>>>>>> In order for the metadata to be propagated (under the assumption 
> > > >>>>>>> you stated), would you plan to add a new API in Kafka to allow 
> > > >>>>>>> the metadata store to send them directly to the brokers?
> > > >>>>>>>
> > > >>>>>>> Thanks,
> > > >>>>>>> Alexandre
> > > >>>>>>>
> > > >>>>>>> Le mer. 18 déc. 2019 à 20:14, Jun Rao <j...@confluent.io> a écrit 
> > > >>>>>>> :
> > > >>>>>>>
> > > >>>>>>> Hi, Satish and Ivan,
> > > >>>>>>>
> > > >>>>>>> There are different ways for doing push based metadata
> > > >>>>>>>
> > > >>>>>>> propagation. Some
> > > >>>>>>>
> > > >>>>>>> object stores may support that already. For example, S3 supports
> > > >>>>>>>
> > > >>>>>>> events
> > > >>>>>>>
> > > >>>>>>> notification (
> > > >>>>>>>
> > > >>>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html).
> > > >>>>>>>
> > > >>>>>>> Otherwise one could use a separate metadata store that supports
> > > >>>>>>>
> > > >>>>>>> push-based
> > > >>>>>>>
> > > >>>>>>> change propagation. Other people have mentioned using a Kafka
> > > >>>>>>>
> > > >>>>>>> topic. The
> > > >>>>>>>
> > > >>>>>>> best approach may depend on the object store and the operational 
> > > >>>>>>> environment (e.g. whether an external metadata store is already
> > > >>>>>>>
> > > >>>>>>> available).
> > > >>>>>>>
> > > >>>>>>> The above discussion is based on the assumption that we need to
> > > >>>>>>>
> > > >>>>>>> cache the
> > > >>>>>>>
> > > >>>>>>> object metadata locally in every broker. I mentioned earlier that
> > > >>>>>>>
> > > >>>>>>> an
> > > >>>>>>>
> > > >>>>>>> alternative is to just store/retrieve those metadata in an 
> > > >>>>>>> external metadata store. That may simplify the implementation in 
> > > >>>>>>> some cases.
> > > >>>>>>>
> > > >>>>>>> Thanks,
> > > >>>>>>>
> > > >>>>>>> Jun
> > > >>>>>>>
> > > >>>>>>> On Thu, Dec 5, 2019 at 7:01 AM Satish Duggana <
> > > >>>>>>>
> > > >>>>>>> satish.dugg...@gmail.com>
> > > >>>>>>>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>> Hi Jun,
> > > >>>>>>> Thanks for your reply.
> > > >>>>>>>
> > > >>>>>>> Currently, `listRemoteSegments` is called at the configured 
> > > >>>>>>> interval(not every second, defaults to 30secs). Storing remote
> > > >>>>>>>
> > > >>>>>>> log
> > > >>>>>>>
> > > >>>>>>> metadata in a strongly consistent store for S3 RSM is raised in 
> > > >>>>>>> PR-comment[1].
> > > >>>>>>> RLM invokes RSM at regular intervals and RSM can give remote
> > > >>>>>>>
> > > >>>>>>> segment
> > > >>>>>>>
> > > >>>>>>> metadata if it is available. RSM is responsible for maintaining
> > > >>>>>>>
> > > >>>>>>> and
> > > >>>>>>>
> > > >>>>>>> fetching those entries. It should be based on whatever mechanism
> > > >>>>>>>
> > > >>>>>>> is
> > > >>>>>>>
> > > >>>>>>> consistent and efficient with the respective remote storage.
> > > >>>>>>>
> > > >>>>>>> Can you give more details about push based mechanism from RSM?
> > > >>>>>>>
> > > >>>>>>> 1.
> > > >>>>>>>
> > > >>>>>>> https://github.com/apache/kafka/pull/7561#discussion_r344576223
> > > >>>>>>>
> > > >>>>>>> Thanks,
> > > >>>>>>> Satish.
> > > >>>>>>>
> > > >>>>>>> On Thu, Dec 5, 2019 at 4:23 AM Jun Rao <j...@confluent.io> wrote:
> > > >>>>>>>
> > > >>>>>>> Hi, Harsha,
> > > >>>>>>>
> > > >>>>>>> Thanks for the reply.
> > > >>>>>>>
> > > >>>>>>> 40/41. I am curious which block storages you have tested. S3
> > > >>>>>>>
> > > >>>>>>> seems
> > > >>>>>>>
> > > >>>>>>> to be
> > > >>>>>>>
> > > >>>>>>> one of the popular block stores. The concerns that I have with
> > > >>>>>>>
> > > >>>>>>> pull
> > > >>>>>>>
> > > >>>>>>> based
> > > >>>>>>>
> > > >>>>>>> approach are the following.
> > > >>>>>>> (a) Cost: S3 list object requests cost $0.005 per 1000
> > > >>>>>>>
> > > >>>>>>> requests. If
> > > >>>>>>>
> > > >>>>>>> you
> > > >>>>>>>
> > > >>>>>>> have 100,000 partitions and want to pull the metadata for each
> > > >>>>>>>
> > > >>>>>>> partition
> > > >>>>>>>
> > > >>>>>>> at
> > > >>>>>>>
> > > >>>>>>> the rate of 1/sec. It can cost $0.5/sec, which is roughly $40K
> > > >>>>>>>
> > > >>>>>>> per
> > > >>>>>>>
> > > >>>>>>> day.
> > > >>>>>>>
> > > >>>>>>> (b) Semantics: S3 list objects are eventually consistent. So,
> > > >>>>>>>
> > > >>>>>>> when
> > > >>>>>>>
> > > >>>>>>> you
> > > >>>>>>>
> > > >>>>>>> do a
> > > >>>>>>>
> > > >>>>>>> list object request, there is no guarantee that you can see all
> > > >>>>>>>
> > > >>>>>>> uploaded
> > > >>>>>>>
> > > >>>>>>> objects. This could impact the correctness of subsequent
> > > >>>>>>>
> > > >>>>>>> logics.
> > > >>>>>>>
> > > >>>>>>> (c) Efficiency: Blindly pulling metadata when there is no
> > > >>>>>>>
> > > >>>>>>> change adds
> > > >>>>>>>
> > > >>>>>>> unnecessary overhead in the broker as well as in the block
> > > >>>>>>>
> > > >>>>>>> store.
> > > >>>>>>>
> > > >>>>>>> So, have you guys tested S3? If so, could you share your
> > > >>>>>>>
> > > >>>>>>> experience
> > > >>>>>>>
> > > >>>>>>> in
> > > >>>>>>>
> > > >>>>>>> terms of cost, semantics and efficiency?
> > > >>>>>>>
> > > >>>>>>> Jun
> > > >>>>>>>
> > > >>>>>>> On Tue, Dec 3, 2019 at 10:11 PM Harsha Chintalapani <
> > > >>>>>>>
> > > >>>>>>> ka...@harsha.io
> > > >>>>>>>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>> Hi Jun,
> > > >>>>>>> Thanks for the reply.
> > > >>>>>>>
> > > >>>>>>> On Tue, Nov 26, 2019 at 3:46 PM, Jun Rao <j...@confluent.io>
> > > >>>>>>>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>> Hi, Satish and Ying,
> > > >>>>>>>
> > > >>>>>>> Thanks for the reply.
> > > >>>>>>>
> > > >>>>>>> 40/41. There are two different ways that we can approach
> > > >>>>>>>
> > > >>>>>>> this.
> > > >>>>>>>
> > > >>>>>>> One is
> > > >>>>>>>
> > > >>>>>>> what
> > > >>>>>>>
> > > >>>>>>> you said. We can have an opinionated way of storing and
> > > >>>>>>>
> > > >>>>>>> populating
> > > >>>>>>>
> > > >>>>>>> the
> > > >>>>>>>
> > > >>>>>>> tier
> > > >>>>>>>
> > > >>>>>>> metadata that we think is good enough for everyone. I am
> > > >>>>>>>
> > > >>>>>>> not
> > > >>>>>>>
> > > >>>>>>> sure if
> > > >>>>>>>
> > > >>>>>>> this
> > > >>>>>>>
> > > >>>>>>> is the case based on what's currently proposed in the KIP.
> > > >>>>>>>
> > > >>>>>>> For
> > > >>>>>>>
> > > >>>>>>> example, I
> > > >>>>>>>
> > > >>>>>>> am not sure that (1) everyone always needs local metadata;
> > > >>>>>>>
> > > >>>>>>> (2)
> > > >>>>>>>
> > > >>>>>>> the
> > > >>>>>>>
> > > >>>>>>> current
> > > >>>>>>>
> > > >>>>>>> local storage format is general enough and (3) everyone
> > > >>>>>>>
> > > >>>>>>> wants to
> > > >>>>>>>
> > > >>>>>>> use
> > > >>>>>>>
> > > >>>>>>> the
> > > >>>>>>>
> > > >>>>>>> pull based approach to propagate the metadata. Another
> > > >>>>>>>
> > > >>>>>>> approach
> > > >>>>>>>
> > > >>>>>>> is to
> > > >>>>>>>
> > > >>>>>>> make
> > > >>>>>>>
> > > >>>>>>> this pluggable and let the implementor implements the best
> > > >>>>>>>
> > > >>>>>>> approach
> > > >>>>>>>
> > > >>>>>>> for a
> > > >>>>>>>
> > > >>>>>>> particular block storage. I haven't seen any comments from
> > > >>>>>>>
> > > >>>>>>> Slack/AirBnb
> > > >>>>>>>
> > > >>>>>>> in
> > > >>>>>>>
> > > >>>>>>> the mailing list on this topic. It would be great if they
> > > >>>>>>>
> > > >>>>>>> can
> > > >>>>>>>
> > > >>>>>>> provide
> > > >>>>>>>
> > > >>>>>>> feedback directly here.
> > > >>>>>>>
> > > >>>>>>> The current interfaces are designed with most popular block
> > > >>>>>>>
> > > >>>>>>> storages
> > > >>>>>>>
> > > >>>>>>> available today and we did 2 implementations with these
> > > >>>>>>>
> > > >>>>>>> interfaces and
> > > >>>>>>>
> > > >>>>>>> they both are yielding good results as we going through the
> > > >>>>>>>
> > > >>>>>>> testing of
> > > >>>>>>>
> > > >>>>>>> it.
> > > >>>>>>>
> > > >>>>>>> If there is ever a need for pull based approach we can
> > > >>>>>>>
> > > >>>>>>> definitely
> > > >>>>>>>
> > > >>>>>>> evolve
> > > >>>>>>>
> > > >>>>>>> the interface.
> > > >>>>>>> In the past we did mark interfaces to be evolving to make
> > > >>>>>>>
> > > >>>>>>> room for
> > > >>>>>>>
> > > >>>>>>> unknowns
> > > >>>>>>>
> > > >>>>>>> in the future.
> > > >>>>>>> If you have any suggestions around the current interfaces
> > > >>>>>>>
> > > >>>>>>> please
> > > >>>>>>>
> > > >>>>>>> propose we
> > > >>>>>>>
> > > >>>>>>> are happy to see if we can work them into it.
> > > >>>>>>>
> > > >>>>>>> 43. To offer tier storage as a general feature, ideally all
> > > >>>>>>>
> > > >>>>>>> existing
> > > >>>>>>>
> > > >>>>>>> capabilities should still be supported. It's fine if the
> > > >>>>>>>
> > > >>>>>>> uber
> > > >>>>>>>
> > > >>>>>>> implementation doesn't support all capabilities for
> > > >>>>>>>
> > > >>>>>>> internal
> > > >>>>>>>
> > > >>>>>>> usage.
> > > >>>>>>>
> > > >>>>>>> However, the framework should be general enough.
> > > >>>>>>>
> > > >>>>>>> We agree on that as a principle. But all of these major
> > > >>>>>>>
> > > >>>>>>> features
> > > >>>>>>>
> > > >>>>>>> mostly
> > > >>>>>>>
> > > >>>>>>> coming right now and to have a new big feature such as tiered
> > > >>>>>>>
> > > >>>>>>> storage
> > > >>>>>>>
> > > >>>>>>> to
> > > >>>>>>>
> > > >>>>>>> support all the new features will be a big ask. We can
> > > >>>>>>>
> > > >>>>>>> document on
> > > >>>>>>>
> > > >>>>>>> how
> > > >>>>>>>
> > > >>>>>>> do
> > > >>>>>>>
> > > >>>>>>> we approach solving these in future iterations.
> > > >>>>>>> Our goal is to make this tiered storage feature work for
> > > >>>>>>>
> > > >>>>>>> everyone.
> > > >>>>>>>
> > > >>>>>>> 43.3 This is more than just serving the tier-ed data from
> > > >>>>>>>
> > > >>>>>>> block
> > > >>>>>>>
> > > >>>>>>> storage.
> > > >>>>>>>
> > > >>>>>>> With KIP-392, the consumer now can resolve the conflicts
> > > >>>>>>>
> > > >>>>>>> with the
> > > >>>>>>>
> > > >>>>>>> replica
> > > >>>>>>>
> > > >>>>>>> based on leader epoch. So, we need to make sure that
> > > >>>>>>>
> > > >>>>>>> leader epoch
> > > >>>>>>>
> > > >>>>>>> can be
> > > >>>>>>>
> > > >>>>>>> recovered properly from tier storage.
> > > >>>>>>>
> > > >>>>>>> We are working on testing our approach and we will update
> > > >>>>>>>
> > > >>>>>>> the KIP
> > > >>>>>>>
> > > >>>>>>> with
> > > >>>>>>>
> > > >>>>>>> design details.
> > > >>>>>>>
> > > >>>>>>> 43.4 For JBOD, if tier storage stores the tier metadata
> > > >>>>>>>
> > > >>>>>>> locally, we
> > > >>>>>>>
> > > >>>>>>> need to
> > > >>>>>>>
> > > >>>>>>> support moving such metadata across disk directories since
> > > >>>>>>>
> > > >>>>>>> JBOD
> > > >>>>>>>
> > > >>>>>>> supports
> > > >>>>>>>
> > > >>>>>>> moving data across disks.
> > > >>>>>>>
> > > >>>>>>> KIP is updated with JBOD details. Having said that JBOD
> > > >>>>>>>
> > > >>>>>>> tooling
> > > >>>>>>>
> > > >>>>>>> needs
> > > >>>>>>>
> > > >>>>>>> to
> > > >>>>>>>
> > > >>>>>>> evolve to support production loads. Most of the users will be
> > > >>>>>>>
> > > >>>>>>> interested in
> > > >>>>>>>
> > > >>>>>>> using tiered storage without JBOD support support on day 1.
> > > >>>>>>>
> > > >>>>>>> Thanks,
> > > >>>>>>> Harsha
> > > >>>>>>>
> > > >>>>>>> As for meeting, we could have a KIP e-meeting on this if
> > > >>>>>>>
> > > >>>>>>> needed,
> > > >>>>>>>
> > > >>>>>>> but it
> > > >>>>>>>
> > > >>>>>>> will be open to everyone and will be recorded and shared.
> > > >>>>>>>
> > > >>>>>>> Often,
> > > >>>>>>>
> > > >>>>>>> the
> > > >>>>>>>
> > > >>>>>>> details are still resolved through the mailing list.
> > > >>>>>>>
> > > >>>>>>> Jun
> > > >>>>>>>
> > > >>>>>>> On Tue, Nov 19, 2019 at 6:48 PM Ying Zheng
> > > >>>>>>>
> > > >>>>>>> <yi...@uber.com.invalid>
> > > >>>>>>>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>> Please ignore my previous email
> > > >>>>>>> I didn't know Apache requires all the discussions to be
> > > >>>>>>>
> > > >>>>>>> "open"
> > > >>>>>>>
> > > >>>>>>> On Tue, Nov 19, 2019, 5:40 PM Ying Zheng <yi...@uber.com>
> > > >>>>>>>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>> Hi Jun,
> > > >>>>>>>
> > > >>>>>>> Thank you very much for your feedback!
> > > >>>>>>>
> > > >>>>>>> Can we schedule a meeting in your Palo Alto office in
> > > >>>>>>>
> > > >>>>>>> December? I
> > > >>>>>>>
> > > >>>>>>> think a
> > > >>>>>>>
> > > >>>>>>> face to face discussion is much more efficient than
> > > >>>>>>>
> > > >>>>>>> emails. Both
> > > >>>>>>>
> > > >>>>>>> Harsha
> > > >>>>>>>
> > > >>>>>>> and
> > > >>>>>>>
> > > >>>>>>> I can visit you. Satish may be able to join us remotely.
> > > >>>>>>>
> > > >>>>>>> On Fri, Nov 15, 2019 at 11:04 AM Jun Rao <j...@confluent.io
> > > >>>>>>>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>> Hi, Satish and Harsha,
> > > >>>>>>>
> > > >>>>>>> The following is a more detailed high level feedback for
> > > >>>>>>>
> > > >>>>>>> the KIP.
> > > >>>>>>>
> > > >>>>>>> Overall,
> > > >>>>>>>
> > > >>>>>>> the KIP seems useful. The challenge is how to design it
> > > >>>>>>>
> > > >>>>>>> such that
> > > >>>>>>>
> > > >>>>>>> it’s
> > > >>>>>>>
> > > >>>>>>> general enough to support different ways of implementing
> > > >>>>>>>
> > > >>>>>>> this
> > > >>>>>>>
> > > >>>>>>> feature
> > > >>>>>>>
> > > >>>>>>> and
> > > >>>>>>>
> > > >>>>>>> support existing features.
> > > >>>>>>>
> > > >>>>>>> 40. Local segment metadata storage: The KIP makes the
> > > >>>>>>>
> > > >>>>>>> assumption
> > > >>>>>>>
> > > >>>>>>> that
> > > >>>>>>>
> > > >>>>>>> the
> > > >>>>>>>
> > > >>>>>>> metadata for the archived log segments are cached locally
> > > >>>>>>>
> > > >>>>>>> in
> > > >>>>>>>
> > > >>>>>>> every
> > > >>>>>>>
> > > >>>>>>> broker
> > > >>>>>>>
> > > >>>>>>> and provides a specific implementation for the local
> > > >>>>>>>
> > > >>>>>>> storage in
> > > >>>>>>>
> > > >>>>>>> the
> > > >>>>>>>
> > > >>>>>>> framework. We probably should discuss this more. For
> > > >>>>>>>
> > > >>>>>>> example,
> > > >>>>>>>
> > > >>>>>>> some
> > > >>>>>>>
> > > >>>>>>> tier
> > > >>>>>>>
> > > >>>>>>> storage providers may not want to cache the metadata
> > > >>>>>>>
> > > >>>>>>> locally and
> > > >>>>>>>
> > > >>>>>>> just
> > > >>>>>>>
> > > >>>>>>> rely
> > > >>>>>>>
> > > >>>>>>> upon a remote key/value store if such a store is already
> > > >>>>>>>
> > > >>>>>>> present. If
> > > >>>>>>>
> > > >>>>>>> a
> > > >>>>>>>
> > > >>>>>>> local store is used, there could be different ways of
> > > >>>>>>>
> > > >>>>>>> implementing it
> > > >>>>>>>
> > > >>>>>>> (e.g., based on customized local files, an embedded local
> > > >>>>>>>
> > > >>>>>>> store
> > > >>>>>>>
> > > >>>>>>> like
> > > >>>>>>>
> > > >>>>>>> RocksDB, etc). An alternative of designing this is to just
> > > >>>>>>>
> > > >>>>>>> provide an
> > > >>>>>>>
> > > >>>>>>> interface for retrieving the tier segment metadata and
> > > >>>>>>>
> > > >>>>>>> leave the
> > > >>>>>>>
> > > >>>>>>> details
> > > >>>>>>>
> > > >>>>>>> of
> > > >>>>>>>
> > > >>>>>>> how to get the metadata outside of the framework.
> > > >>>>>>>
> > > >>>>>>> 41. RemoteStorageManager interface and the usage of the
> > > >>>>>>>
> > > >>>>>>> interface in
> > > >>>>>>>
> > > >>>>>>> the
> > > >>>>>>>
> > > >>>>>>> framework: I am not sure if the interface is general
> > > >>>>>>>
> > > >>>>>>> enough. For
> > > >>>>>>>
> > > >>>>>>> example,
> > > >>>>>>>
> > > >>>>>>> it seems that RemoteLogIndexEntry is tied to a specific
> > > >>>>>>>
> > > >>>>>>> way of
> > > >>>>>>>
> > > >>>>>>> storing
> > > >>>>>>>
> > > >>>>>>> the
> > > >>>>>>>
> > > >>>>>>> metadata in remote storage. The framework uses
> > > >>>>>>>
> > > >>>>>>> listRemoteSegments()
> > > >>>>>>>
> > > >>>>>>> api
> > > >>>>>>>
> > > >>>>>>> in
> > > >>>>>>>
> > > >>>>>>> a pull based approach. However, in some other
> > > >>>>>>>
> > > >>>>>>> implementations, a
> > > >>>>>>>
> > > >>>>>>> push
> > > >>>>>>>
> > > >>>>>>> based
> > > >>>>>>> approach may be more preferred. I don’t have a concrete
> > > >>>>>>>
> > > >>>>>>> proposal
> > > >>>>>>>
> > > >>>>>>> yet.
> > > >>>>>>>
> > > >>>>>>> But,
> > > >>>>>>>
> > > >>>>>>> it would be useful to give this area some more thoughts
> > > >>>>>>>
> > > >>>>>>> and see
> > > >>>>>>>
> > > >>>>>>> if we
> > > >>>>>>>
> > > >>>>>>> can
> > > >>>>>>>
> > > >>>>>>> make the interface more general.
> > > >>>>>>>
> > > >>>>>>> 42. In the diagram, the RemoteLogManager is side by side
> > > >>>>>>>
> > > >>>>>>> with
> > > >>>>>>>
> > > >>>>>>> LogManager.
> > > >>>>>>>
> > > >>>>>>> This KIP only discussed how the fetch request is handled
> > > >>>>>>>
> > > >>>>>>> between
> > > >>>>>>>
> > > >>>>>>> the
> > > >>>>>>>
> > > >>>>>>> two
> > > >>>>>>>
> > > >>>>>>> layer. However, we should also consider how other requests
> > > >>>>>>>
> > > >>>>>>> that
> > > >>>>>>>
> > > >>>>>>> touch
> > > >>>>>>>
> > > >>>>>>> the
> > > >>>>>>>
> > > >>>>>>> log can be handled. e.g., list offsets by timestamp, delete
> > > >>>>>>>
> > > >>>>>>> records,
> > > >>>>>>>
> > > >>>>>>> etc.
> > > >>>>>>>
> > > >>>>>>> Also, in this model, it's not clear which component is
> > > >>>>>>>
> > > >>>>>>> responsible
> > > >>>>>>>
> > > >>>>>>> for
> > > >>>>>>>
> > > >>>>>>> managing the log start offset. It seems that the log start
> > > >>>>>>>
> > > >>>>>>> offset
> > > >>>>>>>
> > > >>>>>>> could
> > > >>>>>>>
> > > >>>>>>> be
> > > >>>>>>>
> > > >>>>>>> changed by both RemoteLogManager and LogManager.
> > > >>>>>>>
> > > >>>>>>> 43. There are quite a few existing features not covered by
> > > >>>>>>>
> > > >>>>>>> the
> > > >>>>>>>
> > > >>>>>>> KIP.
> > > >>>>>>>
> > > >>>>>>> It
> > > >>>>>>>
> > > >>>>>>> would be useful to discuss each of those.
> > > >>>>>>> 43.1 I won’t say that compacted topics are rarely used and
> > > >>>>>>>
> > > >>>>>>> always
> > > >>>>>>>
> > > >>>>>>> small.
> > > >>>>>>>
> > > >>>>>>> For example, KStreams uses compacted topics for storing the
> > > >>>>>>>
> > > >>>>>>> states
> > > >>>>>>>
> > > >>>>>>> and
> > > >>>>>>>
> > > >>>>>>> sometimes the size of the topic could be large. While it
> > > >>>>>>>
> > > >>>>>>> might
> > > >>>>>>>
> > > >>>>>>> be ok
> > > >>>>>>>
> > > >>>>>>> to
> > > >>>>>>>
> > > >>>>>>> not
> > > >>>>>>>
> > > >>>>>>> support compacted topics initially, it would be useful to
> > > >>>>>>>
> > > >>>>>>> have a
> > > >>>>>>>
> > > >>>>>>> high
> > > >>>>>>>
> > > >>>>>>> level
> > > >>>>>>> idea on how this might be supported down the road so that
> > > >>>>>>>
> > > >>>>>>> we
> > > >>>>>>>
> > > >>>>>>> don’t
> > > >>>>>>>
> > > >>>>>>> have
> > > >>>>>>>
> > > >>>>>>> to
> > > >>>>>>>
> > > >>>>>>> make incompatible API changes in the future.
> > > >>>>>>> 43.2 We need to discuss how EOS is supported. In
> > > >>>>>>>
> > > >>>>>>> particular, how
> > > >>>>>>>
> > > >>>>>>> is
> > > >>>>>>>
> > > >>>>>>> the
> > > >>>>>>>
> > > >>>>>>> producer state integrated with the remote storage. 43.3
> > > >>>>>>>
> > > >>>>>>> Now that
> > > >>>>>>>
> > > >>>>>>> KIP-392
> > > >>>>>>>
> > > >>>>>>> (allow consumers to fetch from closest replica) is
> > > >>>>>>>
> > > >>>>>>> implemented,
> > > >>>>>>>
> > > >>>>>>> we
> > > >>>>>>>
> > > >>>>>>> need
> > > >>>>>>>
> > > >>>>>>> to
> > > >>>>>>>
> > > >>>>>>> discuss how reading from a follower replica is supported
> > > >>>>>>>
> > > >>>>>>> with
> > > >>>>>>>
> > > >>>>>>> tier
> > > >>>>>>>
> > > >>>>>>> storage.
> > > >>>>>>>
> > > >>>>>>> 43.4 We need to discuss how JBOD is supported with tier
> > > >>>>>>>
> > > >>>>>>> storage.
> > > >>>>>>>
> > > >>>>>>> Thanks,
> > > >>>>>>>
> > > >>>>>>> Jun
> > > >>>>>>>
> > > >>>>>>> On Fri, Nov 8, 2019 at 12:06 AM Tom Bentley <
> > > >>>>>>>
> > > >>>>>>> tbent...@redhat.com
> > > >>>>>>>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>> Thanks for those insights Ying.
> > > >>>>>>>
> > > >>>>>>> On Thu, Nov 7, 2019 at 9:26 PM Ying Zheng
> > > >>>>>>>
> > > >>>>>>> <yi...@uber.com.invalid
> > > >>>>>>>
> > > >>>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>> Thanks, I missed that point. However, there's still a
> > > >>>>>>>
> > > >>>>>>> point at
> > > >>>>>>>
> > > >>>>>>> which
> > > >>>>>>>
> > > >>>>>>> the
> > > >>>>>>>
> > > >>>>>>> consumer fetches start getting served from remote storage
> > > >>>>>>>
> > > >>>>>>> (even
> > > >>>>>>>
> > > >>>>>>> if
> > > >>>>>>>
> > > >>>>>>> that
> > > >>>>>>>
> > > >>>>>>> point isn't as soon as the local log retention time/size).
> > > >>>>>>>
> > > >>>>>>> This
> > > >>>>>>>
> > > >>>>>>> represents
> > > >>>>>>>
> > > >>>>>>> a kind of performance cliff edge and what I'm really
> > > >>>>>>>
> > > >>>>>>> interested
> > > >>>>>>>
> > > >>>>>>> in
> > > >>>>>>>
> > > >>>>>>> is
> > > >>>>>>>
> > > >>>>>>> how
> > > >>>>>>>
> > > >>>>>>> easy it is for a consumer which falls off that cliff to
> > > >>>>>>>
> > > >>>>>>> catch up
> > > >>>>>>>
> > > >>>>>>> and so
> > > >>>>>>>
> > > >>>>>>> its
> > > >>>>>>>
> > > >>>>>>> fetches again come from local storage. Obviously this can
> > > >>>>>>>
> > > >>>>>>> depend
> > > >>>>>>>
> > > >>>>>>> on
> > > >>>>>>>
> > > >>>>>>> all
> > > >>>>>>>
> > > >>>>>>> sorts of factors (like production rate, consumption rate),
> > > >>>>>>>
> > > >>>>>>> so
> > > >>>>>>>
> > > >>>>>>> it's
> > > >>>>>>>
> > > >>>>>>> not
> > > >>>>>>>
> > > >>>>>>> guaranteed (just like it's not guaranteed for Kafka
> > > >>>>>>>
> > > >>>>>>> today), but
> > > >>>>>>>
> > > >>>>>>> this
> > > >>>>>>>
> > > >>>>>>> would
> > > >>>>>>>
> > > >>>>>>> represent a new failure mode.
> > > >>>>>>>
> > > >>>>>>> As I have explained in the last mail, it's a very rare
> > > >>>>>>>
> > > >>>>>>> case that
> > > >>>>>>>
> > > >>>>>>> a
> > > >>>>>>>
> > > >>>>>>> consumer
> > > >>>>>>> need to read remote data. With our experience at Uber,
> > > >>>>>>>
> > > >>>>>>> this only
> > > >>>>>>>
> > > >>>>>>> happens
> > > >>>>>>>
> > > >>>>>>> when the consumer service had an outage for several hours.
> > > >>>>>>>
> > > >>>>>>> There is not a "performance cliff" as you assume. The
> > > >>>>>>>
> > > >>>>>>> remote
> > > >>>>>>>
> > > >>>>>>> storage
> > > >>>>>>>
> > > >>>>>>> is
> > > >>>>>>>
> > > >>>>>>> even faster than local disks in terms of bandwidth.
> > > >>>>>>>
> > > >>>>>>> Reading from
> > > >>>>>>>
> > > >>>>>>> remote
> > > >>>>>>>
> > > >>>>>>> storage is going to have higher latency than local disk.
> > > >>>>>>>
> > > >>>>>>> But
> > > >>>>>>>
> > > >>>>>>> since
> > > >>>>>>>
> > > >>>>>>> the
> > > >>>>>>>
> > > >>>>>>> consumer
> > > >>>>>>> is catching up several hours data, it's not sensitive to
> > > >>>>>>>
> > > >>>>>>> the
> > > >>>>>>>
> > > >>>>>>> sub-second
> > > >>>>>>>
> > > >>>>>>> level
> > > >>>>>>> latency, and each remote read request will read a large
> > > >>>>>>>
> > > >>>>>>> amount of
> > > >>>>>>>
> > > >>>>>>> data to
> > > >>>>>>>
> > > >>>>>>> make the overall performance better than reading from local
> > > >>>>>>>
> > > >>>>>>> disks.
> > > >>>>>>>
> > > >>>>>>> Another aspect I'd like to understand better is the effect
> > > >>>>>>>
> > > >>>>>>> of
> > > >>>>>>>
> > > >>>>>>> serving
> > > >>>>>>>
> > > >>>>>>> fetch
> > > >>>>>>>
> > > >>>>>>> request from remote storage has on the broker's network
> > > >>>>>>>
> > > >>>>>>> utilization. If
> > > >>>>>>>
> > > >>>>>>> we're just trimming the amount of data held locally
> > > >>>>>>>
> > > >>>>>>> (without
> > > >>>>>>>
> > > >>>>>>> increasing
> > > >>>>>>>
> > > >>>>>>> the
> > > >>>>>>>
> > > >>>>>>> overall local+remote retention), then we're effectively
> > > >>>>>>>
> > > >>>>>>> trading
> > > >>>>>>>
> > > >>>>>>> disk
> > > >>>>>>>
> > > >>>>>>> bandwidth for network bandwidth when serving fetch
> > > >>>>>>>
> > > >>>>>>> requests from
> > > >>>>>>>
> > > >>>>>>> remote
> > > >>>>>>>
> > > >>>>>>> storage (which I understand to be a good thing, since
> > > >>>>>>>
> > > >>>>>>> brokers are
> > > >>>>>>>
> > > >>>>>>> often/usually disk bound). But if we're increasing the
> > > >>>>>>>
> > > >>>>>>> overall
> > > >>>>>>>
> > > >>>>>>> local+remote
> > > >>>>>>>
> > > >>>>>>> retention then it's more likely that network itself
> > > >>>>>>>
> > > >>>>>>> becomes the
> > > >>>>>>>
> > > >>>>>>> bottleneck.
> > > >>>>>>>
> > > >>>>>>> I appreciate this is all rather hand wavy, I'm just trying
> > > >>>>>>>
> > > >>>>>>> to
> > > >>>>>>>
> > > >>>>>>> understand
> > > >>>>>>>
> > > >>>>>>> how this would affect broker performance, so I'd be
> > > >>>>>>>
> > > >>>>>>> grateful for
> > > >>>>>>>
> > > >>>>>>> any
> > > >>>>>>>
> > > >>>>>>> insights you can offer.
> > > >>>>>>>
> > > >>>>>>> Network bandwidth is a function of produce speed, it has
> > > >>>>>>>
> > > >>>>>>> nothing
> > > >>>>>>>
> > > >>>>>>> to
> > > >>>>>>>
> > > >>>>>>> do
> > > >>>>>>>
> > > >>>>>>> with
> > > >>>>>>>
> > > >>>>>>> remote retention. As long as the data is shipped to remote
> > > >>>>>>>
> > > >>>>>>> storage,
> > > >>>>>>>
> > > >>>>>>> you
> > > >>>>>>>
> > > >>>>>>> can
> > > >>>>>>>
> > > >>>>>>> keep the data there for 1 day or 1 year or 100 years, it
> > > >>>>>>>
> > > >>>>>>> doesn't
> > > >>>>>>>
> > > >>>>>>> consume
> > > >>>>>>>
> > > >>>>>>> any
> > > >>>>>>> network resources.
> > > >>>>>>
> > > >>>>>>

Reply via email to