one more example of complicated config - mirror maker. we definitely cant trust each and every topic owner to configure their topics not to purge before they've been mirrored. which would mean there's a per-topic config (set by the owner) and a "global" config (where mirror makers are specified) and they need to be "merged". for those topics that _are_ mirrored. which is a changing set of topics thats stored in an external system outside of kafka. if a topic is taken out of the mirror set the MM offset would be "frozen" at that point and prevent clean-up for all eternity, unless its cleaned-up itself.
... complexity :-) On Wed, Jan 4, 2017 at 8:04 AM, radai <radai.rosenbl...@gmail.com> wrote: > in summary - i'm not opposed to the idea of a per-topic clean up config > that tracks some set of consumer groups' offsets (which would probably work > for 80% of use cases), but i definitely see a need to expose a simple API > for the more advanced/obscure/custom use cases (the other 20%). > > On Wed, Jan 4, 2017 at 7:54 AM, radai <radai.rosenbl...@gmail.com> wrote: > >> a major motivation for this KIP is cost savings. >> >> lots of internal systems at LI use kafka as an intermediate pipe, and set >> the topic retention period to a "safe enough" amount of time to be able to >> recover from crashes/downtime and catch up to "now". this results in a few >> days' worth of retention typically. >> >> however, under normal operating conditions the consumers are mostly >> caught-up and so early clean-up enables a big cost savings in storage. >> >> as for my points: >> >> 1. when discussing implementation options for automatic clean-up we >> realized that cleaning up by keeping track of offsets stored in kafka >> requires some per-topic config - you need to specify which groups to track. >> this becomes a problem because: >> 1.1 - relatively complicated code, to be written in the broker. >> 1.2 - configuration needs to be maintained up to date by topic >> "owners" - of which we have thousands. failure to do so would decrease the >> cost benefit. >> 1.3 - some applications have a "reconsume" / "reinit" / "bootstrap" >> workflow where they will reset their offsets to an earlier value than the >> one stored. this means that a stored offset of X does not always mean you >> can clean up to X-1. think of it as video encoding -some apps have "key >> frames" they may seek back to which are before their current offset. >> 1.4 - there are multiple possible strategies - you could clean up >> aggressively, retain some "time distance" from latest, some "offset >> distance", etc. this we think would have made it very hard to agree on a >> single "correct" implementation that everyone would be happy with. it would >> be better to include the raw functionality in the API and leave the >> "brains" to an external monitoring system where people could custom-taylor >> their logic >> >> 2. ad-hoc consumer groups: its common practice for devs to spin up >> console consumers and connect to a topic as a debug aid. SREs may also do >> this. there are also various other eco-system applications that may >> consumer from topics (unknown to topic owners as those are infra monitoring >> tools). obviously such consumer-groups' offsets should be ignored for >> purposes of clean-up, but coming up with a bullet-proof way to do this is >> non-trivial and again ties with implementation complexity and inflexibility >> of a "one size fits all" solution in 1.4 above. >> >> 3. forceful clean-up: we have workflows that use kafka to move gigantic >> blobs from offline hadoop processing flows into systems. the data being >> "loaded" into such an online system can be several GBs in side and take a >> long time to consume (they are sliced into many small msgs). sometimes the >> sender wants to abort and start a new blob before the current load process >> has completed - meaning the consumer's offsets are not yet caught up. >> >> 4. offsets outside of kafka: yes, you could force applications to store >> their offsets twice, but thats inefficient. its better to expose a raw, >> simple API and let such applications manage their own clean-up logic (this >> again ties into 1.4 and no "one size fits all" solution) >> >> On Tue, Jan 3, 2017 at 11:49 PM, Dong Lin <lindon...@gmail.com> wrote: >> >>> On Tue, Jan 3, 2017 at 11:01 PM, Ewen Cheslack-Postava < >>> e...@confluent.io> >>> wrote: >>> >>> > On Tue, Jan 3, 2017 at 6:14 PM, Dong Lin <lindon...@gmail.com> wrote: >>> > >>> > > Hey Ewen, >>> > > >>> > > Thanks for the review. As Radai explained, it would be complex in >>> terms >>> > of >>> > > user configuration if we were to use committed offset to decide data >>> > > deletion. We need a way to specify which groups need to consume data >>> of >>> > > this partition. The broker will also need to consume the entire >>> offsets >>> > > topic in that approach which has some overhead. I don't think it is >>> that >>> > > hard to implement. But it will likely take more time to discuss that >>> > > approach due to the new config and the server side overhead. >>> > > >>> > > We choose to put this API in AdminClient because the API is more >>> like an >>> > > administrative operation (such as listGroups, deleteTopics) than a >>> > consumer >>> > > operation. It is not necessarily called by consumer only. For >>> example, we >>> > > can implement the "delete data before committed offset" approach by >>> > running >>> > > an external service which calls purgeDataBefore() API based on >>> committed >>> > > offset of consumer groups. >>> > > >>> > > I am not aware that AdminClient is not a public API. Suppose it is >>> not >>> > > public now, I assume we plan to make it public in the future as part >>> of >>> > > KIP-4. Are we not making it public because its interface is not >>> stable? >>> > If >>> > > so, can we just tag this new API as not stable in the code? >>> > > >>> > >>> > >>> > The AdminClient planned for KIP-4 is a new Java-based implementation. >>> It's >>> > definitely confusing that both will be (could be?) named AdminClient, >>> but >>> > we've kept the existing Scala AdminClient out of the public API and >>> have >>> > not required KIPs for changes to it. >>> > >>> > That said, I agree that if this type of API makes it into Kafka, >>> having a >>> > (new, Java-based) AdminClient method would definitely be a good idea. >>> An >>> > alternative path might be to have a Consumer-based implementation since >>> > that seems like a very intuitive, natural way to use the protocol. I >>> think >>> > optimizing for the expected use case would be a good idea. >>> > >>> > -Ewen >>> > >>> > Are you saying that the Scala AdminClient is not a public API and we >>> discourage addition of any new feature to this class? >>> >>> I still prefer to add it to AdminClient (Java version in the future and >>> Scala version in the short team) because I feel it belongs to admin >>> operation instead of KafkaConsumer interface. For example, if in the >>> future >>> we implement the "delete data before committed offset" strategy in an >>> external service, I feel it is a bit awkward if the service has to >>> instantiate a KafkaConsumer and call KafkaConsumer.purgeDataBefore(...) >>> to >>> purge data. In other words, our expected use-case doesn't necessarily >>> bind >>> this API with consumer. >>> >>> I am not strong on this issue. Let's see what other committers/developers >>> think about this. >>> >>> >>> > >>> > > >>> > > Thanks, >>> > > Dong >>> > > >>> > > On Tue, Jan 3, 2017 at 3:56 PM, Ewen Cheslack-Postava < >>> e...@confluent.io >>> > > >>> > > wrote: >>> > > >>> > > > Dong, >>> > > > >>> > > > Looks like that's an internal link, >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-107% >>> > > > 3A+Add+purgeDataBefore%28%29+API+in+AdminClient >>> > > > is the right one. >>> > > > >>> > > > I have a question about one of the rejected alternatives: >>> > > > >>> > > > > Using committed offset instead of an extra API to trigger data >>> purge >>> > > > operation. >>> > > > >>> > > > The KIP says this would be more complicated to implement. Why is >>> that? >>> > I >>> > > > think brokers would have to consume the entire offsets topic, but >>> the >>> > > data >>> > > > stored in memory doesn't seem to change and applying this when >>> updated >>> > > > offsets are seen seems basically the same. It might also be >>> possible to >>> > > > make it work even with multiple consumer groups if that was desired >>> > > > (although that'd require tracking more data in memory) as a >>> > > generalization >>> > > > without requiring coordination between the consumer groups. Given >>> the >>> > > > motivation, I'm assuming this was considered unnecessary since this >>> > > > specifically targets intermediate stream processing topics. >>> > > > >>> > > > Another question is why expose this via AdminClient (which isn't >>> public >>> > > API >>> > > > afaik)? Why not, for example, expose it on the Consumer, which is >>> > > > presumably where you'd want access to it since the functionality >>> > depends >>> > > on >>> > > > the consumer actually having consumed the data? >>> > > > >>> > > > -Ewen >>> > > > >>> > > > On Tue, Jan 3, 2017 at 2:45 PM, Dong Lin <lindon...@gmail.com> >>> wrote: >>> > > > >>> > > > > Hi all, >>> > > > > >>> > > > > We created KIP-107 to propose addition of purgeDataBefore() API >>> in >>> > > > > AdminClient. >>> > > > > >>> > > > > Please find the KIP wiki in the link https://iwww.corp.linkedin. >>> > > > > com/wiki/cf/display/ENGS/Kafka+purgeDataBefore%28%29+API+ >>> > > > design+proposal. >>> > > > > We >>> > > > > would love to hear your comments and suggestions. >>> > > > > >>> > > > > Thanks, >>> > > > > Dong >>> > > > > >>> > > > >>> > > >>> > >>> >> >> >