Hello, Thanks for taking the time to review my KIP!
I will describe some production scenarios I faced to better explain the reasons for this KIP. * Usecase 1: batch processing of files. A batch is producing huge files that must be processed. Each line of the file will be a message produced to a topic. It means the topic storing this messages will go from 0 lag to lets say 5 million lag, in a few seconds. I will adjust the retention time on the topic based on the processing rate on the consumer of this topic. Ex: 5 million messages at 100 TPS needs ~14 hours retention time. In practice we set up bigger retention time, just in case. If a second file arrives before the first one has been processed and the processing ratio is slower than I thought, I will lose the end of the first file, without notice. * Usecase 2: application facing network errors. The application consumes messages on input topic, process them and push them to an external system (ex: webservice). If there are connectivity problem between my kafka consumer and the external webservice, the lag of the application will grow. As I have alerting rules on records-max-lag, I will be aware the backlog of the topic is above a limit. I will take action as in the previous example, and I will adjust retention time on the topic based on the processing rate. If the processing rate is not constant, due to the network connectivity problem, the retention time may not be enough and I will lose messages. In both cases, I don't know if Ive lost messages or not. I suspect that yes but I can not give an accurate number of messages lost, or guarantee I have not lost any of them. I could solve both use cases setting up oversized retention time for the topics, but in practice I'm limited by the hardware resources. One of the reasons Ive opened this KIP is because I think the implementation should be doable. The broker has all the information needed (expired offset and last consumed offset). Though I have questions about the impact on the performance, that's why I hesitate to propose this new metric as default for all consumers, or only to the consumers that request it through configuration. As you said, the motivation is to know when (and which), and how many messages a consumer have missed because they have been deleted. I think it is possible to return the exact amount of messages missed due to retention policy. I think the metric should go on the broker side, in case the consumer is not even be instantiated, or it is crashing in a loop. Please let me know what do you think. Thanks, Jose M On Sat, Jul 27, 2019 at 4:22 PM Stanislav Kozlovski <stanis...@confluent.io> wrote: > > Hey Jose, > > Thanks for the KIP. > > I think that Colin was referring to an existing client metric called > "kafka.consumer:type=consumer-fetch-manager-metrics,client-id="{client-id}",name=records-lag-max", > exposed on the consumer application. > You should be able to use that to get a sense of how far behind your > consumer is. > > That may not solve the motivation for the KIP, though, which as far as I > understand is to have a way to know when a (and which) consumer has missed > messages because they have been deleted. > I also assume that this is happening because consumers are not long-running > but rather started every once in a while. Let me know if that is correct. > The consumer code does > ``` > return topicPartitionState.highWatermark == null ? null : > topicPartitionState.highWatermark - topicPartitionState.position.offset; > ``` > to calculate the partition lag. I believe, > `topicPartitionState.position.offset` is the latest offset consumed by the > consumer group. > In the case of a start-up of a consumer group that has lagged behind and > lost messages due to retention, the very first reading of that metric may > be very high and perhaps misleading, iff > `TopicPartitionState#logStartOffset > TopicPartitionState#position` > > It would be good to expand on the motivation on when/why a consumer would > miss messages due to retention taking effect. > > In any case, I think this might be better suited as a client metric. The > consumer should have all the necessary information, as it has the start > offset of the log and the group's latest offset. > > Thanks, > Stanislav > > On Wed, Jul 24, 2019 at 6:00 PM Jose M <yoz...@gmail.com> wrote: > > > Hello Kamal, > > > > The compacted topics are excluded from the KIP, because users of compacted > > topics are mainly interested on the last state for a certain key, and can > > afford to miss intermediary states. Technically is possible to know if the > > topic is compacted through "log.config.compact" attribute. Thanks a lot for > > your feedback! > > > > Ive updated the KIP to precise: > > > > - compacted topics are excluded of the KIP. > > - instead of logging on the broker, I propose to create a new metric, > > following Colin's comment (thanks a lot!) > > > > Thanks, > > > > Jose > > > > On Tue, Jul 23, 2019 at 11:45 AM Kamal Chandraprakash < > > kamal.chandraprak...@gmail.com> wrote: > > > > > Jose, > > > > > > How do you differentiate the compaction topics from the time > > retention > > > topics? Deleting a message due to compaction policy is a valid case > > > and users won't be interested in monitoring/reading those deleted > > messages. > > > > > > Thanks, > > > Kamal > > > > > > On Tue, Jul 23, 2019 at 4:00 AM Jose M <yoz...@gmail.com> wrote: > > > > > > > Hi Colin, > > > > > > > > Thanks a lot for your feedback. Please note that I only propose to log > > > when > > > > a message is lost this for a set of consumer groups, not as default > > > > behaviour for all consumer groups. > > > > But in fact, I agree with you that to log a line per message expired > > can > > > be > > > > quite lot, and that is not the better way do it. I can propose to add a > > > > dedicated JMX metric of type counter "expired messages" per consumer > > > group. > > > > What do you think ? > > > > > > > > About monitoring the lag to ensure that messages are not lost, I know > > > that > > > > is what clients do, to set up alerting when the lag is above a > > threshold. > > > > But even if the alert is triggered, we dont know if messages have been > > > lost > > > > or not. Implementing this KIP clients would know if something has been > > > > missed or not. > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Jose > > > > > > > > On Mon, Jul 22, 2019 at 5:51 PM Colin McCabe <cmcc...@apache.org> > > wrote: > > > > > > > > > Hi Jose, > > > > > > > > > > One issue that I see here is that the number of log messages could be > > > > > huge. I've seen people create tens of thousands of consumer groups. > > > > > People can also have settings that create pretty small log files. A > > > > > message per log file per group could be quite a lot of messages. > > > > > > > > > > A log message on the broker is also not that useful for detecting bad > > > > > client behavior. People generally only look at the server logs after > > > > they > > > > > become aware that something is wrong through some other means. > > > > > > > > > > Perhaps the clients should just monitor their lag? There is a JMX > > > metric > > > > > for this, which means it can be hooked into traditional metrics / > > > > reporting > > > > > systems. > > > > > > > > > > best, > > > > > Colin > > > > > > > > > > > > > > > On Mon, Jul 22, 2019, at 03:12, Jose M wrote: > > > > > > Hello, > > > > > > > > > > > > I didn't get any feedback on this small KIP-490 > > > > > > < > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-490%3A+log+when+consumer+groups+lose+a+message+because+offset+has+been+deleted > > > > > >. > > > > > > In summary, I propose a way to be noticed when messages are being > > > > > > removed > > > > > > due to retention policy, without being consumed by a given consumer > > > > > > group. > > > > > > It will be useful to realize that some important messages have been > > > > > > lost. > > > > > > > > > > > > As Im new to the codebase, I have technical questions about how to > > > > > achieve > > > > > > this, but before going deeper, I would like your feedback on the > > > > feature. > > > > > > > > > > > > Thanks a lot, > > > > > > > > > > > > > > > > > > Jose Morales > > > > > > > > > > > > On Sun, Jul 14, 2019 at 12:51 AM Jose M <yoz...@gmail.com> wrote: > > > > > > > > > > > > > Hello, > > > > > > > > > > > > > > I would like to know what do you think on KIP-490: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-490%3A+log+when+consumer+groups+lose+a+message+because+offset+has+been+deleted > > > > > > > < > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-490%3A+log+when+consumer+groups+lose+a+message+because+offset+has+expired > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks a lot ! > > > > > > > -- > > > > > > > Jose M > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > J > > > > > > > > > > > > > > > > > > > > > > > -- > > > > J > > > > > > > > > > > > > -- > > J > > -- J