[ https://issues.apache.org/jira/browse/KAFKA-260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jay Kreps resolved KAFKA-260. ----------------------------- Resolution: Won't Fix > Add audit trail to kafka > ------------------------ > > Key: KAFKA-260 > URL: https://issues.apache.org/jira/browse/KAFKA-260 > Project: Kafka > Issue Type: New Feature > Affects Versions: 0.8.0 > Reporter: Jay Kreps > Assignee: Jay Kreps > Attachments: Picture 18.png, kafka-audit-trail-draft.patch > > > LinkedIn has a system that does monitoring on top of our data flow to ensure > all data is delivered to all consumers of data. This works by having each > logical "tier" through which data passes produce messages to a central > "audit-trail" topic; these messages give a time period and the number of > messages that passed through that tier in that time period. Example of tiers > for data might be "producer", "broker", "hadoop-etl", etc. This makes it > possible to compare the total events for a given time period to ensure that > all events that are produced are consumed by all consumers. > This turns out to be extremely useful. We also have an application that > "balances the books" and checks that all data is consumed in a timely > fashion. This gives graphs for each topic and shows any data loss and the lag > at which the data is consumed (if any). > This would be an optional feature that would allow you to to this kind of > reconciliation automatically for all the topics kafka hosts against all the > tiers of applications that interact with the data. > Some details, the proposed format of the data is JSON using the following > format for messages: > { > "time":1301727060032, // the timestamp at which this audit message is sent > "topic": "my_topic_name", // the topic this audit data is for > "tier":"producer", // a user-defined "tier" name > "bucket_start": 1301726400000, // the beginning of the time bucket this > data applies to > "bucket_end": 1301727000000, // the end of the time bucket this data > applies to > "host":"my_host_name.datacenter.linkedin.com", // the server that this was > sent from > "datacenter":"hlx32", // the datacenter this occurred in > "application":"newsfeed_service", // a user-defined application name > "guid":"51656274-a86a-4dff-b824-8e8e20a6348f", // a unique identifier for > this message > "count":43634 > } > DISCUSSION > Time is complex: > 1. The audit data must be based on a timestamp in the events not the time on > machine processing the event. Using this timestamp means that all downstream > consumers will report audit data on the right time bucket. This means that > there must be a timestamp in the event, which we don't currently require. > Arguably we should just add a timestamp to the events, but I think it is > sufficient for now just to allow the user to provide a function to extract > the time from their events. > 2. For counts to reconcile exactly we can only do analysis at a granularity > based on the least common multiple of the bucket size used by all tiers. The > simplest is just to configure them all to use the same bucket size. We > currently use a bucket size of 10 mins, but anything from 1-60 mins is > probably reasonable. > For analysis purposes one tier is designated as the source tier and we do > reconciliation against this count (e.g. if another tier has less, that is > treated as lost, if another tier has more that is duplication). > Note that this system makes false positives possible since you can lose an > audit message. It also makes false negatives possible since if you lose both > normal messages and the associated audit messages it will appear that > everything adds up. The later problem is astronomically unlikely to happen > exactly, though. > This would integrate into the client (producer and consumer both) in the > following way: > 1. The user provides a way to get timestamps from messages (required) > 2. The user configures the tier name, host name, datacenter name, and > application name as part of the consumer and producer config. We can provide > reasonable defaults if not supplied (e.g. if it is a Producer then set tier > to "producer" and get the hostname from the OS). > The application that processes this data is currently a Java Jetty app and > talks to mysql. It feeds off the audit topic in kafka and runs both automatic > monitoring checks and graphical displays of data against this. The data layer > is not terribly scalable but because the audit data is sent only periodically > this is enough to allow us to audit thousands of servers on very modest > hardware, and having sql access makes diving into the data to trace problems > to particular hosts easier. > LOGISTICS > I would recommend the following steps: > 1. Add the audit application, the proposal would be to add a new top-level > directory equivalent to core or perf called "audit" to house this > application. At this point it would just be sitting there, not really being > used. > 2. Integrate these capabilities into the producer as part of the refactoring > we are doing now > 3. Integrate into consumer when possible -- This message was sent by Atlassian JIRA (v6.3.4#6332)