Hey Lokesh, Thanks for the details. To me it makes more sense to have the user specify the entire timestamp and key field name (it seems weird to have a "timestamp prefix" and "key prefix" that are only used for single fields). I just wrote that + a few comments on the PR itself: https://github.com/apache/druid/pull/11630#pullrequestreview-760351816
On Fri, Sep 17, 2021 at 9:43 AM Lokesh Lingarajan <llingara...@confluent.io> wrote: > Hi Gian, > > Thanks for the your reply, please find below are my comments > > 1) How is the timestamp exposed exactly? I see there is a > recordTimestampLabelPrefix, but what is that a prefix to? Also: what do you > think about accepting the entire name of the timestamp field instead? > Finally: in the docs it would be good to have an example of how people can > write a timestampSpec that refers to the Kafka timestamp, and also how they > can load the Kafka timestamp as a long-typed dimension storing millis since > the epoch (our convention for secondary timestamps). > > >>> The input format allows users to pick and choose the timestamp value > either from the header/key/value portions of the kafka record. If the > timestamp is missing in both key and value parts, then users can always > default to the timestamp that is available in the header. Code will default > this column with the name "kafka.timestamp". recordTimestampLabelPrefix allows > users to change the "kafka" to something else. If this model is deviating > from what we currently have in druid, then I agree we should change this to > giving a full name. Currently timestamp is loaded directly from > ConsumerRecord<K, V> data structure as follows > > // Add kafka record timestamp to the mergelist, we will skip record timestamp > if the same key exists already in the header list > mergeMap.putIfAbsent(recordTimestampColumn, record.getRecord().timestamp()); > > > 2) You mention that the key will show up as "kafka.key", and in the > example you provide I don't see a parameter enabling a choice of what that > field is called. Is it hard-coded or is it configurable somehow? > > >>> this behavior is exactly the same as the timestamp discussed above. If > nothing is done, we will have a column named "kafka.key", users have the > choice to change the "kafka" to something else. We can make the change > uniform here as well based on the above decision. > > 3) Could you write up some user-facing docs too, like an addition to > development/extensions-core/kafka-ingestion.md? That way, people will know > how to use this feature. And it'll help us better understand how it's > supposed to work. (Perhaps it could have answered the two questions above) > > >>> Absolutely agree with you, I will do that along with other review > comments from the code. > > Thanks again for looking into this :) > > -Lokesh > > > On Thu, Sep 16, 2021 at 9:34 AM Gian Merlino <g...@apache.org> wrote: > >> Lokesh, it looks like you got dropped from the thread, so I'm adding you >> back. Please check out the previous message for some comments. >> >> By the way, by default, replies to the dev list go back to the dev list >> only, which can cause you to miss some replies. If you join the list you >> will be sure to get all your replies 🙂 >> >> On Tue, Sep 14, 2021 at 10:10 PM Gian Merlino <g...@apache.org> wrote: >> >>> Hey Lokesh, >>> >>> The concept and API looks solid to me! Thank you for writing this up. I >>> agree with Ben's comment. This will be really useful functionality. >>> >>> I have a few questions about how it would work: >>> >>> 1) How is the timestamp exposed exactly? I see there is a >>> recordTimestampLabelPrefix, but what is that a prefix to? Also: what do you >>> think about accepting the entire name of the timestamp field instead? >>> Finally: in the docs it would be good to have an example of how people can >>> write a timestampSpec that refers to the Kafka timestamp, and also how they >>> can load the Kafka timestamp as a long-typed dimension storing millis since >>> the epoch (our convention for secondary timestamps). >>> >>> 2) You mention that the key will show up as "kafka.key", and in the >>> example you provide I don't see a parameter enabling a choice of what that >>> field is called. Is it hard-coded or is it configurable somehow? >>> >>> 3) Could you write up some user-facing docs too, like an addition to >>> development/extensions-core/kafka-ingestion.md? That way, people will know >>> how to use this feature. And it'll help us better understand how it's >>> supposed to work. (Perhaps it could have answered the two questions above) >>> >>> Full disclosure: I haven't reviewed the patch yet; these questions are >>> just based on your writeup. >>> >>> On Mon, Aug 30, 2021 at 3:00 PM Lokesh Lingarajan >>> <llingara...@confluent.io.invalid> wrote: >>> >>>> Motivation >>>> >>>> Today we ingest a number of high cardinality metrics into Druid across >>>> dimensions. These metrics are rolled up on a per minute basis, and are >>>> very >>>> useful when looking at metrics on a partition or client basis. Events is >>>> another class of data that provides useful information about a >>>> particular >>>> incident/scenario inside a Kafka cluster. Events themselves are carried >>>> inside the kafka payload, but nonetheless there is some very useful >>>> metadata that is carried in kafka headers that can serve as a useful >>>> dimension for aggregation and in turn bringing better insights. >>>> >>>> PR(#10730 <https://github.com/apache/druid/pull/10730>) introduced >>>> support >>>> for Kafka headers in InputFormats. >>>> >>>> We still need an input format to parse out the headers and translate >>>> those >>>> into relevant columns in Druid. Until that’s implemented, none of the >>>> information available in the Kafka message headers would be exposed. So >>>> first there is a need to implement an input format that can parse >>>> headers >>>> in any given format(provided we support the format) like we parse >>>> payloads >>>> today. Apart from headers there is also some useful information present >>>> in >>>> the key portion of the kafka record. We also need a way to expose the >>>> data >>>> present in the key as druid columns. We need a generic way to express at >>>> configuration time what attributes from headers, key and payload need >>>> to be >>>> ingested into druid. We need to keep the design generic enough so that >>>> users can specify different parsers for headers, key and payload. >>>> >>>> Proposal is to design an input format to solve the above by providing >>>> wrapper around any existing input formats and merging the data into a >>>> single unified Druid row. >>>> Proposed changes >>>> >>>> Let's look at a sample input format from the above discussion >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> *"inputFormat":{ "type": "kafka", // New input format type >>>> "headerLabelPrefix": "kafka.header.", // Label prefix for header >>>> columns, >>>> this will avoid collisions while merging columns >>>> "recordTimestampLabelPrefix": "kafka.", // Kafka record's timestamp is >>>> made >>>> available in case payload does not carry timestamp >>>> "headerFormat": >>>> // Header parser specifying that values are of type string { >>>> "type": "string" }, "valueFormat": // Value parser >>>> from >>>> json parsing { "type": "json", >>>> "flattenSpec": >>>> { "useFieldDiscovery": true, >>>> "fields": [...] } }, "keyFormat": // Key >>>> parser >>>> also from json parsing { "type": "json" }}* >>>> >>>> Since we have independent sections for header, key and payload, it will >>>> also enable parsing each section with its own parser, eg., headers >>>> coming >>>> in as string and payload as json. >>>> >>>> KafkaInputFormat(the new inputFormat class) will be the uber class >>>> extending inputFormat interface and will be responsible for creating >>>> individual parsers for header, key and payload, blend the data resolving >>>> conflicts in columns and generating a single unified InputRow for Druid >>>> ingestion. >>>> >>>> "headerFormat" will allow users to plug in a parser type for the header >>>> values and will add the default header prefix as "kafka.header."(can be >>>> overridden) for attributes to avoid collision while merging attributes >>>> with >>>> payload. >>>> >>>> Kafka payload parser will be responsible for parsing the Value portion >>>> of >>>> the Kafka record. This is where most of the data will come from and we >>>> should be able to plugin existing parsers. One thing to note here is >>>> that >>>> if batching is performed, then the code should be augmenting header and >>>> key >>>> values to every record in the batch. >>>> >>>> Kafka key parser will handle parsing the Key portion of the Kafka record >>>> and will ingest the Key with dimension name as "kafka.key". >>>> Operational impact, Test plan & Future work >>>> >>>> Since we had an immediate need to ingest blended data from header and >>>> payload, we have implemented the above proposal in a PR - here >>>> <https://github.com/apache/druid/pull/11630> >>>> -Lokesh Lingarajan >>>> >>>