Hi Robert, I believe that I cannot use a "ProcessFunction" because I key the stream, and I use TumblingEventTimeWindows, which does not allow for the use of "ProcessFunction" in that scenario.
I compute the averages with a ProcessWindowFunction. I am going to follow up this question in a new thread with more information. Thank you. Sincerely, Marco Villalobos > On Jun 15, 2020, at 11:13 AM, Robert Metzger <rmetz...@apache.org> wrote: > > Hi Marco, > > I'm not 100% if I understood the problem. Let me repeat: You want a stream of > 15 minute averages for each unique "name". If there's no data available for a > 15m average, use the data from the previous 15m time window? > > If that's the problem, you can probably build this using ProcessFunction and > a timer. For each key, you are just storing the average in Flink state. You > set a timer which outputs the last stored average and sets a new timer. > > Hope that is some useful inspiration! > > Best, > Robert > > On Mon, Jun 15, 2020 at 4:59 AM Marco Villalobos <mvillalo...@kineteque.com > <mailto:mvillalo...@kineteque.com>> wrote: > Hello Flink community. I need help. Thus far, Flink has proven very useful to > me. > > I am using it for stream processing of time-series data. > > For the scope of this mailing list, let's say the time-series has the fields: > name: String, value: double, and timestamp: Instant. > > I named the time series: timeSeriesDataStream. > > My first task was to average the time series by name within a 15 minute > tumbling event time window. > > \ > I was able to solve this with a ProcessWindowFunction (had to use this > approach because the watermark is not keyed), and named resultant stream: > aggregateTimeSeriesDataStream, and then "sinking" the values. > > My next task is to backfill the name averages on the subsequent. This means > that if a time-series does not appear in a subsequent window then the > previous average value will be used in that window. > > How do I do this? > > I started by performing a Map function on the aggregateTimeSeriesDataStream > to change the timestamp back 15 minutes, and naming the resultant stream: > backfilledDataStream. > > Now, I am stuck. I suspect that I either > > 1) timeSeriesDataStream.coGroup(backfilledDataStream) and add > CoGroupWindowFunction to process the backfill. > 2) Use "iterate" to somehow jury rig a backfill. > > I really don't know. That's why I am asking this group for advice. > > What's the common solution for this problem? I am quite sure that this is a > very common use-case.