Hi Nirmalya,

if you want to calculate the running average over all measurements
independent of the probe ID, then you cannot parallelize the computation.
In this case you have to use a global window.

Cheers,
Till
On Feb 19, 2016 6:30 PM, "Nirmalya Sengupta" <sengupta.nirma...@gmail.com>
wrote:

> Hello  Aljoscha <aljos...@apache.org>,
>
> My sincere apologies at the beginning, if I seem to repeat the same
> question, almost interminably. If it is frustrating you, I seek your
> patience but I really want to nail it down in mind. :-)
>
> The point about parallelizing is well taken. I understand why the stream
> should be broken into multiple partitions and how. The understanding that
> is still evading me is how is the use-case of computing an (sliding)
> average temperature achieved if the stream is scattered.
>
> I want the *running* average temperature for every 3 readings, sliding by
> 1 reading. I am monitoring the average temperature; if it goes beyond a
> certain threshold for 3 consecutive readings, I throw an alarm.
>
> Let's take the following set of data (fields are: probeID, timestamp,
> temperature ; 'timestamp' field is used for assignAscendingTimestamp()
> function):
>
> P1,T1,20
> P1,T2,30
> P2,T2,30
> P1,T3,50
> P2,T3,20
> P3,T3,10
>
> Assumption: T1 < T2 < T3
>
> Now, if we partition on the probeID, we get three partitions, thus:
>
> P1 -> (T1,20) | (T2,30) | (T3,50)
> P2 -> (T2,30) | (T3,20)
> P3 -> (T3,10)
>
> Computing the average temperature will give me *three distinct averages*
> here, one for each partition. I get average per probe, not per every 3
> readings [assuming a slidingWindow(3,1)] irrespective of which probe gives.
>
> Is it even correct to expect a running average if we partition the input
> stream?
>
> Hope I am making my understanding (or the lack of it), quite clear here!
> :-)
>
> -- Nirmalya
>
>
>
>
> -------------------------------------------------------------------------------------------------------------------------------------------------------------
> To: user@flink.apache.org
> Cc:
> Date: Fri, 19 Feb 2016 10:41:52 +0100
> Subject:
> Hi,
> as I understand it the “temp_reading_timestamp” field is not a key on
> which you can partition your data. This is a field that would be used for
> assigning the elements to timestamps. In you data you also have the
> “probeID” field. This is a field that could be used to parallelize
> computation, for example you could
> do the following:
>
> val inputStream = <define some source>
>
> val result = inputStream
>   .assignAscendingTimestamps { e => e.temp_reading_timestamp }
>   .keyBy { e => e.probeID }
>   .timeWindow(Time.minutes(10))
>   .apply(new SumFunction(), new ComputeAverageFunction())
>
> result.print()
>
> (Where SumFunction() would sum up temperatures and keep a count and
> ComputeAverageFunction() would divide the sum by the count.)
>
> In this way, computation is parallelized because it can be spread across
> several machines and partitioned by the key. Without such a key everything
> has to be computed on one machine because a global view of the data is
> required.
>
> Cheers,
>
>
>
>
>
>
> --
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is
> where they should be.
> Now put the foundation under them."
>

Reply via email to