Hi Jun, Please let us know if you have any comments on "transactional support" and "follower requests/replication" mentioned in the wiki.
Thanks, Satish. On Tue, Jun 2, 2020 at 9:25 PM Satish Duggana <satish.dugg...@gmail.com> wrote: > > Thanks Jun for your comments. > > >100. It would be useful to provide more details on how those apis are used. > >Otherwise, it's kind of hard to really assess whether the new apis are > >sufficient/redundant. A few examples below. > > We will update the wiki and let you know. > > >100.1 deleteRecords seems to only advance the logStartOffset in Log. How > >does that trigger the deletion of remote log segments? > > RLMTask for leader partition periodically checks whether there are > remote log segments earlier to logStartOffset and the respective > remote log segment metadata and data are deleted by using RLMM and > RSM. > > >100.2 stopReplica with deletion is used in 2 cases (a) replica reassignment; > >(b) topic deletion. We only want to delete the tiered metadata in the second > >case. Also, in the second case, who initiates the deletion of the remote > >segment since the leader may not exist? > > Right, it is deleted only incase of topic deletion only. We will cover > the details in the KIP. > > >100.3 "LogStartOffset of a topic can be either in local or in remote > >storage." If LogStartOffset exists in both places, which one is the source > >of truth? > > I meant the logStartOffset can point to either of local segment or > remote segment but it is initialised and maintained in the Log class > like now. > > >100.4 List<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicPartition > >topicPartition, long minOffset): How is minOffset supposed to be used? > > Returns list of remote segments, sorted by baseOffset in ascending > order that have baseOffset >= the given min Offset. > > >100.5 When copying a segment to remote storage, it seems we are calling the > >same RLMM.putRemoteLogSegmentData() twice before and after copyLogSegment(). > >Could you explain why? > > This is more about prepare/commit/rollback as you suggested. We will > update the wiki with the new APIs. > > >100.6 LogSegmentData includes leaderEpochCache, but there is no api in > >RemoteStorageManager to retrieve it. > > Nice catch, copy/paste issue. There is an API to retrieve it. > > >101. If the __remote_log_metadata is for production usage, could you provide > >more details? For example, what is the schema of the data (both key and > >value)? How is the topic maintained,delete or compact? > > It is with delete config and it’s retention period is suggested to be > more than the remote retention period. > > >110. Is the cache implementation in RemoteLogMetadataManager meant for > >production usage? If so, could you provide more details on the schema and > >how/where the data is stored? > > The proposal is to have a cache (with default implementation backed by > rocksdb) but it will be added in later versions. We will add this to > future work items. > > >111. "Committed offsets can be stored in a local file". Could you describe > >the format of the file and where it's stored? > > We will cover this in the KIP. > > >112. Truncation of remote segments under unclean leader election: I am not > >sure who figures out the truncated remote segments and how that information > >is propagated to all replicas? > > We will add this in detail in the KIP. > > >113. "If there are any failures in removing remote log segments then those > >are stored in a specific topic (default as > >__remote_segments_to_be_deleted)". Is it necessary to add yet another > >internal topic? Could we just keep retrying? > > This is not really an internal topic, it will be exposed as a user > configurable topic. After a few retries, we want user to know about > the failure so that they can take an action later by consuming from > this topic. We want to keep this simple instead of retrying > continuously and maintaining the deletion state etc. > > >114. "We may not need to copy producer-id-snapshot as we are copying only > >segments earlier to last-stable-offset." Hmm, not sure about that. The > >producer snapshot includes things like the last timestamp of each open > >producer id and can affect when those producer ids are expired. > > Sure, this will be added as part of the LogSegmentData. > > Thanks, > Satish. > > > On Fri, May 29, 2020 at 6:39 AM Jun Rao <j...@confluent.io> wrote: > > > > Hi, Satish, > > > > Made another pass on the wiki. A few more comments below. > > > > 100. It would be useful to provide more details on how those apis are used. > > Otherwise, it's kind of hard to really assess whether the new apis are > > sufficient/redundant. A few examples below. > > 100.1 deleteRecords seems to only advance the logStartOffset in Log. How > > does that trigger the deletion of remote log segments? > > 100.2 stopReplica with deletion is used in 2 cases (a) replica > > reassignment; (b) topic deletion. We only want to delete the tiered > > metadata in the second case. Also, in the second case, who initiates the > > deletion of the remote segment since the leader may not exist? > > 100.3 "LogStartOffset of a topic can be either in local or in remote > > storage." If LogStartOffset exists in both places, which one is the source > > of truth? > > 100.4 List<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicPartition > > topicPartition, long minOffset): How is minOffset supposed to be used? > > 100.5 When copying a segment to remote storage, it seems we are calling the > > same RLMM.putRemoteLogSegmentData() twice before and after > > copyLogSegment(). Could you explain why? > > 100.6 LogSegmentData includes leaderEpochCache, but there is no api in > > RemoteStorageManager to retrieve it. > > > > 101. If the __remote_log_metadata is for production usage, could you > > provide more details? For example, what is the schema of the data (both key > > and value)? How is the topic maintained,delete or compact? > > > > 110. Is the cache implementation in RemoteLogMetadataManager meant for > > production usage? If so, could you provide more details on the schema and > > how/where the data is stored? > > > > 111. "Committed offsets can be stored in a local file". Could you describe > > the format of the file and where it's stored? > > > > 112. Truncation of remote segments under unclean leader election: I am not > > sure who figures out the truncated remote segments and how that information > > is propagated to all replicas? > > > > 113. "If there are any failures in removing remote log segments then those > > are stored in a specific topic (default as > > __remote_segments_to_be_deleted)". Is it necessary to add yet another > > internal topic? Could we just keep retrying? > > > > 114. "We may not need to copy producer-id-snapshot as we are copying only > > segments earlier to last-stable-offset." Hmm, not sure about that. The > > producer snapshot includes things like the last timestamp of each open > > producer id and can affect when those producer ids are expired. > > > > Thanks, > > > > Jun > > > > On Thu, May 28, 2020 at 5:38 AM Satish Duggana <satish.dugg...@gmail.com> > > wrote: > >> > >> Hi Jun, > >> Gentle reminder. Please go through the updated wiki and let us know your > >> comments. > >> > >> Thanks, > >> Satish. > >> > >> On Tue, May 19, 2020 at 3:50 PM Satish Duggana <satish.dugg...@gmail.com> > >> wrote: > >>> > >>> Hi Jun, > >>> Please go through the wiki which has the latest updates. Google doc is > >>> updated frequently to be in sync with wiki. > >>> > >>> Thanks, > >>> Satish. > >>> > >>> On Tue, May 19, 2020 at 12:30 AM Jun Rao <j...@confluent.io> wrote: > >>>> > >>>> Hi, Satish, > >>>> > >>>> Thanks for the update. Just to clarify. Which doc has the latest > >>>> updates, the wiki or the google doc? > >>>> > >>>> Jun > >>>> > >>>> On Thu, May 14, 2020 at 10:38 AM Satish Duggana > >>>> <satish.dugg...@gmail.com> wrote: > >>>>> > >>>>> Hi Jun, > >>>>> Thanks for your comments. We updated the KIP with more details. > >>>>> > >>>>> >100. For each of the operations related to tiering, it would be useful > >>>>> >to provide a description on how it works with the new API. These > >>>>> >include things like consumer fetch, replica fetch, offsetForTimestamp, > >>>>> >retention (remote and local) by size, time and logStartOffset, topic > >>>>> >deletion, etc. This will tell us if the proposed APIs are sufficient. > >>>>> > >>>>> We addressed most of these APIs in the KIP. We can add more details if > >>>>> needed. > >>>>> > >>>>> >101. For the default implementation based on internal topic, is it > >>>>> >meant as a proof of concept or for production usage? I assume that > >>>>> >it's the former. However, if it's the latter, then the KIP needs to > >>>>> >describe the design in more detail. > >>>>> > >>>>> It is production usage as was mentioned in an earlier mail. We plan to > >>>>> update this section in the next few days. > >>>>> > >>>>> >102. When tiering a segment, the segment is first written to the > >>>>> >object store and then its metadata is written to RLMM using the api > >>>>> >"void putRemoteLogSegmentData()". One potential issue with this > >>>>> >approach is that if the system fails after the first operation, it > >>>>> >leaves a garbage in the object store that's never reclaimed. One way > >>>>> >to improve this is to have two separate APIs, sth like > >>>>> >preparePutRemoteLogSegmentData() and commitPutRemoteLogSegmentData(). > >>>>> > >>>>> That is a good point. We currently have a different way using markers > >>>>> in the segment but your suggestion is much better. > >>>>> > >>>>> >103. It seems that the transactional support and the ability to read > >>>>> >from follower are missing. > >>>>> > >>>>> KIP is updated with transactional support, follower fetch semantics, > >>>>> and reading from a follower. > >>>>> > >>>>> >104. It would be useful to provide a testing plan for this KIP. > >>>>> > >>>>> We added a few tests by introducing test util for tiered storage in the > >>>>> PR. We will provide the testing plan in the next few days. > >>>>> > >>>>> Thanks, > >>>>> Satish. > >>>>> > >>>>> > >>>>> On Wed, Feb 26, 2020 at 9:43 PM Harsha Chintalapani <ka...@harsha.io> > >>>>> wrote: > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> On Tue, Feb 25, 2020 at 12:46 PM, Jun Rao <j...@confluent.io> wrote: > >>>>>>> > >>>>>>> Hi, Satish, > >>>>>>> > >>>>>>> Thanks for the updated doc. The new API seems to be an improvement > >>>>>>> overall. A few more comments below. > >>>>>>> > >>>>>>> 100. For each of the operations related to tiering, it would be > >>>>>>> useful to provide a description on how it works with the new API. > >>>>>>> These include things like consumer fetch, replica fetch, > >>>>>>> offsetForTimestamp, retention > >>>>>>> (remote and local) by size, time and logStartOffset, topic deletion, > >>>>>>> etc. This will tell us if the proposed APIs are sufficient. > >>>>>> > >>>>>> > >>>>>> Thanks for the feedback Jun. We will add more details around this. > >>>>>> > >>>>>>> > >>>>>>> 101. For the default implementation based on internal topic, is it > >>>>>>> meant as a proof of concept or for production usage? I assume that > >>>>>>> it's the former. However, if it's the latter, then the KIP needs to > >>>>>>> describe the design in more detail. > >>>>>> > >>>>>> > >>>>>> Yes it meant to be for production use. Ideally it would be good to > >>>>>> merge this in as the default implementation for metadata service. We > >>>>>> can add more details around design and testing. > >>>>>> > >>>>>>> 102. When tiering a segment, the segment is first written to the > >>>>>>> object store and then its metadata is written to RLMM using the api > >>>>>>> "void putRemoteLogSegmentData()". > >>>>>>> One potential issue with this approach is that if the system fails > >>>>>>> after the first operation, it leaves a garbage in the object store > >>>>>>> that's never reclaimed. One way to improve this is to have two > >>>>>>> separate APIs, sth like preparePutRemoteLogSegmentData() and > >>>>>>> commitPutRemoteLogSegmentData(). > >>>>>>> > >>>>>>> 103. It seems that the transactional support and the ability to read > >>>>>>> from follower are missing. > >>>>>>> > >>>>>>> 104. It would be useful to provide a testing plan for this KIP. > >>>>>> > >>>>>> > >>>>>> We are working on adding more details around transactional support and > >>>>>> coming up with test plan. > >>>>>> Add system tests and integration tests. > >>>>>> > >>>>>>> Thanks, > >>>>>>> > >>>>>>> Jun > >>>>>>> > >>>>>>> On Mon, Feb 24, 2020 at 8:10 AM Satish Duggana > >>>>>>> <satish.dugg...@gmail.com> wrote: > >>>>>>> > >>>>>>> Hi Jun, > >>>>>>> Please look at the earlier reply and let us know your comments. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Satish. > >>>>>>> > >>>>>>> On Wed, Feb 12, 2020 at 4:06 PM Satish Duggana > >>>>>>> <satish.dugg...@gmail.com> wrote: > >>>>>>> > >>>>>>> Hi Jun, > >>>>>>> Thanks for your comments on the separation of remote log metadata > >>>>>>> storage and remote log storage. > >>>>>>> We had a few discussions since early Jan on how to support eventually > >>>>>>> consistent stores like S3 by uncoupling remote log segment metadata > >>>>>>> and remote log storage. It is written with details in the doc > >>>>>>> here(1). Below is the brief summary of the discussion from that doc. > >>>>>>> > >>>>>>> The current approach consists of pulling the remote log segment > >>>>>>> metadata from remote log storage APIs. It worked fine for storages > >>>>>>> like HDFS. But one of the problems of relying on the remote storage > >>>>>>> to maintain metadata is that tiered-storage needs to be strongly > >>>>>>> consistent, with an impact not only on the metadata(e.g. LIST in S3) > >>>>>>> but also on the segment data(e.g. GET after a DELETE in S3). The cost > >>>>>>> of maintaining metadata in remote storage needs to be factored in. > >>>>>>> This is true in the case of S3, LIST APIs incur huge costs as you > >>>>>>> raised earlier. > >>>>>>> So, it is good to separate the remote storage from the remote log > >>>>>>> metadata store. We refactored the existing RemoteStorageManager and > >>>>>>> introduced RemoteLogMetadataManager. Remote log metadata store should > >>>>>>> give strong consistency semantics but remote log storage can be > >>>>>>> eventually consistent. > >>>>>>> We can have a default implementation for RemoteLogMetadataManager > >>>>>>> which uses an internal topic(as mentioned in one of our earlier > >>>>>>> emails) as storage. But users can always plugin their own > >>>>>>> RemoteLogMetadataManager implementation based on their environment. > >>>>>>> > >>>>>>> Please go through the updated KIP and let us know your comments. We > >>>>>>> have started refactoring for the changes mentioned in the KIP and > >>>>>>> there may be a few more updates to the APIs. > >>>>>>> > >>>>>>> [1] > >>>>>>> > >>>>>>> https://docs.google.com/document/d/1qfkBCWL1e7ZWkHU7brxKDBebq4ie9yK20XJnKbgAlew/edit?ts=5e208ec7# > >>>>>>> > >>>>>>> On Fri, Dec 27, 2019 at 5:43 PM Ivan Yurchenko > >>>>>>> <ivan0yurche...@gmail.com> > >>>>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>> Hi all, > >>>>>>> > >>>>>>> Jun: > >>>>>>> > >>>>>>> (a) Cost: S3 list object requests cost $0.005 per 1000 requests. If > >>>>>>> > >>>>>>> you > >>>>>>> > >>>>>>> have 100,000 partitions and want to pull the metadata for each > >>>>>>> > >>>>>>> partition > >>>>>>> > >>>>>>> at > >>>>>>> > >>>>>>> the rate of 1/sec. It can cost $0.5/sec, which is roughly $40K per > >>>>>>> > >>>>>>> day. > >>>>>>> > >>>>>>> I want to note here, that no reasonably durable storage will be cheap > >>>>>>> at 100k RPS. For example, DynamoDB might give the same ballpark > >>>>>>> > >>>>>>> figures. > >>>>>>> > >>>>>>> If we want to keep the pull-based approach, we can try to reduce this > >>>>>>> > >>>>>>> number > >>>>>>> > >>>>>>> in several ways: doing listings less frequently (as Satish mentioned, > >>>>>>> with the current defaults it's ~3.33k RPS for your example), batching > >>>>>>> listing operations in some way (depending on the storage; it might > >>>>>>> require the change of RSM's interface). > >>>>>>> > >>>>>>> There are different ways for doing push based metadata propagation. > >>>>>>> > >>>>>>> Some > >>>>>>> > >>>>>>> object stores may support that already. For example, S3 supports > >>>>>>> > >>>>>>> events > >>>>>>> > >>>>>>> notification > >>>>>>> > >>>>>>> This sounds interesting. However, I see a couple of issues using it: > >>>>>>> 1. As I understand the documentation, notification delivery is not > >>>>>>> guaranteed > >>>>>>> and it's recommended to periodically do LIST to fill the gaps. Which > >>>>>>> brings us back to the same LIST consistency guarantees issue. > >>>>>>> 2. The same goes for the broker start: to get the current state, we > >>>>>>> > >>>>>>> need > >>>>>>> > >>>>>>> to LIST. > >>>>>>> 3. The dynamic set of multiple consumers (RSMs): AFAIK SQS and SNS > >>>>>>> > >>>>>>> aren't > >>>>>>> > >>>>>>> designed for such a case. > >>>>>>> > >>>>>>> Alexandre: > >>>>>>> > >>>>>>> A.1 As commented on PR 7561, S3 consistency model [1][2] implies RSM > >>>>>>> > >>>>>>> cannot > >>>>>>> > >>>>>>> relies solely on S3 APIs to guarantee the expected strong > >>>>>>> > >>>>>>> consistency. The > >>>>>>> > >>>>>>> proposed implementation [3] would need to be updated to take this > >>>>>>> > >>>>>>> into > >>>>>>> > >>>>>>> account. Let’s talk more about this. > >>>>>>> > >>>>>>> Thank you for the feedback. I clearly see the need for changing the > >>>>>>> S3 implementation > >>>>>>> to provide stronger consistency guarantees. As it see from this > >>>>>>> thread, there are > >>>>>>> several possible approaches to this. Let's discuss RemoteLogManager's > >>>>>>> contract and > >>>>>>> behavior (like pull vs push model) further before picking one (or > >>>>>>> > >>>>>>> several - > >>>>>>> > >>>>>>> ?) of them. > >>>>>>> I'm going to do some evaluation of DynamoDB for the pull-based > >>>>>>> > >>>>>>> approach, > >>>>>>> > >>>>>>> if it's possible to apply it paying a reasonable bill. Also, of the > >>>>>>> push-based approach > >>>>>>> with a Kafka topic as the medium. > >>>>>>> > >>>>>>> A.2.3 Atomicity – what does an implementation of RSM need to provide > >>>>>>> > >>>>>>> with > >>>>>>> > >>>>>>> respect to atomicity of the APIs copyLogSegment, cleanupLogUntil and > >>>>>>> deleteTopicPartition? If a partial failure happens in any of those > >>>>>>> > >>>>>>> (e.g. > >>>>>>> > >>>>>>> in > >>>>>>> > >>>>>>> the S3 implementation, if one of the multiple uploads fails [4]), > >>>>>>> > >>>>>>> The S3 implementation is going to change, but it's worth clarifying > >>>>>>> > >>>>>>> anyway. > >>>>>>> > >>>>>>> The segment log file is being uploaded after S3 has acked uploading > >>>>>>> of all other files associated with the segment and only after this the > >>>>>>> > >>>>>>> whole > >>>>>>> > >>>>>>> segment file set becomes visible remotely for operations like > >>>>>>> listRemoteSegments [1]. > >>>>>>> In case of upload failure, the files that has been successfully > >>>>>>> > >>>>>>> uploaded > >>>>>>> > >>>>>>> stays > >>>>>>> as invisible garbage that is collected by cleanupLogUntil (or > >>>>>>> > >>>>>>> overwritten > >>>>>>> > >>>>>>> successfully later). > >>>>>>> And the opposite happens during the deletion: log files are deleted > >>>>>>> > >>>>>>> first. > >>>>>>> > >>>>>>> This approach should generally work when we solve consistency issues > >>>>>>> by adding a strongly consistent storage: a segment's uploaded files > >>>>>>> > >>>>>>> remain > >>>>>>> > >>>>>>> invisible garbage until some metadata about them is written. > >>>>>>> > >>>>>>> A.3 Caching – storing locally the segments retrieved from the remote > >>>>>>> storage is excluded as it does not align with the original intent > >>>>>>> > >>>>>>> and even > >>>>>>> > >>>>>>> defeat some of its purposes (save disk space etc.). That said, could > >>>>>>> > >>>>>>> there > >>>>>>> > >>>>>>> be other types of use cases where the pattern of access to the > >>>>>>> > >>>>>>> remotely > >>>>>>> > >>>>>>> stored segments would benefit from local caching (and potentially > >>>>>>> read-ahead)? Consider the use case of a large pool of consumers which > >>>>>>> > >>>>>>> start > >>>>>>> > >>>>>>> a backfill at the same time for one day worth of data from one year > >>>>>>> > >>>>>>> ago > >>>>>>> > >>>>>>> stored remotely. Caching the segments locally would allow to > >>>>>>> > >>>>>>> uncouple the > >>>>>>> > >>>>>>> load on the remote storage from the load on the Kafka cluster. Maybe > >>>>>>> > >>>>>>> the > >>>>>>> > >>>>>>> RLM could expose a configuration parameter to switch that feature > >>>>>>> > >>>>>>> on/off? > >>>>>>> > >>>>>>> I tend to agree here, caching remote segments locally and making this > >>>>>>> configurable sounds pretty practical to me. We should implement > >>>>>>> > >>>>>>> this, > >>>>>>> > >>>>>>> maybe not in the first iteration. > >>>>>>> > >>>>>>> Br, > >>>>>>> Ivan > >>>>>>> > >>>>>>> [1] > >>>>>>> > >>>>>>> https://github.com/harshach/kafka/pull/18/files#diff-4d73d01c16caed6f2548fc3063550ef0R152 > >>>>>>> > >>>>>>> On Thu, 19 Dec 2019 at 19:49, Alexandre Dupriez < > >>>>>>> > >>>>>>> alexandre.dupr...@gmail.com> > >>>>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>> Hi Jun, > >>>>>>> > >>>>>>> Thank you for the feedback. I am trying to understand how a > >>>>>>> > >>>>>>> push-based > >>>>>>> > >>>>>>> approach would work. > >>>>>>> In order for the metadata to be propagated (under the assumption you > >>>>>>> stated), would you plan to add a new API in Kafka to allow the > >>>>>>> metadata store to send them directly to the brokers? > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Alexandre > >>>>>>> > >>>>>>> Le mer. 18 déc. 2019 à 20:14, Jun Rao <j...@confluent.io> a écrit : > >>>>>>> > >>>>>>> Hi, Satish and Ivan, > >>>>>>> > >>>>>>> There are different ways for doing push based metadata > >>>>>>> > >>>>>>> propagation. Some > >>>>>>> > >>>>>>> object stores may support that already. For example, S3 supports > >>>>>>> > >>>>>>> events > >>>>>>> > >>>>>>> notification ( > >>>>>>> > >>>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html). > >>>>>>> > >>>>>>> Otherwise one could use a separate metadata store that supports > >>>>>>> > >>>>>>> push-based > >>>>>>> > >>>>>>> change propagation. Other people have mentioned using a Kafka > >>>>>>> > >>>>>>> topic. The > >>>>>>> > >>>>>>> best approach may depend on the object store and the operational > >>>>>>> environment (e.g. whether an external metadata store is already > >>>>>>> > >>>>>>> available). > >>>>>>> > >>>>>>> The above discussion is based on the assumption that we need to > >>>>>>> > >>>>>>> cache the > >>>>>>> > >>>>>>> object metadata locally in every broker. I mentioned earlier that > >>>>>>> > >>>>>>> an > >>>>>>> > >>>>>>> alternative is to just store/retrieve those metadata in an external > >>>>>>> metadata store. That may simplify the implementation in some cases. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> > >>>>>>> Jun > >>>>>>> > >>>>>>> On Thu, Dec 5, 2019 at 7:01 AM Satish Duggana < > >>>>>>> > >>>>>>> satish.dugg...@gmail.com> > >>>>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>> Hi Jun, > >>>>>>> Thanks for your reply. > >>>>>>> > >>>>>>> Currently, `listRemoteSegments` is called at the configured > >>>>>>> interval(not every second, defaults to 30secs). Storing remote > >>>>>>> > >>>>>>> log > >>>>>>> > >>>>>>> metadata in a strongly consistent store for S3 RSM is raised in > >>>>>>> PR-comment[1]. > >>>>>>> RLM invokes RSM at regular intervals and RSM can give remote > >>>>>>> > >>>>>>> segment > >>>>>>> > >>>>>>> metadata if it is available. RSM is responsible for maintaining > >>>>>>> > >>>>>>> and > >>>>>>> > >>>>>>> fetching those entries. It should be based on whatever mechanism > >>>>>>> > >>>>>>> is > >>>>>>> > >>>>>>> consistent and efficient with the respective remote storage. > >>>>>>> > >>>>>>> Can you give more details about push based mechanism from RSM? > >>>>>>> > >>>>>>> 1. > >>>>>>> > >>>>>>> https://github.com/apache/kafka/pull/7561#discussion_r344576223 > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Satish. > >>>>>>> > >>>>>>> On Thu, Dec 5, 2019 at 4:23 AM Jun Rao <j...@confluent.io> wrote: > >>>>>>> > >>>>>>> Hi, Harsha, > >>>>>>> > >>>>>>> Thanks for the reply. > >>>>>>> > >>>>>>> 40/41. I am curious which block storages you have tested. S3 > >>>>>>> > >>>>>>> seems > >>>>>>> > >>>>>>> to be > >>>>>>> > >>>>>>> one of the popular block stores. The concerns that I have with > >>>>>>> > >>>>>>> pull > >>>>>>> > >>>>>>> based > >>>>>>> > >>>>>>> approach are the following. > >>>>>>> (a) Cost: S3 list object requests cost $0.005 per 1000 > >>>>>>> > >>>>>>> requests. If > >>>>>>> > >>>>>>> you > >>>>>>> > >>>>>>> have 100,000 partitions and want to pull the metadata for each > >>>>>>> > >>>>>>> partition > >>>>>>> > >>>>>>> at > >>>>>>> > >>>>>>> the rate of 1/sec. It can cost $0.5/sec, which is roughly $40K > >>>>>>> > >>>>>>> per > >>>>>>> > >>>>>>> day. > >>>>>>> > >>>>>>> (b) Semantics: S3 list objects are eventually consistent. So, > >>>>>>> > >>>>>>> when > >>>>>>> > >>>>>>> you > >>>>>>> > >>>>>>> do a > >>>>>>> > >>>>>>> list object request, there is no guarantee that you can see all > >>>>>>> > >>>>>>> uploaded > >>>>>>> > >>>>>>> objects. This could impact the correctness of subsequent > >>>>>>> > >>>>>>> logics. > >>>>>>> > >>>>>>> (c) Efficiency: Blindly pulling metadata when there is no > >>>>>>> > >>>>>>> change adds > >>>>>>> > >>>>>>> unnecessary overhead in the broker as well as in the block > >>>>>>> > >>>>>>> store. > >>>>>>> > >>>>>>> So, have you guys tested S3? If so, could you share your > >>>>>>> > >>>>>>> experience > >>>>>>> > >>>>>>> in > >>>>>>> > >>>>>>> terms of cost, semantics and efficiency? > >>>>>>> > >>>>>>> Jun > >>>>>>> > >>>>>>> On Tue, Dec 3, 2019 at 10:11 PM Harsha Chintalapani < > >>>>>>> > >>>>>>> ka...@harsha.io > >>>>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>> Hi Jun, > >>>>>>> Thanks for the reply. > >>>>>>> > >>>>>>> On Tue, Nov 26, 2019 at 3:46 PM, Jun Rao <j...@confluent.io> > >>>>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>> Hi, Satish and Ying, > >>>>>>> > >>>>>>> Thanks for the reply. > >>>>>>> > >>>>>>> 40/41. There are two different ways that we can approach > >>>>>>> > >>>>>>> this. > >>>>>>> > >>>>>>> One is > >>>>>>> > >>>>>>> what > >>>>>>> > >>>>>>> you said. We can have an opinionated way of storing and > >>>>>>> > >>>>>>> populating > >>>>>>> > >>>>>>> the > >>>>>>> > >>>>>>> tier > >>>>>>> > >>>>>>> metadata that we think is good enough for everyone. I am > >>>>>>> > >>>>>>> not > >>>>>>> > >>>>>>> sure if > >>>>>>> > >>>>>>> this > >>>>>>> > >>>>>>> is the case based on what's currently proposed in the KIP. > >>>>>>> > >>>>>>> For > >>>>>>> > >>>>>>> example, I > >>>>>>> > >>>>>>> am not sure that (1) everyone always needs local metadata; > >>>>>>> > >>>>>>> (2) > >>>>>>> > >>>>>>> the > >>>>>>> > >>>>>>> current > >>>>>>> > >>>>>>> local storage format is general enough and (3) everyone > >>>>>>> > >>>>>>> wants to > >>>>>>> > >>>>>>> use > >>>>>>> > >>>>>>> the > >>>>>>> > >>>>>>> pull based approach to propagate the metadata. Another > >>>>>>> > >>>>>>> approach > >>>>>>> > >>>>>>> is to > >>>>>>> > >>>>>>> make > >>>>>>> > >>>>>>> this pluggable and let the implementor implements the best > >>>>>>> > >>>>>>> approach > >>>>>>> > >>>>>>> for a > >>>>>>> > >>>>>>> particular block storage. I haven't seen any comments from > >>>>>>> > >>>>>>> Slack/AirBnb > >>>>>>> > >>>>>>> in > >>>>>>> > >>>>>>> the mailing list on this topic. It would be great if they > >>>>>>> > >>>>>>> can > >>>>>>> > >>>>>>> provide > >>>>>>> > >>>>>>> feedback directly here. > >>>>>>> > >>>>>>> The current interfaces are designed with most popular block > >>>>>>> > >>>>>>> storages > >>>>>>> > >>>>>>> available today and we did 2 implementations with these > >>>>>>> > >>>>>>> interfaces and > >>>>>>> > >>>>>>> they both are yielding good results as we going through the > >>>>>>> > >>>>>>> testing of > >>>>>>> > >>>>>>> it. > >>>>>>> > >>>>>>> If there is ever a need for pull based approach we can > >>>>>>> > >>>>>>> definitely > >>>>>>> > >>>>>>> evolve > >>>>>>> > >>>>>>> the interface. > >>>>>>> In the past we did mark interfaces to be evolving to make > >>>>>>> > >>>>>>> room for > >>>>>>> > >>>>>>> unknowns > >>>>>>> > >>>>>>> in the future. > >>>>>>> If you have any suggestions around the current interfaces > >>>>>>> > >>>>>>> please > >>>>>>> > >>>>>>> propose we > >>>>>>> > >>>>>>> are happy to see if we can work them into it. > >>>>>>> > >>>>>>> 43. To offer tier storage as a general feature, ideally all > >>>>>>> > >>>>>>> existing > >>>>>>> > >>>>>>> capabilities should still be supported. It's fine if the > >>>>>>> > >>>>>>> uber > >>>>>>> > >>>>>>> implementation doesn't support all capabilities for > >>>>>>> > >>>>>>> internal > >>>>>>> > >>>>>>> usage. > >>>>>>> > >>>>>>> However, the framework should be general enough. > >>>>>>> > >>>>>>> We agree on that as a principle. But all of these major > >>>>>>> > >>>>>>> features > >>>>>>> > >>>>>>> mostly > >>>>>>> > >>>>>>> coming right now and to have a new big feature such as tiered > >>>>>>> > >>>>>>> storage > >>>>>>> > >>>>>>> to > >>>>>>> > >>>>>>> support all the new features will be a big ask. We can > >>>>>>> > >>>>>>> document on > >>>>>>> > >>>>>>> how > >>>>>>> > >>>>>>> do > >>>>>>> > >>>>>>> we approach solving these in future iterations. > >>>>>>> Our goal is to make this tiered storage feature work for > >>>>>>> > >>>>>>> everyone. > >>>>>>> > >>>>>>> 43.3 This is more than just serving the tier-ed data from > >>>>>>> > >>>>>>> block > >>>>>>> > >>>>>>> storage. > >>>>>>> > >>>>>>> With KIP-392, the consumer now can resolve the conflicts > >>>>>>> > >>>>>>> with the > >>>>>>> > >>>>>>> replica > >>>>>>> > >>>>>>> based on leader epoch. So, we need to make sure that > >>>>>>> > >>>>>>> leader epoch > >>>>>>> > >>>>>>> can be > >>>>>>> > >>>>>>> recovered properly from tier storage. > >>>>>>> > >>>>>>> We are working on testing our approach and we will update > >>>>>>> > >>>>>>> the KIP > >>>>>>> > >>>>>>> with > >>>>>>> > >>>>>>> design details. > >>>>>>> > >>>>>>> 43.4 For JBOD, if tier storage stores the tier metadata > >>>>>>> > >>>>>>> locally, we > >>>>>>> > >>>>>>> need to > >>>>>>> > >>>>>>> support moving such metadata across disk directories since > >>>>>>> > >>>>>>> JBOD > >>>>>>> > >>>>>>> supports > >>>>>>> > >>>>>>> moving data across disks. > >>>>>>> > >>>>>>> KIP is updated with JBOD details. Having said that JBOD > >>>>>>> > >>>>>>> tooling > >>>>>>> > >>>>>>> needs > >>>>>>> > >>>>>>> to > >>>>>>> > >>>>>>> evolve to support production loads. Most of the users will be > >>>>>>> > >>>>>>> interested in > >>>>>>> > >>>>>>> using tiered storage without JBOD support support on day 1. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> Harsha > >>>>>>> > >>>>>>> As for meeting, we could have a KIP e-meeting on this if > >>>>>>> > >>>>>>> needed, > >>>>>>> > >>>>>>> but it > >>>>>>> > >>>>>>> will be open to everyone and will be recorded and shared. > >>>>>>> > >>>>>>> Often, > >>>>>>> > >>>>>>> the > >>>>>>> > >>>>>>> details are still resolved through the mailing list. > >>>>>>> > >>>>>>> Jun > >>>>>>> > >>>>>>> On Tue, Nov 19, 2019 at 6:48 PM Ying Zheng > >>>>>>> > >>>>>>> <yi...@uber.com.invalid> > >>>>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>> Please ignore my previous email > >>>>>>> I didn't know Apache requires all the discussions to be > >>>>>>> > >>>>>>> "open" > >>>>>>> > >>>>>>> On Tue, Nov 19, 2019, 5:40 PM Ying Zheng <yi...@uber.com> > >>>>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>> Hi Jun, > >>>>>>> > >>>>>>> Thank you very much for your feedback! > >>>>>>> > >>>>>>> Can we schedule a meeting in your Palo Alto office in > >>>>>>> > >>>>>>> December? I > >>>>>>> > >>>>>>> think a > >>>>>>> > >>>>>>> face to face discussion is much more efficient than > >>>>>>> > >>>>>>> emails. Both > >>>>>>> > >>>>>>> Harsha > >>>>>>> > >>>>>>> and > >>>>>>> > >>>>>>> I can visit you. Satish may be able to join us remotely. > >>>>>>> > >>>>>>> On Fri, Nov 15, 2019 at 11:04 AM Jun Rao <j...@confluent.io > >>>>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>> Hi, Satish and Harsha, > >>>>>>> > >>>>>>> The following is a more detailed high level feedback for > >>>>>>> > >>>>>>> the KIP. > >>>>>>> > >>>>>>> Overall, > >>>>>>> > >>>>>>> the KIP seems useful. The challenge is how to design it > >>>>>>> > >>>>>>> such that > >>>>>>> > >>>>>>> it’s > >>>>>>> > >>>>>>> general enough to support different ways of implementing > >>>>>>> > >>>>>>> this > >>>>>>> > >>>>>>> feature > >>>>>>> > >>>>>>> and > >>>>>>> > >>>>>>> support existing features. > >>>>>>> > >>>>>>> 40. Local segment metadata storage: The KIP makes the > >>>>>>> > >>>>>>> assumption > >>>>>>> > >>>>>>> that > >>>>>>> > >>>>>>> the > >>>>>>> > >>>>>>> metadata for the archived log segments are cached locally > >>>>>>> > >>>>>>> in > >>>>>>> > >>>>>>> every > >>>>>>> > >>>>>>> broker > >>>>>>> > >>>>>>> and provides a specific implementation for the local > >>>>>>> > >>>>>>> storage in > >>>>>>> > >>>>>>> the > >>>>>>> > >>>>>>> framework. We probably should discuss this more. For > >>>>>>> > >>>>>>> example, > >>>>>>> > >>>>>>> some > >>>>>>> > >>>>>>> tier > >>>>>>> > >>>>>>> storage providers may not want to cache the metadata > >>>>>>> > >>>>>>> locally and > >>>>>>> > >>>>>>> just > >>>>>>> > >>>>>>> rely > >>>>>>> > >>>>>>> upon a remote key/value store if such a store is already > >>>>>>> > >>>>>>> present. If > >>>>>>> > >>>>>>> a > >>>>>>> > >>>>>>> local store is used, there could be different ways of > >>>>>>> > >>>>>>> implementing it > >>>>>>> > >>>>>>> (e.g., based on customized local files, an embedded local > >>>>>>> > >>>>>>> store > >>>>>>> > >>>>>>> like > >>>>>>> > >>>>>>> RocksDB, etc). An alternative of designing this is to just > >>>>>>> > >>>>>>> provide an > >>>>>>> > >>>>>>> interface for retrieving the tier segment metadata and > >>>>>>> > >>>>>>> leave the > >>>>>>> > >>>>>>> details > >>>>>>> > >>>>>>> of > >>>>>>> > >>>>>>> how to get the metadata outside of the framework. > >>>>>>> > >>>>>>> 41. RemoteStorageManager interface and the usage of the > >>>>>>> > >>>>>>> interface in > >>>>>>> > >>>>>>> the > >>>>>>> > >>>>>>> framework: I am not sure if the interface is general > >>>>>>> > >>>>>>> enough. For > >>>>>>> > >>>>>>> example, > >>>>>>> > >>>>>>> it seems that RemoteLogIndexEntry is tied to a specific > >>>>>>> > >>>>>>> way of > >>>>>>> > >>>>>>> storing > >>>>>>> > >>>>>>> the > >>>>>>> > >>>>>>> metadata in remote storage. The framework uses > >>>>>>> > >>>>>>> listRemoteSegments() > >>>>>>> > >>>>>>> api > >>>>>>> > >>>>>>> in > >>>>>>> > >>>>>>> a pull based approach. However, in some other > >>>>>>> > >>>>>>> implementations, a > >>>>>>> > >>>>>>> push > >>>>>>> > >>>>>>> based > >>>>>>> approach may be more preferred. I don’t have a concrete > >>>>>>> > >>>>>>> proposal > >>>>>>> > >>>>>>> yet. > >>>>>>> > >>>>>>> But, > >>>>>>> > >>>>>>> it would be useful to give this area some more thoughts > >>>>>>> > >>>>>>> and see > >>>>>>> > >>>>>>> if we > >>>>>>> > >>>>>>> can > >>>>>>> > >>>>>>> make the interface more general. > >>>>>>> > >>>>>>> 42. In the diagram, the RemoteLogManager is side by side > >>>>>>> > >>>>>>> with > >>>>>>> > >>>>>>> LogManager. > >>>>>>> > >>>>>>> This KIP only discussed how the fetch request is handled > >>>>>>> > >>>>>>> between > >>>>>>> > >>>>>>> the > >>>>>>> > >>>>>>> two > >>>>>>> > >>>>>>> layer. However, we should also consider how other requests > >>>>>>> > >>>>>>> that > >>>>>>> > >>>>>>> touch > >>>>>>> > >>>>>>> the > >>>>>>> > >>>>>>> log can be handled. e.g., list offsets by timestamp, delete > >>>>>>> > >>>>>>> records, > >>>>>>> > >>>>>>> etc. > >>>>>>> > >>>>>>> Also, in this model, it's not clear which component is > >>>>>>> > >>>>>>> responsible > >>>>>>> > >>>>>>> for > >>>>>>> > >>>>>>> managing the log start offset. It seems that the log start > >>>>>>> > >>>>>>> offset > >>>>>>> > >>>>>>> could > >>>>>>> > >>>>>>> be > >>>>>>> > >>>>>>> changed by both RemoteLogManager and LogManager. > >>>>>>> > >>>>>>> 43. There are quite a few existing features not covered by > >>>>>>> > >>>>>>> the > >>>>>>> > >>>>>>> KIP. > >>>>>>> > >>>>>>> It > >>>>>>> > >>>>>>> would be useful to discuss each of those. > >>>>>>> 43.1 I won’t say that compacted topics are rarely used and > >>>>>>> > >>>>>>> always > >>>>>>> > >>>>>>> small. > >>>>>>> > >>>>>>> For example, KStreams uses compacted topics for storing the > >>>>>>> > >>>>>>> states > >>>>>>> > >>>>>>> and > >>>>>>> > >>>>>>> sometimes the size of the topic could be large. While it > >>>>>>> > >>>>>>> might > >>>>>>> > >>>>>>> be ok > >>>>>>> > >>>>>>> to > >>>>>>> > >>>>>>> not > >>>>>>> > >>>>>>> support compacted topics initially, it would be useful to > >>>>>>> > >>>>>>> have a > >>>>>>> > >>>>>>> high > >>>>>>> > >>>>>>> level > >>>>>>> idea on how this might be supported down the road so that > >>>>>>> > >>>>>>> we > >>>>>>> > >>>>>>> don’t > >>>>>>> > >>>>>>> have > >>>>>>> > >>>>>>> to > >>>>>>> > >>>>>>> make incompatible API changes in the future. > >>>>>>> 43.2 We need to discuss how EOS is supported. In > >>>>>>> > >>>>>>> particular, how > >>>>>>> > >>>>>>> is > >>>>>>> > >>>>>>> the > >>>>>>> > >>>>>>> producer state integrated with the remote storage. 43.3 > >>>>>>> > >>>>>>> Now that > >>>>>>> > >>>>>>> KIP-392 > >>>>>>> > >>>>>>> (allow consumers to fetch from closest replica) is > >>>>>>> > >>>>>>> implemented, > >>>>>>> > >>>>>>> we > >>>>>>> > >>>>>>> need > >>>>>>> > >>>>>>> to > >>>>>>> > >>>>>>> discuss how reading from a follower replica is supported > >>>>>>> > >>>>>>> with > >>>>>>> > >>>>>>> tier > >>>>>>> > >>>>>>> storage. > >>>>>>> > >>>>>>> 43.4 We need to discuss how JBOD is supported with tier > >>>>>>> > >>>>>>> storage. > >>>>>>> > >>>>>>> Thanks, > >>>>>>> > >>>>>>> Jun > >>>>>>> > >>>>>>> On Fri, Nov 8, 2019 at 12:06 AM Tom Bentley < > >>>>>>> > >>>>>>> tbent...@redhat.com > >>>>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>> Thanks for those insights Ying. > >>>>>>> > >>>>>>> On Thu, Nov 7, 2019 at 9:26 PM Ying Zheng > >>>>>>> > >>>>>>> <yi...@uber.com.invalid > >>>>>>> > >>>>>>> wrote: > >>>>>>> > >>>>>>> Thanks, I missed that point. However, there's still a > >>>>>>> > >>>>>>> point at > >>>>>>> > >>>>>>> which > >>>>>>> > >>>>>>> the > >>>>>>> > >>>>>>> consumer fetches start getting served from remote storage > >>>>>>> > >>>>>>> (even > >>>>>>> > >>>>>>> if > >>>>>>> > >>>>>>> that > >>>>>>> > >>>>>>> point isn't as soon as the local log retention time/size). > >>>>>>> > >>>>>>> This > >>>>>>> > >>>>>>> represents > >>>>>>> > >>>>>>> a kind of performance cliff edge and what I'm really > >>>>>>> > >>>>>>> interested > >>>>>>> > >>>>>>> in > >>>>>>> > >>>>>>> is > >>>>>>> > >>>>>>> how > >>>>>>> > >>>>>>> easy it is for a consumer which falls off that cliff to > >>>>>>> > >>>>>>> catch up > >>>>>>> > >>>>>>> and so > >>>>>>> > >>>>>>> its > >>>>>>> > >>>>>>> fetches again come from local storage. Obviously this can > >>>>>>> > >>>>>>> depend > >>>>>>> > >>>>>>> on > >>>>>>> > >>>>>>> all > >>>>>>> > >>>>>>> sorts of factors (like production rate, consumption rate), > >>>>>>> > >>>>>>> so > >>>>>>> > >>>>>>> it's > >>>>>>> > >>>>>>> not > >>>>>>> > >>>>>>> guaranteed (just like it's not guaranteed for Kafka > >>>>>>> > >>>>>>> today), but > >>>>>>> > >>>>>>> this > >>>>>>> > >>>>>>> would > >>>>>>> > >>>>>>> represent a new failure mode. > >>>>>>> > >>>>>>> As I have explained in the last mail, it's a very rare > >>>>>>> > >>>>>>> case that > >>>>>>> > >>>>>>> a > >>>>>>> > >>>>>>> consumer > >>>>>>> need to read remote data. With our experience at Uber, > >>>>>>> > >>>>>>> this only > >>>>>>> > >>>>>>> happens > >>>>>>> > >>>>>>> when the consumer service had an outage for several hours. > >>>>>>> > >>>>>>> There is not a "performance cliff" as you assume. The > >>>>>>> > >>>>>>> remote > >>>>>>> > >>>>>>> storage > >>>>>>> > >>>>>>> is > >>>>>>> > >>>>>>> even faster than local disks in terms of bandwidth. > >>>>>>> > >>>>>>> Reading from > >>>>>>> > >>>>>>> remote > >>>>>>> > >>>>>>> storage is going to have higher latency than local disk. > >>>>>>> > >>>>>>> But > >>>>>>> > >>>>>>> since > >>>>>>> > >>>>>>> the > >>>>>>> > >>>>>>> consumer > >>>>>>> is catching up several hours data, it's not sensitive to > >>>>>>> > >>>>>>> the > >>>>>>> > >>>>>>> sub-second > >>>>>>> > >>>>>>> level > >>>>>>> latency, and each remote read request will read a large > >>>>>>> > >>>>>>> amount of > >>>>>>> > >>>>>>> data to > >>>>>>> > >>>>>>> make the overall performance better than reading from local > >>>>>>> > >>>>>>> disks. > >>>>>>> > >>>>>>> Another aspect I'd like to understand better is the effect > >>>>>>> > >>>>>>> of > >>>>>>> > >>>>>>> serving > >>>>>>> > >>>>>>> fetch > >>>>>>> > >>>>>>> request from remote storage has on the broker's network > >>>>>>> > >>>>>>> utilization. If > >>>>>>> > >>>>>>> we're just trimming the amount of data held locally > >>>>>>> > >>>>>>> (without > >>>>>>> > >>>>>>> increasing > >>>>>>> > >>>>>>> the > >>>>>>> > >>>>>>> overall local+remote retention), then we're effectively > >>>>>>> > >>>>>>> trading > >>>>>>> > >>>>>>> disk > >>>>>>> > >>>>>>> bandwidth for network bandwidth when serving fetch > >>>>>>> > >>>>>>> requests from > >>>>>>> > >>>>>>> remote > >>>>>>> > >>>>>>> storage (which I understand to be a good thing, since > >>>>>>> > >>>>>>> brokers are > >>>>>>> > >>>>>>> often/usually disk bound). But if we're increasing the > >>>>>>> > >>>>>>> overall > >>>>>>> > >>>>>>> local+remote > >>>>>>> > >>>>>>> retention then it's more likely that network itself > >>>>>>> > >>>>>>> becomes the > >>>>>>> > >>>>>>> bottleneck. > >>>>>>> > >>>>>>> I appreciate this is all rather hand wavy, I'm just trying > >>>>>>> > >>>>>>> to > >>>>>>> > >>>>>>> understand > >>>>>>> > >>>>>>> how this would affect broker performance, so I'd be > >>>>>>> > >>>>>>> grateful for > >>>>>>> > >>>>>>> any > >>>>>>> > >>>>>>> insights you can offer. > >>>>>>> > >>>>>>> Network bandwidth is a function of produce speed, it has > >>>>>>> > >>>>>>> nothing > >>>>>>> > >>>>>>> to > >>>>>>> > >>>>>>> do > >>>>>>> > >>>>>>> with > >>>>>>> > >>>>>>> remote retention. As long as the data is shipped to remote > >>>>>>> > >>>>>>> storage, > >>>>>>> > >>>>>>> you > >>>>>>> > >>>>>>> can > >>>>>>> > >>>>>>> keep the data there for 1 day or 1 year or 100 years, it > >>>>>>> > >>>>>>> doesn't > >>>>>>> > >>>>>>> consume > >>>>>>> > >>>>>>> any > >>>>>>> network resources. > >>>>>> > >>>>>>