Hi Jun,
Thanks for your comments on the separation of remote log metadata
storage and remote log storage.
We had a few discussions since early Jan on how to support eventually
consistent stores like S3 by uncoupling remote log segment metadata
and remote log storage. It is written with details in the doc here(1).
Below is the brief summary of the discussion from that doc.

The current approach consists of pulling the remote log segment
metadata from remote log storage APIs. It worked fine for storages
like HDFS. But one of the problems of relying on the remote storage to
maintain metadata is that tiered-storage needs to be strongly
consistent, with an impact not only on the metadata(e.g. LIST in S3)
but also on the segment data(e.g. GET after a DELETE in S3). The cost
of maintaining metadata in remote storage needs to be factored in.
This is true in the case of S3, LIST APIs incur huge costs as you
raised earlier.
So, it is good to separate the remote storage from the remote log
metadata store. We refactored the existing RemoteStorageManager and
introduced RemoteLogMetadataManager. Remote log metadata store should
give strong consistency semantics but remote log storage can be
eventually consistent.
We can have a default implementation for RemoteLogMetadataManager
which uses an internal topic(as mentioned in one of our earlier
emails) as storage. But users can always plugin their own
RemoteLogMetadataManager implementation based on their environment.

Please go through the updated KIP and let us know your comments. We
have started refactoring for the changes mentioned in the KIP and
there may be a few more updates to the APIs.

[1] 
https://docs.google.com/document/d/1qfkBCWL1e7ZWkHU7brxKDBebq4ie9yK20XJnKbgAlew/edit?ts=5e208ec7#

