Oh sorry..  Logically, since the ContinuousProcessingTimeTrigger never
PURGES but only FIRES what I said is semantically true.  The window
contents are never cleared.

What I missed is that in this case since you're using a function that
incrementally reduces on the fly rather than processing all the data when
it's triggered your state is always kept to one element per key.  Your'e
correct but in general with non-incremental window functions the state
would grow unbounded in this configuration.

So it looks like your approach should work just fine.

-Jamie



On Thu, Jan 17, 2019 at 10:18 PM knur <cristian.k...@gmail.com> wrote:

> Hello Jamie.
>
> Thanks for taking a look at this. So, yes, I want to write only the last
> data for each key every X minutes. In other words, I want a snapshot of the
> whole database every X minutes.
>
> >  The issue is that the window never get's PURGED so the data just
> > continues to accumulate in the window.  This will grow without bound.
>
> The window not being purged does not necessarily mean that the data will be
> accumulated indefinitely. How so? Well, Flink has two mechanisms to remove
> data from a window: triggering a FIRE/FIRE_AND_PURGE or using an evictor.
>
> The reduce function has an implicit evictor that automatically removes
> events from the window pane that are no longer needed. i.e. it keeps in
> state only the element that was reduced. Here is an example:
>
>     env.socketTextStream("localhost", 9999)
>       .keyBy { it.first().toString() }
>       .window(GlobalWindows.create())
>
> .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.seconds(seconds)))
>       .reduce { left, right ->
>         println("left: $left, right: $right")
>         if (left.length > right.length) {
>           left
>         } else {
>           right
>         }
>       }
>       .printToErr()
>
> For your claim to hold true, every time the trigger fires one would expect
> to see ALL the elements by a key being printed over and over again in the
> reduce function. However, if you run a job similar to this one in your lang
> of choice, you will notice that the print statement is effectively called
> only once per event per key.
>
> In fact, not using purge is intentional. Because I want to hold every
> record
> (the last one by its primary key) of the database in state so that I can
> write a snapshot of the whole database.
>
> So for instance, let's say my table has two columns: id and time. And I
> have
> the following events:
>
> 1,January
> 2,February
> 1,March
>
> I want to write to S3 two records: "1,March", and "2,February".
>
> Now, let's say two more events come into the stream:
>
> 3,April
> 1,June
>
> Then I want to write to S3 three records: "1,June", "2,February" and
> "3,April".
>
> In other words, I can't just purge the windows, because I would lose the
> record with id 2.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>

Reply via email to