Hope everyone had a good long weekend. Any updates/comments ? -Lokesh
On Mon, Aug 30, 2021 at 2:43 PM Lokesh Lingarajan <llingara...@confluent.io> wrote: > Motivation > > Today we ingest a number of high cardinality metrics into Druid across > dimensions. These metrics are rolled up on a per minute basis, and are very > useful when looking at metrics on a partition or client basis. Events is > another class of data that provides useful information about a particular > incident/scenario inside a Kafka cluster. Events themselves are carried > inside the kafka payload, but nonetheless there is some very useful > metadata that is carried in kafka headers that can serve as a useful > dimension for aggregation and in turn bringing better insights. > > PR(#10730 <https://github.com/apache/druid/pull/10730>) introduced > support for Kafka headers in InputFormats. > > We still need an input format to parse out the headers and translate those > into relevant columns in Druid. Until that’s implemented, none of the > information available in the Kafka message headers would be exposed. So > first there is a need to implement an input format that can parse headers > in any given format(provided we support the format) like we parse payloads > today. Apart from headers there is also some useful information present in > the key portion of the kafka record. We also need a way to expose the data > present in the key as druid columns. We need a generic way to express at > configuration time what attributes from headers, key and payload need to be > ingested into druid. We need to keep the design generic enough so that > users can specify different parsers for headers, key and payload. > > Proposal is to design an input format to solve the above by providing > wrapper around any existing input formats and merging the data into a > single unified Druid row. > Proposed changes > > Let's look at a sample input format from the above discussion > > > > > > > > > > > > > > > > > > > > > > > *"inputFormat":{ "type": "kafka", // New input format type > "headerLabelPrefix": "kafka.header.", // Label prefix for header columns, > this will avoid collisions while merging columns > "recordTimestampLabelPrefix": "kafka.", // Kafka record's timestamp is made > available in case payload does not carry timestamp "headerFormat": > // Header parser specifying that values are of type string { > "type": "string" }, "valueFormat": // Value parser from > json parsing { "type": "json", "flattenSpec": > { "useFieldDiscovery": true, > "fields": [...] } }, "keyFormat": // Key parser > also from json parsing { "type": "json" }}* > > Since we have independent sections for header, key and payload, it will > also enable parsing each section with its own parser, eg., headers coming > in as string and payload as json. > > KafkaInputFormat(the new inputFormat class) will be the uber class > extending inputFormat interface and will be responsible for creating > individual parsers for header, key and payload, blend the data resolving > conflicts in columns and generating a single unified InputRow for Druid > ingestion. > > "headerFormat" will allow users to plug in a parser type for the header > values and will add the default header prefix as "kafka.header."(can be > overridden) for attributes to avoid collision while merging attributes with > payload. > > Kafka payload parser will be responsible for parsing the Value portion of > the Kafka record. This is where most of the data will come from and we > should be able to plugin existing parsers. One thing to note here is that > if batching is performed, then the code should be augmenting header and key > values to every record in the batch. > > Kafka key parser will handle parsing the Key portion of the Kafka record > and will ingest the Key with dimension name as "kafka.key". > Operational impact, Test plan & Future work > > Since we had an immediate need to ingest blended data from header and > payload, we have implemented the above proposal in a PR - here > <https://github.com/apache/druid/pull/11630> > -Lokesh Lingarajan >