Re: Clean GlobalWidnow state

2017-09-29 Thread Fabian Hueske
Thanks for creating the JIRA issue! Best, Fabian 2017-09-20 12:26 GMT+02:00 gerardg : > I have prepared a repo that reproduces the issue: > https://github.com/GerardGarcia/flink-global-window-growing-state > > Maybe this way it is easier to spot the error or we can determine if it is > a > bug.

Re: Clean GlobalWidnow state

2017-09-20 Thread gerardg
I have prepared a repo that reproduces the issue: https://github.com/GerardGarcia/flink-global-window-growing-state Maybe this way it is easier to spot the error or we can determine if it is a bug. Gerard -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Clean GlobalWidnow state

2017-09-19 Thread gerardg
The UUIDs are assigned. As far as I can see (inspecting the metrics and how the task behaves) the mergeElements apply function receives all the elements (the main element and the other elements that it expects) so it seems that the correlation is correct. Also, nothing indicates that there are el

Re: Clean GlobalWidnow state

2017-09-19 Thread Aljoscha Krettek
Hi, Are the UUIDs randomly generated when calling .uuid or are they assigned and then .uuid will return the same UUID when calling multiple times? The latter would be problematic because we would not correctly assign state. Best, Aljoscha > On 19. Sep 2017, at 11:41, Fabian Hueske wrote: > >

Re: Clean GlobalWidnow state

2017-09-19 Thread Fabian Hueske
If this would be the case, that would be a bug in Flink. As I said before, your implementation looked good to me. All state of window and trigger should be wiped if the trigger returns FIRE_AND_PURGE (or PURGE) and it's clean() method is correctly implemented. I'll CC Aljoscha again for his opinio

Re: Clean GlobalWidnow state

2017-09-19 Thread gerardg
Thanks Fabian, I'll take a look to these improvements. I was wondering if the increasing state size could be due to that the UUID used in the keyBy are randomly generated. Maybe even if I correctly delete all the state related to a given key there is still some metadata related to the key wanderin

Re: Clean GlobalWidnow state

2017-09-19 Thread Fabian Hueske
Hi Gerard, I had a look at your Trigger implementation but did not spot something suspicious that would cause the state size to grow. However, I notices a few things that can be improved: - use ctx.getCurrentProcessingTime instead of System.currentTimeMillis to make the Trigger easier to test (th

Re: Clean GlobalWidnow state

2017-09-18 Thread gerardg
I may be able to better know what is happening if I could get what is being stored in the state. Is there any way to read the RocksDB db state? Gerard -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Clean GlobalWidnow state

2017-09-15 Thread gerardg
I'm using nabble and seems that it has removed the code between raw tags. Here it is again: import com.typesafe.scalalogging.LazyLogging import org.apache.flink.api.common.functions.ReduceFunction import org.apache.flink.api.common.state.{ReducingStateDescriptor, ValueStateDescriptor} import org.a

Re: Clean GlobalWidnow state

2017-09-15 Thread Aljoscha Krettek
Sure, but how does the Trigger actually work? > On 15. Sep 2017, at 12:20, gerardg wrote: > > Sure: > > > > The application is configured to use processing time. > > Thanks, > > Gerard > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Clean GlobalWidnow state

2017-09-15 Thread gerardg
Sure: The application is configured to use processing time. Thanks, Gerard -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Clean GlobalWidnow state

2017-09-15 Thread Aljoscha Krettek
Hi, Could you maybe show the code of your trigger? Best, Aljoscha > On 15. Sep 2017, at 11:39, gerardg wrote: > > Hi, > > I have the following operator: > > mainStream > .coGroup(coStream) > .where(_.uuid).equalTo(_.uuid) > .window(GlobalWindows.create()) > .trigger(trigg

Clean GlobalWidnow state

2017-09-15 Thread gerardg
Hi, I have the following operator: mainStream .coGroup(coStream) .where(_.uuid).equalTo(_.uuid) .window(GlobalWindows.create()) .trigger(triggerWhenAllReceived) .apply(mergeElements) TLDR; It seems that the checkpointed state of the operator keeps growing forever ev