Sorry my earlier comment should read: "It would just read all the files in order and NOT worry about which data rows are in which files"
On Fri, Jan 18, 2019 at 10:00 AM Jamie Grier <jgr...@lyft.com> wrote: > Hmm.. I would have to look into the code for the StreamingFileSink more > closely to understand the concern but typically you should not be concerned > at all with *when* checkpoints happen. They are meant to be a completely > asynchronous background process that has absolutely no bearing on > application semantics. The output should be thought of as a stream rather > than a snapshot. > > Can you rework the downstream consumer of the output data such that you > don't have to worry about this? It would just read all the files in order > and worry about which data rows are in which files. > > Anyway, maybe Kostas can add more since he wrote the StreamingFileSink > code. I've cc'd him directly. > > -Jamie > > > On Fri, Jan 18, 2019 at 9:44 AM Cristian C <cristian.k...@gmail.com> > wrote: > >> Well, the problem is that, conceptually, the way I'm trying to approach >> this is ok. But in practice, it has some edge cases. >> >> So back to my original premise: if you both, trigger and checkpoint >> happen around the same time, there is a chance that the streaming file sink >> rolls the bucket BEFORE it has received all the data. In other words, it >> would create incomplete snapshots of the table. >> >> Keep in mind that every snapshot is written to a different folder. And >> they are supposed to represent the state of the whole table at a point in >> time. >> >> On Fri, Jan 18, 2019, 8:26 AM Jamie Grier <jgr...@lyft.com wrote: >> >>> Oh sorry.. Logically, since the ContinuousProcessingTimeTrigger never >>> PURGES but only FIRES what I said is semantically true. The window >>> contents are never cleared. >>> >>> What I missed is that in this case since you're using a function that >>> incrementally reduces on the fly rather than processing all the data when >>> it's triggered your state is always kept to one element per key. Your'e >>> correct but in general with non-incremental window functions the state >>> would grow unbounded in this configuration. >>> >>> So it looks like your approach should work just fine. >>> >>> -Jamie >>> >>> >>> >>> On Thu, Jan 17, 2019 at 10:18 PM knur <cristian.k...@gmail.com> wrote: >>> >>>> Hello Jamie. >>>> >>>> Thanks for taking a look at this. So, yes, I want to write only the last >>>> data for each key every X minutes. In other words, I want a snapshot of >>>> the >>>> whole database every X minutes. >>>> >>>> > The issue is that the window never get's PURGED so the data just >>>> > continues to accumulate in the window. This will grow without bound. >>>> >>>> The window not being purged does not necessarily mean that the data >>>> will be >>>> accumulated indefinitely. How so? Well, Flink has two mechanisms to >>>> remove >>>> data from a window: triggering a FIRE/FIRE_AND_PURGE or using an >>>> evictor. >>>> >>>> The reduce function has an implicit evictor that automatically removes >>>> events from the window pane that are no longer needed. i.e. it keeps in >>>> state only the element that was reduced. Here is an example: >>>> >>>> env.socketTextStream("localhost", 9999) >>>> .keyBy { it.first().toString() } >>>> .window(GlobalWindows.create()) >>>> >>>> >>>> .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.seconds(seconds))) >>>> .reduce { left, right -> >>>> println("left: $left, right: $right") >>>> if (left.length > right.length) { >>>> left >>>> } else { >>>> right >>>> } >>>> } >>>> .printToErr() >>>> >>>> For your claim to hold true, every time the trigger fires one would >>>> expect >>>> to see ALL the elements by a key being printed over and over again in >>>> the >>>> reduce function. However, if you run a job similar to this one in your >>>> lang >>>> of choice, you will notice that the print statement is effectively >>>> called >>>> only once per event per key. >>>> >>>> In fact, not using purge is intentional. Because I want to hold every >>>> record >>>> (the last one by its primary key) of the database in state so that I can >>>> write a snapshot of the whole database. >>>> >>>> So for instance, let's say my table has two columns: id and time. And I >>>> have >>>> the following events: >>>> >>>> 1,January >>>> 2,February >>>> 1,March >>>> >>>> I want to write to S3 two records: "1,March", and "2,February". >>>> >>>> Now, let's say two more events come into the stream: >>>> >>>> 3,April >>>> 1,June >>>> >>>> Then I want to write to S3 three records: "1,June", "2,February" and >>>> "3,April". >>>> >>>> In other words, I can't just purge the windows, because I would lose the >>>> record with id 2. >>>> >>>> >>>> >>>> -- >>>> Sent from: >>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>>> >>>