Sorry my earlier comment should read: "It would just read all the files in
order and NOT worry about which data rows are in which files"

On Fri, Jan 18, 2019 at 10:00 AM Jamie Grier <jgr...@lyft.com> wrote:

> Hmm..  I would have to look into the code for the StreamingFileSink more
> closely to understand the concern but typically you should not be concerned
> at all with *when* checkpoints happen.  They are meant to be a completely
> asynchronous background process that has absolutely no bearing on
> application semantics.  The output should be thought of as a stream rather
> than a snapshot.
>
> Can you rework the downstream consumer of the output data such that you
> don't have to worry about this?  It would just read all the files in order
> and worry about which data rows are in which files.
>
> Anyway, maybe Kostas can add more since he wrote the StreamingFileSink
> code.  I've cc'd him directly.
>
> -Jamie
>
>
> On Fri, Jan 18, 2019 at 9:44 AM Cristian C <cristian.k...@gmail.com>
> wrote:
>
>> Well, the problem is that, conceptually, the way I'm trying to approach
>> this is ok. But in practice, it has some edge cases.
>>
>> So back to my original premise: if you both, trigger and checkpoint
>> happen around the same time, there is a chance that the streaming file sink
>> rolls the bucket BEFORE it has received all the data. In other words, it
>> would create incomplete snapshots of the table.
>>
>> Keep in mind that every snapshot is written to a different folder. And
>> they are supposed to represent the state of the whole table at a point in
>> time.
>>
>> On Fri, Jan 18, 2019, 8:26 AM Jamie Grier <jgr...@lyft.com wrote:
>>
>>> Oh sorry..  Logically, since the ContinuousProcessingTimeTrigger never
>>> PURGES but only FIRES what I said is semantically true.  The window
>>> contents are never cleared.
>>>
>>> What I missed is that in this case since you're using a function that
>>> incrementally reduces on the fly rather than processing all the data when
>>> it's triggered your state is always kept to one element per key.  Your'e
>>> correct but in general with non-incremental window functions the state
>>> would grow unbounded in this configuration.
>>>
>>> So it looks like your approach should work just fine.
>>>
>>> -Jamie
>>>
>>>
>>>
>>> On Thu, Jan 17, 2019 at 10:18 PM knur <cristian.k...@gmail.com> wrote:
>>>
>>>> Hello Jamie.
>>>>
>>>> Thanks for taking a look at this. So, yes, I want to write only the last
>>>> data for each key every X minutes. In other words, I want a snapshot of
>>>> the
>>>> whole database every X minutes.
>>>>
>>>> >  The issue is that the window never get's PURGED so the data just
>>>> > continues to accumulate in the window.  This will grow without bound.
>>>>
>>>> The window not being purged does not necessarily mean that the data
>>>> will be
>>>> accumulated indefinitely. How so? Well, Flink has two mechanisms to
>>>> remove
>>>> data from a window: triggering a FIRE/FIRE_AND_PURGE or using an
>>>> evictor.
>>>>
>>>> The reduce function has an implicit evictor that automatically removes
>>>> events from the window pane that are no longer needed. i.e. it keeps in
>>>> state only the element that was reduced. Here is an example:
>>>>
>>>>     env.socketTextStream("localhost", 9999)
>>>>       .keyBy { it.first().toString() }
>>>>       .window(GlobalWindows.create())
>>>>
>>>>
>>>> .trigger(ContinuousProcessingTimeTrigger.of(WindowTime.seconds(seconds)))
>>>>       .reduce { left, right ->
>>>>         println("left: $left, right: $right")
>>>>         if (left.length > right.length) {
>>>>           left
>>>>         } else {
>>>>           right
>>>>         }
>>>>       }
>>>>       .printToErr()
>>>>
>>>> For your claim to hold true, every time the trigger fires one would
>>>> expect
>>>> to see ALL the elements by a key being printed over and over again in
>>>> the
>>>> reduce function. However, if you run a job similar to this one in your
>>>> lang
>>>> of choice, you will notice that the print statement is effectively
>>>> called
>>>> only once per event per key.
>>>>
>>>> In fact, not using purge is intentional. Because I want to hold every
>>>> record
>>>> (the last one by its primary key) of the database in state so that I can
>>>> write a snapshot of the whole database.
>>>>
>>>> So for instance, let's say my table has two columns: id and time. And I
>>>> have
>>>> the following events:
>>>>
>>>> 1,January
>>>> 2,February
>>>> 1,March
>>>>
>>>> I want to write to S3 two records: "1,March", and "2,February".
>>>>
>>>> Now, let's say two more events come into the stream:
>>>>
>>>> 3,April
>>>> 1,June
>>>>
>>>> Then I want to write to S3 three records: "1,June", "2,February" and
>>>> "3,April".
>>>>
>>>> In other words, I can't just purge the windows, because I would lose the
>>>> record with id 2.
>>>>
>>>>
>>>>
>>>> --
>>>> Sent from:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>>
>>>

Reply via email to