Hi all,

A.1 As commented on PR 7561, S3 consistency model [1][2] implies RSM cannot
relies solely on S3 APIs to guarantee the expected strong consistency. The
proposed implementation [3] would need to be updated to take this into
account. Let’s talk more about this.

A.2 Contract for the RSM API (an API call is loosely defined as an
“operation” here):

A.2.1 The KIP mentions “*If the process of a topic-partition is failed due
to remote storage error, its scheduled processing time is set to ( now() +
rlm_retry_interval_ms ). rlm_retry_interval_ms can be configured in broker
config file.*”. Do you still plan to implement such retries?

A.2.2 Idempotency – Are operations exposed by the RSM idempotent? What is
the risk of an operation being retried with the same input (irrespective of
the state of the remote storage)? If the same successful operation is
retried with the same input, should an error be propagated? How would this
error be discriminated from I/O or other type of failures?

A.2.3 Atomicity – what does an implementation of RSM need to provide with
respect to atomicity of the APIs copyLogSegment, cleanupLogUntil and
deleteTopicPartition? If a partial failure happens in any of those (e.g. in
the S3 implementation, if one of the multiple uploads fails [4]), what
guarantees are to be provided to the RLM on the state of the remote
storage, and what if it is left in an unconsistent state? In case the
operation is meant to be retried from RLM, this means RSM is expected to
recover from partial failures? What if an unrecoverable failure affects the
RSM? In the RLMTask an exception is logged [5] but it seems the task
continues to be scheduled -> is there a mode where a topic partition stops
to be transferred to the remote storage?

A.2.4 Consistency – already discussed.

A.2.5 Failure modes – currently RSM propagates failures as IOExceptions.
Wouldn’t we need a slightly different contract for the RSM? As opposed to
the I/O errors which Kafka handles in its innermost layers when accessing
the file system, should the implementations of RSM deal with low-level
errors and retries (wherever it can), and not expose them via its API?
Since the RLM is agnostic of the implementation behind the RSM, it is
virtually impossible to know how to deal with an I/O type of exception
without prior assumptions on the implementation of RSM exercised.

A.3 Caching – storing locally the segments retrieved from the remote
storage is excluded as it does not align with the original intent and even
defeat some of its purposes (save disk space etc.). That said, could there
be other types of use cases where the pattern of access to the remotely
stored segments would benefit from local caching (and potentially
read-ahead)? Consider the use case of a large pool of consumers which start
a backfill at the same time for one day worth of data from one year ago
stored remotely. Caching the segments locally would allow to uncouple the
load on the remote storage from the load on the Kafka cluster. Maybe the
RLM could expose a configuration parameter to switch that feature on/off?

[1]
https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel
[2]
https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#Warning_.231:_S3_Consistency_model
[3] https://github.com/harshach/kafka/pull/18
[4]
https://github.com/harshach/kafka/pull/18/files#diff-39e2143514ed06d5d066708309263424R124
[5]
https://github.com/apache/kafka/pull/7561/files#diff-a597bd0c7d627789e73d1fa38eb1abfaR278

Le jeu. 5 déc. 2019 à 15:01, Satish Duggana <satish.dugg...@gmail.com> a
écrit :

