Hi Stephan,
I was going to suggest that using a flatMap and tracking the timestamp of
each key yourself is a bit like having a per-key watermark. I wanted to
wait a bit before answering because I'm currently working on a new type of
Function that will be release with Flink 1.2: ProcessFunction. This is
somewhat like a FlatMap but also allows to access the element timestamp,
query current processing time/event time and set (per key) timers for
processing time and event time. With this you should be able to easily
implement your per-key tracking, I hope.

Cheers,
Aljoscha

P.S. ProcessFunction is already in the Flink repository but it's called
TimelyFlatMapFunction right now, because I was working on it under that
working title.

On Mon, 14 Nov 2016 at 15:47 kaelumania <stephan.epp...@zweitag.de> wrote:

> Hey Fabian,
>
> thank you very much.
>
> - yes, I would window by event time and fire/purge by processing time
> - Cheaper in the end meant, that having too much state in the flink
> cluster would be more expensive, as we store all data in cassandra too.I
> think the fault tolerance would be okay, as we would make a compare and set
> with cassandra.
>
> With the flatMap Operator wouldn’t it be like running my own windowing
> mechanism? I need to keep the aggregate window per sensor open (with
> checkpointing and state management) until I receive an element for a sensor
> that is later in time than the windows time and then purge the state and
> emit a new event (which is like having a watermark per sensor). Further, I
> need a timer that fires like after 24 hours, in case a sensor dies and
> doesn’t send more data which might is possible with window
> assigner/trigger, right? But not inside normal functions, e.g. flatMap? We
> can guarantee that all sensor data per sensor comes almost in order (might
> be out of order within a few seconds), but there might be gaps of several
> hours after network partitions.
>
> There is now way to define/redefine the watermark per keyed stream? Or
> adjust the window assigner + trigger to achieve the desired behaviour? I am
> a bit reserved in implementing the whole state management. Do you plan to
> support such use cases on keyed streams? Maybe the WatermarkAssigner could
> also receive information about the key for wich the watermark should be
> calculated etc.
>
> best, Stephan
>
>
> On 14 Nov 2016, at 15:17, Fabian Hueske-2 [via Apache Flink User Mailing
> List archive.] <[hidden email]
> <http:///user/SendEmail.jtp?type=node&node=10098&i=0>> wrote:
>
> Hi Stephan,
>
> I'm skeptical about two things:
> - using processing time will result in inaccurately bounded aggregates (or
> do you want to group by event time in a processing time window?)
> - writing to and reading from Cassandra might be expensive (not sure what
> you mean by cheaper in the end) and it is not integrated with Flink's
> checkpointing mechanism for fault-tolerance.
>
> To me, the stateful FlatMapOperator looks like the best approach. There is
> an upcoming feature for registering timers in user-functions, i.e., a
> function is called after the timer exceeds. This could be helpful to
> overcome the problem of closing the window without new data.
>
> Best,
> Fabian
>
>
> 2016-11-14 8:39 GMT+01:00 Stephan Epping <<a
> href="x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=0"
> target="_top" rel="nofollow" link="external" class="">[hidden email]>:
>
> Hello Fabian,
>
> Thank you very much. What is your opinion on the following solution:
>
> - Window data per time window, e.g. 15 minutes
> - using processing time as trigger, e.g. 15 minutes
> - which results in an aggregate over sensor values
> - then use cassandra to select the previous aggregate (as there can be
> multiple for the time window due to processing time)
> - then update the aggregate and put it into a cassandra sink again
>
> The cassandra select will be a bit slower than using an in memory/flink
> state, but will be cheaper in the end. Further, what does this have for
> consequences?
> For example, replaying events will be more difficult, right? Also, what
> about Snapshots? Will they work with the mentioned design?
>
> kind regards,
> Stephan
>
> On 11 Nov 2016, at 00:39, Fabian Hueske <<a
> href="x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=1"
> target="_top" rel="nofollow" link="external" class="">[hidden email]> wrote:
>
> Hi Stephan,
>
> I just wrote an answer to your SO question.
>
> Best, Fabian
>
>
> 2016-11-10 11:01 GMT+01:00 Stephan Epping <<a
> href="x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=2"
> target="_top" rel="nofollow" link="external" class="">[hidden email]>:
>
>
> Hello,
>
> I found this question in the Nabble archive (
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html)
> but was unable/dont know how to reply.
>
> Here is my question regarding the mentioned thread:
>
> Hello,
>
> I have similar requirements (see StackOverflor
> http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data).
> I am pretty new to flink, could you elaborate on a possible solution? We
> can guarantee good ordering by sensor_id, thus watermarking by key would be
> the only reasonable way for us (
> *sensorData.keyBy('id').timeWindow(1.minute).sum('value')*), could I do
> my own watermarking after*sensorData.keyBy('id').overwriteWatermarking()*...
> per key? Or maybe using custom state plus a custom trigger? What happens if
> a sensor dies or is being removed completely, how can this be detected as
> watermarks would be ignored for window garbage collection. Or could we
> dynamically schedule a job of each sensor? Which would result in 1000 Jobs.
>
>
> Thanks,
> Stephan
>
>
>
>
>
>
>
> ------------------------------
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10094.html
> To unsubscribe from Maintaining watermarks per key, instead of per
> operator instance, click here.
> NAML
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>
>
>
> ------------------------------
> View this message in context: Re: Maintaining watermarks per key, instead
> of per operator instance
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10098.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/> at
> Nabble.com.
>

Reply via email to