I created this Jira issue for per-window state:
https://issues.apache.org/jira/browse/FLINK-5929 Would that suit your
needs? If yes, please comment on the issue. I think it would be a very nice
addition that opens up a lot of possibilities.

Regarding access to the job id in the RuntimeContext, I think that's not
possible right now. Please open an issue for that if you like. It will look
better coming from a user, I think. :-)

Best,
Aljoscha

On Fri, 24 Feb 2017 at 21:01 Seth Wiesman <swies...@mediamath.com> wrote:

> Also while I’ve got you, is it possible to get the job id from the runtime
> context?
>
>
>
> Seth Wiesman
>
>
>
> *From: *Seth Wiesman <swies...@mediamath.com>
> *Reply-To: *"user@flink.apache.org" <user@flink.apache.org>
> *Date: *Friday, February 24, 2017 at 2:51 PM
>
>
> *To: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: List State in RichWindowFunction leads to RocksDb memory
> leak
>
>
>
> Appreciate you getting back to me.
>
>
>
> ProcessWindowFunction does look interesting and expect that it will be
> what I move to in the future. However, even if it did currently have the
> functionality that I need today I don’t think I would be comfortable moving
> to a snapshot version so soon after migrating to 1.2.
>
>
>
> With the count window: I was actually using a time window with a count
> trigger (stream.timeWindow().allowedLateness().trigger(Count.of(1))). The
> issue appeared to have less to do with state size expanding and more to do
> with checkpoint buffers being blocked somewhere along the pipeline. I
> decided to move away from this idea shortly after sending my last email so
> I don’t have any real insight into what was wrong.
>
>
>
> I understand not wanting to break things for people who expect state to be
> global and do not expect to see any api’s change J.
>
>
>
> The solution I ended up setting on was copying the window operator and
> giving the window function access to the trigger context; luckily it was a
> fairly trivial change to make. With that I am able to keep everything
> scoped to the correct namespace and clean everything up when the window is
> discarded. Is the plan for context in ProcessWindowFunction eventually have
> access to scoped partitioned state or just timing? There are several things
> I have coming down the pipeline that require coordination between window
> evaluations.
>
>
>
> Thank you again for all the help.
>
>
>
> Seth Wiesman
>
>
>
>
>
> *From: *Aljoscha Krettek <aljos...@apache.org>
> *Reply-To: *"user@flink.apache.org" <user@flink.apache.org>
> *Date: *Friday, February 24, 2017 at 12:09 PM
> *To: *"user@flink.apache.org" <user@flink.apache.org>
> *Subject: *Re: List State in RichWindowFunction leads to RocksDb memory
> leak
>
>
>
> Hi Seth,
>
> yes, this is a thorny problem but I actually see one additional possible
> solution (that will, however, break other possible use cases.
>
>
>
> First, regarding your solution 1):
>
> We are working on adding this for ProcessWindowFunction:
> https://issues.apache.org/jira/browse/FLINK-4953. ProcessWindowFunction
> is a more powerful interface that allows querying more context about a
> window firing. This will replace the current WindowFunction in the future.
> Unfortunately this doesn't help you with your current situation.
>
>
>
> About 2), do you have any idea why the state is getting so big? Do you see
> the state of the second (count) window operator growing very large? The
> problem with count windows is that they never get garbage collected if you
> don't reach the count required by a Trigger. If you have an evolving key
> space this means that your state will possibly grow forever.
>
>
>
> The third solution that I can think of is to make state of a window
> function implicitly scoped to both the key and window. Right now, state is
> "global" across time and only scoped to a key. If we also scoped to the
> window we could keep track of all state created for a window and then
> garbage collect that once the window expires. This, however, will break
> things for people that rely on this state being global. I'll bring this up
> on the dev mailing list to see what people think about it? Are you also
> following that one? So that you could chime in.
>
>
>
> I'm afraid I don't have a good solution for you before Flink 1.3 come out,
> other than writing your own custom operator or copying the WindowOperator.
>
>
>
> What do you think?
>
>
>
> Best,
>
> Aljoscha
>
> On Thu, 23 Feb 2017 at 16:12 Seth Wiesman <swies...@mediamath.com> wrote:
>
> I am working on a program that uses a complex window and have run into
> some issues. It is a 1 hour window with 7 days allowed lateness including a
> custom trigger that gives us intermediate results every 5 minutes of
> processing time until the end of 7 days event time when a final fire is
> triggered and the window is purged. The window functions are an incremental
> reduce function as well as a RichWindowFunction which performs some final
> computation before outputting each result. I am building up a collection of
> objects so each time the RichWindowFunction is run I want to take a diff
> with the previous set to only output elements that have changed.
>
>
>
> Example:
>
>
>
> //In reality I am working with more complex objects than ints.
>
> class CustomRichWindowFunction extends RichWindowRunction[Collection[Int],
> Int, Key, TimeWindow] {
>
>                 @transient var state: ListState[Int]= _
>
>
>
>                 override def open(parameters: Configuration): Unit = {
>
>                 val info = new ListStateDescriptor(“previous”,
> createTypeInformation[Int])
>
>                 state = getRuntimeContext.getListState(info)
>
> }
>
>
>
> override def apply(key: Key, window: TimeWindow, input:
> Iterable[Collection[Int]], out: Collector[Int]): Unit = {
>
>                 val current = input.iterator.next
>
>                 val previous = state.get().iterator.asScala.toSet
>
>                 previous.clear()
>
>
>
> for (elem <- current) {
>                 if (!previous.contains(elem)) {
>
>                 out.collect(elem)
>
> }
>
>
>
> state.add(elem) //store for the next run
>
> }
>
> }
>
> }
>
>
>
> The issue with this is that it causes a memory leak with RocksDb. When the
> WindowOperator executes clearAllState
> <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L527>
> at the end of the windows lifetime it does not clear the ListState or any
> other type of custom partitioned state that may have been created. This
> causes my state size to grow indefinitely. It appears to me that a
> RichWindowFunction should have a clear method, similar to triggers, for
> cleaning up state when the window is destroyed.
>
>
>
> Barring that I can envision two ways of solving this problem but have come
> short of successfully implementing them.
>
>
>
> 1)       If I had access to the watermark from within apply I could use
> that in conjuction with the TimeWindow passed in and be able to tell if it
> was my final EventTimeTimer that had gone off allowing me to manually clear
> the state:
>
>
>
> ie: if (watermark < window.getEnd  + Time.days(7).getMilliseconds) {
>
>                                 state.add(elem) // I know that my window
> is not finished so I can store state.
>
>                      }
>
>
>
> 2)       Pass my elements into a second window with a count trigger of 1
> and a custom evictor which always keeps the two most recent elements and
> then do my diff there.
>
> Semantically this seems to work but in practice it causes my checkpoint
> times to grow 10x and I seem to fail every 5th-7th checkpoint.
>
>
>
> I am curious if anyone here has any ideas of what I might be able to do to
> solve this problem.
>
>
>
> Thank you,
>
>
>
> Seth Wiesman
>
>

Reply via email to