> Hi Jun,
> Thanks for your reply.
>
> Currently, `listRemoteSegments` is called at the configured
> interval(not every second, defaults to 30secs). Storing remote log
> metadata in a strongly consistent store for S3 RSM is raised in
> PR-comment[1].
> RLM invokes RSM at regular intervals and RSM can give remote segment
> metadata if it is available. RSM is responsible for maintaining and
> fetching those entries. It should be based on whatever mechanism is
> consistent and efficient with the respective remote storage.
>
> Can you give more details about push based mechanism from RSM?
>
> 1. https://github.com/apache/kafka/pull/7561#discussion_r344576223
>
> Thanks,
> Satish.
>
> On Thu, Dec 5, 2019 at 4:23 AM Jun Rao <j...@confluent.io> wrote:
> >
> > Hi, Harsha,
> >
> > Thanks for the reply.
> >
> > 40/41. I am curious which block storages you have tested. S3 seems to be
> > one of the popular block stores. The concerns that I have with pull based
> > approach are the following.
> > (a) Cost: S3 list object requests cost $0.005 per 1000 requests. If you
> > have 100,000 partitions and want to pull the metadata for each partition
> at
> > the rate of 1/sec. It can cost $0.5/sec, which is roughly $40K per day.
> > (b) Semantics: S3 list objects are eventually consistent. So, when you
> do a
> > list object request, there is no guarantee that you can see all uploaded
> > objects. This could impact the correctness of subsequent logics.
> > (c) Efficiency: Blindly pulling metadata when there is no change adds
> > unnecessary overhead in the broker as well as in the block store.
> >
> > So, have you guys tested S3? If so, could you share your experience in
> > terms of cost, semantics and efficiency?
> >
> > Jun
> >
> >
> > On Tue, Dec 3, 2019 at 10:11 PM Harsha Chintalapani <ka...@harsha.io>
> wrote:
> >
> > > Hi Jun,
> > >       Thanks for the reply.
> > >
> > >
> > >
> > > On Tue, Nov 26, 2019 at 3:46 PM, Jun Rao <j...@confluent.io> wrote:
> > >
> > > > Hi, Satish and Ying,
> > > >
> > > > Thanks for the reply.
> > > >
> > > > 40/41. There are two different ways that we can approach this. One is
> > > what
> > > > you said. We can have an opinionated way of storing and populating
> the
> > > tier
> > > > metadata that we think is good enough for everyone. I am not sure if
> this
> > > > is the case based on what's currently proposed in the KIP. For
> example, I
> > > > am not sure that (1) everyone always needs local metadata; (2) the
> > > current
> > > > local storage format is general enough and (3) everyone wants to use
> the
> > > > pull based approach to propagate the metadata. Another approach is to
> > > make
> > > > this pluggable and let the implementor implements the best approach
> for a
> > > > particular block storage. I haven't seen any comments from
> Slack/AirBnb
> > > in
> > > > the mailing list on this topic. It would be great if they can provide
> > > > feedback directly here.
> > > >
> > >
> > > The current interfaces are designed with most popular block storages
> > > available today  and we did 2 implementations with these interfaces and
> > > they both are yielding good results as we going through the testing of
> it.
> > > If there is ever a need for pull based approach  we can definitely
> evolve
> > > the interface.
> > > In the past we did mark interfaces to be evolving to make room for
> unknowns
> > > in the future.
> > > If you have any suggestions around the current interfaces please
> propose we
> > > are happy to see if we can work them into it.
> > >
> > >
> > > 43. To offer tier storage as a general feature, ideally all existing
> > > > capabilities should still be supported. It's fine if the uber
> > > > implementation doesn't support all capabilities for internal usage.
> > > > However, the framework should be general enough.
> > > >
> > >
> > > We agree on that as a principle. But all of these major features mostly
> > > coming right now and to have a new big feature such as tiered storage
> to
> > > support all the new features will be a big ask. We can document on how
> do
> > > we approach solving these in future iterations.
> > > Our goal is to make this tiered storage feature work for everyone.
> > >
> > > 43.3 This is more than just serving the tier-ed data from block
> storage.
> > > > With KIP-392, the consumer now can resolve the conflicts with the
> replica
> > > > based on leader epoch. So, we need to make sure that leader epoch
> can be
> > > > recovered properly from tier storage.
> > > >
> > >
> > > We are working on testing our approach and we will update the KIP with
> > > design details.
> > >
> > > 43.4 For JBOD, if tier storage stores the tier metadata locally, we
> need to
> > > > support moving such metadata across disk directories since JBOD
> supports
> > > > moving data across disks.
> > > >
> > >
> > > KIP is updated with JBOD details. Having said that JBOD tooling needs
> to
> > > evolve to support production loads. Most of the users will be
> interested in
> > > using tiered storage without JBOD support support on day 1.
> > >
> > > Thanks,
> > > Harsha
> > >
> > > As for meeting, we could have a KIP e-meeting on this if needed, but it
> > > > will be open to everyone and will be recorded and shared. Often, the
> > > > details are still resolved through the mailing list.
> > > >
> > > > Jun
> > > >
> > > > On Tue, Nov 19, 2019 at 6:48 PM Ying Zheng <yi...@uber.com.invalid>
> > > > wrote:
> > > >
> > > >
> > > > Please ignore my previous email
> > > > I didn't know Apache requires all the discussions to be "open"
> > > >
> > > >
> > > > On Tue, Nov 19, 2019, 5:40 PM Ying Zheng <yi...@uber.com> wrote:
> > > >
> > > > Hi Jun,
> > > >
> > > > Thank you very much for your feedback!
> > > >
> > > > Can we schedule a meeting in your Palo Alto office in December? I
> think a
> > > > face to face discussion is much more efficient than emails. Both
> Harsha
> > > >
> > > > and
> > > >
> > > > I can visit you. Satish may be able to join us remotely.
> > > >
> > > > On Fri, Nov 15, 2019 at 11:04 AM Jun Rao <j...@confluent.io> wrote:
> > > >
> > > > Hi, Satish and Harsha,
> > > >
> > > > The following is a more detailed high level feedback for the KIP.
> > > >
> > > > Overall,
> > > >
> > > > the KIP seems useful. The challenge is how to design it such that
> it’s
> > > > general enough to support different ways of implementing this feature
> > > >
> > > > and
> > > >
> > > > support existing features.
> > > >
> > > > 40. Local segment metadata storage: The KIP makes the assumption that
> > > >
> > > > the
> > > >
> > > > metadata for the archived log segments are cached locally in every
> > > >
> > > > broker
> > > >
> > > > and provides a specific implementation for the local storage in the
> > > > framework. We probably should discuss this more. For example, some
> tier
> > > > storage providers may not want to cache the metadata locally and just
> > > >
> > > > rely
> > > >
> > > >
> > > > upon a remote key/value store if such a store is already present. If
> a
> > > > local store is used, there could be different ways of implementing it
> > > > (e.g., based on customized local files, an embedded local store like
> > > > RocksDB, etc). An alternative of designing this is to just provide an
> > > > interface for retrieving the tier segment metadata and leave the
> details
> > > of
> > > > how to get the metadata outside of the framework.
> > > >
> > > >
> > > > 41. RemoteStorageManager interface and the usage of the interface in
> the
> > > > framework: I am not sure if the interface is general enough. For
> > > >
> > > > example,
> > > >
> > > > it seems that RemoteLogIndexEntry is tied to a specific way of
> storing
> > > >
> > > > the
> > > >
> > > > metadata in remote storage. The framework uses listRemoteSegments()
> api
> > > >
> > > > in
> > > >
> > > >
> > > > a pull based approach. However, in some other implementations, a push
> > > > based
> > > > approach may be more preferred. I don’t have a concrete proposal yet.
> > > >
> > > >
> > > > But,
> > > >
> > > > it would be useful to give this area some more thoughts and see if we
> > > >
> > > > can
> > > >
> > > > make the interface more general.
> > > >
> > > > 42. In the diagram, the RemoteLogManager is side by side with
> > > >
> > > > LogManager.
> > > >
> > > > This KIP only discussed how the fetch request is handled between the
> two
> > > > layer. However, we should also consider how other requests that touch
> > > >
> > > > the
> > > >
> > > > log can be handled. e.g., list offsets by timestamp, delete records,
> > > >
> > > > etc.
> > > >
> > > > Also, in this model, it's not clear which component is responsible
> for
> > > > managing the log start offset. It seems that the log start offset
> could
> > > >
> > > > be
> > > >
> > > > changed by both RemoteLogManager and LogManager.
> > > >
> > > >
> > > > 43. There are quite a few existing features not covered by the KIP.
> It
> > > > would be useful to discuss each of those.
> > > > 43.1 I won’t say that compacted topics are rarely used and always
> small.
> > > > For example, KStreams uses compacted topics for storing the states
> and
> > > > sometimes the size of the topic could be large. While it might be ok
> to
> > > not
> > > > support compacted topics initially, it would be useful to have a high
> > > > level
> > > > idea on how this might be supported down the road so that we don’t
> have
> > > >
> > > >
> > > > to
> > > >
> > > >
> > > > make incompatible API changes in the future.
> > > > 43.2 We need to discuss how EOS is supported. In particular, how is
> the
> > > > producer state integrated with the remote storage. 43.3 Now that
> KIP-392
> > > > (allow consumers to fetch from closest replica) is implemented, we
> need
> > > to
> > > > discuss how reading from a follower replica is supported with tier
> > > storage.
> > > > 43.4 We need to discuss how JBOD is supported with tier storage.
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Fri, Nov 8, 2019 at 12:06 AM Tom Bentley <tbent...@redhat.com>
> > > >
> > > > wrote:
> > > >
> > > > Thanks for those insights Ying.
> > > >
> > > > On Thu, Nov 7, 2019 at 9:26 PM Ying Zheng <yi...@uber.com.invalid>
> > > >
> > > > wrote:
> > > >
> > > > Thanks, I missed that point. However, there's still a point at
> > > >
> > > > which
> > > >
> > > > the
> > > >
> > > > consumer fetches start getting served from remote storage (even if
> > > >
> > > > that
> > > >
> > > > point isn't as soon as the local log retention time/size). This
> > > >
> > > > represents
> > > >
> > > > a kind of performance cliff edge and what I'm really interested in
> > > >
> > > > is
> > > >
> > > > how
> > > >
> > > > easy it is for a consumer which falls off that cliff to catch up
> > > >
> > > > and so
> > > >
> > > > its
> > > >
> > > > fetches again come from local storage. Obviously this can depend
> > > >
> > > > on
> > > >
> > > > all
> > > >
> > > > sorts of factors (like production rate, consumption rate), so it's
> > > >
> > > > not
> > > >
> > > > guaranteed (just like it's not guaranteed for Kafka today), but
> > > >
> > > > this
> > > >
> > > > would
> > > >
> > > > represent a new failure mode.
> > > >
> > > >
> > > > As I have explained in the last mail, it's a very rare case that a
> > > > consumer
> > > > need to read remote data. With our experience at Uber, this only
> > > >
> > > >
> > > > happens
> > > >
> > > > when the consumer service had an outage for several hours.
> > > >
> > > > There is not a "performance cliff" as you assume. The remote storage
> > > >
> > > > is
> > > >
> > > > even faster than local disks in terms of bandwidth. Reading from
> > > >
> > > > remote
> > > >
> > > > storage is going to have higher latency than local disk. But since
> > > >
> > > > the
> > > >
> > > >
> > > > consumer
> > > > is catching up several hours data, it's not sensitive to the
> > > >
> > > >
> > > > sub-second
> > > >
> > > >
> > > > level
> > > > latency, and each remote read request will read a large amount of
> > > >
> > > >
> > > > data to
> > > >
> > > > make the overall performance better than reading from local disks.
> > > >
> > > > Another aspect I'd like to understand better is the effect of
> > > >
> > > > serving
> > > >
> > > > fetch
> > > >
> > > > request from remote storage has on the broker's network
> > > >
> > > > utilization. If
> > > >
> > > > we're just trimming the amount of data held locally (without
> > > >
> > > > increasing
> > > >
> > > > the
> > > >
> > > > overall local+remote retention), then we're effectively trading
> > > >
> > > > disk
> > > >
> > > > bandwidth for network bandwidth when serving fetch requests from
> > > >
> > > > remote
> > > >
> > > > storage (which I understand to be a good thing, since brokers are
> > > > often/usually disk bound). But if we're increasing the overall
> > > >
> > > > local+remote
> > > >
> > > > retention then it's more likely that network itself becomes the
> > > >
> > > > bottleneck.
> > > >
> > > > I appreciate this is all rather hand wavy, I'm just trying to
> > > >
> > > > understand
> > > >
> > > > how this would affect broker performance, so I'd be grateful for
> > > >
> > > > any
> > > >
> > > > insights you can offer.
> > > >
> > > > Network bandwidth is a function of produce speed, it has nothing to
> > > >
> > > > do
> > > >
> > > > with
> > > >
> > > > remote retention. As long as the data is shipped to remote storage,
> > > >
> > > > you
> > > >
> > > > can
> > > >
> > > > keep the data there for 1 day or 1 year or 100 years, it doesn't
> > > >
> > > > consume
> > > >
> > > >
> > > > any
> > > > network resources.
> > > >
> > > >
> > > >
> > >
>

Reply via email to