On Fri, Dec 27, 2019 at 5:43 PM Ivan Yurchenko <ivan0yurche...@gmail.com> wrote:
>
> Hi all,
>
>
> Jun:
> > (a) Cost: S3 list object requests cost $0.005 per 1000 requests. If you
> > have 100,000 partitions and want to pull the metadata for each partition
> at
> > the rate of 1/sec. It can cost $0.5/sec, which is roughly $40K per day.
>
> I want to note here, that no reasonably durable storage will be cheap
> at 100k RPS. For example, DynamoDB might give the same ballpark figures.
> If we want to keep the pull-based approach, we can try to reduce this number
> in several ways: doing listings less frequently (as Satish mentioned,
> with the current defaults it's ~3.33k RPS for your example),
> batching listing operations in some way (depending on the storage;
> it might require the change of RSM's interface).
>
>
> > There are different ways for doing push based metadata propagation. Some
> > object stores may support that already. For example, S3 supports events
> > notification
> This sounds interesting. However, I see a couple of issues using it:
>   1. As I understand the documentation, notification delivery is not
> guaranteed
>     and it's recommended to periodically do LIST to fill the gaps.
>     Which brings us back to the same LIST consistency guarantees issue.
>   2. The same goes for the broker start: to get the current state, we need
> to LIST.
>   3. The dynamic set of multiple consumers (RSMs): AFAIK SQS and SNS aren't
>     designed for such a case.
>
>
> Alexandre:
> > A.1 As commented on PR 7561, S3 consistency model [1][2] implies RSM
> cannot
> > relies solely on S3 APIs to guarantee the expected strong consistency. The
> > proposed implementation [3] would need to be updated to take this into
> > account. Let’s talk more about this.
>
> Thank you for the feedback. I clearly see the need for changing the S3
> implementation
> to provide stronger consistency guarantees. As it see from this thread,
> there are
> several possible approaches to this. Let's discuss RemoteLogManager's
> contract and
> behavior (like pull vs push model) further before picking one (or several -
> ?) of them.
> I'm going to do some evaluation of DynamoDB for the pull-based approach,
> if it's possible to apply it paying a reasonable bill. Also, of the
> push-based approach
> with a Kafka topic as the medium.
>
>
> > A.2.3 Atomicity – what does an implementation of RSM need to provide with
> > respect to atomicity of the APIs copyLogSegment, cleanupLogUntil and
> > deleteTopicPartition? If a partial failure happens in any of those (e.g.
> in
> > the S3 implementation, if one of the multiple uploads fails [4]),
>
> The S3 implementation is going to change, but it's worth clarifying anyway.
> The segment log file is being uploaded after S3 has acked uploading of
> all other files associated with the segment and only after this the whole
> segment file set becomes visible remotely for operations like
> listRemoteSegments [1].
> In case of upload failure, the files that has been successfully uploaded
> stays
> as invisible garbage that is collected by cleanupLogUntil (or overwritten
> successfully later).
> And the opposite happens during the deletion: log files are deleted first.
> This approach should generally work when we solve consistency issues
> by adding a strongly consistent storage: a segment's uploaded files remain
> invisible garbage until some metadata about them is written.
>
>
> > A.3 Caching – storing locally the segments retrieved from the remote
> > storage is excluded as it does not align with the original intent and even
> > defeat some of its purposes (save disk space etc.). That said, could there
> > be other types of use cases where the pattern of access to the remotely
> > stored segments would benefit from local caching (and potentially
> > read-ahead)? Consider the use case of a large pool of consumers which
> start
> > a backfill at the same time for one day worth of data from one year ago
> > stored remotely. Caching the segments locally would allow to uncouple the
> > load on the remote storage from the load on the Kafka cluster. Maybe the
> > RLM could expose a configuration parameter to switch that feature on/off?
>
> I tend to agree here, caching remote segments locally and making
> this configurable sounds pretty practical to me. We should implement this,
> maybe not in the first iteration.
>
>
> Br,
> Ivan
>
> [1]
> https://github.com/harshach/kafka/pull/18/files#diff-4d73d01c16caed6f2548fc3063550ef0R152
>
> On Thu, 19 Dec 2019 at 19:49, Alexandre Dupriez <alexandre.dupr...@gmail.com>
> wrote:
>
> > Hi Jun,
> >
> > Thank you for the feedback. I am trying to understand how a push-based
> > approach would work.
> > In order for the metadata to be propagated (under the assumption you
> > stated), would you plan to add a new API in Kafka to allow the
> > metadata store to send them directly to the brokers?
> >
> > Thanks,
> > Alexandre
> >
> >
> > Le mer. 18 déc. 2019 à 20:14, Jun Rao <j...@confluent.io> a écrit :
> > >
> > > Hi, Satish and Ivan,
> > >
> > > There are different ways for doing push based metadata propagation. Some
> > > object stores may support that already. For example, S3 supports events
> > > notification (
> > > https://docs.aws.amazon.com/AmazonS3/latest/dev/NotificationHowTo.html).
> > > Otherwise one could use a separate metadata store that supports
> > push-based
> > > change propagation. Other people have mentioned using a Kafka topic. The
> > > best approach may depend on the object store and the operational
> > > environment (e.g. whether an external metadata store is already
> > available).
> > >
> > > The above discussion is based on the assumption that we need to cache the
> > > object metadata locally in every broker. I mentioned earlier that an
> > > alternative is to just store/retrieve those metadata in an external
> > > metadata store. That may simplify the implementation in some cases.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Dec 5, 2019 at 7:01 AM Satish Duggana <satish.dugg...@gmail.com>
> > > wrote:
> > >
> > > > Hi Jun,
> > > > Thanks for your reply.
> > > >
> > > > Currently, `listRemoteSegments` is called at the configured
> > > > interval(not every second, defaults to 30secs). Storing remote log
> > > > metadata in a strongly consistent store for S3 RSM is raised in
> > > > PR-comment[1].
> > > > RLM invokes RSM at regular intervals and RSM can give remote segment
> > > > metadata if it is available. RSM is responsible for maintaining and
> > > > fetching those entries. It should be based on whatever mechanism is
> > > > consistent and efficient with the respective remote storage.
> > > >
> > > > Can you give more details about push based mechanism from RSM?
> > > >
> > > > 1. https://github.com/apache/kafka/pull/7561#discussion_r344576223
> > > >
> > > > Thanks,
> > > > Satish.
> > > >
> > > > On Thu, Dec 5, 2019 at 4:23 AM Jun Rao <j...@confluent.io> wrote:
> > > > >
> > > > > Hi, Harsha,
> > > > >
> > > > > Thanks for the reply.
> > > > >
> > > > > 40/41. I am curious which block storages you have tested. S3 seems
> > to be
> > > > > one of the popular block stores. The concerns that I have with pull
> > based
> > > > > approach are the following.
> > > > > (a) Cost: S3 list object requests cost $0.005 per 1000 requests. If
> > you
> > > > > have 100,000 partitions and want to pull the metadata for each
> > partition
> > > > at
> > > > > the rate of 1/sec. It can cost $0.5/sec, which is roughly $40K per
> > day.
> > > > > (b) Semantics: S3 list objects are eventually consistent. So, when
> > you
> > > > do a
> > > > > list object request, there is no guarantee that you can see all
> > uploaded
> > > > > objects. This could impact the correctness of subsequent logics.
> > > > > (c) Efficiency: Blindly pulling metadata when there is no change adds
> > > > > unnecessary overhead in the broker as well as in the block store.
> > > > >
> > > > > So, have you guys tested S3? If so, could you share your experience
> > in
> > > > > terms of cost, semantics and efficiency?
> > > > >
> > > > > Jun
> > > > >
> > > > >
> > > > > On Tue, Dec 3, 2019 at 10:11 PM Harsha Chintalapani <ka...@harsha.io
> > >
> > > > wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >       Thanks for the reply.
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Tue, Nov 26, 2019 at 3:46 PM, Jun Rao <j...@confluent.io> wrote:
> > > > > >
> > > > > > > Hi, Satish and Ying,
> > > > > > >
> > > > > > > Thanks for the reply.
> > > > > > >
> > > > > > > 40/41. There are two different ways that we can approach this.
> > One is
> > > > > > what
> > > > > > > you said. We can have an opinionated way of storing and
> > populating
> > > > the
> > > > > > tier
> > > > > > > metadata that we think is good enough for everyone. I am not
> > sure if
> > > > this
> > > > > > > is the case based on what's currently proposed in the KIP. For
> > > > example, I
> > > > > > > am not sure that (1) everyone always needs local metadata; (2)
> > the
> > > > > > current
> > > > > > > local storage format is general enough and (3) everyone wants to
> > use
> > > > the
> > > > > > > pull based approach to propagate the metadata. Another approach
> > is to
> > > > > > make
> > > > > > > this pluggable and let the implementor implements the best
> > approach
> > > > for a
> > > > > > > particular block storage. I haven't seen any comments from
> > > > Slack/AirBnb
> > > > > > in
> > > > > > > the mailing list on this topic. It would be great if they can
> > provide
> > > > > > > feedback directly here.
> > > > > > >
> > > > > >
> > > > > > The current interfaces are designed with most popular block
> > storages
> > > > > > available today  and we did 2 implementations with these
> > interfaces and
> > > > > > they both are yielding good results as we going through the
> > testing of
> > > > it.
> > > > > > If there is ever a need for pull based approach  we can definitely
> > > > evolve
> > > > > > the interface.
> > > > > > In the past we did mark interfaces to be evolving to make room for
> > > > unknowns
> > > > > > in the future.
> > > > > > If you have any suggestions around the current interfaces please
> > > > propose we
> > > > > > are happy to see if we can work them into it.
> > > > > >
> > > > > >
> > > > > > 43. To offer tier storage as a general feature, ideally all
> > existing
> > > > > > > capabilities should still be supported. It's fine if the uber
> > > > > > > implementation doesn't support all capabilities for internal
> > usage.
> > > > > > > However, the framework should be general enough.
> > > > > > >
> > > > > >
> > > > > > We agree on that as a principle. But all of these major features
> > mostly
> > > > > > coming right now and to have a new big feature such as tiered
> > storage
> > > > to
> > > > > > support all the new features will be a big ask. We can document on
> > how
> > > > do
> > > > > > we approach solving these in future iterations.
> > > > > > Our goal is to make this tiered storage feature work for everyone.
> > > > > >
> > > > > > 43.3 This is more than just serving the tier-ed data from block
> > > > storage.
> > > > > > > With KIP-392, the consumer now can resolve the conflicts with the
> > > > replica
> > > > > > > based on leader epoch. So, we need to make sure that leader epoch
> > > > can be
> > > > > > > recovered properly from tier storage.
> > > > > > >
> > > > > >
> > > > > > We are working on testing our approach and we will update the KIP
> > with
> > > > > > design details.
> > > > > >
> > > > > > 43.4 For JBOD, if tier storage stores the tier metadata locally, we
> > > > need to
> > > > > > > support moving such metadata across disk directories since JBOD
> > > > supports
> > > > > > > moving data across disks.
> > > > > > >
> > > > > >
> > > > > > KIP is updated with JBOD details. Having said that JBOD tooling
> > needs
> > > > to
> > > > > > evolve to support production loads. Most of the users will be
> > > > interested in
> > > > > > using tiered storage without JBOD support support on day 1.
> > > > > >
> > > > > > Thanks,
> > > > > > Harsha
> > > > > >
> > > > > > As for meeting, we could have a KIP e-meeting on this if needed,
> > but it
> > > > > > > will be open to everyone and will be recorded and shared. Often,
> > the
> > > > > > > details are still resolved through the mailing list.
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Tue, Nov 19, 2019 at 6:48 PM Ying Zheng
> > <yi...@uber.com.invalid>
> > > > > > > wrote:
> > > > > > >
> > > > > > >
> > > > > > > Please ignore my previous email
> > > > > > > I didn't know Apache requires all the discussions to be "open"
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Nov 19, 2019, 5:40 PM Ying Zheng <yi...@uber.com> wrote:
> > > > > > >
> > > > > > > Hi Jun,
> > > > > > >
> > > > > > > Thank you very much for your feedback!
> > > > > > >
> > > > > > > Can we schedule a meeting in your Palo Alto office in December? I
> > > > think a
> > > > > > > face to face discussion is much more efficient than emails. Both
> > > > Harsha
> > > > > > >
> > > > > > > and
> > > > > > >
> > > > > > > I can visit you. Satish may be able to join us remotely.
> > > > > > >
> > > > > > > On Fri, Nov 15, 2019 at 11:04 AM Jun Rao <j...@confluent.io>
> > wrote:
> > > > > > >
> > > > > > > Hi, Satish and Harsha,
> > > > > > >
> > > > > > > The following is a more detailed high level feedback for the KIP.
> > > > > > >
> > > > > > > Overall,
> > > > > > >
> > > > > > > the KIP seems useful. The challenge is how to design it such that
> > > > it’s
> > > > > > > general enough to support different ways of implementing this
> > feature
> > > > > > >
> > > > > > > and
> > > > > > >
> > > > > > > support existing features.
> > > > > > >
> > > > > > > 40. Local segment metadata storage: The KIP makes the assumption
> > that
> > > > > > >
> > > > > > > the
> > > > > > >
> > > > > > > metadata for the archived log segments are cached locally in
> > every
> > > > > > >
> > > > > > > broker
> > > > > > >
> > > > > > > and provides a specific implementation for the local storage in
> > the
> > > > > > > framework. We probably should discuss this more. For example,
> > some
> > > > tier
> > > > > > > storage providers may not want to cache the metadata locally and
> > just
> > > > > > >
> > > > > > > rely
> > > > > > >
> > > > > > >
> > > > > > > upon a remote key/value store if such a store is already
> > present. If
> > > > a
> > > > > > > local store is used, there could be different ways of
> > implementing it
> > > > > > > (e.g., based on customized local files, an embedded local store
> > like
> > > > > > > RocksDB, etc). An alternative of designing this is to just
> > provide an
> > > > > > > interface for retrieving the tier segment metadata and leave the
> > > > details
> > > > > > of
> > > > > > > how to get the metadata outside of the framework.
> > > > > > >
> > > > > > >
> > > > > > > 41. RemoteStorageManager interface and the usage of the
> > interface in
> > > > the
> > > > > > > framework: I am not sure if the interface is general enough. For
> > > > > > >
> > > > > > > example,
> > > > > > >
> > > > > > > it seems that RemoteLogIndexEntry is tied to a specific way of
> > > > storing
> > > > > > >
> > > > > > > the
> > > > > > >
> > > > > > > metadata in remote storage. The framework uses
> > listRemoteSegments()
> > > > api
> > > > > > >
> > > > > > > in
> > > > > > >
> > > > > > >
> > > > > > > a pull based approach. However, in some other implementations, a
> > push
> > > > > > > based
> > > > > > > approach may be more preferred. I don’t have a concrete proposal
> > yet.
> > > > > > >
> > > > > > >
> > > > > > > But,
> > > > > > >
> > > > > > > it would be useful to give this area some more thoughts and see
> > if we
> > > > > > >
> > > > > > > can
> > > > > > >
> > > > > > > make the interface more general.
> > > > > > >
> > > > > > > 42. In the diagram, the RemoteLogManager is side by side with
> > > > > > >
> > > > > > > LogManager.
> > > > > > >
> > > > > > > This KIP only discussed how the fetch request is handled between
> > the
> > > > two
> > > > > > > layer. However, we should also consider how other requests that
> > touch
> > > > > > >
> > > > > > > the
> > > > > > >
> > > > > > > log can be handled. e.g., list offsets by timestamp, delete
> > records,
> > > > > > >
> > > > > > > etc.
> > > > > > >
> > > > > > > Also, in this model, it's not clear which component is
> > responsible
> > > > for
> > > > > > > managing the log start offset. It seems that the log start offset
> > > > could
> > > > > > >
> > > > > > > be
> > > > > > >
> > > > > > > changed by both RemoteLogManager and LogManager.
> > > > > > >
> > > > > > >
> > > > > > > 43. There are quite a few existing features not covered by the
> > KIP.
> > > > It
> > > > > > > would be useful to discuss each of those.
> > > > > > > 43.1 I won’t say that compacted topics are rarely used and always
> > > > small.
> > > > > > > For example, KStreams uses compacted topics for storing the
> > states
> > > > and
> > > > > > > sometimes the size of the topic could be large. While it might
> > be ok
> > > > to
> > > > > > not
> > > > > > > support compacted topics initially, it would be useful to have a
> > high
> > > > > > > level
> > > > > > > idea on how this might be supported down the road so that we
> > don’t
> > > > have
> > > > > > >
> > > > > > >
> > > > > > > to
> > > > > > >
> > > > > > >
> > > > > > > make incompatible API changes in the future.
> > > > > > > 43.2 We need to discuss how EOS is supported. In particular, how
> > is
> > > > the
> > > > > > > producer state integrated with the remote storage. 43.3 Now that
> > > > KIP-392
> > > > > > > (allow consumers to fetch from closest replica) is implemented,
> > we
> > > > need
> > > > > > to
> > > > > > > discuss how reading from a follower replica is supported with
> > tier
> > > > > > storage.
> > > > > > > 43.4 We need to discuss how JBOD is supported with tier storage.
> > > > > > >
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Fri, Nov 8, 2019 at 12:06 AM Tom Bentley <tbent...@redhat.com
> > >
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > Thanks for those insights Ying.
> > > > > > >
> > > > > > > On Thu, Nov 7, 2019 at 9:26 PM Ying Zheng <yi...@uber.com.invalid
> > >
> > > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > Thanks, I missed that point. However, there's still a point at
> > > > > > >
> > > > > > > which
> > > > > > >
> > > > > > > the
> > > > > > >
> > > > > > > consumer fetches start getting served from remote storage (even
> > if
> > > > > > >
> > > > > > > that
> > > > > > >
> > > > > > > point isn't as soon as the local log retention time/size). This
> > > > > > >
> > > > > > > represents
> > > > > > >
> > > > > > > a kind of performance cliff edge and what I'm really interested
> > in
> > > > > > >
> > > > > > > is
> > > > > > >
> > > > > > > how
> > > > > > >
> > > > > > > easy it is for a consumer which falls off that cliff to catch up
> > > > > > >
> > > > > > > and so
> > > > > > >
> > > > > > > its
> > > > > > >
> > > > > > > fetches again come from local storage. Obviously this can depend
> > > > > > >
> > > > > > > on
> > > > > > >
> > > > > > > all
> > > > > > >
> > > > > > > sorts of factors (like production rate, consumption rate), so
> > it's
> > > > > > >
> > > > > > > not
> > > > > > >
> > > > > > > guaranteed (just like it's not guaranteed for Kafka today), but
> > > > > > >
> > > > > > > this
> > > > > > >
> > > > > > > would
> > > > > > >
> > > > > > > represent a new failure mode.
> > > > > > >
> > > > > > >
> > > > > > > As I have explained in the last mail, it's a very rare case that
> > a
> > > > > > > consumer
> > > > > > > need to read remote data. With our experience at Uber, this only
> > > > > > >
> > > > > > >
> > > > > > > happens
> > > > > > >
> > > > > > > when the consumer service had an outage for several hours.
> > > > > > >
> > > > > > > There is not a "performance cliff" as you assume. The remote
> > storage
> > > > > > >
> > > > > > > is
> > > > > > >
> > > > > > > even faster than local disks in terms of bandwidth. Reading from
> > > > > > >
> > > > > > > remote
> > > > > > >
> > > > > > > storage is going to have higher latency than local disk. But
> > since
> > > > > > >
> > > > > > > the
> > > > > > >
> > > > > > >
> > > > > > > consumer
> > > > > > > is catching up several hours data, it's not sensitive to the
> > > > > > >
> > > > > > >
> > > > > > > sub-second
> > > > > > >
> > > > > > >
> > > > > > > level
> > > > > > > latency, and each remote read request will read a large amount of
> > > > > > >
> > > > > > >
> > > > > > > data to
> > > > > > >
> > > > > > > make the overall performance better than reading from local
> > disks.
> > > > > > >
> > > > > > > Another aspect I'd like to understand better is the effect of
> > > > > > >
> > > > > > > serving
> > > > > > >
> > > > > > > fetch
> > > > > > >
> > > > > > > request from remote storage has on the broker's network
> > > > > > >
> > > > > > > utilization. If
> > > > > > >
> > > > > > > we're just trimming the amount of data held locally (without
> > > > > > >
> > > > > > > increasing
> > > > > > >
> > > > > > > the
> > > > > > >
> > > > > > > overall local+remote retention), then we're effectively trading
> > > > > > >
> > > > > > > disk
> > > > > > >
> > > > > > > bandwidth for network bandwidth when serving fetch requests from
> > > > > > >
> > > > > > > remote
> > > > > > >
> > > > > > > storage (which I understand to be a good thing, since brokers are
> > > > > > > often/usually disk bound). But if we're increasing the overall
> > > > > > >
> > > > > > > local+remote
> > > > > > >
> > > > > > > retention then it's more likely that network itself becomes the
> > > > > > >
> > > > > > > bottleneck.
> > > > > > >
> > > > > > > I appreciate this is all rather hand wavy, I'm just trying to
> > > > > > >
> > > > > > > understand
> > > > > > >
> > > > > > > how this would affect broker performance, so I'd be grateful for
> > > > > > >
> > > > > > > any
> > > > > > >
> > > > > > > insights you can offer.
> > > > > > >
> > > > > > > Network bandwidth is a function of produce speed, it has nothing
> > to
> > > > > > >
> > > > > > > do
> > > > > > >
> > > > > > > with
> > > > > > >
> > > > > > > remote retention. As long as the data is shipped to remote
> > storage,
> > > > > > >
> > > > > > > you
> > > > > > >
> > > > > > > can
> > > > > > >
> > > > > > > keep the data there for 1 day or 1 year or 100 years, it doesn't
> > > > > > >
> > > > > > > consume
> > > > > > >
> > > > > > >
> > > > > > > any
> > > > > > > network resources.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > >
> > > >
> >

Reply via email to