Hi Hemant,

what you described is an aggregation. You aggregate 15 small records into
one large record. The concept of aggregation goes beyond pure numeric
operations; for example, forming one large string with CONCAT is also a
type of aggregation.

In your case, I'd still follow my general outlined approach. You have two
options:
* Use an infinite window that fires at each new element. You need to ensure
that old metrics are evicted [1].
* You can also implement a KeyedProcessFunction [2] and manage the state
directly. It might be initially a bit more complex (=more code), but it
should easier to understand and maintain.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/windows.html#evictors
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html



On Wed, May 13, 2020 at 12:06 AM hemant singh <hemant2...@gmail.com> wrote:

> Hi Arvid,
>
> I don't want to aggregate all events, rather I want to create a record for
> a device combining data from multiple events. Each of this event gives me a
> metric for a device, so for example if I want one record for device-id=1
> the metric will look like metric1, metric2, metric3....  where metric1
> comes from a event1, metric2 from event2 and likewise....
> From each event get latest data to form a kind of snapshot of device
> performance across the metrics.
>
> Thanks,
> Hemant
>
> On Wed, May 13, 2020 at 1:38 AM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Hemant,
>>
>> In general, you want to keep all data coming from one device in one Kafka
>> partition, such that the timestamps of that device are monotonically
>> increasing. Thus, when processing data from one device, you have ensured
>> that no out-of-order events with respect to this device happen.
>>
>> If you now want to aggregate all events of a given timestamp for a
>> device, it is a matter of keying by device id and applying a custom window.
>> There is no need for joins.
>>
>> On Tue, May 12, 2020 at 9:05 PM hemant singh <hemant2...@gmail.com>
>> wrote:
>>
>>> Hello Flink Users,
>>>
>>> Any views on this question of mine.
>>>
>>> Thanks,
>>> Hemant
>>>
>>> On Tue, May 12, 2020 at 7:00 PM hemant singh <hemant2...@gmail.com>
>>> wrote:
>>>
>>>> Hello Roman,
>>>>
>>>> Thanks for your response.
>>>>
>>>> I think partitioning you described (event type + protocol type) is
>>>> subject to data skew. Including a device ID should solve this problem.
>>>> Also, including "protocol_type" into the key and having topic per
>>>> protocol_type seems redundant.
>>>> Each protocol is in single topic and event_type is key to distribute
>>>> data to a specific partition.
>>>>
>>>> Furthermore, do you have any particular reason to maintain multiple
>>>> topics?
>>>> I could imagine protocols have different speeds or other
>>>> characteristics, so you can tune Flink accordingly.
>>>> Otherwise, having a single topic partitioned only by device ID would
>>>> simplify deployment and reduce data skew.
>>>> Yes, you are right. These protocols have separate characteristics like
>>>> speed, data format. If I do have only one topic with data partitioned by
>>>> device_id then it could be that events from faster protocol is processed
>>>> faster and the joins which I want to do will not have enough matching data.
>>>> I have a question here how are you referring to tune Flink to handle
>>>> different characteristics like speed of streams as reading from kafka could
>>>> result in uneven processing of data?
>>>>
>>>> > By consume do you mean the downstream system?
>>>> My downstream is TSDB and other DBs where the data will be written to.
>>>> All these is time-series data.
>>>>
>>>> Thanks,
>>>> Hemant
>>>>
>>>>
>>>>
>>>> On Tue, May 12, 2020 at 5:28 PM Khachatryan Roman <
>>>> khachatryan.ro...@gmail.com> wrote:
>>>>
>>>>> Hello Hemant,
>>>>>
>>>>> Thanks for your reply.
>>>>>
>>>>> I think partitioning you described (event type + protocol type) is
>>>>> subject to data skew. Including a device ID should solve this problem.
>>>>> Also, including "protocol_type" into the key and having topic per
>>>>> protocol_type seems redundant.
>>>>>
>>>>> Furthermore, do you have any particular reason to maintain multiple
>>>>> topics?
>>>>> I could imagine protocols have different speeds or other
>>>>> characteristics, so you can tune Flink accordingly.
>>>>> Otherwise, having a single topic partitioned only by device ID would
>>>>> simplify deployment and reduce data skew.
>>>>>
>>>>> > By consume do you mean the downstream system?
>>>>> Yes.
>>>>>
>>>>> Regards,
>>>>> Roman
>>>>>
>>>>>
>>>>> On Mon, May 11, 2020 at 11:30 PM hemant singh <hemant2...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello Roman,
>>>>>>
>>>>>> PFB my response -
>>>>>>
>>>>>> As I understand, each protocol has a distinct set of event types
>>>>>> (where event type == metrics type); and a distinct set of devices. Is 
>>>>>> this
>>>>>> correct?
>>>>>> Yes, correct. distinct events and devices. Each device emits these
>>>>>> event.
>>>>>>
>>>>>> > Based on data protocol I have 4-5 topics. Currently the data for a
>>>>>> single event is being pushed to a partition of the kafka topic(producer 
>>>>>> key
>>>>>> -> event_type + data_protocol).
>>>>>> Here you are talking about the source (to Flink job), right?
>>>>>> Yes, you are right.
>>>>>>
>>>>>> Can you also share how are you going to consume these data?
>>>>>> By consume do you mean the downstream system?
>>>>>> If yes then this data will be written to a DB, some metrics goes to
>>>>>> TSDB(Influx) as well.
>>>>>>
>>>>>> Thanks,
>>>>>> Hemant
>>>>>>
>>>>>> On Tue, May 12, 2020 at 2:08 AM Khachatryan Roman <
>>>>>> khachatryan.ro...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Hemant,
>>>>>>>
>>>>>>> As I understand, each protocol has a distinct set of event types
>>>>>>> (where event type == metrics type); and a distinct set of devices. Is 
>>>>>>> this
>>>>>>> correct?
>>>>>>>
>>>>>>> > Based on data protocol I have 4-5 topics. Currently the data for a
>>>>>>> single event is being pushed to a partition of the kafka topic(producer 
>>>>>>> key
>>>>>>> -> event_type + data_protocol).
>>>>>>> Here you are talking about the source (to Flink job), right?
>>>>>>>
>>>>>>> Can you also share how are you going to consume these data?
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>> Roman
>>>>>>>
>>>>>>>
>>>>>>> On Mon, May 11, 2020 at 8:57 PM hemant singh <hemant2...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I have different events from a device which constitutes different
>>>>>>>> metrics for same device. Each of these event is produced by the device 
>>>>>>>> in
>>>>>>>> interval of few milli seconds to a minute.
>>>>>>>>
>>>>>>>> Event1(Device1) -> Stream1 -> Metric 1
>>>>>>>> Event2 (Device1) -> Stream2 -> Metric 2 ...
>>>>>>>> ..............
>>>>>>>> .......
>>>>>>>> Event100(Device1) -> Stream100 -> Metric100
>>>>>>>>
>>>>>>>> The number of events can go up to few 100s for each data protocol
>>>>>>>> and we have around 4-5 data protocols. Metrics from different streams 
>>>>>>>> makes
>>>>>>>> up a records
>>>>>>>> like for example from above example for device 1 -
>>>>>>>>
>>>>>>>> Device1 -> Metric1, Metric 2, Metric15 forms a single record for
>>>>>>>> the device. Currently in development phase I am using interval join to
>>>>>>>> achieve this, that is to create a record with latest data from 
>>>>>>>> different
>>>>>>>> streams(events).
>>>>>>>>
>>>>>>>> Based on data protocol I have 4-5 topics. Currently the data for a
>>>>>>>> single event is being pushed to a partition of the kafka 
>>>>>>>> topic(producer key
>>>>>>>> -> event_type + data_protocol). So essentially one topic is made up of 
>>>>>>>> many
>>>>>>>> streams. I am filtering on the key to define the streams.
>>>>>>>>
>>>>>>>> My question is - Is this correct way to stream the data, I had
>>>>>>>> thought of maintaining different topic for an event, however in that 
>>>>>>>> case
>>>>>>>> number of topics could go to few thousands and that is something which
>>>>>>>> becomes little challenging to maintain and not sure if kafka handles 
>>>>>>>> that
>>>>>>>> well.
>>>>>>>>
>>>>>>>> I know there are traditional ways to do this like pushing it to
>>>>>>>> timeseries db and then joining data for different metric but that is
>>>>>>>> something which will never scale, also this processing should be as
>>>>>>>> realtime as possible.
>>>>>>>>
>>>>>>>> Are there better ways to handle this use case or I am on correct
>>>>>>>> path.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Hemant
>>>>>>>>
>>>>>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to