Hi Nirmalya, if you want to calculate the running average over all measurements independent of the probe ID, then you cannot parallelize the computation. In this case you have to use a global window.
Cheers, Till On Feb 19, 2016 6:30 PM, "Nirmalya Sengupta" <sengupta.nirma...@gmail.com> wrote: > Hello Aljoscha <aljos...@apache.org>, > > My sincere apologies at the beginning, if I seem to repeat the same > question, almost interminably. If it is frustrating you, I seek your > patience but I really want to nail it down in mind. :-) > > The point about parallelizing is well taken. I understand why the stream > should be broken into multiple partitions and how. The understanding that > is still evading me is how is the use-case of computing an (sliding) > average temperature achieved if the stream is scattered. > > I want the *running* average temperature for every 3 readings, sliding by > 1 reading. I am monitoring the average temperature; if it goes beyond a > certain threshold for 3 consecutive readings, I throw an alarm. > > Let's take the following set of data (fields are: probeID, timestamp, > temperature ; 'timestamp' field is used for assignAscendingTimestamp() > function): > > P1,T1,20 > P1,T2,30 > P2,T2,30 > P1,T3,50 > P2,T3,20 > P3,T3,10 > > Assumption: T1 < T2 < T3 > > Now, if we partition on the probeID, we get three partitions, thus: > > P1 -> (T1,20) | (T2,30) | (T3,50) > P2 -> (T2,30) | (T3,20) > P3 -> (T3,10) > > Computing the average temperature will give me *three distinct averages* > here, one for each partition. I get average per probe, not per every 3 > readings [assuming a slidingWindow(3,1)] irrespective of which probe gives. > > Is it even correct to expect a running average if we partition the input > stream? > > Hope I am making my understanding (or the lack of it), quite clear here! > :-) > > -- Nirmalya > > > > > ------------------------------------------------------------------------------------------------------------------------------------------------------------- > To: user@flink.apache.org > Cc: > Date: Fri, 19 Feb 2016 10:41:52 +0100 > Subject: > Hi, > as I understand it the “temp_reading_timestamp” field is not a key on > which you can partition your data. This is a field that would be used for > assigning the elements to timestamps. In you data you also have the > “probeID” field. This is a field that could be used to parallelize > computation, for example you could > do the following: > > val inputStream = <define some source> > > val result = inputStream > .assignAscendingTimestamps { e => e.temp_reading_timestamp } > .keyBy { e => e.probeID } > .timeWindow(Time.minutes(10)) > .apply(new SumFunction(), new ComputeAverageFunction()) > > result.print() > > (Where SumFunction() would sum up temperatures and keep a count and > ComputeAverageFunction() would divide the sum by the count.) > > In this way, computation is parallelized because it can be spread across > several machines and partitioned by the key. Without such a key everything > has to be computed on one machine because a global view of the data is > required. > > Cheers, > > > > > > > -- > Software Technologist > http://www.linkedin.com/in/nirmalyasengupta > "If you have built castles in the air, your work need not be lost. That is > where they should be. > Now put the foundation under them." >