I created this Jira issue for per-window state: https://issues.apache.org/jira/browse/FLINK-5929 Would that suit your needs? If yes, please comment on the issue. I think it would be a very nice addition that opens up a lot of possibilities.
Regarding access to the job id in the RuntimeContext, I think that's not possible right now. Please open an issue for that if you like. It will look better coming from a user, I think. :-) Best, Aljoscha On Fri, 24 Feb 2017 at 21:01 Seth Wiesman <swies...@mediamath.com> wrote: > Also while I’ve got you, is it possible to get the job id from the runtime > context? > > > > Seth Wiesman > > > > *From: *Seth Wiesman <swies...@mediamath.com> > *Reply-To: *"user@flink.apache.org" <user@flink.apache.org> > *Date: *Friday, February 24, 2017 at 2:51 PM > > > *To: *"user@flink.apache.org" <user@flink.apache.org> > *Subject: *Re: List State in RichWindowFunction leads to RocksDb memory > leak > > > > Appreciate you getting back to me. > > > > ProcessWindowFunction does look interesting and expect that it will be > what I move to in the future. However, even if it did currently have the > functionality that I need today I don’t think I would be comfortable moving > to a snapshot version so soon after migrating to 1.2. > > > > With the count window: I was actually using a time window with a count > trigger (stream.timeWindow().allowedLateness().trigger(Count.of(1))). The > issue appeared to have less to do with state size expanding and more to do > with checkpoint buffers being blocked somewhere along the pipeline. I > decided to move away from this idea shortly after sending my last email so > I don’t have any real insight into what was wrong. > > > > I understand not wanting to break things for people who expect state to be > global and do not expect to see any api’s change J. > > > > The solution I ended up setting on was copying the window operator and > giving the window function access to the trigger context; luckily it was a > fairly trivial change to make. With that I am able to keep everything > scoped to the correct namespace and clean everything up when the window is > discarded. Is the plan for context in ProcessWindowFunction eventually have > access to scoped partitioned state or just timing? There are several things > I have coming down the pipeline that require coordination between window > evaluations. > > > > Thank you again for all the help. > > > > Seth Wiesman > > > > > > *From: *Aljoscha Krettek <aljos...@apache.org> > *Reply-To: *"user@flink.apache.org" <user@flink.apache.org> > *Date: *Friday, February 24, 2017 at 12:09 PM > *To: *"user@flink.apache.org" <user@flink.apache.org> > *Subject: *Re: List State in RichWindowFunction leads to RocksDb memory > leak > > > > Hi Seth, > > yes, this is a thorny problem but I actually see one additional possible > solution (that will, however, break other possible use cases. > > > > First, regarding your solution 1): > > We are working on adding this for ProcessWindowFunction: > https://issues.apache.org/jira/browse/FLINK-4953. ProcessWindowFunction > is a more powerful interface that allows querying more context about a > window firing. This will replace the current WindowFunction in the future. > Unfortunately this doesn't help you with your current situation. > > > > About 2), do you have any idea why the state is getting so big? Do you see > the state of the second (count) window operator growing very large? The > problem with count windows is that they never get garbage collected if you > don't reach the count required by a Trigger. If you have an evolving key > space this means that your state will possibly grow forever. > > > > The third solution that I can think of is to make state of a window > function implicitly scoped to both the key and window. Right now, state is > "global" across time and only scoped to a key. If we also scoped to the > window we could keep track of all state created for a window and then > garbage collect that once the window expires. This, however, will break > things for people that rely on this state being global. I'll bring this up > on the dev mailing list to see what people think about it? Are you also > following that one? So that you could chime in. > > > > I'm afraid I don't have a good solution for you before Flink 1.3 come out, > other than writing your own custom operator or copying the WindowOperator. > > > > What do you think? > > > > Best, > > Aljoscha > > On Thu, 23 Feb 2017 at 16:12 Seth Wiesman <swies...@mediamath.com> wrote: > > I am working on a program that uses a complex window and have run into > some issues. It is a 1 hour window with 7 days allowed lateness including a > custom trigger that gives us intermediate results every 5 minutes of > processing time until the end of 7 days event time when a final fire is > triggered and the window is purged. The window functions are an incremental > reduce function as well as a RichWindowFunction which performs some final > computation before outputting each result. I am building up a collection of > objects so each time the RichWindowFunction is run I want to take a diff > with the previous set to only output elements that have changed. > > > > Example: > > > > //In reality I am working with more complex objects than ints. > > class CustomRichWindowFunction extends RichWindowRunction[Collection[Int], > Int, Key, TimeWindow] { > > @transient var state: ListState[Int]= _ > > > > override def open(parameters: Configuration): Unit = { > > val info = new ListStateDescriptor(“previous”, > createTypeInformation[Int]) > > state = getRuntimeContext.getListState(info) > > } > > > > override def apply(key: Key, window: TimeWindow, input: > Iterable[Collection[Int]], out: Collector[Int]): Unit = { > > val current = input.iterator.next > > val previous = state.get().iterator.asScala.toSet > > previous.clear() > > > > for (elem <- current) { > if (!previous.contains(elem)) { > > out.collect(elem) > > } > > > > state.add(elem) //store for the next run > > } > > } > > } > > > > The issue with this is that it causes a memory leak with RocksDb. When the > WindowOperator executes clearAllState > <https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L527> > at the end of the windows lifetime it does not clear the ListState or any > other type of custom partitioned state that may have been created. This > causes my state size to grow indefinitely. It appears to me that a > RichWindowFunction should have a clear method, similar to triggers, for > cleaning up state when the window is destroyed. > > > > Barring that I can envision two ways of solving this problem but have come > short of successfully implementing them. > > > > 1) If I had access to the watermark from within apply I could use > that in conjuction with the TimeWindow passed in and be able to tell if it > was my final EventTimeTimer that had gone off allowing me to manually clear > the state: > > > > ie: if (watermark < window.getEnd + Time.days(7).getMilliseconds) { > > state.add(elem) // I know that my window > is not finished so I can store state. > > } > > > > 2) Pass my elements into a second window with a count trigger of 1 > and a custom evictor which always keeps the two most recent elements and > then do my diff there. > > Semantically this seems to work but in practice it causes my checkpoint > times to grow 10x and I seem to fail every 5th-7th checkpoint. > > > > I am curious if anyone here has any ideas of what I might be able to do to > solve this problem. > > > > Thank you, > > > > Seth Wiesman > >