I think I can turn my problem into a simpler one.

Effectively what I need - I need way to checkpoint certain events in input
stream and once this checkpoint reaches end of DAG take some action. So I
need a signal at the sink which can tell "all events in source before
checkpointed event are now processed".

As far as I understand flagged record don't quite work since DAG doesn't
propagate source events one-to-one. Some transformations might create 3
child events out of 1 source. If I want to make sure I fully processed
source event, I need to wait till all childs are processed.



On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov <polyakov.an...@gmail.com>
wrote:

> Hi Fabian
>
> Defining a special flag for record seems like a checkpoint barrier. I
> think I will end up re-implementing checkpointing myself. I found the
> discussion in flink-dev:
> mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/…
> <http://mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/%3CCA+faj9xDFAUG_zi==e2h8s-8r4cn8zbdon_hf+1rud5pjqv...@mail.gmail.com%3E>
>  which
> seems to solve my task. Essentially they want to have a mechanism which
> will mark record produced by job as “last” and then wait until it’s fully
> propagated through DAG. Similarly to what I need. Essentially my job which
> produces trades can also thought as being finished once it produced all
> trades, then I just need to wait till latest trade produced by this job is
> processed.
>
> So although windows can probably also be applied, I think propagating
> barrier through DAG and checkpointing at final job is what I need.
>
> Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like
> triggering a custom checkoint or finishing streaming job)?
>
> On 24 Nov 2015, at 21:53, Fabian Hueske <fhue...@gmail.com> wrote:
>
> Hi Anton,
>
> If I got your requirements right, you are looking for a solution that
> continuously produces updated partial aggregates in a streaming fashion.
> When a  special event (no more trades) is received, you would like to store
> the last update as a final result. Is that correct?
>
> You can compute continuous updates using a reduce() or fold() function.
> These will produce a new update for each incoming event.
> For example:
>
> val s: DataStream[(Int, Long)] = ...
> s.keyBy(_._1)
>   .reduce( (x,y) => (x._1, y._2 + y._2) )
>
> would continuously compute a sum for every key (_._1) and produce an
> update for each incoming record.
>
> You could add a flag to the record and implement a ReduceFunction that
> marks a record as final when the no-more-trades event is received.
> With a filter and a data sink you could emit such final records to a
> persistent data store.
>
> Btw.: You can also define custom trigger policies for windows. A custom
> trigger is called for each element that is added to a window and when
> certain timers expire. For example with a custom trigger, you can evaluate
> a window for every second element that is added. You can also define
> whether the elements in the window should be retained or removed after the
> evaluation.
>
> Best, Fabian
>
>
>
> 2015-11-24 21:32 GMT+01:00 Anton Polyakov <polyakov.an...@gmail.com>:
>
>> Hi Max
>>
>> thanks for reply. From what I understand window works in a way that it
>> buffers records while window is open, then apply transformation once window
>> close is triggered and pass transformed result.
>> In my case then window will be open for few hours, then the whole amount
>> of trades will be processed once window close is triggered. Actually I want
>> to process events as they are produced without buffering them. It is more
>> like a stream with some special mark versus windowing seems more like a
>> batch (if I understand it correctly).
>>
>> In other words - buffering and waiting for window to close, then
>> processing will be equal to simply doing one-off processing when all events
>> are produced. I am looking for a solution when I am processing events as
>> they are produced and when source signals "done" my processing is also
>> nearly done.
>>
>>
>> On Tue, Nov 24, 2015 at 2:41 PM, Maximilian Michels <m...@apache.org>
>> wrote:
>>
>>> Hi Anton,
>>>
>>> You should be able to model your problem using the Flink Streaming
>>> API. The actions you want to perform on the streamed records
>>> correspond to transformations on Windows. You can indeed use
>>> Watermarks to signal the window that a threshold for an action has
>>> been reached. Otherwise an eviction policy should also do it.
>>>
>>> Without more details about what you want to do I can only refer you to
>>> the streaming API documentation:
>>> Please see
>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html
>>>
>>> Thanks,
>>> Max
>>>
>>> On Sun, Nov 22, 2015 at 8:53 PM, Anton Polyakov
>>> <polyakov.an...@gmail.com> wrote:
>>> > Hi
>>> >
>>> > I am very new to Flink and in fact never used it. My task (which I
>>> currently solve using home grown Redis-based solution) is quite simple - I
>>> have a system which produces some events (trades, it is a financial system)
>>> and computational chain which computes some measure accumulatively over
>>> these events. Those events form a long but finite stream, they are produced
>>> as a result of end of day flow. Computational logic forms a processing DAG
>>> which computes some measure over these events (VaR). Each trade is
>>> processed through DAG and at different stages might produce different set
>>> of subsequent events (like return vectors), eventually they all arrive into
>>> some aggregator which computes accumulated measure (reducer).
>>> >
>>> > Ideally I would like to process trades as they appear (i.e. stream
>>> them) and once producer reaches end of portfolio (there will be no more
>>> trades), I need to write final resulting measure and mark it as “end of day
>>> record”. Of course I also could use a classical batch - i.e. wait until all
>>> trades are produced and then batch process them, but this will be too
>>> inefficient.
>>> >
>>> > If I use Flink, I will need a sort of watermark saying - “done, no
>>> more trades” and once this watermark reaches end of DAG, final measure can
>>> be saved. More generally would be cool to have an indication at the end of
>>> DAG telling to which input stream position current measure corresponds.
>>> >
>>> > I feel my problem is very typical yet I can’t find any solution. All
>>> examples operate either on infinite streams where nobody cares about
>>> completion or classical batch examples which rely on fact all input data is
>>> ready.
>>> >
>>> > Can you please hint me.
>>> >
>>> > Thank you vm
>>> > Anton
>>>
>>
>>
>
>

Reply via email to