Hi Hemant, what you described is an aggregation. You aggregate 15 small records into one large record. The concept of aggregation goes beyond pure numeric operations; for example, forming one large string with CONCAT is also a type of aggregation.
In your case, I'd still follow my general outlined approach. You have two options: * Use an infinite window that fires at each new element. You need to ensure that old metrics are evicted [1]. * You can also implement a KeyedProcessFunction [2] and manage the state directly. It might be initially a bit more complex (=more code), but it should easier to understand and maintain. [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/windows.html#evictors [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html On Wed, May 13, 2020 at 12:06 AM hemant singh <hemant2...@gmail.com> wrote: > Hi Arvid, > > I don't want to aggregate all events, rather I want to create a record for > a device combining data from multiple events. Each of this event gives me a > metric for a device, so for example if I want one record for device-id=1 > the metric will look like metric1, metric2, metric3.... where metric1 > comes from a event1, metric2 from event2 and likewise.... > From each event get latest data to form a kind of snapshot of device > performance across the metrics. > > Thanks, > Hemant > > On Wed, May 13, 2020 at 1:38 AM Arvid Heise <ar...@ververica.com> wrote: > >> Hi Hemant, >> >> In general, you want to keep all data coming from one device in one Kafka >> partition, such that the timestamps of that device are monotonically >> increasing. Thus, when processing data from one device, you have ensured >> that no out-of-order events with respect to this device happen. >> >> If you now want to aggregate all events of a given timestamp for a >> device, it is a matter of keying by device id and applying a custom window. >> There is no need for joins. >> >> On Tue, May 12, 2020 at 9:05 PM hemant singh <hemant2...@gmail.com> >> wrote: >> >>> Hello Flink Users, >>> >>> Any views on this question of mine. >>> >>> Thanks, >>> Hemant >>> >>> On Tue, May 12, 2020 at 7:00 PM hemant singh <hemant2...@gmail.com> >>> wrote: >>> >>>> Hello Roman, >>>> >>>> Thanks for your response. >>>> >>>> I think partitioning you described (event type + protocol type) is >>>> subject to data skew. Including a device ID should solve this problem. >>>> Also, including "protocol_type" into the key and having topic per >>>> protocol_type seems redundant. >>>> Each protocol is in single topic and event_type is key to distribute >>>> data to a specific partition. >>>> >>>> Furthermore, do you have any particular reason to maintain multiple >>>> topics? >>>> I could imagine protocols have different speeds or other >>>> characteristics, so you can tune Flink accordingly. >>>> Otherwise, having a single topic partitioned only by device ID would >>>> simplify deployment and reduce data skew. >>>> Yes, you are right. These protocols have separate characteristics like >>>> speed, data format. If I do have only one topic with data partitioned by >>>> device_id then it could be that events from faster protocol is processed >>>> faster and the joins which I want to do will not have enough matching data. >>>> I have a question here how are you referring to tune Flink to handle >>>> different characteristics like speed of streams as reading from kafka could >>>> result in uneven processing of data? >>>> >>>> > By consume do you mean the downstream system? >>>> My downstream is TSDB and other DBs where the data will be written to. >>>> All these is time-series data. >>>> >>>> Thanks, >>>> Hemant >>>> >>>> >>>> >>>> On Tue, May 12, 2020 at 5:28 PM Khachatryan Roman < >>>> khachatryan.ro...@gmail.com> wrote: >>>> >>>>> Hello Hemant, >>>>> >>>>> Thanks for your reply. >>>>> >>>>> I think partitioning you described (event type + protocol type) is >>>>> subject to data skew. Including a device ID should solve this problem. >>>>> Also, including "protocol_type" into the key and having topic per >>>>> protocol_type seems redundant. >>>>> >>>>> Furthermore, do you have any particular reason to maintain multiple >>>>> topics? >>>>> I could imagine protocols have different speeds or other >>>>> characteristics, so you can tune Flink accordingly. >>>>> Otherwise, having a single topic partitioned only by device ID would >>>>> simplify deployment and reduce data skew. >>>>> >>>>> > By consume do you mean the downstream system? >>>>> Yes. >>>>> >>>>> Regards, >>>>> Roman >>>>> >>>>> >>>>> On Mon, May 11, 2020 at 11:30 PM hemant singh <hemant2...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hello Roman, >>>>>> >>>>>> PFB my response - >>>>>> >>>>>> As I understand, each protocol has a distinct set of event types >>>>>> (where event type == metrics type); and a distinct set of devices. Is >>>>>> this >>>>>> correct? >>>>>> Yes, correct. distinct events and devices. Each device emits these >>>>>> event. >>>>>> >>>>>> > Based on data protocol I have 4-5 topics. Currently the data for a >>>>>> single event is being pushed to a partition of the kafka topic(producer >>>>>> key >>>>>> -> event_type + data_protocol). >>>>>> Here you are talking about the source (to Flink job), right? >>>>>> Yes, you are right. >>>>>> >>>>>> Can you also share how are you going to consume these data? >>>>>> By consume do you mean the downstream system? >>>>>> If yes then this data will be written to a DB, some metrics goes to >>>>>> TSDB(Influx) as well. >>>>>> >>>>>> Thanks, >>>>>> Hemant >>>>>> >>>>>> On Tue, May 12, 2020 at 2:08 AM Khachatryan Roman < >>>>>> khachatryan.ro...@gmail.com> wrote: >>>>>> >>>>>>> Hi Hemant, >>>>>>> >>>>>>> As I understand, each protocol has a distinct set of event types >>>>>>> (where event type == metrics type); and a distinct set of devices. Is >>>>>>> this >>>>>>> correct? >>>>>>> >>>>>>> > Based on data protocol I have 4-5 topics. Currently the data for a >>>>>>> single event is being pushed to a partition of the kafka topic(producer >>>>>>> key >>>>>>> -> event_type + data_protocol). >>>>>>> Here you are talking about the source (to Flink job), right? >>>>>>> >>>>>>> Can you also share how are you going to consume these data? >>>>>>> >>>>>>> >>>>>>> Regards, >>>>>>> Roman >>>>>>> >>>>>>> >>>>>>> On Mon, May 11, 2020 at 8:57 PM hemant singh <hemant2...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I have different events from a device which constitutes different >>>>>>>> metrics for same device. Each of these event is produced by the device >>>>>>>> in >>>>>>>> interval of few milli seconds to a minute. >>>>>>>> >>>>>>>> Event1(Device1) -> Stream1 -> Metric 1 >>>>>>>> Event2 (Device1) -> Stream2 -> Metric 2 ... >>>>>>>> .............. >>>>>>>> ....... >>>>>>>> Event100(Device1) -> Stream100 -> Metric100 >>>>>>>> >>>>>>>> The number of events can go up to few 100s for each data protocol >>>>>>>> and we have around 4-5 data protocols. Metrics from different streams >>>>>>>> makes >>>>>>>> up a records >>>>>>>> like for example from above example for device 1 - >>>>>>>> >>>>>>>> Device1 -> Metric1, Metric 2, Metric15 forms a single record for >>>>>>>> the device. Currently in development phase I am using interval join to >>>>>>>> achieve this, that is to create a record with latest data from >>>>>>>> different >>>>>>>> streams(events). >>>>>>>> >>>>>>>> Based on data protocol I have 4-5 topics. Currently the data for a >>>>>>>> single event is being pushed to a partition of the kafka >>>>>>>> topic(producer key >>>>>>>> -> event_type + data_protocol). So essentially one topic is made up of >>>>>>>> many >>>>>>>> streams. I am filtering on the key to define the streams. >>>>>>>> >>>>>>>> My question is - Is this correct way to stream the data, I had >>>>>>>> thought of maintaining different topic for an event, however in that >>>>>>>> case >>>>>>>> number of topics could go to few thousands and that is something which >>>>>>>> becomes little challenging to maintain and not sure if kafka handles >>>>>>>> that >>>>>>>> well. >>>>>>>> >>>>>>>> I know there are traditional ways to do this like pushing it to >>>>>>>> timeseries db and then joining data for different metric but that is >>>>>>>> something which will never scale, also this processing should be as >>>>>>>> realtime as possible. >>>>>>>> >>>>>>>> Are there better ways to handle this use case or I am on correct >>>>>>>> path. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Hemant >>>>>>>> >>>>>>> >> >> -- >> >> Arvid Heise | Senior Java Developer >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Ververica GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji >> (Toni) Cheng >> > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng