We did not want to add many implementation details in the KIP. But we decided to add them in the KIP as appendix or sub-sections(including follower fetch protocol) to describe the flow with the main cases. That will answer most of the queries. I will update on this mail thread when the respective sections are updated.
Thanks, Satish. On Sat, Jun 6, 2020 at 7:49 PM Alexandre Dupriez <alexandre.dupr...@gmail.com> wrote: > > Hi Satish, > > A couple of questions specific to the section "Follower > Requests/Replication", pages 16:17 in the design document [1]. > > 900. It is mentioned that followers fetch auxiliary states from the > remote storage. > > 900.a Does the consistency model of the external storage impacts reads > of leader epochs and other auxiliary data? > > 900.b What are the benefits of using a mechanism to store and access > the leader epochs which is different from other metadata associated to > tiered segments? What are the benefits of retrieving this information > on-demand from the follower rather than relying on propagation via the > topic __remote_log_metadata? What are the advantages over using a > dedicated control structure (e.g. a new record type) propagated via > this topic? Since in the document, different control paths are > operating in the system, how are the metadata stored in > __remote_log_metadata [which also include the epoch of the leader > which offloaded a segment] and the remote auxiliary states, kept in > sync? > > 900.c A follower can encounter an OFFSET_MOVED_TO_TIERED_STORAGE. Is > this in response to a Fetch or OffsetForLeaderEpoch request? > > 900.d What happens if, after a follower encountered an > OFFSET_MOVED_TO_TIERED_STORAGE response, its attempts to retrieve > leader epochs fail (for instance, because the remote storage is > temporarily unavailable)? Does the follower fallbacks to a mode where > it ignores tiered segments, and applies truncation using only locally > available information? What happens when access to the remote storage > is restored? How is the replica lineage inferred by the remote leader > epochs reconciled with the follower's replica lineage, which has > evolved? Does the follower remember fetching auxiliary states failed > in the past and attempt reconciliation? Is there a plan to offer > different strategies in this scenario, configurable via configuration? > > 900.e Is the leader epoch cache offloaded with every segment? Or when > a new checkpoint is detected? If that information is not always > offloaded to avoid duplicating data, how does the remote storage > satisfy the request to retrieve it? > > 900.f Since the leader epoch cache covers the entire replica lineage, > what happens if, after a leader epoch cache file is offloaded with a > given segment, the local epoch cache is truncated [not necessarily for > a range of offset included in tiered segments]? How are remote and > local leader epoch caches kept consistent? > > 900.g Consumer can also use leader epochs (e.g. to enable fencing to > protect against stale leaders). What differences would there be > between consumer and follower fetches? Especially, would consumers > also fetch leader epoch information from the remote storage? > > 900.h Assume a newly elected leader of a topic-partition detects more > recent segments are available in the external storage, with epochs > > its local epoch. Does it ignore these segments and their associated > epoch-to-offset vectors? Or try to reconstruct its local replica > lineage based on the data remotely available? > > Thanks, > Alexandre > > [1] > https://docs.google.com/document/d/18tnobSas3mKFZFr8oRguZoj_tkD_sGzivuLRlMloEMs/edit?usp=sharing > > Le jeu. 4 juin 2020 à 19:55, Satish Duggana <satish.dugg...@gmail.com> a > écrit : > > > > Hi Jun, > > Please let us know if you have any comments on "transactional support" > > and "follower requests/replication" mentioned in the wiki. > > > > Thanks, > > Satish. > > > > On Tue, Jun 2, 2020 at 9:25 PM Satish Duggana <satish.dugg...@gmail.com> > > wrote: > > > > > > Thanks Jun for your comments. > > > > > > >100. It would be useful to provide more details on how those apis are > > > >used. Otherwise, it's kind of hard to really assess whether the new apis > > > >are sufficient/redundant. A few examples below. > > > > > > We will update the wiki and let you know. > > > > > > >100.1 deleteRecords seems to only advance the logStartOffset in Log. How > > > >does that trigger the deletion of remote log segments? > > > > > > RLMTask for leader partition periodically checks whether there are > > > remote log segments earlier to logStartOffset and the respective > > > remote log segment metadata and data are deleted by using RLMM and > > > RSM. > > > > > > >100.2 stopReplica with deletion is used in 2 cases (a) replica > > > >reassignment; (b) topic deletion. We only want to delete the tiered > > > >metadata in the second case. Also, in the second case, who initiates the > > > >deletion of the remote segment since the leader may not exist? > > > > > > Right, it is deleted only incase of topic deletion only. We will cover > > > the details in the KIP. > > > > > > >100.3 "LogStartOffset of a topic can be either in local or in remote > > > >storage." If LogStartOffset exists in both places, which one is the > > > >source of truth? > > > > > > I meant the logStartOffset can point to either of local segment or > > > remote segment but it is initialised and maintained in the Log class > > > like now. > > > > > > >100.4 List<RemoteLogSegmentMetadata> > > > >listRemoteLogSegments(TopicPartition topicPartition, long minOffset): > > > >How is minOffset supposed to be used? > > > > > > Returns list of remote segments, sorted by baseOffset in ascending > > > order that have baseOffset >= the given min Offset. > > > > > > >100.5 When copying a segment to remote storage, it seems we are calling > > > >the same RLMM.putRemoteLogSegmentData() twice before and after > > > >copyLogSegment(). Could you explain why? > > > > > > This is more about prepare/commit/rollback as you suggested. We will > > > update the wiki with the new APIs. > > > > > > >100.6 LogSegmentData includes leaderEpochCache, but there is no api in > > > >RemoteStorageManager to retrieve it. > > > > > > Nice catch, copy/paste issue. There is an API to retrieve it. > > > > > > >101. If the __remote_log_metadata is for production usage, could you > > > >provide more details? For example, what is the schema of the data (both > > > >key and value)? How is the topic maintained,delete or compact? > > > > > > It is with delete config and it’s retention period is suggested to be > > > more than the remote retention period. > > > > > > >110. Is the cache implementation in RemoteLogMetadataManager meant for > > > >production usage? If so, could you provide more details on the schema > > > >and how/where the data is stored? > > > > > > The proposal is to have a cache (with default implementation backed by > > > rocksdb) but it will be added in later versions. We will add this to > > > future work items. > > > > > > >111. "Committed offsets can be stored in a local file". Could you > > > >describe the format of the file and where it's stored? > > > > > > We will cover this in the KIP. > > > > > > >112. Truncation of remote segments under unclean leader election: I am > > > >not sure who figures out the truncated remote segments and how that > > > >information is propagated to all replicas? > > > > > > We will add this in detail in the KIP. > > > > > > >113. "If there are any failures in removing remote log segments then > > > >those are stored in a specific topic (default as > > > >__remote_segments_to_be_deleted)". Is it necessary to add yet another > > > >internal topic? Could we just keep retrying? > > > > > > This is not really an internal topic, it will be exposed as a user > > > configurable topic. After a few retries, we want user to know about > > > the failure so that they can take an action later by consuming from > > > this topic. We want to keep this simple instead of retrying > > > continuously and maintaining the deletion state etc. > > > > > > >114. "We may not need to copy producer-id-snapshot as we are copying > > > >only segments earlier to last-stable-offset." Hmm, not sure about that. > > > >The producer snapshot includes things like the last timestamp of each > > > >open producer id and can affect when those producer ids are expired. > > > > > > Sure, this will be added as part of the LogSegmentData. > > > > > > Thanks, > > > Satish. > > > > > > > > > On Fri, May 29, 2020 at 6:39 AM Jun Rao <j...@confluent.io> wrote: > > > > > > > > Hi, Satish, > > > > > > > > Made another pass on the wiki. A few more comments below. > > > > > > > > 100. It would be useful to provide more details on how those apis are > > > > used. Otherwise, it's kind of hard to really assess whether the new > > > > apis are sufficient/redundant. A few examples below. > > > > 100.1 deleteRecords seems to only advance the logStartOffset in Log. > > > > How does that trigger the deletion of remote log segments? > > > > 100.2 stopReplica with deletion is used in 2 cases (a) replica > > > > reassignment; (b) topic deletion. We only want to delete the tiered > > > > metadata in the second case. Also, in the second case, who initiates > > > > the deletion of the remote segment since the leader may not exist? > > > > 100.3 "LogStartOffset of a topic can be either in local or in remote > > > > storage." If LogStartOffset exists in both places, which one is the > > > > source of truth? > > > > 100.4 List<RemoteLogSegmentMetadata> > > > > listRemoteLogSegments(TopicPartition topicPartition, long minOffset): > > > > How is minOffset supposed to be used? > > > > 100.5 When copying a segment to remote storage, it seems we are calling > > > > the same RLMM.putRemoteLogSegmentData() twice before and after > > > > copyLogSegment(). Could you explain why? > > > > 100.6 LogSegmentData includes leaderEpochCache, but there is no api in > > > > RemoteStorageManager to retrieve it. > > > > > > > > 101. If the __remote_log_metadata is for production usage, could you > > > > provide more details? For example, what is the schema of the data (both > > > > key and value)? How is the topic maintained,delete or compact? > > > > > > > > 110. Is the cache implementation in RemoteLogMetadataManager meant for > > > > production usage? If so, could you provide more details on the schema > > > > and how/where the data is stored? > > > > > > > > 111. "Committed offsets can be stored in a local file". Could you > > > > describe the format of the file and where it's stored? > > > > > > > > 112. Truncation of remote segments under unclean leader election: I am > > > > not sure who figures out the truncated remote segments and how that > > > > information is propagated to all replicas? > > > > > > > > 113. "If there are any failures in removing remote log segments then > > > > those are stored in a specific topic (default as > > > > __remote_segments_to_be_deleted)". Is it necessary to add yet another > > > > internal topic? Could we just keep retrying? > > > > > > > > 114. "We may not need to copy producer-id-snapshot as we are copying > > > > only segments earlier to last-stable-offset." Hmm, not sure about that. > > > > The producer snapshot includes things like the last timestamp of each > > > > open producer id and can affect when those producer ids are expired. > > > > > > > > Thanks, > > > > > > > > Jun > > > > > > > > On Thu, May 28, 2020 at 5:38 AM Satish Duggana > > > > <satish.dugg...@gmail.com> wrote: > > > >> > > > >> Hi Jun, > > > >> Gentle reminder. Please go through the updated wiki and let us know > > > >> your comments. > > > >> > > > >> Thanks, > > > >> Satish. > > > >> > > > >> On Tue, May 19, 2020 at 3:50 PM Satish Duggana > > > >> <satish.dugg...@gmail.com> wrote: > > > >>> > > > >>> Hi Jun, > > > >>> Please go through the wiki which has the latest updates. Google doc > > > >>> is updated frequently to be in sync with wiki. > > > >>> > > > >>> Thanks, > > > >>> Satish. > > > >>> > > > >>> On Tue, May 19, 2020 at 12:30 AM Jun Rao <j...@confluent.io> wrote: > > > >>>> > > > >>>> Hi, Satish, > > > >>>> > > > >>>> Thanks for the update. Just to clarify. Which doc has the latest > > > >>>> updates, the wiki or the google doc? > > > >>>> > > > >>>> Jun > > > >>>> > > > >>>> On Thu, May 14, 2020 at 10:38 AM Satish Duggana > > > >>>> <satish.dugg...@gmail.com> wrote: > > > >>>>> > > > >>>>> Hi Jun, > > > >>>>> Thanks for your comments. We updated the KIP with more details. > > > >>>>> > > > >>>>> >100. For each of the operations related to tiering, it would be > > > >>>>> >useful to provide a description on how it works with the new API. > > > >>>>> >These include things like consumer fetch, replica fetch, > > > >>>>> >offsetForTimestamp, retention (remote and local) by size, time and > > > >>>>> >logStartOffset, topic deletion, etc. This will tell us if the > > > >>>>> >proposed APIs are sufficient. > > > >>>>> > > > >>>>> We addressed most of these APIs in the KIP. We can add more details > > > >>>>> if needed. > > > >>>>> > > > >>>>> >101. For the default implementation based on internal topic, is it > > > >>>>> >meant as a proof of concept or for production usage? I assume that > > > >>>>> >it's the former. However, if it's the latter, then the KIP needs > > > >>>>> >to describe the design in more detail. > > > >>>>> > > > >>>>> It is production usage as was mentioned in an earlier mail. We plan > > > >>>>> to update this section in the next few days. > > > >>>>> > > > >>>>> >102. When tiering a segment, the segment is first written to the > > > >>>>> >object store and then its metadata is written to RLMM using the > > > >>>>> >api "void putRemoteLogSegmentData()". One potential issue with > > > >>>>> >this approach is that if the system fails after the first > > > >>>>> >operation, it leaves a garbage in the object store that's never > > > >>>>> >reclaimed. One way to improve this is to have two separate APIs, > > > >>>>> >sth like preparePutRemoteLogSegmentData() and > > > >>>>> >commitPutRemoteLogSegmentData(). > > > >>>>> > > > >>>>> That is a good point. We currently have a different way using > > > >>>>> markers in the segment but your suggestion is much better. > > > >>>>> > > > >>>>> >103. It seems that the transactional support and the ability to > > > >>>>> >read from follower are missing. > > > >>>>> > > > >>>>> KIP is updated with transactional support, follower fetch > > > >>>>> semantics, and reading from a follower. > > > >>>>> > > > >>>>> >104. It would be useful to provide a testing plan for this KIP. > > > >>>>> > > > >>>>> We added a few tests by introducing test util for tiered storage in > > > >>>>> the PR. We will provide the testing plan in the next few days. > > > >>>>> > > > >>>>> Thanks, > > > >>>>> Satish. > > > >>>>> > > > >>>>> > > > >>>>> On Wed, Feb 26, 2020 at 9:43 PM Harsha Chintalapani > > > >>>>> <ka...@harsha.io> wrote: > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> > > > >>>>>> On Tue, Feb 25, 2020 at 12:46 PM, Jun Rao <j...@confluent.io> > > > >>>>>> wrote: > > > >>>>>>> > > > >>>>>>> Hi, Satish, > > > >>>>>>> > > > >>>>>>> Thanks for the updated doc. The new API seems to be an > > > >>>>>>> improvement overall. A few more comments below. > > > >>>>>>> > > > >>>>>>> 100. For each of the operations related to tiering, it would be > > > >>>>>>> useful to provide a description on how it works with the new API. > > > >>>>>>> These include things like consumer fetch, replica fetch, > > > >>>>>>> offsetForTimestamp, retention > > > >>>>>>> (remote and local) by size, time and logStartOffset, topic > > > >>>>>>> deletion, etc. This will tell us if the proposed APIs are > > > >>>>>>> sufficient. > > > >>>>>> > > > >>>>>> > > > >>>>>> Thanks for the feedback Jun. We will add more details around this. > > > >>>>>> > > > >>>>>>> > > > >>>>>>> 101. For the default implementation based on internal topic, is > > > >>>>>>> it meant as a proof of concept or for production usage? I assume > > > >>>>>>> that it's the former. However, if it's the latter, then the KIP > > > >>>>>>> needs to describe the design in more detail. > > > >>>>>> > > > >>>>>> > > > >>>>>> Yes it meant to be for production use. Ideally it would be good > > > >>>>>> to merge this in as the default implementation for metadata > > > >>>>>> service. We can add more details around design and testing. > > > >>>>>> > > > >>>>>>> 102. When tiering a segment, the segment is first written to the > > > >>>>>>> object store and then its metadata is written to RLMM using the > > > >>>>>>> api "void putRemoteLogSegmentData()". > > > >>>>>>> One potential issue with this approach is that if the system > > > >>>>>>> fails after the first operation, it leaves a garbage in the > > > >>>>>>> object store that's never reclaimed. One way to improve this is > > > >>>>>>> to have two separate APIs, sth like > > > >>>>>>> preparePutRemoteLogSegmentData() and > > > >>>>>>> commitPutRemoteLogSegmentData(). > > > >>>>>>> > > > >>>>>>> 103. It seems that the transactional support and the ability to > > > >>>>>>> read from follower are missing. > > > >>>>>>> > > > >>>>>>> 104. It would be useful to provide a testing plan for this KIP. > > > >>>>>> > > > >>>>>> > > > >>>>>> We are working on adding more details around transactional support > > > >>>>>> and coming up with test plan. > > > >>>>>> Add system tests and integration tests. > > > >>>>>> > > > >>>>>>> Thanks, > > > >>>>>>> > > > >>>>>>> Jun > > > >>>>>>> > > > >>>>>>> On Mon, Feb 24, 2020 at 8:10 AM Satish Duggana > > > >>>>>>> <satish.dugg...@gmail.com> wrote: > > > >>>>>>> > > > >>>>>>> Hi Jun, > > > >>>>>>> Please look at the earlier reply and let us know your comments. > > > >>>>>>> > > > >>>>>>> Thanks, > > > >>>>>>> Satish. > > > >>>>>>> > > > >>>>>>> On Wed, Feb 12, 2020 at 4:06 PM Satish Duggana > > > >>>>>>> <satish.dugg...@gmail.com> wrote: > > > >>>>>>> > > > >>>>>>> Hi Jun, > > > >>>>>>> Thanks for your comments on the separation of remote log metadata > > > >>>>>>> storage and remote log storage. > > > >>>>>>> We had a few discussions since early Jan on how to support > > > >>>>>>> eventually consistent stores like S3 by uncoupling remote log > > > >>>>>>> segment metadata and remote log storage. It is written with > > > >>>>>>> details in the doc here(1). Below is the brief summary of the > > > >>>>>>> discussion from that doc. > > > >>>>>>> > > > >>>>>>> The current approach consists of pulling the remote log segment > > > >>>>>>> metadata from remote log storage APIs. It worked fine for > > > >>>>>>> storages like HDFS. But one of the problems of relying on the > > > >>>>>>> remote storage to maintain metadata is that tiered-storage needs > > > >>>>>>> to be strongly consistent, with an impact not only on the > > > >>>>>>> metadata(e.g. LIST in S3) but also on the segment data(e.g. GET > > > >>>>>>> after a DELETE in S3). The cost of maintaining metadata in remote > > > >>>>>>> storage needs to be factored in. This is true in the case of S3, > > > >>>>>>> LIST APIs incur huge costs as you raised earlier. > > > >>>>>>> So, it is good to separate the remote storage from the remote log > > > >>>>>>> metadata store. We refactored the existing RemoteStorageManager > > > >>>>>>> and introduced RemoteLogMetadataManager. Remote log metadata > > > >>>>>>> store should give strong consistency semantics but remote log > > > >>>>>>> storage can be eventually consistent. > > > >>>>>>> We can have a default implementation for RemoteLogMetadataManager > > > >>>>>>> which uses an internal topic(as mentioned in one of our earlier > > > >>>>>>> emails) as storage. But users can always plugin their own > > > >>>>>>> RemoteLogMetadataManager implementation based on their > > > >>>>>>> environment. > > > >>>>>>> > > > >>>>>>> Please go through the updated KIP and let us know your comments. > > > >>>>>>> We have started refactoring for the changes mentioned in the KIP > > > >>>>>>> and there may be a few more updates to the APIs. > > > >>>>>>> > > > >>>>>>> [1] > > > >>>>>>> > > > >>>>>>> https://docs.google.com/document/d/1qfkBCWL1e7ZWkHU7brxKDBebq4ie9yK20XJnKbgAlew/edit?ts=5e208ec7# > > > >>>>>>> > > > >>>>>>> On Fri, Dec 27, 2019 at 5:43 PM Ivan Yurchenko > > > >>>>>>> <ivan0yurche...@gmail.com> > > > >>>>>>> > > > >>>>>>> wrote: > > > >>>>>>> > > > >>>>>>> Hi all, > > > >>>>>>> > > > >>>>>>> Jun: > > > >>>>>>> > > > >>>>>>> (a) Cost: S3 list object requests cost $0.005 per 1000 requests. > > > >>>>>>> If > > > >>>>>>> > > > >>>>>>> you > > > >>>>>>> > > > >>>>>>> have 100,000 partitions and want to pull the metadata for each > > > >>>>>>> > > > >>>>>>> partition > > > >>>>>>> > > > >>>>>>> at > > > >>>>>>> > > > >>>>>>> the rate of 1/sec. It can cost $0.5/sec, which is roughly $40K per > > > >>>>>>> > > > >>>>>>> day. > > > >>>>>>> > > > >>>>>>> I want to note here, that no reasonably durable storage will be > > > >>>>>>> cheap at 100k RPS. For example, DynamoDB might give the same > > > >>>>>>> ballpark > > > >>>>>>> > > > >>>>>>> figures. > > > >>>>>>> > > > >>>>>>> If we want to keep the pull-based approach, we can try to reduce > > > >>>>>>> this > > > >>>>>>> > > > >>>>>>> number > > > >>>>>>> > > > >>>>>>> in several ways: doing listings less frequently (as Satish > > > >>>>>>> mentioned, with the current defaults it's ~3.33k RPS for your > > > >>>>>>> example), batching listing operations in some way (depending on > > > >>>>>>> the storage; it might require the change of RSM's interface). > > > >>>>>>> > > > >>>>>>> There are different ways for doing push based metadata > > > >>>>>>> propagation. > > > >>>>>>> > > > >>>>>>> Some > > > >>>>>>> > > > >>>>>>> object stores may support that already. For example, S3 supports > > > >>>>>>> > > > >>>>>>> events > > > >>>>>>> > > > >>>>>>> notification > > > >>>>>>> > > > >>>>>>> This sounds interesting. However, I see a couple of issues using > > > >>>>>>> it: > > > >>>>>>> 1. As I understand the documentation, notification delivery is > > > >>>>>>> not guaranteed > > > >>>>>>> and it's recommended to periodically do LIST to fill the gaps. > > > >>>>>>> Which brings us back to the same LIST consistency guarantees > > > >>>>>>> issue. > > > >>>>>>> 2. The same goes for the broker start: to get the current state, > > > >>>>>>> we > > > >>>>>>> > > > >>>>>>> need > > > >>>>>>> > > > >>>>>>> to LIST. > > > >>>>>>> 3. The dynamic set of multiple consumers (RSMs): AFAIK SQS and SNS > > > >>>>>>> > > > >>>>>>> aren't > > > >>>>>>> > > > >>>>>>> designed for such a case. > > > >>>>>>> > > > >>>>>>> Alexandre: > > > >>>>>>> > > > >>>>>>> A.1 As commented on PR 7561, S3 consistency model [1][2] implies > > > >>>>>>> RSM > > > >>>>>>> > > > >>>>>>> cannot > > > >>>>>>> > > > >>>>>>> relies solely on S3 APIs to guarantee the expected strong > > > >>>>>>> > > > >>>>>>> consistency. The > > > >>>>>>> > > > >>>>>>> proposed implementation [3] would need to be updated to take this > > > >>>>>>> > > > >>>>>>> into > > > >>>>>>> > > > >>>>>>> account. Let’s talk more about this. > > > >>>>>>> > > > >>>>>>> Thank you for the feedback. I clearly see the need for changing > > > >>>>>>> the S3 implementation > > > >>>>>>> to provide stronger consistency guarantees. As it see from this > > > >>>>>>> thread, there are > > > >>>>>>> several possible approaches to this. Let's discuss > > > >>>>>>> RemoteLogManager's contract and > > > >>>>>>> behavior (like pull vs push model) further before picking one (or > > > >>>>>>> > > > >>>>>>> several - > > > >>>>>>> > > > >>>>>>> ?) of them. > > > >>>>>>> I'm going to do some evaluation of DynamoDB for the pull-based > > > >>>>>>> > > > >>>>>>> approach, > > > >>>>>>> > > > >>>>>>> if it's possible to apply it paying a reasonable bill. Also, of > > > >>>>>>> the push-based approach > > > >>>>>>> with a Kafka topic as the medium. > > > >>>>>>> > > > >>>>>>> A.2.3 Atomicity – what does an implementation of RSM need to > > > >>>>>>> provide > > > >>>>>>> > > > >>>>>>> with > > > >>>>>>> > > > >>>>>>> respect to atomicity of the APIs copyLogSegment, cleanupLogUntil > > > >>>>>>> and deleteTopicPartition? If a partial failure happens in any of > > > >>>>>>> those > > > >>>>>>> > > > >>>>>>> (e.g. > > > >>>>>>> > > > >>>>>>> in > > > >>>>>>> > > > >>>>>>> the S3 implementation, if one of the multiple uploads fails [4]), > > > >>>>>>> > > > >>>>>>> The S3 implementation is going to change, but it's worth > > > >>>>>>> clarifying > > > >>>>>>> > > > >>>>>>> anyway. > > > >>>>>>> > > > >>>>>>> The segment log file is being uploaded after S3 has acked > > > >>>>>>> uploading of all other files associated with the segment and only > > > >>>>>>> after this the > > > >>>>>>> > > > >>>>>>> whole > > > >>>>>>> > > > >>>>>>> segment file set becomes visible remotely for operations like > > > >>>>>>> listRemoteSegments [1]. > > > >>>>>>> In case of upload failure, the files that has been successfully > > > >>>>>>> > > > >>>>>>> uploaded > > > >>>>>>> > > > >>>>>>> stays > > > >>>>>>> as invisible garbage that is collected by cleanupLogUntil (or > > > >>>>>>> > > > >>>>>>> overwritten > > > >>>>>>> > > > >>>>>>> successfully later). > > > >>>>>>> And the opposite happens during the deletion: log files are > > > >>>>>>> deleted > > > >>>>>>> > > > >>>>>>> first. > > > >>>>>>> > > > >>>>>>> This approach should generally work when we solve consistency > > > >>>>>>> issues by adding a strongly consistent storage: a segment's > > > >>>>>>> uploaded files > > > >>>>>>> > > > >>>>>>> remain > > > >>>>>>> > > > >>>>>>> invisible garbage until some metadata about them is written. > > > >>>>>>> > > > >>>>>>> A.3 Caching – storing locally the segments retrieved from the > > > >>>>>>> remote storage is excluded as it does not align with the original > > > >>>>>>> intent > > > >>>>>>> > > > >>>>>>> and even > > > >>>>>>> > > > >>>>>>> defeat some of its purposes (save disk space etc.). That said, > > > >>>>>>> could > > > >>>>>>> > > > >>>>>>> there > > > >>>>>>> > > > >>>>>>> be other types of use cases where the pattern of access to the > > > >>>>>>> > > > >>>>>>> remotely > > > >>>>>>> > > > >>>>>>> stored segments would benefit from local caching (and potentially > > > >>>>>>> read-ahead)? Consider the use case of a large pool of consumers > > > >>>>>>> which > > > >>>>>>> > > > >>>>>>> start > > > >>>>>>> > > > >>>>>>> a backfill at the same time for one day worth of data from one > > > >>>>>>> year > > > >>>>>>> > > > >>>>>>> ago > > > >>>>>>> > > > >>>>>>> stored remotely. Caching the segments locally would allow to > > > >>>>>>> > > > >>>>>>> uncouple the > > > >>>>>>> > > > >>>>>>> load on the remote storage from the load on the Kafka cluster. > > > >>>>>>> Maybe > > > >>>>>>> > > > >>>>>>> the > > > >>>>>>> > > > >>>>>>> RLM could expose a configuration parameter to switch that feature > > > >>>>>>> > > > >>>>>>> on/off? > > > >>>>>>> > > > >>>>>>> I tend to agree here, caching remote segments locally and making > > > >>>>>>> this configurable sounds pretty practical to me. We should > > > >>>>>>> implement > > > >>>>>>> > > > >>>>>>> this, > > > >>>>>>> > > > >>>>>>> maybe not in the first iteration. > > > >>>>>>> > > > >>>>>>> Br, > > > >>>>>>> Ivan > > > >>>>>>> > > > >>>>>>> [1] > > > >>>>>>> > > > >>>>>>> https://github.com/harshach/kafka/pull/18/files#diff-4d73d01c16caed6f2548fc3063550ef0R152 > > > >>>>>>> > > > >>>>>>> On Thu, 19 Dec 2019 at 19:49, Alexandre Dupriez < > > > >>>>>>> > > > >>>>>>> alexandre.dupr...@gmail.com> > > > >>>>>>> > > > >>>>>>> wrote: > > > >>>>>>> > > > >>>>>>> Hi Jun, > > > >>>>>>> > > > >>>>>>> Thank you for the feedback. I am trying to understand how a > > > >>>>>>> > > > >>>>>>> push-based > > > >>>>>>> > > > >>>>>>> approach would work. > > > >>>>>>> In order for the metadata to be propagated (under the assumption > > > >>>>>>> you stated), would you plan to add a new API in Kafka to allow > > > >>>>>>> the metadata store to send them directly to the brokers? > > > >>>>>>> > > > >>>>>>> Thanks, > > > >>>>>>> Alexandre > > > >>>>>>> > > > >>>>>>> Le mer. 18 déc. 2019 à 20:14, Jun Rao <j...@confluent.io> a écrit > > > >>>>>>> : > > > >>>>>>> > > > >>>>>>> Hi, Satish and Ivan, > > > >>>>>>> > > > >>>>>>> There are different ways for doing push based metadata > > > >>>>>>> > > > >>>>>>> propagation. Some > > > >>>>>>> > > > >>>>>>> object stores may support that already. For example, S3 supports > > > >>>>>>> > > > >>>>>>> events > > > >>>>>>> > > > >>>>>>> notification ( > > > >>>>>>> > > > >>>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html). > > > >>>>>>> > > > >>>>>>> Otherwise one could use a separate metadata store that supports > > > >>>>>>> > > > >>>>>>> push-based > > > >>>>>>> > > > >>>>>>> change propagation. Other people have mentioned using a Kafka > > > >>>>>>> > > > >>>>>>> topic. The > > > >>>>>>> > > > >>>>>>> best approach may depend on the object store and the operational > > > >>>>>>> environment (e.g. whether an external metadata store is already > > > >>>>>>> > > > >>>>>>> available). > > > >>>>>>> > > > >>>>>>> The above discussion is based on the assumption that we need to > > > >>>>>>> > > > >>>>>>> cache the > > > >>>>>>> > > > >>>>>>> object metadata locally in every broker. I mentioned earlier that > > > >>>>>>> > > > >>>>>>> an > > > >>>>>>> > > > >>>>>>> alternative is to just store/retrieve those metadata in an > > > >>>>>>> external metadata store. That may simplify the implementation in > > > >>>>>>> some cases. > > > >>>>>>> > > > >>>>>>> Thanks, > > > >>>>>>> > > > >>>>>>> Jun > > > >>>>>>> > > > >>>>>>> On Thu, Dec 5, 2019 at 7:01 AM Satish Duggana < > > > >>>>>>> > > > >>>>>>> satish.dugg...@gmail.com> > > > >>>>>>> > > > >>>>>>> wrote: > > > >>>>>>> > > > >>>>>>> Hi Jun, > > > >>>>>>> Thanks for your reply. > > > >>>>>>> > > > >>>>>>> Currently, `listRemoteSegments` is called at the configured > > > >>>>>>> interval(not every second, defaults to 30secs). Storing remote > > > >>>>>>> > > > >>>>>>> log > > > >>>>>>> > > > >>>>>>> metadata in a strongly consistent store for S3 RSM is raised in > > > >>>>>>> PR-comment[1]. > > > >>>>>>> RLM invokes RSM at regular intervals and RSM can give remote > > > >>>>>>> > > > >>>>>>> segment > > > >>>>>>> > > > >>>>>>> metadata if it is available. RSM is responsible for maintaining > > > >>>>>>> > > > >>>>>>> and > > > >>>>>>> > > > >>>>>>> fetching those entries. It should be based on whatever mechanism > > > >>>>>>> > > > >>>>>>> is > > > >>>>>>> > > > >>>>>>> consistent and efficient with the respective remote storage. > > > >>>>>>> > > > >>>>>>> Can you give more details about push based mechanism from RSM? > > > >>>>>>> > > > >>>>>>> 1. > > > >>>>>>> > > > >>>>>>> https://github.com/apache/kafka/pull/7561#discussion_r344576223 > > > >>>>>>> > > > >>>>>>> Thanks, > > > >>>>>>> Satish. > > > >>>>>>> > > > >>>>>>> On Thu, Dec 5, 2019 at 4:23 AM Jun Rao <j...@confluent.io> wrote: > > > >>>>>>> > > > >>>>>>> Hi, Harsha, > > > >>>>>>> > > > >>>>>>> Thanks for the reply. > > > >>>>>>> > > > >>>>>>> 40/41. I am curious which block storages you have tested. S3 > > > >>>>>>> > > > >>>>>>> seems > > > >>>>>>> > > > >>>>>>> to be > > > >>>>>>> > > > >>>>>>> one of the popular block stores. The concerns that I have with > > > >>>>>>> > > > >>>>>>> pull > > > >>>>>>> > > > >>>>>>> based > > > >>>>>>> > > > >>>>>>> approach are the following. > > > >>>>>>> (a) Cost: S3 list object requests cost $0.005 per 1000 > > > >>>>>>> > > > >>>>>>> requests. If > > > >>>>>>> > > > >>>>>>> you > > > >>>>>>> > > > >>>>>>> have 100,000 partitions and want to pull the metadata for each > > > >>>>>>> > > > >>>>>>> partition > > > >>>>>>> > > > >>>>>>> at > > > >>>>>>> > > > >>>>>>> the rate of 1/sec. It can cost $0.5/sec, which is roughly $40K > > > >>>>>>> > > > >>>>>>> per > > > >>>>>>> > > > >>>>>>> day. > > > >>>>>>> > > > >>>>>>> (b) Semantics: S3 list objects are eventually consistent. So, > > > >>>>>>> > > > >>>>>>> when > > > >>>>>>> > > > >>>>>>> you > > > >>>>>>> > > > >>>>>>> do a > > > >>>>>>> > > > >>>>>>> list object request, there is no guarantee that you can see all > > > >>>>>>> > > > >>>>>>> uploaded > > > >>>>>>> > > > >>>>>>> objects. This could impact the correctness of subsequent > > > >>>>>>> > > > >>>>>>> logics. > > > >>>>>>> > > > >>>>>>> (c) Efficiency: Blindly pulling metadata when there is no > > > >>>>>>> > > > >>>>>>> change adds > > > >>>>>>> > > > >>>>>>> unnecessary overhead in the broker as well as in the block > > > >>>>>>> > > > >>>>>>> store. > > > >>>>>>> > > > >>>>>>> So, have you guys tested S3? If so, could you share your > > > >>>>>>> > > > >>>>>>> experience > > > >>>>>>> > > > >>>>>>> in > > > >>>>>>> > > > >>>>>>> terms of cost, semantics and efficiency? > > > >>>>>>> > > > >>>>>>> Jun > > > >>>>>>> > > > >>>>>>> On Tue, Dec 3, 2019 at 10:11 PM Harsha Chintalapani < > > > >>>>>>> > > > >>>>>>> ka...@harsha.io > > > >>>>>>> > > > >>>>>>> wrote: > > > >>>>>>> > > > >>>>>>> Hi Jun, > > > >>>>>>> Thanks for the reply. > > > >>>>>>> > > > >>>>>>> On Tue, Nov 26, 2019 at 3:46 PM, Jun Rao <j...@confluent.io> > > > >>>>>>> > > > >>>>>>> wrote: > > > >>>>>>> > > > >>>>>>> Hi, Satish and Ying, > > > >>>>>>> > > > >>>>>>> Thanks for the reply. > > > >>>>>>> > > > >>>>>>> 40/41. There are two different ways that we can approach > > > >>>>>>> > > > >>>>>>> this. > > > >>>>>>> > > > >>>>>>> One is > > > >>>>>>> > > > >>>>>>> what > > > >>>>>>> > > > >>>>>>> you said. We can have an opinionated way of storing and > > > >>>>>>> > > > >>>>>>> populating > > > >>>>>>> > > > >>>>>>> the > > > >>>>>>> > > > >>>>>>> tier > > > >>>>>>> > > > >>>>>>> metadata that we think is good enough for everyone. I am > > > >>>>>>> > > > >>>>>>> not > > > >>>>>>> > > > >>>>>>> sure if > > > >>>>>>> > > > >>>>>>> this > > > >>>>>>> > > > >>>>>>> is the case based on what's currently proposed in the KIP. > > > >>>>>>> > > > >>>>>>> For > > > >>>>>>> > > > >>>>>>> example, I > > > >>>>>>> > > > >>>>>>> am not sure that (1) everyone always needs local metadata; > > > >>>>>>> > > > >>>>>>> (2) > > > >>>>>>> > > > >>>>>>> the > > > >>>>>>> > > > >>>>>>> current > > > >>>>>>> > > > >>>>>>> local storage format is general enough and (3) everyone > > > >>>>>>> > > > >>>>>>> wants to > > > >>>>>>> > > > >>>>>>> use > > > >>>>>>> > > > >>>>>>> the > > > >>>>>>> > > > >>>>>>> pull based approach to propagate the metadata. Another > > > >>>>>>> > > > >>>>>>> approach > > > >>>>>>> > > > >>>>>>> is to > > > >>>>>>> > > > >>>>>>> make > > > >>>>>>> > > > >>>>>>> this pluggable and let the implementor implements the best > > > >>>>>>> > > > >>>>>>> approach > > > >>>>>>> > > > >>>>>>> for a > > > >>>>>>> > > > >>>>>>> particular block storage. I haven't seen any comments from > > > >>>>>>> > > > >>>>>>> Slack/AirBnb > > > >>>>>>> > > > >>>>>>> in > > > >>>>>>> > > > >>>>>>> the mailing list on this topic. It would be great if they > > > >>>>>>> > > > >>>>>>> can > > > >>>>>>> > > > >>>>>>> provide > > > >>>>>>> > > > >>>>>>> feedback directly here. > > > >>>>>>> > > > >>>>>>> The current interfaces are designed with most popular block > > > >>>>>>> > > > >>>>>>> storages > > > >>>>>>> > > > >>>>>>> available today and we did 2 implementations with these > > > >>>>>>> > > > >>>>>>> interfaces and > > > >>>>>>> > > > >>>>>>> they both are yielding good results as we going through the > > > >>>>>>> > > > >>>>>>> testing of > > > >>>>>>> > > > >>>>>>> it. > > > >>>>>>> > > > >>>>>>> If there is ever a need for pull based approach we can > > > >>>>>>> > > > >>>>>>> definitely > > > >>>>>>> > > > >>>>>>> evolve > > > >>>>>>> > > > >>>>>>> the interface. > > > >>>>>>> In the past we did mark interfaces to be evolving to make > > > >>>>>>> > > > >>>>>>> room for > > > >>>>>>> > > > >>>>>>> unknowns > > > >>>>>>> > > > >>>>>>> in the future. > > > >>>>>>> If you have any suggestions around the current interfaces > > > >>>>>>> > > > >>>>>>> please > > > >>>>>>> > > > >>>>>>> propose we > > > >>>>>>> > > > >>>>>>> are happy to see if we can work them into it. > > > >>>>>>> > > > >>>>>>> 43. To offer tier storage as a general feature, ideally all > > > >>>>>>> > > > >>>>>>> existing > > > >>>>>>> > > > >>>>>>> capabilities should still be supported. It's fine if the > > > >>>>>>> > > > >>>>>>> uber > > > >>>>>>> > > > >>>>>>> implementation doesn't support all capabilities for > > > >>>>>>> > > > >>>>>>> internal > > > >>>>>>> > > > >>>>>>> usage. > > > >>>>>>> > > > >>>>>>> However, the framework should be general enough. > > > >>>>>>> > > > >>>>>>> We agree on that as a principle. But all of these major > > > >>>>>>> > > > >>>>>>> features > > > >>>>>>> > > > >>>>>>> mostly > > > >>>>>>> > > > >>>>>>> coming right now and to have a new big feature such as tiered > > > >>>>>>> > > > >>>>>>> storage > > > >>>>>>> > > > >>>>>>> to > > > >>>>>>> > > > >>>>>>> support all the new features will be a big ask. We can > > > >>>>>>> > > > >>>>>>> document on > > > >>>>>>> > > > >>>>>>> how > > > >>>>>>> > > > >>>>>>> do > > > >>>>>>> > > > >>>>>>> we approach solving these in future iterations. > > > >>>>>>> Our goal is to make this tiered storage feature work for > > > >>>>>>> > > > >>>>>>> everyone. > > > >>>>>>> > > > >>>>>>> 43.3 This is more than just serving the tier-ed data from > > > >>>>>>> > > > >>>>>>> block > > > >>>>>>> > > > >>>>>>> storage. > > > >>>>>>> > > > >>>>>>> With KIP-392, the consumer now can resolve the conflicts > > > >>>>>>> > > > >>>>>>> with the > > > >>>>>>> > > > >>>>>>> replica > > > >>>>>>> > > > >>>>>>> based on leader epoch. So, we need to make sure that > > > >>>>>>> > > > >>>>>>> leader epoch > > > >>>>>>> > > > >>>>>>> can be > > > >>>>>>> > > > >>>>>>> recovered properly from tier storage. > > > >>>>>>> > > > >>>>>>> We are working on testing our approach and we will update > > > >>>>>>> > > > >>>>>>> the KIP > > > >>>>>>> > > > >>>>>>> with > > > >>>>>>> > > > >>>>>>> design details. > > > >>>>>>> > > > >>>>>>> 43.4 For JBOD, if tier storage stores the tier metadata > > > >>>>>>> > > > >>>>>>> locally, we > > > >>>>>>> > > > >>>>>>> need to > > > >>>>>>> > > > >>>>>>> support moving such metadata across disk directories since > > > >>>>>>> > > > >>>>>>> JBOD > > > >>>>>>> > > > >>>>>>> supports > > > >>>>>>> > > > >>>>>>> moving data across disks. > > > >>>>>>> > > > >>>>>>> KIP is updated with JBOD details. Having said that JBOD > > > >>>>>>> > > > >>>>>>> tooling > > > >>>>>>> > > > >>>>>>> needs > > > >>>>>>> > > > >>>>>>> to > > > >>>>>>> > > > >>>>>>> evolve to support production loads. Most of the users will be > > > >>>>>>> > > > >>>>>>> interested in > > > >>>>>>> > > > >>>>>>> using tiered storage without JBOD support support on day 1. > > > >>>>>>> > > > >>>>>>> Thanks, > > > >>>>>>> Harsha > > > >>>>>>> > > > >>>>>>> As for meeting, we could have a KIP e-meeting on this if > > > >>>>>>> > > > >>>>>>> needed, > > > >>>>>>> > > > >>>>>>> but it > > > >>>>>>> > > > >>>>>>> will be open to everyone and will be recorded and shared. > > > >>>>>>> > > > >>>>>>> Often, > > > >>>>>>> > > > >>>>>>> the > > > >>>>>>> > > > >>>>>>> details are still resolved through the mailing list. > > > >>>>>>> > > > >>>>>>> Jun > > > >>>>>>> > > > >>>>>>> On Tue, Nov 19, 2019 at 6:48 PM Ying Zheng > > > >>>>>>> > > > >>>>>>> <yi...@uber.com.invalid> > > > >>>>>>> > > > >>>>>>> wrote: > > > >>>>>>> > > > >>>>>>> Please ignore my previous email > > > >>>>>>> I didn't know Apache requires all the discussions to be > > > >>>>>>> > > > >>>>>>> "open" > > > >>>>>>> > > > >>>>>>> On Tue, Nov 19, 2019, 5:40 PM Ying Zheng <yi...@uber.com> > > > >>>>>>> > > > >>>>>>> wrote: > > > >>>>>>> > > > >>>>>>> Hi Jun, > > > >>>>>>> > > > >>>>>>> Thank you very much for your feedback! > > > >>>>>>> > > > >>>>>>> Can we schedule a meeting in your Palo Alto office in > > > >>>>>>> > > > >>>>>>> December? I > > > >>>>>>> > > > >>>>>>> think a > > > >>>>>>> > > > >>>>>>> face to face discussion is much more efficient than > > > >>>>>>> > > > >>>>>>> emails. Both > > > >>>>>>> > > > >>>>>>> Harsha > > > >>>>>>> > > > >>>>>>> and > > > >>>>>>> > > > >>>>>>> I can visit you. Satish may be able to join us remotely. > > > >>>>>>> > > > >>>>>>> On Fri, Nov 15, 2019 at 11:04 AM Jun Rao <j...@confluent.io > > > >>>>>>> > > > >>>>>>> wrote: > > > >>>>>>> > > > >>>>>>> Hi, Satish and Harsha, > > > >>>>>>> > > > >>>>>>> The following is a more detailed high level feedback for > > > >>>>>>> > > > >>>>>>> the KIP. > > > >>>>>>> > > > >>>>>>> Overall, > > > >>>>>>> > > > >>>>>>> the KIP seems useful. The challenge is how to design it > > > >>>>>>> > > > >>>>>>> such that > > > >>>>>>> > > > >>>>>>> it’s > > > >>>>>>> > > > >>>>>>> general enough to support different ways of implementing > > > >>>>>>> > > > >>>>>>> this > > > >>>>>>> > > > >>>>>>> feature > > > >>>>>>> > > > >>>>>>> and > > > >>>>>>> > > > >>>>>>> support existing features. > > > >>>>>>> > > > >>>>>>> 40. Local segment metadata storage: The KIP makes the > > > >>>>>>> > > > >>>>>>> assumption > > > >>>>>>> > > > >>>>>>> that > > > >>>>>>> > > > >>>>>>> the > > > >>>>>>> > > > >>>>>>> metadata for the archived log segments are cached locally > > > >>>>>>> > > > >>>>>>> in > > > >>>>>>> > > > >>>>>>> every > > > >>>>>>> > > > >>>>>>> broker > > > >>>>>>> > > > >>>>>>> and provides a specific implementation for the local > > > >>>>>>> > > > >>>>>>> storage in > > > >>>>>>> > > > >>>>>>> the > > > >>>>>>> > > > >>>>>>> framework. We probably should discuss this more. For > > > >>>>>>> > > > >>>>>>> example, > > > >>>>>>> > > > >>>>>>> some > > > >>>>>>> > > > >>>>>>> tier > > > >>>>>>> > > > >>>>>>> storage providers may not want to cache the metadata > > > >>>>>>> > > > >>>>>>> locally and > > > >>>>>>> > > > >>>>>>> just > > > >>>>>>> > > > >>>>>>> rely > > > >>>>>>> > > > >>>>>>> upon a remote key/value store if such a store is already > > > >>>>>>> > > > >>>>>>> present. If > > > >>>>>>> > > > >>>>>>> a > > > >>>>>>> > > > >>>>>>> local store is used, there could be different ways of > > > >>>>>>> > > > >>>>>>> implementing it > > > >>>>>>> > > > >>>>>>> (e.g., based on customized local files, an embedded local > > > >>>>>>> > > > >>>>>>> store > > > >>>>>>> > > > >>>>>>> like > > > >>>>>>> > > > >>>>>>> RocksDB, etc). An alternative of designing this is to just > > > >>>>>>> > > > >>>>>>> provide an > > > >>>>>>> > > > >>>>>>> interface for retrieving the tier segment metadata and > > > >>>>>>> > > > >>>>>>> leave the > > > >>>>>>> > > > >>>>>>> details > > > >>>>>>> > > > >>>>>>> of > > > >>>>>>> > > > >>>>>>> how to get the metadata outside of the framework. > > > >>>>>>> > > > >>>>>>> 41. RemoteStorageManager interface and the usage of the > > > >>>>>>> > > > >>>>>>> interface in > > > >>>>>>> > > > >>>>>>> the > > > >>>>>>> > > > >>>>>>> framework: I am not sure if the interface is general > > > >>>>>>> > > > >>>>>>> enough. For > > > >>>>>>> > > > >>>>>>> example, > > > >>>>>>> > > > >>>>>>> it seems that RemoteLogIndexEntry is tied to a specific > > > >>>>>>> > > > >>>>>>> way of > > > >>>>>>> > > > >>>>>>> storing > > > >>>>>>> > > > >>>>>>> the > > > >>>>>>> > > > >>>>>>> metadata in remote storage. The framework uses > > > >>>>>>> > > > >>>>>>> listRemoteSegments() > > > >>>>>>> > > > >>>>>>> api > > > >>>>>>> > > > >>>>>>> in > > > >>>>>>> > > > >>>>>>> a pull based approach. However, in some other > > > >>>>>>> > > > >>>>>>> implementations, a > > > >>>>>>> > > > >>>>>>> push > > > >>>>>>> > > > >>>>>>> based > > > >>>>>>> approach may be more preferred. I don’t have a concrete > > > >>>>>>> > > > >>>>>>> proposal > > > >>>>>>> > > > >>>>>>> yet. > > > >>>>>>> > > > >>>>>>> But, > > > >>>>>>> > > > >>>>>>> it would be useful to give this area some more thoughts > > > >>>>>>> > > > >>>>>>> and see > > > >>>>>>> > > > >>>>>>> if we > > > >>>>>>> > > > >>>>>>> can > > > >>>>>>> > > > >>>>>>> make the interface more general. > > > >>>>>>> > > > >>>>>>> 42. In the diagram, the RemoteLogManager is side by side > > > >>>>>>> > > > >>>>>>> with > > > >>>>>>> > > > >>>>>>> LogManager. > > > >>>>>>> > > > >>>>>>> This KIP only discussed how the fetch request is handled > > > >>>>>>> > > > >>>>>>> between > > > >>>>>>> > > > >>>>>>> the > > > >>>>>>> > > > >>>>>>> two > > > >>>>>>> > > > >>>>>>> layer. However, we should also consider how other requests > > > >>>>>>> > > > >>>>>>> that > > > >>>>>>> > > > >>>>>>> touch > > > >>>>>>> > > > >>>>>>> the > > > >>>>>>> > > > >>>>>>> log can be handled. e.g., list offsets by timestamp, delete > > > >>>>>>> > > > >>>>>>> records, > > > >>>>>>> > > > >>>>>>> etc. > > > >>>>>>> > > > >>>>>>> Also, in this model, it's not clear which component is > > > >>>>>>> > > > >>>>>>> responsible > > > >>>>>>> > > > >>>>>>> for > > > >>>>>>> > > > >>>>>>> managing the log start offset. It seems that the log start > > > >>>>>>> > > > >>>>>>> offset > > > >>>>>>> > > > >>>>>>> could > > > >>>>>>> > > > >>>>>>> be > > > >>>>>>> > > > >>>>>>> changed by both RemoteLogManager and LogManager. > > > >>>>>>> > > > >>>>>>> 43. There are quite a few existing features not covered by > > > >>>>>>> > > > >>>>>>> the > > > >>>>>>> > > > >>>>>>> KIP. > > > >>>>>>> > > > >>>>>>> It > > > >>>>>>> > > > >>>>>>> would be useful to discuss each of those. > > > >>>>>>> 43.1 I won’t say that compacted topics are rarely used and > > > >>>>>>> > > > >>>>>>> always > > > >>>>>>> > > > >>>>>>> small. > > > >>>>>>> > > > >>>>>>> For example, KStreams uses compacted topics for storing the > > > >>>>>>> > > > >>>>>>> states > > > >>>>>>> > > > >>>>>>> and > > > >>>>>>> > > > >>>>>>> sometimes the size of the topic could be large. While it > > > >>>>>>> > > > >>>>>>> might > > > >>>>>>> > > > >>>>>>> be ok > > > >>>>>>> > > > >>>>>>> to > > > >>>>>>> > > > >>>>>>> not > > > >>>>>>> > > > >>>>>>> support compacted topics initially, it would be useful to > > > >>>>>>> > > > >>>>>>> have a > > > >>>>>>> > > > >>>>>>> high > > > >>>>>>> > > > >>>>>>> level > > > >>>>>>> idea on how this might be supported down the road so that > > > >>>>>>> > > > >>>>>>> we > > > >>>>>>> > > > >>>>>>> don’t > > > >>>>>>> > > > >>>>>>> have > > > >>>>>>> > > > >>>>>>> to > > > >>>>>>> > > > >>>>>>> make incompatible API changes in the future. > > > >>>>>>> 43.2 We need to discuss how EOS is supported. In > > > >>>>>>> > > > >>>>>>> particular, how > > > >>>>>>> > > > >>>>>>> is > > > >>>>>>> > > > >>>>>>> the > > > >>>>>>> > > > >>>>>>> producer state integrated with the remote storage. 43.3 > > > >>>>>>> > > > >>>>>>> Now that > > > >>>>>>> > > > >>>>>>> KIP-392 > > > >>>>>>> > > > >>>>>>> (allow consumers to fetch from closest replica) is > > > >>>>>>> > > > >>>>>>> implemented, > > > >>>>>>> > > > >>>>>>> we > > > >>>>>>> > > > >>>>>>> need > > > >>>>>>> > > > >>>>>>> to > > > >>>>>>> > > > >>>>>>> discuss how reading from a follower replica is supported > > > >>>>>>> > > > >>>>>>> with > > > >>>>>>> > > > >>>>>>> tier > > > >>>>>>> > > > >>>>>>> storage. > > > >>>>>>> > > > >>>>>>> 43.4 We need to discuss how JBOD is supported with tier > > > >>>>>>> > > > >>>>>>> storage. > > > >>>>>>> > > > >>>>>>> Thanks, > > > >>>>>>> > > > >>>>>>> Jun > > > >>>>>>> > > > >>>>>>> On Fri, Nov 8, 2019 at 12:06 AM Tom Bentley < > > > >>>>>>> > > > >>>>>>> tbent...@redhat.com > > > >>>>>>> > > > >>>>>>> wrote: > > > >>>>>>> > > > >>>>>>> Thanks for those insights Ying. > > > >>>>>>> > > > >>>>>>> On Thu, Nov 7, 2019 at 9:26 PM Ying Zheng > > > >>>>>>> > > > >>>>>>> <yi...@uber.com.invalid > > > >>>>>>> > > > >>>>>>> wrote: > > > >>>>>>> > > > >>>>>>> Thanks, I missed that point. However, there's still a > > > >>>>>>> > > > >>>>>>> point at > > > >>>>>>> > > > >>>>>>> which > > > >>>>>>> > > > >>>>>>> the > > > >>>>>>> > > > >>>>>>> consumer fetches start getting served from remote storage > > > >>>>>>> > > > >>>>>>> (even > > > >>>>>>> > > > >>>>>>> if > > > >>>>>>> > > > >>>>>>> that > > > >>>>>>> > > > >>>>>>> point isn't as soon as the local log retention time/size). > > > >>>>>>> > > > >>>>>>> This > > > >>>>>>> > > > >>>>>>> represents > > > >>>>>>> > > > >>>>>>> a kind of performance cliff edge and what I'm really > > > >>>>>>> > > > >>>>>>> interested > > > >>>>>>> > > > >>>>>>> in > > > >>>>>>> > > > >>>>>>> is > > > >>>>>>> > > > >>>>>>> how > > > >>>>>>> > > > >>>>>>> easy it is for a consumer which falls off that cliff to > > > >>>>>>> > > > >>>>>>> catch up > > > >>>>>>> > > > >>>>>>> and so > > > >>>>>>> > > > >>>>>>> its > > > >>>>>>> > > > >>>>>>> fetches again come from local storage. Obviously this can > > > >>>>>>> > > > >>>>>>> depend > > > >>>>>>> > > > >>>>>>> on > > > >>>>>>> > > > >>>>>>> all > > > >>>>>>> > > > >>>>>>> sorts of factors (like production rate, consumption rate), > > > >>>>>>> > > > >>>>>>> so > > > >>>>>>> > > > >>>>>>> it's > > > >>>>>>> > > > >>>>>>> not > > > >>>>>>> > > > >>>>>>> guaranteed (just like it's not guaranteed for Kafka > > > >>>>>>> > > > >>>>>>> today), but > > > >>>>>>> > > > >>>>>>> this > > > >>>>>>> > > > >>>>>>> would > > > >>>>>>> > > > >>>>>>> represent a new failure mode. > > > >>>>>>> > > > >>>>>>> As I have explained in the last mail, it's a very rare > > > >>>>>>> > > > >>>>>>> case that > > > >>>>>>> > > > >>>>>>> a > > > >>>>>>> > > > >>>>>>> consumer > > > >>>>>>> need to read remote data. With our experience at Uber, > > > >>>>>>> > > > >>>>>>> this only > > > >>>>>>> > > > >>>>>>> happens > > > >>>>>>> > > > >>>>>>> when the consumer service had an outage for several hours. > > > >>>>>>> > > > >>>>>>> There is not a "performance cliff" as you assume. The > > > >>>>>>> > > > >>>>>>> remote > > > >>>>>>> > > > >>>>>>> storage > > > >>>>>>> > > > >>>>>>> is > > > >>>>>>> > > > >>>>>>> even faster than local disks in terms of bandwidth. > > > >>>>>>> > > > >>>>>>> Reading from > > > >>>>>>> > > > >>>>>>> remote > > > >>>>>>> > > > >>>>>>> storage is going to have higher latency than local disk. > > > >>>>>>> > > > >>>>>>> But > > > >>>>>>> > > > >>>>>>> since > > > >>>>>>> > > > >>>>>>> the > > > >>>>>>> > > > >>>>>>> consumer > > > >>>>>>> is catching up several hours data, it's not sensitive to > > > >>>>>>> > > > >>>>>>> the > > > >>>>>>> > > > >>>>>>> sub-second > > > >>>>>>> > > > >>>>>>> level > > > >>>>>>> latency, and each remote read request will read a large > > > >>>>>>> > > > >>>>>>> amount of > > > >>>>>>> > > > >>>>>>> data to > > > >>>>>>> > > > >>>>>>> make the overall performance better than reading from local > > > >>>>>>> > > > >>>>>>> disks. > > > >>>>>>> > > > >>>>>>> Another aspect I'd like to understand better is the effect > > > >>>>>>> > > > >>>>>>> of > > > >>>>>>> > > > >>>>>>> serving > > > >>>>>>> > > > >>>>>>> fetch > > > >>>>>>> > > > >>>>>>> request from remote storage has on the broker's network > > > >>>>>>> > > > >>>>>>> utilization. If > > > >>>>>>> > > > >>>>>>> we're just trimming the amount of data held locally > > > >>>>>>> > > > >>>>>>> (without > > > >>>>>>> > > > >>>>>>> increasing > > > >>>>>>> > > > >>>>>>> the > > > >>>>>>> > > > >>>>>>> overall local+remote retention), then we're effectively > > > >>>>>>> > > > >>>>>>> trading > > > >>>>>>> > > > >>>>>>> disk > > > >>>>>>> > > > >>>>>>> bandwidth for network bandwidth when serving fetch > > > >>>>>>> > > > >>>>>>> requests from > > > >>>>>>> > > > >>>>>>> remote > > > >>>>>>> > > > >>>>>>> storage (which I understand to be a good thing, since > > > >>>>>>> > > > >>>>>>> brokers are > > > >>>>>>> > > > >>>>>>> often/usually disk bound). But if we're increasing the > > > >>>>>>> > > > >>>>>>> overall > > > >>>>>>> > > > >>>>>>> local+remote > > > >>>>>>> > > > >>>>>>> retention then it's more likely that network itself > > > >>>>>>> > > > >>>>>>> becomes the > > > >>>>>>> > > > >>>>>>> bottleneck. > > > >>>>>>> > > > >>>>>>> I appreciate this is all rather hand wavy, I'm just trying > > > >>>>>>> > > > >>>>>>> to > > > >>>>>>> > > > >>>>>>> understand > > > >>>>>>> > > > >>>>>>> how this would affect broker performance, so I'd be > > > >>>>>>> > > > >>>>>>> grateful for > > > >>>>>>> > > > >>>>>>> any > > > >>>>>>> > > > >>>>>>> insights you can offer. > > > >>>>>>> > > > >>>>>>> Network bandwidth is a function of produce speed, it has > > > >>>>>>> > > > >>>>>>> nothing > > > >>>>>>> > > > >>>>>>> to > > > >>>>>>> > > > >>>>>>> do > > > >>>>>>> > > > >>>>>>> with > > > >>>>>>> > > > >>>>>>> remote retention. As long as the data is shipped to remote > > > >>>>>>> > > > >>>>>>> storage, > > > >>>>>>> > > > >>>>>>> you > > > >>>>>>> > > > >>>>>>> can > > > >>>>>>> > > > >>>>>>> keep the data there for 1 day or 1 year or 100 years, it > > > >>>>>>> > > > >>>>>>> doesn't > > > >>>>>>> > > > >>>>>>> consume > > > >>>>>>> > > > >>>>>>> any > > > >>>>>>> network resources. > > > >>>>>> > > > >>>>>>