Hi Harsha/Satish, Thanks for the great KIP. Below are the first set of questions/suggestions I had after making a pass on the KIP.
5001. Under the section "Follower fetch protocol in detail", the next-local-offset is the offset upto which the segments are copied to remote storage. Instead, would last-tiered-offset be a better name than next-local-offset? last-tiered-offset seems to naturally align well with the definition provided in the KIP. 5002. After leadership is established for a partition, the leader would begin uploading a segment to remote storage. If successful, the leader would write the updated RemoteLogSegmentMetadata to the metadata topic (via RLMM.putRemoteLogSegmentData). However, for defensive reasons, it seems useful that before the first time the segment is uploaded by the leader for a partition, the leader should ensure to catch up to all the metadata events written so far in the metadata topic for that partition (ex: by previous leader). To achieve this, the leader could start a lease (using an establish_leader metadata event) before commencing tiering, and wait until the event is read back. For example, this seems useful to avoid cases where zombie leaders can be active for the same partition. This can also prove useful to help avoid making decisions on which segments to be uploaded for a partition, until the current leader has caught up to a complete view of all segments uploaded for the partition so far (otherwise this may cause same segment being uploaded twice -- once by the previous leader and then by the new leader). 5003. There is a natural interleaving between uploading a segment to remote store, and, writing a metadata event for the same (via RLMM.putRemoteLogSegmentData). There can be cases where a remote segment is uploaded, then the leader fails and a corresponding metadata event never gets written. In such cases, the orphaned remote segment has to be eventually deleted (since there is no confirmation of the upload). To handle this, we could use 2 separate metadata events viz. copy_initiated and copy_completed, so that copy_initiated events that don't have a corresponding copy_completed event can be treated as garbage and deleted from the remote object store by the broker. 5004. In the default implementation of RLMM (using the internal topic __remote_log_metadata), a separate topic called __remote_segments_to_be_deleted is going to be used just to track failures in removing remote log segments. A separate topic (effectively another metadata stream) introduces some maintenance overhead and design complexity. It seems to me that the same can be achieved just by using just the __remote_log_metadata topic with the following steps: 1) the leader writes a delete_initiated metadata event, 2) the leader deletes the segment and 3) the leader writes a delete_completed metadata event. Tiered segments that have delete_initiated message and not delete_completed message, can be considered to be a failure and retried. 5005. When a Kafka cluster is provisioned for the first time with KIP-405 tiered storage enabled, could you explain in the KIP about how the bootstrap for __remote_log_metadata topic will be performed in the the default RLMM implementation? 5006. I currently do not see details on the KIP on why RocksDB was chosen as the default cache implementation, and how it is going to be used. Were alternatives compared/considered? For example, it would be useful to explain/evaulate the following: 1) debuggability of the RocksDB JNI interface, 2) performance, 3) portability across platforms and 4) interface parity of RocksDB’s JNI api with it's underlying C/C++ api. 5007. For the RocksDB cache (the default implementation of RLMM), what is the relationship/mapping between the following: 1) # of tiered partitions, 2) # of partitions of metadata topic __remote_log_metadata and 3) # of RocksDB instances? i.e. is the plan to have a RocksDB instance per tiered partition, or per metadata topic partition, or just 1 for per broker? 5008. The system-wide configuration 'remote.log.storage.enable' is used to enable tiered storage. Can this be made a topic-level configuration, so that the user can enable/disable tiered storage at a topic level rather than a system-wide default for an entire Kafka cluster? 5009. Whenever a topic with tiered storage enabled is deleted, the underlying actions require the topic data to be deleted in local store as well as remote store, and eventually the topic metadata needs to be deleted too. What is the role of the controller in deleting a topic and it's contents, while the topic has tiered storage enabled? 5010. RLMM APIs are currently synchronous, for example RLMM.putRemoteLogSegmentData waits until the put operation is completed in the remote metadata store. It may also block until the leader has caught up to the metadata (not sure). Could we make these apis asynchronous (ex: based on java.util.concurrent.Future) to provide room for tapping performance improvements such as non-blocking i/o? 5011. The same question as 5009 on sync vs async api for RSM. Have we considered the pros/cons of making the RSM apis asynchronous? Cheers, Kowshik On Thu, Aug 6, 2020 at 11:02 AM Satish Duggana <satish.dugg...@gmail.com> wrote: > Hi Jun, > Thanks for your comments. > > > At the high level, that approach sounds reasonable to > me. It would be useful to document how RLMM handles overlapping archived > offset ranges and how those overlapping segments are deleted through > retention. > > Sure, we will document that in the KIP. > > >How is the remaining part of the KIP coming along? To me, the two biggest > missing items are (1) more detailed documentation on how all the new APIs > are being used and (2) metadata format and usage in the internal > topic __remote_log_metadata. > > We are working on updating APIs based on the recent discussions and > get the perf numbers by plugging in rocksdb as a cache store for RLMM. > We will update the KIP with the updated APIs and with the above > requested details in a few days and let you know. > > Thanks, > Satish. > > > > > On Wed, Aug 5, 2020 at 12:49 AM Jun Rao <j...@confluent.io> wrote: > > > > Hi, Ying, Satish, > > > > Thanks for the reply. At the high level, that approach sounds reasonable > to > > me. It would be useful to document how RLMM handles overlapping archived > > offset ranges and how those overlapping segments are deleted through > > retention. > > > > How is the remaining part of the KIP coming along? To me, the two biggest > > missing items are (1) more detailed documentation on how all the new APIs > > are being used and (2) metadata format and usage in the internal > > topic __remote_log_metadata. > > > > Thanks, > > > > Jun > > > > On Tue, Aug 4, 2020 at 8:32 AM Satish Duggana <satish.dugg...@gmail.com> > > wrote: > > > > > Hi Jun, > > > Thanks for your comment, > > > > > > 1001. Using the new leader as the source of truth may be fine too. > What's > > > not clear to me is when a follower takes over as the new leader, from > which > > > offset does it start archiving to the block storage. I assume that the > new > > > leader starts from the latest archived ooffset by the previous leader, > but > > > it seems that's not the case. It would be useful to document this in > the > > > Wiki. > > > > > > When a follower becomes a leader it needs to findout the offset from > > > which the segments to be copied to remote storage. This is found by > > > traversing from the the latest leader epoch from leader epoch history > > > and find the highest offset of a segment with that epoch copied into > > > remote storage by using respective RLMM APIs. If it can not find an > > > entry then it checks for the previous leader epoch till it finds an > > > entry, If there are no entries till the earliest leader epoch in > > > leader epoch cache then it starts copying the segments from the > > > earliest epoch entry’s offset. > > > Added an example in the KIP here[1]. We will update RLMM APIs in the > KIP. > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Followertoleadertransition > > > > > > Satish. > > > > > > > > > On Tue, Aug 4, 2020 at 9:00 PM Satish Duggana < > satish.dugg...@gmail.com> > > > wrote: > > > > > > > > Hi Ying, > > > > Thanks for your comment. > > > > > > > > 1001. Using the new leader as the source of truth may be fine too. > What's > > > > not clear to me is when a follower takes over as the new leader, from > > > which > > > > offset does it start archiving to the block storage. I assume that > the > > > new > > > > leader starts from the latest archived ooffset by the previous > leader, > > > but > > > > it seems that's not the case. It would be useful to document this in > the > > > > Wiki. > > > > > > > > When a follower becomes a leader it needs to findout the offset from > > > > which the segments to be copied to remote storage. This is found by > > > > traversing from the the latest leader epoch from leader epoch history > > > > and find the highest offset of a segment with that epoch copied into > > > > remote storage by using respective RLMM APIs. If it can not find an > > > > entry then it checks for the previous leader epoch till it finds an > > > > entry, If there are no entries till the earliest leader epoch in > > > > leader epoch cache then it starts copying the segments from the > > > > earliest epoch entry’s offset. > > > > Added an example in the KIP here[1]. We will update RLMM APIs in the > KIP. > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-Followertoleadertransition > > > > > > > > > > > > Satish. > > > > > > > > > > > > On Tue, Aug 4, 2020 at 10:28 AM Ying Zheng <yi...@uber.com.invalid> > > > wrote: > > > > > > > > > > Hi Jun, > > > > > > > > > > Thank you for the comment! The current KIP is not very clear about > this > > > > > part. > > > > > > > > > > 1001. The new leader will start archiving from the earliest local > > > segment > > > > > that is not fully > > > > > covered by the "valid" remote data. "valid" means the (offset, > leader > > > > > epoch) pair is valid > > > > > based on the leader-epoch history. > > > > > > > > > > There are some edge cases where the same offset range (with the > same > > > leader > > > > > epoch) can > > > > > be copied to the remote storage more than once. But this kind of > > > > > duplication shouldn't be a > > > > > problem. > > > > > > > > > > Staish is going to explain the details in the KIP with examples. > > > > > > > > > > > > > > > On Fri, Jul 31, 2020 at 2:55 PM Jun Rao <j...@confluent.io> wrote: > > > > > > > > > > > Hi, Ying, > > > > > > > > > > > > Thanks for the reply. > > > > > > > > > > > > 1001. Using the new leader as the source of truth may be fine > too. > > > What's > > > > > > not clear to me is when a follower takes over as the new leader, > > > from which > > > > > > offset does it start archiving to the block storage. I assume > that > > > the new > > > > > > leader starts from the latest archived ooffset by the previous > > > leader, but > > > > > > it seems that's not the case. It would be useful to document > this in > > > the > > > > > > wiki. > > > > > > > > > > > > Jun > > > > > > > > > > > > On Tue, Jul 28, 2020 at 12:11 PM Ying Zheng > <yi...@uber.com.invalid> > > > > > > wrote: > > > > > > > > > > > > > 1001. > > > > > > > > > > > > > > We did consider this approach. The concerns are > > > > > > > 1) This makes unclean-leader-election rely on remote storage. > In > > > case > > > > > > the > > > > > > > remote storage > > > > > > > is unavailable, Kafka will not be able to finish the > > > > > > > unclean-leader-election. > > > > > > > 2) Since the user set local retention time (or local retention > > > bytes), I > > > > > > > think we are expected to > > > > > > > keep that much local data when possible (avoid truncating all > the > > > local > > > > > > > data). But, as you said, > > > > > > > unclean leader elections are very rare, this may not be a big > > > problem. > > > > > > > > > > > > > > The current design uses the leader broker as source-of-truth. > This > > > is > > > > > > > consistent with the > > > > > > > existing Kafka behavior. > > > > > > > > > > > > > > By using remote storage as the source-of-truth, the follower > logic > > > can > > > > > > be a > > > > > > > little simpler, > > > > > > > but the leader logic is going to be more complex. Overall, I > don't > > > see > > > > > > > there many benefits > > > > > > > of using remote storage as the source-of-truth. > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 28, 2020 at 10:25 AM Jun Rao <j...@confluent.io> > wrote: > > > > > > > > > > > > > > > Hi, Satish, > > > > > > > > > > > > > > > > Thanks for the reply. > > > > > > > > > > > > > > > > 1001. In your example, I was thinking that you could just > > > download the > > > > > > > > latest leader epoch from the object store. After that you > know > > > the > > > > > > leader > > > > > > > > should end with offset 1100. The leader will delete all its > > > local data > > > > > > > > before offset 1000 and start accepting new messages at offset > > > 1100. > > > > > > > > Consumer requests for messages before offset 1100 will be > served > > > from > > > > > > the > > > > > > > > object store. The benefit with this approach is that it's > > > simpler to > > > > > > > reason > > > > > > > > about who is the source of truth. The downside is slightly > > > increased > > > > > > > > unavailability window during unclean leader election. Since > > > unclean > > > > > > > leader > > > > > > > > elections are rare, I am not sure if this is a big concern. > > > > > > > > > > > > > > > > 1008. Yes, I think introducing sth like local.retention.ms > > > seems more > > > > > > > > consistent. > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > On Tue, Jul 28, 2020 at 2:30 AM Satish Duggana < > > > > > > satish.dugg...@gmail.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > HI Jun, > > > > > > > > > Thanks for your comments. We put our inline replies below. > > > > > > > > > > > > > > > > > > 1001. I was thinking that you could just use the tiered > > > metadata to > > > > > > do > > > > > > > > the > > > > > > > > > reconciliation. The tiered metadata contains offset ranges > and > > > epoch > > > > > > > > > history. Those should be enough for reconciliation > purposes. > > > > > > > > > > > > > > > > > > If we use remote storage as the source-of-truth during > > > > > > > > > unclean-leader-election, it's possible that after > > > reconciliation the > > > > > > > > > remote storage will have more recent data than the new > > > leader's local > > > > > > > > > storage. For example, the new leader's latest message is > > > offset 1000, > > > > > > > > > while the remote storage has message 1100. In such a case, > the > > > new > > > > > > > > > leader will have to download the messages from 1001 to > 1100, > > > before > > > > > > > > > accepting new messages from producers. Otherwise, there > would > > > be a > > > > > > gap > > > > > > > > > in the local data between 1000 and 1101. > > > > > > > > > > > > > > > > > > Moreover, with the current design, leader epoch history is > > > stored in > > > > > > > > > remote storage, rather than the metadata topic. We did > consider > > > > > > saving > > > > > > > > > epoch history in remote segment metadata. But the concern > is > > > that > > > > > > > > > there is currently no limit for the epoch history size. > > > > > > Theoretically, > > > > > > > > > if a user has a very long remote retention time and there > are > > > very > > > > > > > > > frequent leadership changes, the leader epoch history can > > > become too > > > > > > > > > long to fit into a regular Kafka message. > > > > > > > > > > > > > > > > > > > > > > > > > > > 1003.3 Having just a serverEndpoint string is probably not > > > enough. > > > > > > > > > Connecting to a Kafka cluster may need various security > > > credentials. > > > > > > We > > > > > > > > can > > > > > > > > > make RLMM configurable and pass in the properties through > the > > > > > > > configure() > > > > > > > > > method. Ditto for RSM. > > > > > > > > > > > > > > > > > > RLMM and RSM are already configurable and they take > > > properties which > > > > > > > > > start with "remote.log.metadata." and "remote.log.storage." > > > > > > > > > respectively and a few others. We have listener-name as the > > > config > > > > > > for > > > > > > > > > RLMM and other properties(like security) can be sent as you > > > > > > suggested. > > > > > > > > > We will update the KIP with the details. > > > > > > > > > > > > > > > > > > > > > > > > > > > 1008.1 We started with log.retention.hours and > > > log.retention.minutes, > > > > > > > and > > > > > > > > > added log.retention.ms later. If we are adding a new > > > configuration, > > > > > > ms > > > > > > > > > level config alone is enough and is simpler. We can build > > > tools to > > > > > > make > > > > > > > > the > > > > > > > > > configuration at different granularities easier. The > > > definition of > > > > > > > > > log.retention.ms is "The number of milliseconds to keep a > log > > > file > > > > > > > > before > > > > > > > > > deleting it". The deletion is independent of whether > tiering is > > > > > > enabled > > > > > > > > or > > > > > > > > > not. If this changes to just the local portion of the > data, we > > > are > > > > > > > > changing > > > > > > > > > the meaning of an existing configuration. > > > > > > > > > > > > > > > > > > We are fine with either way. We can go with > log.retention.xxxx > > > as the > > > > > > > > > effective log retention instead of local log retention. > With > > > this > > > > > > > > > convention, we need to introduce local.log.retention > instead > > > of > > > > > > > > > remote.log.retention.ms that we proposed. If > log.retention.ms > > > as -1 > > > > > > > > > then remote retention is also considered as unlimited but > user > > > should > > > > > > > > > be able to set the local.retention.ms. > > > > > > > > > So, we need to introduce local.log.retention.ms and > > > > > > > > > local.log.retention.bytes which should always be <= > > > > > > > > > log.retention.ms/bytes respectively. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Jul 24, 2020 at 3:37 AM Jun Rao <j...@confluent.io> > > > wrote: > > > > > > > > > > > > > > > > > > > > Hi, Satish, > > > > > > > > > > > > > > > > > > > > Thanks for the reply. A few quick comments below. > > > > > > > > > > > > > > > > > > > > 1001. I was thinking that you could just use the tiered > > > metadata to > > > > > > > do > > > > > > > > > the > > > > > > > > > > reconciliation. The tiered metadata contains offset > ranges > > > and > > > > > > epoch > > > > > > > > > > history. Those should be enough for reconciliation > purposes. > > > > > > > > > > > > > > > > > > > > 1003.3 Having just a serverEndpoint string is probably > not > > > enough. > > > > > > > > > > Connecting to a Kafka cluster may need various security > > > > > > credentials. > > > > > > > We > > > > > > > > > can > > > > > > > > > > make RLMM configurable and pass in the properties > through the > > > > > > > > configure() > > > > > > > > > > method. Ditto for RSM. > > > > > > > > > > > > > > > > > > > > 1008.1 We started with log.retention.hours and > > > > > > log.retention.minutes, > > > > > > > > and > > > > > > > > > > added log.retention.ms later. If we are adding a new > > > > > > configuration, > > > > > > > ms > > > > > > > > > > level config alone is enough and is simpler. We can build > > > tools to > > > > > > > make > > > > > > > > > the > > > > > > > > > > configuration at different granularities easier. The > > > definition of > > > > > > > > > > log.retention.ms is "The number of milliseconds to keep > a > > > log file > > > > > > > > > before > > > > > > > > > > deleting it". The deletion is independent of whether > tiering > > > is > > > > > > > enabled > > > > > > > > > or > > > > > > > > > > not. If this changes to just the local portion of the > data, > > > we are > > > > > > > > > changing > > > > > > > > > > the meaning of an existing configuration. > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, Jul 23, 2020 at 11:04 AM Satish Duggana < > > > > > > > > > satish.dugg...@gmail.com> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > > > > > > > > Thank you for the comments! Ying, Harsha and I > discussed > > > and put > > > > > > > our > > > > > > > > > > > comments below. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1001. The KIP described a few scenarios of unclean > leader > > > > > > > elections. > > > > > > > > > This > > > > > > > > > > > is very useful, but I am wondering if this is the best > > > approach. > > > > > > My > > > > > > > > > > > understanding of the proposed approach is to allow the > new > > > > > > > (unclean) > > > > > > > > > leader > > > > > > > > > > > to take new messages immediately. While this increases > > > > > > > availability, > > > > > > > > it > > > > > > > > > > > creates the problem that there could be multiple > > > conflicting > > > > > > > segments > > > > > > > > > in > > > > > > > > > > > the remote store for the same offset range. This seems > to > > > make it > > > > > > > > > harder > > > > > > > > > > > for RLMM to determine which archived log segments > contain > > > the > > > > > > > correct > > > > > > > > > data. > > > > > > > > > > > For example, an archived log segment could at one time > be > > > the > > > > > > > correct > > > > > > > > > data, > > > > > > > > > > > but be changed to incorrect data after an unclean > leader > > > > > > election. > > > > > > > An > > > > > > > > > > > alternative approach is to let the unclean leader use > the > > > > > > archived > > > > > > > > > data as > > > > > > > > > > > the source of truth. So, when the new (unclean) leader > > > takes > > > > > > over, > > > > > > > it > > > > > > > > > first > > > > > > > > > > > reconciles the local data based on the archived data > before > > > > > > taking > > > > > > > > new > > > > > > > > > > > messages. This makes the job of RLMM a bit easier > since all > > > > > > > archived > > > > > > > > > data > > > > > > > > > > > are considered correct. This increases availability a > bit. > > > > > > However, > > > > > > > > > since > > > > > > > > > > > unclean leader elections are rare, this may be ok. > > > > > > > > > > > > > > > > > > > > > > Firstly, We don't want to assume the remote storage is > more > > > > > > > reliable > > > > > > > > > than > > > > > > > > > > > Kafka. Kafka unclean leader election usually happens > when > > > there > > > > > > is > > > > > > > a > > > > > > > > > large > > > > > > > > > > > scale outage that impacts multiple racks (or even > multiple > > > > > > > > availability > > > > > > > > > > > zones). In such a case, the remote storage may be > > > unavailable or > > > > > > > > > unstable. > > > > > > > > > > > Pulling a large amount of data from the remote storage > to > > > > > > reconcile > > > > > > > > the > > > > > > > > > > > local data may also exacerbate the outage. With the > current > > > > > > design, > > > > > > > > > the new > > > > > > > > > > > leader can start working even when the remote storage > is > > > > > > > temporarily > > > > > > > > > > > unavailable. > > > > > > > > > > > > > > > > > > > > > > Secondly, it is not easier to implement the reconciling > > > logic at > > > > > > > the > > > > > > > > > leader > > > > > > > > > > > side. It can take a long time for the new leader to > > > download the > > > > > > > > remote > > > > > > > > > > > data and rebuild local producer id / leader epoch > > > information. > > > > > > > During > > > > > > > > > this > > > > > > > > > > > period, the leader cannot accept any requests from the > > > clients > > > > > > and > > > > > > > > > > > followers. We have to introduce a new state for the > > > leader, and a > > > > > > > new > > > > > > > > > error > > > > > > > > > > > code to let the clients / followers know what is > happening. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1002. RemoteStorageManager. > > > > > > > > > > > 1002.1 There seems to be some inconsistencies in > > > > > > > > RemoteStorageManager. > > > > > > > > > We > > > > > > > > > > > pass in RemoteLogSegmentId copyLogSegment(). For all > other > > > > > > methods, > > > > > > > > we > > > > > > > > > pass > > > > > > > > > > > in RemoteLogSegmentMetadata. > > > > > > > > > > > > > > > > > > > > > > Nice catch, we can have the RemoteLogSegmentMetadata > for > > > > > > > > copyLogSegment > > > > > > > > > > > too. > > > > > > > > > > > > > > > > > > > > > > 1002.2 Is endOffset in RemoteLogSegmentMetadata > inclusive > > > or > > > > > > > > exclusive? > > > > > > > > > > > > > > > > > > > > > > It is inclusive. > > > > > > > > > > > > > > > > > > > > > > 1002.3 It seems that we need an api to get the > leaderEpoch > > > > > > history > > > > > > > > for > > > > > > > > > a > > > > > > > > > > > partition. > > > > > > > > > > > > > > > > > > > > > > Yes, updated the KIP with the new method. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1002.4 Could you define the type of > > > RemoteLogSegmentContext? > > > > > > > > > > > > > > > > > > > > > > This is removed in the latest code and it is not > needed. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1003 RemoteLogMetadataManager > > > > > > > > > > > > > > > > > > > > > > 1003.1 I am not sure why we need both of the following > > > methods > > > > > > > > > > > in RemoteLogMetadataManager. Could we combine them into > > > one that > > > > > > > > takes > > > > > > > > > in > > > > > > > > > > > offset and returns RemoteLogSegmentMetadata? > > > > > > > > > > > RemoteLogSegmentId > getRemoteLogSegmentId(TopicPartition > > > > > > > > > topicPartition, > > > > > > > > > > > long offset) throws IOException; > > > > > > > > > > > RemoteLogSegmentMetadata > > > > > > > > > getRemoteLogSegmentMetadata(RemoteLogSegmentId > > > > > > > > > > > remoteLogSegmentId) throws IOException; > > > > > > > > > > > > > > > > > > > > > > Good point, these can be merged for now. I guess we > needed > > > them > > > > > > in > > > > > > > > > earlier > > > > > > > > > > > version of the implementation but it is not needed now. > > > > > > > > > > > > > > > > > > > > > > 1003.2 There seems to be some inconsistencies in the > > > methods > > > > > > > below. I > > > > > > > > > am > > > > > > > > > > > not sure why one takes RemoteLogSegmentMetadata and the > > > other > > > > > > > > > > > takes RemoteLogSegmentId. > > > > > > > > > > > void > putRemoteLogSegmentData(RemoteLogSegmentMetadata > > > > > > > > > > > remoteLogSegmentMetadata) throws IOException; > > > > > > > > > > > void > deleteRemoteLogSegmentMetadata(RemoteLogSegmentId > > > > > > > > > > > remoteLogSegmentId) throws IOException; > > > > > > > > > > > > > > > > > > > > > > RLMM stores RemoteLogSegmentMetadata which is > identified by > > > > > > > > > > > RemoteLogsSegmentId. So, when it is added it takes > > > > > > > > > > > RemoteLogSegmentMetadata. `delete` operation needs only > > > > > > > > > RemoteLogsSegmentId > > > > > > > > > > > as RemoteLogSegmentMetadata can be identified with > > > > > > > > RemoteLogsSegmentId. > > > > > > > > > > > > > > > > > > > > > > 1003.3 In void onServerStarted(final String > > > serverEndpoint), what > > > > > > > > > > > is serverEndpoint used for? > > > > > > > > > > > > > > > > > > > > > > This can be used by RLMM implementation to connect to > the > > > local > > > > > > > Kafka > > > > > > > > > > > cluster. Incase of default implementation, it is used > in > > > > > > > > initializing > > > > > > > > > > > kafka clients connecting to the local cluster. > > > > > > > > > > > > > > > > > > > > > > 1004. It would be useful to document how all the new > APIs > > > are > > > > > > being > > > > > > > > > used. > > > > > > > > > > > For example, when is > > > RemoteLogSegmentMetadata.markedForDeletion > > > > > > > being > > > > > > > > > set > > > > > > > > > > > and used? How are > > > > > > > > > > > > > > RemoteLogMetadataManager.earliestLogOffset/highestLogOffset being > > > > > > > > used? > > > > > > > > > > > > > > > > > > > > > > RLMM APIs are going through the changes and they > should be > > > ready > > > > > > > in a > > > > > > > > > few > > > > > > > > > > > days. I will update the KIP and the mail thread once > they > > > are > > > > > > > ready. > > > > > > > > > > > > > > > > > > > > > > 1005. Handling partition deletion: The KIP says "RLMM > will > > > > > > > eventually > > > > > > > > > > > delete these segments by using RemoteStorageManager." > Which > > > > > > replica > > > > > > > > > does > > > > > > > > > > > this logic? > > > > > > > > > > > > > > > > > > > > > > This is a good point. When a topic is deleted, it will > not > > > have > > > > > > any > > > > > > > > > > > leader/followers to do the cleanup. We will have a > cleaner > > > agent > > > > > > > on a > > > > > > > > > > > single broker in the cluster to do this cleanup, we > plan > > > to add > > > > > > > that > > > > > > > > in > > > > > > > > > > > controller broker. > > > > > > > > > > > > > > > > > > > > > > 1006. "If there are any failures in removing remote log > > > segments > > > > > > > then > > > > > > > > > those > > > > > > > > > > > are stored in a specific topic (default as > > > > > > > > > __remote_segments_to_be_deleted) > > > > > > > > > > > and user can consume the events(which contain > > > > > > > remote-log-segment-id) > > > > > > > > > from > > > > > > > > > > > that topic and clean them up from remote storage. " > Not > > > sure if > > > > > > > it's > > > > > > > > > worth > > > > > > > > > > > the complexity of adding another topic. Could we just > > > retry? > > > > > > > > > > > > > > > > > > > > > > Sure, we can keep this simpler for now by logging an > error > > > after > > > > > > > > > retries. > > > > > > > > > > > We can give users a better way to process this in > future. > > > Oneway > > > > > > > can > > > > > > > > > be a > > > > > > > > > > > dead letter topic which can be configured by the user. > > > > > > > > > > > > > > > > > > > > > > 1007. RemoteFetchPurgatory: Could we just reuse the > > > existing > > > > > > > > > > > fetchPurgatory? > > > > > > > > > > > > > > > > > > > > > > We have 2 types of delayed operations waiting for 2 > > > different > > > > > > > events. > > > > > > > > > > > DelayedFetch waits for new messages from producers. > > > > > > > > DelayedRemoteFetch > > > > > > > > > > > waits for the remote-storage-read-task to finish. When > > > either of > > > > > > > the > > > > > > > > 2 > > > > > > > > > > > events happens, we only want to notify one type of the > > > delayed > > > > > > > > > operations. > > > > > > > > > > > It would be inefficient to put 2 types of delayed > > > operations in > > > > > > one > > > > > > > > > > > purgatory, as the tryComplete() methods of the delayed > > > operations > > > > > > > can > > > > > > > > > be > > > > > > > > > > > triggered by irrelevant events. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1008. Configurations: > > > > > > > > > > > 1008.1 remote.log.retention.ms, > > > remote.log.retention.minutes, > > > > > > > > > > > remote.log.retention.hours: It seems that we just need > the > > > ms > > > > > > one. > > > > > > > > > Also, > > > > > > > > > > > are we changing the meaning of existing config > > > log.retention.ms > > > > > > to > > > > > > > > > mean > > > > > > > > > > > the > > > > > > > > > > > local retention? For backward compatibility, it's > better > > > to not > > > > > > > > change > > > > > > > > > the > > > > > > > > > > > meaning of existing configurations. > > > > > > > > > > > > > > > > > > > > > > We agree that we only need remote.log.retention.ms. > But, > > > the > > > > > > > > existing > > > > > > > > > > > Kafka > > > > > > > > > > > configuration > > > > > > > > > > > has 3 properties (log.retention.ms, > log.retention.minutes, > > > > > > > > > > > log.retention.hours). We just > > > > > > > > > > > want to keep consistent with the existing properties. > > > > > > > > > > > Existing log.retention.xxxx config is about log > retention > > > in > > > > > > > broker’s > > > > > > > > > > > storage which is local. It should be easy for users to > > > configure > > > > > > > > > partition > > > > > > > > > > > storage with local retention and remote retention > based on > > > their > > > > > > > > usage. > > > > > > > > > > > > > > > > > > > > > > 1008.2 Should remote.log.storage.enable be at the topic > > > level? > > > > > > > > > > > > > > > > > > > > > > We can introduce topic level config for the same > remote.log > > > > > > > settings. > > > > > > > > > User > > > > > > > > > > > can set the desired config while creating the topic. > > > > > > > > > > > remote.log.storage.enable property is not allowed to be > > > updated > > > > > > > after > > > > > > > > > the > > > > > > > > > > > topic is created. Other remote.log.* properties can be > > > modified. > > > > > > We > > > > > > > > > will > > > > > > > > > > > support flipping remote.log.storage.enable in next > > > versions. > > > > > > > > > > > > > > > > > > > > > > 1009. It would be useful to list all limitations in a > > > separate > > > > > > > > section: > > > > > > > > > > > compacted topic, JBOD, etc. Also, is changing a topic > from > > > delete > > > > > > > to > > > > > > > > > > > compact and vice versa allowed when tiering is enabled? > > > > > > > > > > > > > > > > > > > > > > +1 to have limitations in a separate section. We will > > > update the > > > > > > > KIP > > > > > > > > > with > > > > > > > > > > > that. > > > > > > > > > > > Topic created with effective value for > remote.log.enabled > > > as > > > > > > true, > > > > > > > > > can not > > > > > > > > > > > change its retention policy from delete to compact. > > > > > > > > > > > > > > > > > > > > > > 1010. Thanks for performance numbers. Are those with > > > RocksDB as > > > > > > the > > > > > > > > > cache? > > > > > > > > > > > > > > > > > > > > > > No, We have not yet added RocksDB support. This is > based on > > > > > > > in-memory > > > > > > > > > map > > > > > > > > > > > representation. We will add that support and update > this > > > thread > > > > > > > after > > > > > > > > > > > updating the KIP with the numbers. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > Satish. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 21, 2020 at 6:49 AM Jun Rao < > j...@confluent.io> > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi, Satish, Ying, Harsha, > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the updated KIP. A few more comments > below. > > > > > > > > > > > > > > > > > > > > > > > > 1000. Regarding Colin's question on querying the > metadata > > > > > > > directly > > > > > > > > > in the > > > > > > > > > > > > remote block store. One issue is that not all block > > > stores > > > > > > offer > > > > > > > > the > > > > > > > > > > > needed > > > > > > > > > > > > api to query the metadata. For example, S3 only > offers > > > an api > > > > > > to > > > > > > > > list > > > > > > > > > > > > objects under a prefix and this api has the eventual > > > > > > consistency > > > > > > > > > > > semantic. > > > > > > > > > > > > > > > > > > > > > > > > 1001. The KIP described a few scenarios of unclean > leader > > > > > > > > elections. > > > > > > > > > This > > > > > > > > > > > > is very useful, but I am wondering if this is the > best > > > > > > approach. > > > > > > > My > > > > > > > > > > > > understanding of the proposed approach is to allow > the > > > new > > > > > > > > (unclean) > > > > > > > > > > > leader > > > > > > > > > > > > to take new messages immediately. While this > increases > > > > > > > > availability, > > > > > > > > > it > > > > > > > > > > > > creates the problem that there could be multiple > > > conflicting > > > > > > > > > segments in > > > > > > > > > > > > the remote store for the same offset range. This > seems > > > to make > > > > > > it > > > > > > > > > harder > > > > > > > > > > > > for RLMM to determine which archived log segments > > > contain the > > > > > > > > correct > > > > > > > > > > > data. > > > > > > > > > > > > For example, an archived log segment could at one > time > > > be the > > > > > > > > correct > > > > > > > > > > > data, > > > > > > > > > > > > but be changed to incorrect data after an unclean > leader > > > > > > > election. > > > > > > > > An > > > > > > > > > > > > alternative approach is to let the unclean leader > use the > > > > > > > archived > > > > > > > > > data > > > > > > > > > > > as > > > > > > > > > > > > the source of truth. So, when the new (unclean) > leader > > > takes > > > > > > > over, > > > > > > > > it > > > > > > > > > > > first > > > > > > > > > > > > reconciles the local data based on the archived data > > > before > > > > > > > taking > > > > > > > > > new > > > > > > > > > > > > messages. This makes the job of RLMM a bit easier > since > > > all > > > > > > > > archived > > > > > > > > > data > > > > > > > > > > > > are considered correct. This increases availability a > > > bit. > > > > > > > However, > > > > > > > > > since > > > > > > > > > > > > unclean leader elections are rare, this may be ok. > > > > > > > > > > > > > > > > > > > > > > > > 1002. RemoteStorageManager. > > > > > > > > > > > > 1002.1 There seems to be some inconsistencies in > > > > > > > > > RemoteStorageManager. We > > > > > > > > > > > > pass in RemoteLogSegmentId copyLogSegment(). For all > > > other > > > > > > > methods, > > > > > > > > > we > > > > > > > > > > > pass > > > > > > > > > > > > in RemoteLogSegmentMetadata. > > > > > > > > > > > > 1002.2 Is endOffset in RemoteLogSegmentMetadata > > > inclusive or > > > > > > > > > exclusive? > > > > > > > > > > > > 1002.3 It seems that we need an api to get the > > > leaderEpoch > > > > > > > history > > > > > > > > > for a > > > > > > > > > > > > partition. > > > > > > > > > > > > 1002.4 Could you define the type of > > > RemoteLogSegmentContext? > > > > > > > > > > > > > > > > > > > > > > > > 1003 RemoteLogMetadataManager > > > > > > > > > > > > 1003.1 I am not sure why we need both of the > following > > > methods > > > > > > > > > > > > in RemoteLogMetadataManager. Could we combine them > into > > > one > > > > > > that > > > > > > > > > takes in > > > > > > > > > > > > offset and returns RemoteLogSegmentMetadata? > > > > > > > > > > > > RemoteLogSegmentId > > > getRemoteLogSegmentId(TopicPartition > > > > > > > > > > > topicPartition, > > > > > > > > > > > > long offset) throws IOException; > > > > > > > > > > > > RemoteLogSegmentMetadata > > > > > > > > > > > getRemoteLogSegmentMetadata(RemoteLogSegmentId > > > > > > > > > > > > remoteLogSegmentId) throws IOException; > > > > > > > > > > > > 1003.2 There seems to be some inconsistencies in the > > > methods > > > > > > > below. > > > > > > > > > I am > > > > > > > > > > > > not sure why one takes RemoteLogSegmentMetadata and > the > > > other > > > > > > > > > > > > takes RemoteLogSegmentId. > > > > > > > > > > > > void > putRemoteLogSegmentData(RemoteLogSegmentMetadata > > > > > > > > > > > > remoteLogSegmentMetadata) throws IOException; > > > > > > > > > > > > void > > > deleteRemoteLogSegmentMetadata(RemoteLogSegmentId > > > > > > > > > > > > remoteLogSegmentId) throws IOException; > > > > > > > > > > > > 1003.3 In void onServerStarted(final String > > > serverEndpoint), > > > > > > what > > > > > > > > > > > > is serverEndpoint used for? > > > > > > > > > > > > > > > > > > > > > > > > 1004. It would be useful to document how all the new > > > APIs are > > > > > > > being > > > > > > > > > used. > > > > > > > > > > > > For example, when is > > > RemoteLogSegmentMetadata.markedForDeletion > > > > > > > > > being set > > > > > > > > > > > > and used? How are > > > > > > > > > > > > > > > RemoteLogMetadataManager.earliestLogOffset/highestLogOffset > > > > > > being > > > > > > > > > used? > > > > > > > > > > > > > > > > > > > > > > > > 1005. Handling partition deletion: The KIP says "RLMM > > > will > > > > > > > > eventually > > > > > > > > > > > > delete these segments by using RemoteStorageManager." > > > Which > > > > > > > replica > > > > > > > > > does > > > > > > > > > > > > this logic? > > > > > > > > > > > > > > > > > > > > > > > > 1006. "If there are any failures in removing remote > log > > > > > > segments > > > > > > > > then > > > > > > > > > > > those > > > > > > > > > > > > are stored in a specific topic (default as > > > > > > > > > > > __remote_segments_to_be_deleted) > > > > > > > > > > > > and user can consume the events(which contain > > > > > > > > remote-log-segment-id) > > > > > > > > > from > > > > > > > > > > > > that topic and clean them up from remote storage. " > Not > > > sure > > > > > > if > > > > > > > > it's > > > > > > > > > > > worth > > > > > > > > > > > > the complexity of adding another topic. Could we just > > > retry? > > > > > > > > > > > > > > > > > > > > > > > > 1007. RemoteFetchPurgatory: Could we just reuse the > > > existing > > > > > > > > > > > > fetchPurgatory? > > > > > > > > > > > > > > > > > > > > > > > > 1008. Configurations: > > > > > > > > > > > > 1008.1 remote.log.retention.ms, > > > remote.log.retention.minutes, > > > > > > > > > > > > remote.log.retention.hours: It seems that we just > need > > > the ms > > > > > > > one. > > > > > > > > > Also, > > > > > > > > > > > > are we changing the meaning of existing config > > > > > > log.retention.ms > > > > > > > to > > > > > > > > > mean > > > > > > > > > > > > the > > > > > > > > > > > > local retention? For backward compatibility, it's > better > > > to not > > > > > > > > > change > > > > > > > > > > > the > > > > > > > > > > > > meaning of existing configurations. > > > > > > > > > > > > 1008.2 Should remote.log.storage.enable be at the > topic > > > level? > > > > > > > > > > > > > > > > > > > > > > > > 1009. It would be useful to list all limitations in a > > > separate > > > > > > > > > section: > > > > > > > > > > > > compacted topic, JBOD, etc. Also, is changing a topic > > > from > > > > > > delete > > > > > > > > to > > > > > > > > > > > > compact and vice versa allowed when tiering is > enabled? > > > > > > > > > > > > > > > > > > > > > > > > 1010. Thanks for performance numbers. Are those with > > > RocksDB as > > > > > > > the > > > > > > > > > > > cache? > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jul 15, 2020 at 6:12 PM Harsha Ch < > > > harsha...@gmail.com > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi Colin, > > > > > > > > > > > > > Thats not what we said in the > previous > > > email. > > > > > > > RLMM > > > > > > > > > is > > > > > > > > > > > > > pluggable storage and by running numbers even 1PB > data > > > you do > > > > > > > not > > > > > > > > > need > > > > > > > > > > > > more > > > > > > > > > > > > > than 10GB local storage. > > > > > > > > > > > > > If in future this becomes a blocker for any users > we > > > can > > > > > > > revisit > > > > > > > > > but > > > > > > > > > > > this > > > > > > > > > > > > > does not warrant another implementation at this > point > > > to push > > > > > > > the > > > > > > > > > data > > > > > > > > > > > to > > > > > > > > > > > > > remote storage. > > > > > > > > > > > > > We can ofcourse implement another RLMM that is > > > optional for > > > > > > > users > > > > > > > > > to > > > > > > > > > > > > > configure to push to remote. But that doesn't need > to > > > be > > > > > > > > addressed > > > > > > > > > in > > > > > > > > > > > > this > > > > > > > > > > > > > KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > Harsha > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Jul 15, 2020 at 5:50 PM Colin McCabe < > > > > > > > cmcc...@apache.org > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Ying, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the response. > > > > > > > > > > > > > > > > > > > > > > > > > > > > It sounds like you agree that storing the > metadata > > > in the > > > > > > > > remote > > > > > > > > > > > > storage > > > > > > > > > > > > > > would be a better design overall. Given that > that's > > > true, > > > > > > is > > > > > > > > > there > > > > > > > > > > > any > > > > > > > > > > > > > > reason to include the worse implementation based > on > > > > > > RocksDB? > > > > > > > > > > > > > > > > > > > > > > > > > > > > Choosing a long-term metadata store is not > something > > > that > > > > > > we > > > > > > > > > should > > > > > > > > > > > do > > > > > > > > > > > > > > lightly. It can take users years to migrate from > > > metadata > > > > > > > > store > > > > > > > > > to > > > > > > > > > > > the > > > > > > > > > > > > > > other. I also don't think it's realistic or > > > desirable for > > > > > > > > users > > > > > > > > > to > > > > > > > > > > > > write > > > > > > > > > > > > > > their own metadata stores. Even assuming that > they > > > could > > > > > > do > > > > > > > a > > > > > > > > > good > > > > > > > > > > > job > > > > > > > > > > > > > at > > > > > > > > > > > > > > this, it would create huge fragmentation in the > Kafka > > > > > > > > ecosystem. > > > > > > > > > > > > > > > > > > > > > > > > > > > > best, > > > > > > > > > > > > > > Colin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 14, 2020, at 09:39, Ying Zheng wrote: > > > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > Hi Colin, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Satish and I are still discussing some details > > > about how > > > > > > to > > > > > > > > > handle > > > > > > > > > > > > > > > transactions / producer ids. Satish is going to > > > make some > > > > > > > > minor > > > > > > > > > > > > changes > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > RLMM API and other parts. Other than that, we > have > > > > > > finished > > > > > > > > > > > updating > > > > > > > > > > > > > the > > > > > > > > > > > > > > KIP > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I agree with Colin that the current design of > using > > > > > > rocksDB > > > > > > > > is > > > > > > > > > not > > > > > > > > > > > > > > > optimal. But this design is simple and should > work > > > for > > > > > > > almost > > > > > > > > > all > > > > > > > > > > > the > > > > > > > > > > > > > > > existing Kafka users. RLMM is a plugin. Users > can > > > replace > > > > > > > > > rocksDB > > > > > > > > > > > > with > > > > > > > > > > > > > > > their own RLMM implementation, if needed. So, I > > > think we > > > > > > > can > > > > > > > > > keep > > > > > > > > > > > > > rocksDB > > > > > > > > > > > > > > > for now. What do you think? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > Ying > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 7, 2020 at 10:35 AM Jun Rao < > > > > > > j...@confluent.io> > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, Ying, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for the update. It's good to see the > > > progress on > > > > > > > > this. > > > > > > > > > > > > Please > > > > > > > > > > > > > > let us > > > > > > > > > > > > > > > > know when you are done updating the KIP wiki. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 7, 2020 at 10:13 AM Ying Zheng > > > > > > > > > > > <yi...@uber.com.invalid > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Satish and I have added more design > details in > > > the > > > > > > KIP, > > > > > > > > > > > including > > > > > > > > > > > > > > how to > > > > > > > > > > > > > > > > > keep consistency between replicas > (especially > > > when > > > > > > > there > > > > > > > > is > > > > > > > > > > > > > > leadership > > > > > > > > > > > > > > > > > changes / log truncations) and new > metrics. We > > > also > > > > > > > made > > > > > > > > > some > > > > > > > > > > > > other > > > > > > > > > > > > > > minor > > > > > > > > > > > > > > > > > changes in the doc. We will finish the KIP > > > changes in > > > > > > > the > > > > > > > > > next > > > > > > > > > > > > > > couple of > > > > > > > > > > > > > > > > > days. We will let you know when we are > done. > > > Most of > > > > > > > the > > > > > > > > > > > changes > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > > already updated to the wiki KIP. You can > take > > > a look. > > > > > > > But > > > > > > > > > it's > > > > > > > > > > > > not > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > final version yet. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > As for the implementation, the code is > mostly > > > done > > > > > > and > > > > > > > we > > > > > > > > > > > already > > > > > > > > > > > > > had > > > > > > > > > > > > > > > > some > > > > > > > > > > > > > > > > > feature tests / system tests. I have added > the > > > > > > > > performance > > > > > > > > > test > > > > > > > > > > > > > > results > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > the KIP. However the recent design changes > > > (e.g. > > > > > > leader > > > > > > > > > epoch > > > > > > > > > > > > info > > > > > > > > > > > > > > > > > management / log truncation / some of the > new > > > > > > metrics) > > > > > > > > > have not > > > > > > > > > > > > > been > > > > > > > > > > > > > > > > > implemented yet. It will take about 2 weeks > > > for us to > > > > > > > > > implement > > > > > > > > > > > > > > after you > > > > > > > > > > > > > > > > > review and agree with those design changes. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jul 7, 2020 at 9:23 AM Jun Rao < > > > > > > > j...@confluent.io > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi, Satish, Harsha, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Any new updates on the KIP? This feature > is > > > one of > > > > > > > the > > > > > > > > > most > > > > > > > > > > > > > > important > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > most requested features in Apache Kafka > > > right now. > > > > > > It > > > > > > > > > would > > > > > > > > > > > be > > > > > > > > > > > > > > helpful > > > > > > > > > > > > > > > > if > > > > > > > > > > > > > > > > > > we can make sustained progress on this. > > > Could you > > > > > > > share > > > > > > > > > how > > > > > > > > > > > far > > > > > > > > > > > > > > along > > > > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > > > the design/implementation right now? Is > there > > > > > > > anything > > > > > > > > > that > > > > > > > > > > > > other > > > > > > > > > > > > > > > > people > > > > > > > > > > > > > > > > > > can help to get it across the line? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > As for "transactional support" and > "follower > > > > > > > > > > > > > > requests/replication", no > > > > > > > > > > > > > > > > > > further comments from me as long as the > > > producer > > > > > > > state > > > > > > > > > and > > > > > > > > > > > > leader > > > > > > > > > > > > > > epoch > > > > > > > > > > > > > > > > > can > > > > > > > > > > > > > > > > > > be restored properly from the object > store > > > when > > > > > > > needed. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Jun > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jun 9, 2020 at 3:39 AM Satish > > > Duggana < > > > > > > > > > > > > > > > > satish.dugg...@gmail.com> > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We did not want to add many > implementation > > > > > > details > > > > > > > in > > > > > > > > > the > > > > > > > > > > > > KIP. > > > > > > > > > > > > > > But we > > > > > > > > > > > > > > > > > > > decided to add them in the KIP as > appendix > > > or > > > > > > > > > > > > > > sub-sections(including > > > > > > > > > > > > > > > > > > > follower fetch protocol) to describe > the > > > flow > > > > > > with > > > > > > > > the > > > > > > > > > main > > > > > > > > > > > > > > cases. > > > > > > > > > > > > > > > > > > > That will answer most of the queries. I > > > will > > > > > > update > > > > > > > > on > > > > > > > > > this > > > > > > > > > > > > > mail > > > > > > > > > > > > > > > > > > > thread when the respective sections are > > > updated. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > Satish. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Sat, Jun 6, 2020 at 7:49 PM > Alexandre > > > Dupriez > > > > > > > > > > > > > > > > > > > <alexandre.dupr...@gmail.com> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Satish, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > A couple of questions specific to the > > > section > > > > > > > > > "Follower > > > > > > > > > > > > > > > > > > > > Requests/Replication", pages 16:17 > in the > > > > > > design > > > > > > > > > document > > > > > > > > > > > > > [1]. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 900. It is mentioned that followers > fetch > > > > > > > auxiliary > > > > > > > > > > > states > > > > > > > > > > > > > > from the > > > > > > > > > > > > > > > > > > > > remote storage. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 900.a Does the consistency model of > the > > > > > > external > > > > > > > > > storage > > > > > > > > > > > > > > impacts > > > > > > > > > > > > > > > > > reads > > > > > > > > > > > > > > > > > > > > of leader epochs and other auxiliary > > > data? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 900.b What are the benefits of using > a > > > > > > mechanism > > > > > > > to > > > > > > > > > store > > > > > > > > > > > > and > > > > > > > > > > > > > > > > access > > > > > > > > > > > > > > > > > > > > the leader epochs which is different > > > from other > > > > > > > > > metadata > > > > > > > > > > > > > > associated > > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > tiered segments? What are the > benefits of > > > > > > > > retrieving > > > > > > > > > this > > > > > > > > > > > > > > > > information > > > > > > > > > > > > > > > > > > > > on-demand from the follower rather > than > > > relying > > > > > > > on > > > > > > > > > > > > > propagation > > > > > > > > > > > > > > via > > > > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > > > > topic __remote_log_metadata? What > are the > > > > > > > > advantages > > > > > > > > > over > > > > > > > > > > > > > > using a > > > > > > > > > > > > > > > > > > > > dedicated control structure (e.g. a > new > > > record > > > > > > > > type) > > > > > > > > > > > > > > propagated via > > > > > > > > > > > > > > > > > > > > this topic? Since in the document, > > > different > > > > > > > > control > > > > > > > > > > > paths > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > > > > > operating in the system, how are the > > > metadata > > > > > > > > stored > > > > > > > > > in > > > > > > > > > > > > > > > > > > > > __remote_log_metadata [which also > > > include the > > > > > > > epoch > > > > > > > > > of > > > > > > > > > > > the > > > > > > > > > > > > > > leader > > > > > > > > > > > > > > > > > > > > which offloaded a segment] and the > remote > > > > > > > auxiliary > > > > > > > > > > > states, > > > > > > > > > > > > > > kept in > > > > > > > > > > > > > > > > > > > > sync? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 900.c A follower can encounter an > > > > > > > > > > > > > > OFFSET_MOVED_TO_TIERED_STORAGE. > > > > > > > > > > > > > > > > Is > > > > > > > > > > > > > > > > > > > > this in response to a Fetch or > > > > > > > OffsetForLeaderEpoch > > > > > > > > > > > > request? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 900.d What happens if, after a > follower > > > > > > > encountered > > > > > > > > > an > > > > > > > > > > > > > > > > > > > > OFFSET_MOVED_TO_TIERED_STORAGE > response, > > > its > > > > > > > > > attempts to > > > > > > > > > > > > > > retrieve > > > > > > > > > > > > > > > > > > > > leader epochs fail (for instance, > > > because the > > > > > > > > remote > > > > > > > > > > > > storage > > > > > > > > > > > > > is > > > > > > > > > > > > > > > > > > > > temporarily unavailable)? Does the > > > follower > > > > > > > > > fallbacks to > > > > > > > > > > > a > > > > > > > > > > > > > mode > > > > > > > > > > > > > > > > where > > > > > > > > > > > > > > > > > > > > it ignores tiered segments, and > applies > > > > > > > truncation > > > > > > > > > using > > > > > > > > > > > > only > > > > > > > > > > > > > > > > locally > > > > > > > > > > > > > > > > > > > > available information? What happens > when > > > access > > > > > > > to > > > > > > > > > the > > > > > > > > > > > > remote > > > > > > > > > > > > > > > > storage > > > > > > > > > > > > > > > > > > > > is restored? How is the replica > lineage > > > > > > inferred > > > > > > > by > > > > > > > > > the > > > > > > > > > > > > > remote > > > > > > > > > > > > > > > > leader > > > > > > > > > > > > > > > > > > > > epochs reconciled with the follower's > > > replica > > > > > > > > > lineage, > > > > > > > > > > > > which > > > > > > > > > > > > > > has > > > > > > > > > > > > > > > > > > > > evolved? Does the follower remember > > > fetching > > > > > > > > > auxiliary > > > > > > > > > > > > states > > > > > > > > > > > > > > > > failed > > > > > > > > > > > > > > > > > > > > in the past and attempt > reconciliation? > > > Is > > > > > > there > > > > > > > a > > > > > > > > > plan > > > > > > > > > > > to > > > > > > > > > > > > > > offer > > > > > > > > > > > > > > > > > > > > different strategies in this > scenario, > > > > > > > configurable > > > > > > > > > via > > > > > > > > > > > > > > > > > configuration? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 900.e Is the leader epoch cache > > > offloaded with > > > > > > > > every > > > > > > > > > > > > segment? > > > > > > > > > > > > > > Or > > > > > > > > > > > > > > > > when > > > > > > > > > > > > > > > > > > > > a new checkpoint is detected? If that > > > > > > information > > > > > > > > is > > > > > > > > > not > > > > > > > > > > > > > always > > > > > > > > > > > > > > > > > > > > offloaded to avoid duplicating data, > how > > > does > > > > > > the > > > > > > > > > remote > > > > > > > > > > > > > > storage > > > > > > > > > > > > > > > > > > > > satisfy the request to retrieve it? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 900.f Since the leader epoch cache > > > covers the > > > > > > > > entire > > > > > > > > > > > > replica > > > > > > > > > > > > > > > > lineage, > > > > > > > > > > > > > > > > > > > > what happens if, after a leader epoch > > > cache > > > > > > file > > > > > > > is > > > > > > > > > > > > offloaded > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > a > > > > > > > > > > > > > > > > > > > > given segment, the local epoch cache > is > > > > > > truncated > > > > > > > > > [not > > > > > > > > > > > > > > necessarily > > > > > > > > > > > > > > > > > for > > > > > > > > > > > > > > > > > > > > a range of offset included in tiered > > > segments]? > > > > > > > How > > > > > > > > > are > > > > > > > > > > > > > remote > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > > local leader epoch caches kept > > > consistent? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 900.g Consumer can also use leader > > > epochs (e.g. > > > > > > > to > > > > > > > > > enable > > > > > > > > > > > > > > fencing > > > > > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > > > > protect against stale leaders). What > > > > > > differences > > > > > > > > > would > > > > > > > > > > > > there > > > > > > > > > > > > > be > > > > > > > > > > > > > > > > > > > > between consumer and follower > fetches? > > > > > > > Especially, > > > > > > > > > would > > > > > > > > > > > > > > consumers > > > > > > > > > > > > > > > > > > > > also fetch leader epoch information > from > > > the > > > > > > > remote > > > > > > > > > > > > storage? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 900.h Assume a newly elected leader > of a > > > > > > > > > topic-partition > > > > > > > > > > > > > > detects > > > > > > > > > > > > > > > > more > > > > > > > > > > > > > > > > > > > > recent segments are available in the > > > external > > > > > > > > > storage, > > > > > > > > > > > with > > > > > > > > > > > > > > epochs > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > its local epoch. Does it ignore these > > > segments > > > > > > > and > > > > > > > > > their > > > > > > > > > > > > > > associated > > > > > > > > > > > > > > > > > > > > epoch-to-offset vectors? Or try to > > > reconstruct > > > > > > > its > > > > > > > > > local > > > > > > > > > > > > > > replica > > > > > > > > > > > > > > > > > > > > lineage based on the data remotely > > > available? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > Alexandre > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://docs.google.com/document/d/18tnobSas3mKFZFr8oRguZoj_tkD_sGzivuLRlMloEMs/edit?usp=sharing > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Le jeu. 4 juin 2020 à 19:55, Satish > > > Duggana < > > > > > > > > > > > > > > > > > satish.dugg...@gmail.com> > > > > > > > > > > > > > > > > > > > a écrit : > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Jun, > > > > > > > > > > > > > > > > > > > > > Please let us know if you have any > > > comments > > > > > > on > > > > > > > > > > > > > "transactional > > > > > > > > > > > > > > > > > > support" > > > > > > > > > > > > > > > > > > > > > and "follower requests/replication" > > > mentioned > > > > > > > in > > > > > > > > > the > > > > > > > > > > > > wiki. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > > > > Satish. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Jun 2, 2020 at 9:25 PM > Satish > > > > > > Duggana < > > > > > > > > > > > > > > > > > > > satish.dugg...@gmail.com> wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks Jun for your comments. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >100. It would be useful to > provide > > > more > > > > > > > > details > > > > > > > > > on > > > > > > > > > > > how > > > > > > > > > > > > > > those > > > > > > > > > > > > > > > > > apis > > > > > > > > > > > > > > > > > > > are used. Otherwise, it's kind of hard > to > > > really > > > > > > > > assess > > > > > > > > > > > > whether > > > > > > > > > > > > > > the > > > > > > > > > > > > > > > > new > > > > > > > > > > > > > > > > > > > apis are sufficient/redundant. A few > > > examples > > > > > > > below. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We will update the wiki and let > you > > > know. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >100.1 deleteRecords seems to > only > > > advance > > > > > > > the > > > > > > > > > > > > > > logStartOffset > > > > > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > > > Log. How does that trigger the > deletion of > > > remote > > > > > > > log > > > > > > > > > > > > segments? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > RLMTask for leader partition > > > periodically > > > > > > > > checks > > > > > > > > > > > > whether > > > > > > > > > > > > > > there > > > > > > > > > > > > > > > > > are > > > > > > > > > > > > > > > > > > > > > > remote log segments earlier to > > > > > > logStartOffset > > > > > > > > > and the > > > > > > > > > > > > > > > > respective > > > > > > > > > > > > > > > > > > > > > > remote log segment metadata and > data > > > are > > > > > > > > deleted > > > > > > > > > by > > > > > > > > > > > > using > > > > > > > > > > > > > > RLMM > > > > > > > > > > > > > > > > > and > > > > > > > > > > > > > > > > > > > > > > RSM. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >100.2 stopReplica with deletion > is > > > used > > > > > > in 2 > > > > > > > > > cases > > > > > > > > > > > (a) > > > > > > > > > > > > > > replica > > > > > > > > > > > > > > > > > > > reassignment; (b) topic deletion. We > only > > > want to > > > > > > > > > delete > > > > > > > > > > > the > > > > > > > > > > > > > > tiered > > > > > > > > > > > > > > > > > > > metadata in the second case. Also, in > the > > > second > > > > > > > > case, > > > > > > > > > who > > > > > > > >