Hi All, Thanks for your initial feedback. We updated the KIP. Please take a look and let us know if you have any questions. https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage
Thanks, Harsha On Wed, Feb 6, 2019, at 10:30 AM, Harsha wrote: > Thanks Eno, Adam & Satish for you review and questions. I'll address > these in KIP and update the thread here. > > Thanks, > Harsha > > On Wed, Feb 6, 2019, at 7:09 AM, Satish Duggana wrote: > > Thanks, Harsha for the KIP. It is a good start for tiered storage in > > Kafka. I have a few comments/questions. > > > > It may be good to have a configuration to keep the number of local > > segments instead of keeping only the active segment. This config can > > be exposed at cluster and topic levels with default value as 1. In > > some use cases, few consumers may lag over one segment, it will be > > better to serve from local storage instead of remote storage. > > > > It may be better to keep “remote.log.storage.enable” and respective > > configuration at topic level along with cluster level. It will be > > helpful in environments where few topics are configured with > > local-storage and other topics are configured with remote storage. > > > > Each topic-partition leader pushes its log segments with respective > > index files to remote whenever active log rolls over, it updates the > > remote log index file for the respective remote log segment. The > > second option is to add offset index files also for each segment. It > > can serve consumer fetch requests for old segments from local log > > segment instead of serving directly from the remote log which may > > cause high latencies. There can be different strategies in when the > > remote segment is copied to a local segment. > > > > What is “remote.log.manager.scheduler.interval.ms” config about? > > > > How do followers sync RemoteLogSegmentIndex files? Do they request > > from leader replica? This looks to be important as the failed over > > leader should have RemoteLogSegmentIndex updated and ready to avoid > > high latencies in serving old data stored in remote logs. > > > > Thanks, > > Satish. > > > > On Tue, Feb 5, 2019 at 10:42 PM Ryanne Dolan <ryannedo...@gmail.com> wrote: > > > > > > Thanks Harsha, makes sense. > > > > > > Ryanne > > > > > > On Mon, Feb 4, 2019 at 5:53 PM Harsha Chintalapani <ka...@harsha.io> > > > wrote: > > > > > > > "I think you are saying that this enables additional (potentially > > > > cheaper) > > > > storage options without *requiring* an existing ETL pipeline. “ > > > > Yes. > > > > > > > > " But it's not really a replacement for the sort of pipelines people > > > > build > > > > with Connect, Gobblin etc.” > > > > > > > > It is not. But also making an assumption that everyone runs these > > > > pipelines for storing raw Kafka data into HDFS or S3 is also wrong > > > > assumption. > > > > The aim of this KIP is to provide tiered storage as whole package not > > > > asking users to ship the data on their own using existing ETL, which > > > > means > > > > running a consumer and maintaining those pipelines. > > > > > > > > " My point was that, if you are already offloading records in an ETL > > > > pipeline, why do you need a new pipeline built into the broker to ship > > > > the > > > > same data to the same place?” > > > > > > > > As you said its ETL pipeline, which means users of these pipelines are > > > > reading the data from broker and transforming its state and storing it > > > > somewhere. > > > > The point of this KIP is store log segments as it is without changing > > > > their structure so that we can use the existing offset mechanisms to > > > > look > > > > it up when the consumer needs to read old data. When you do load it via > > > > your existing pipelines you are reading the topic as a whole , which > > > > doesn’t guarantee that you’ll produce this data back into HDFS in S3 in > > > > the > > > > same order and who is going to generate the Index files again. > > > > > > > > > > > > "So you'd end up with one of 1)cold segments are only useful to Kafka; > > > > 2) > > > > you have the same data written to HDFS/etc twice, once for Kafka and > > > > once > > > > for everything else, in two separate formats” > > > > > > > > You are talking two different use cases. If someone is storing raw data > > > > out of Kafka for long term access. > > > > By storing the data as it is in HDFS though Kafka will solve this issue. > > > > They do not need to run another pipe-line to ship these logs. > > > > > > > > If they are running pipelines to store in HDFS in a different format, > > > > thats a different use case. May be they are transforming Kafka logs to > > > > ORC > > > > so that they can query through Hive. Once you transform the log > > > > segment it > > > > does loose its ability to use the existing offset index. > > > > Main objective here not to change the existing protocol and still be > > > > able > > > > to write and read logs from remote storage. > > > > > > > > > > > > -Harsha > > > > > > > > On Feb 4, 2019, 2:53 PM -0800, Ryanne Dolan <ryannedo...@gmail.com>, > > > > wrote: > > > > > Thanks Harsha, makes sense for the most part. > > > > > > > > > > > tiered storage is to get away from this and make this transparent to > > > > the > > > > > user > > > > > > > > > > I think you are saying that this enables additional (potentially > > > > > cheaper) > > > > > storage options without *requiring* an existing ETL pipeline. But it's > > > > not > > > > > really a replacement for the sort of pipelines people build with > > > > > Connect, > > > > > Gobblin etc. My point was that, if you are already offloading records > > > > > in > > > > an > > > > > ETL pipeline, why do you need a new pipeline built into the broker to > > > > ship > > > > > the same data to the same place? I think in most cases this will be an > > > > > additional pipeline, not a replacement, because the segments written > > > > > to > > > > > cold storage won't be useful outside Kafka. So you'd end up with one > > > > > of > > > > 1) > > > > > cold segments are only useful to Kafka; 2) you have the same data > > > > > written > > > > > to HDFS/etc twice, once for Kafka and once for everything else, in two > > > > > separate formats; 3) you use your existing ETL pipeline and read cold > > > > data > > > > > directly. > > > > > > > > > > To me, an ideal solution would let me spool segments from Kafka to any > > > > sink > > > > > I would like, and then let Kafka clients seamlessly access that cold > > > > data. > > > > > Today I can do that in the client, but ideally the broker would do it > > > > > for > > > > > me via some HDFS/Hive/S3 plugin. The KIP seems to accomplish that -- > > > > > just > > > > > without leveraging anything I've currently got in place. > > > > > > > > > > Ryanne > > > > > > > > > > On Mon, Feb 4, 2019 at 3:34 PM Harsha <ka...@harsha.io> wrote: > > > > > > > > > > > Hi Eric, > > > > > > Thanks for your questions. Answers are in-line > > > > > > > > > > > > "The high-level design seems to indicate that all of the logic for > > > > when and > > > > > > how to copy log segments to remote storage lives in the RLM class. > > > > > > The > > > > > > default implementation is then HDFS specific with additional > > > > > > implementations being left to the community. This seems like it > > > > > > would > > > > > > require anyone implementing a new RLM to also re-implement the logic > > > > for > > > > > > when to ship data to remote storage." > > > > > > > > > > > > RLM will be responsible for shipping log segments and it will decide > > > > when > > > > > > a log segment is ready to be shipped over. > > > > > > Once a Log Segement(s) are identified as rolled over, RLM will > > > > > > delegate > > > > > > this responsibility to a pluggable remote storage implementation. > > > > Users who > > > > > > are looking add their own implementation to enable other storages > > > > > > all > > > > they > > > > > > need to do is to implement the copy and read mechanisms and not to > > > > > > re-implement RLM itself. > > > > > > > > > > > > > > > > > > "Would it not be better for the Remote Log Manager implementation > > > > > > to be > > > > > > non-configurable, and instead have an interface for the remote > > > > > > storage > > > > > > layer? That way the "when" of the logic is consistent across all > > > > > > implementations and it's only a matter of "how," similar to how the > > > > Streams > > > > > > StateStores are managed." > > > > > > > > > > > > It's possible that we can RLM non-configurable. But for the initial > > > > > > release and to keep the backward compatibility > > > > > > we want to make this configurable and for any users who might not be > > > > > > interested in having the LogSegments shipped to remote, they don't > > > > need to > > > > > > worry about this. > > > > > > > > > > > > > > > > > > Hi Ryanne, > > > > > > Thanks for your questions. > > > > > > > > > > > > "How could this be used to leverage fast key-value stores, e.g. > > > > Couchbase, > > > > > > which can serve individual records but maybe not entire segments? Or > > > > is the > > > > > > idea to only support writing and fetching entire segments? Would it > > > > make > > > > > > sense to support both?" > > > > > > > > > > > > LogSegment once its rolled over are immutable objects and we want to > > > > keep > > > > > > the current structure of LogSegments and corresponding Index files. > > > > > > It > > > > will > > > > > > be easy to copy the whole segment as it is, instead of re-reading > > > > > > each > > > > file > > > > > > and use a key/value store. > > > > > > > > > > > > " > > > > > > - Instead of defining a new interface and/or mechanism to ETL > > > > > > segment > > > > files > > > > > > from brokers to cold storage, can we just leverage Kafka itself? In > > > > > > particular, we can already ETL records to HDFS via Kafka Connect, > > > > Gobblin > > > > > > etc -- we really just need a way for brokers to read these records > > > > back. > > > > > > I'm wondering whether the new API could be limited to the fetch, and > > > > then > > > > > > existing ETL pipelines could be more easily leveraged. For example, > > > > > > if > > > > you > > > > > > already have an ETL pipeline from Kafka to HDFS, you could leave > > > > > > that > > > > in > > > > > > place and just tell Kafka how to read these records/segments from > > > > > > cold > > > > > > storage when necessary." > > > > > > > > > > > > This is pretty much what everyone does and it has the additional > > > > overhead > > > > > > of keeping these pipelines operating and monitoring. > > > > > > What's proposed in the KIP is not ETL. It's just looking a the logs > > > > that > > > > > > are written and rolled over to copy the file as it is. > > > > > > Each new topic needs to be added (sure we can do so via wildcard or > > > > > > another mechanism) but new topics need to be onboard to ship the > > > > > > data > > > > into > > > > > > remote storage through a traditional ETL pipeline. > > > > > > Once the data lands somewhere like HDFS/HIVE etc.. Users need to > > > > > > write > > > > > > another processing line to re-process this data similar to how they > > > > > > are > > > > > > doing it in their Stream processing pipelines. Tiered storage is to > > > > > > get > > > > > > away from this and make this transparent to the user. They don't > > > > > > need > > > > to > > > > > > run another ETL process to ship the logs. > > > > > > > > > > > > "I'm wondering if we could just add support for loading segments > > > > > > from > > > > > > remote URIs instead of from file, i.e. via plugins for s3://, > > > > > > hdfs:// > > > > etc. > > > > > > I suspect less broker logic would change in that case -- the broker > > > > > > wouldn't necessarily care if it reads from file:// or s3:// to load > > > > > > a > > > > given > > > > > > segment." > > > > > > > > > > > > Yes, this is what we are discussing in KIP. We are leaving the > > > > > > details > > > > of > > > > > > loading segments to RLM read part instead of directly exposing this > > > > > > in > > > > the > > > > > > Broker. This way we can keep the current Kafka code as it is without > > > > > > changing the assumptions around the local disk. Let the RLM handle > > > > > > the > > > > > > remote storage part. > > > > > > > > > > > > > > > > > > Thanks, > > > > > > Harsha > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 4, 2019, at 12:54 PM, Ryanne Dolan wrote: > > > > > > > Harsha, Sriharsha, Suresh, a couple thoughts: > > > > > > > > > > > > > > - How could this be used to leverage fast key-value stores, e.g. > > > > > > Couchbase, > > > > > > > which can serve individual records but maybe not entire segments? > > > > > > > Or > > > > is > > > > > > the > > > > > > > idea to only support writing and fetching entire segments? Would > > > > > > > it > > > > make > > > > > > > sense to support both? > > > > > > > > > > > > > > - Instead of defining a new interface and/or mechanism to ETL > > > > > > > segment > > > > > > files > > > > > > > from brokers to cold storage, can we just leverage Kafka itself? > > > > > > > In > > > > > > > particular, we can already ETL records to HDFS via Kafka Connect, > > > > Gobblin > > > > > > > etc -- we really just need a way for brokers to read these records > > > > back. > > > > > > > I'm wondering whether the new API could be limited to the fetch, > > > > > > > and > > > > then > > > > > > > existing ETL pipelines could be more easily leveraged. For > > > > > > > example, > > > > if > > > > > > you > > > > > > > already have an ETL pipeline from Kafka to HDFS, you could leave > > > > that in > > > > > > > place and just tell Kafka how to read these records/segments from > > > > cold > > > > > > > storage when necessary. > > > > > > > > > > > > > > - I'm wondering if we could just add support for loading segments > > > > from > > > > > > > remote URIs instead of from file, i.e. via plugins for s3://, > > > > > > > hdfs:// > > > > > > etc. > > > > > > > I suspect less broker logic would change in that case -- the > > > > > > > broker > > > > > > > wouldn't necessarily care if it reads from file:// or s3:// to > > > > > > > load a > > > > > > given > > > > > > > segment. > > > > > > > > > > > > > > Combining the previous two comments, I can imagine a URI > > > > > > > resolution > > > > chain > > > > > > > for segments. For example, first try > > > > file:///logs/{topic}/{segment}.log, > > > > > > > then s3://mybucket/{topic}/{date}/{segment}.log, etc, leveraging > > > > > > > your > > > > > > > existing ETL pipeline(s). > > > > > > > > > > > > > > Ryanne > > > > > > > > > > > > > > > > > > > > > On Mon, Feb 4, 2019 at 12:01 PM Harsha <ka...@harsha.io> wrote: > > > > > > > > > > > > > > > Hi All, > > > > > > > > We are interested in adding tiered storage to Kafka. More > > > > > > details > > > > > > > > about motivation and design are in the KIP. We are working > > > > > > > > towards > > > > an > > > > > > > > initial POC. Any feedback or questions on this KIP are welcome. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > Harsha > > > > > > > > > > > > > > > > > > >