Hi Stephan thanks that looks super. But source needs then to emit checkpoint. At the source, while reading source events I can find out that - this is the source event I want to take actions after. So if at ssource I can then emit checkpoint and catch it at the end of the DAG that would solve my problem (well, I also need to somehow distinguish my checkpoint from Flink's auto-generated ones).
Sorry for being too chatty, this is the topic where I need expert opinion, can't find out the answer by just googling. On Mon, Nov 30, 2015 at 11:07 AM, Stephan Ewen <se...@apache.org> wrote: > Hi Anton! > > That you can do! > > You can look at the interfaces "Checkpointed" and "checkpointNotifier". > There you will get a call at every checkpoint (and can look at what records > are before that checkpoint). You also get a call once the checkpoint is > complete, which corresponds to the point when everything has flown through > the DAG. > > I think it is nice to implement it like that, because it works > non-blocking: The stream continues while the the records-you-wait-for flow > through the DAG, and you get an asynchronous notification once they have > flown all the way through. > > Greetings, > Stephan > > > On Mon, Nov 30, 2015 at 11:03 AM, Anton Polyakov <polyakov.an...@gmail.com > > wrote: > >> I think I can turn my problem into a simpler one. >> >> Effectively what I need - I need way to checkpoint certain events in >> input stream and once this checkpoint reaches end of DAG take some action. >> So I need a signal at the sink which can tell "all events in source before >> checkpointed event are now processed". >> >> As far as I understand flagged record don't quite work since DAG doesn't >> propagate source events one-to-one. Some transformations might create 3 >> child events out of 1 source. If I want to make sure I fully processed >> source event, I need to wait till all childs are processed. >> >> >> >> On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov <polyakov.an...@gmail.com >> > wrote: >> >>> Hi Fabian >>> >>> Defining a special flag for record seems like a checkpoint barrier. I >>> think I will end up re-implementing checkpointing myself. I found the >>> discussion in flink-dev: >>> mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/… >>> <http://mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/%3CCA+faj9xDFAUG_zi==e2h8s-8r4cn8zbdon_hf+1rud5pjqv...@mail.gmail.com%3E> >>> which >>> seems to solve my task. Essentially they want to have a mechanism which >>> will mark record produced by job as “last” and then wait until it’s fully >>> propagated through DAG. Similarly to what I need. Essentially my job which >>> produces trades can also thought as being finished once it produced all >>> trades, then I just need to wait till latest trade produced by this job is >>> processed. >>> >>> So although windows can probably also be applied, I think propagating >>> barrier through DAG and checkpointing at final job is what I need. >>> >>> Can I possibly utilize internal Flink’s checkpoint barriers (i.e. like >>> triggering a custom checkoint or finishing streaming job)? >>> >>> On 24 Nov 2015, at 21:53, Fabian Hueske <fhue...@gmail.com> wrote: >>> >>> Hi Anton, >>> >>> If I got your requirements right, you are looking for a solution that >>> continuously produces updated partial aggregates in a streaming fashion. >>> When a special event (no more trades) is received, you would like to store >>> the last update as a final result. Is that correct? >>> >>> You can compute continuous updates using a reduce() or fold() function. >>> These will produce a new update for each incoming event. >>> For example: >>> >>> val s: DataStream[(Int, Long)] = ... >>> s.keyBy(_._1) >>> .reduce( (x,y) => (x._1, y._2 + y._2) ) >>> >>> would continuously compute a sum for every key (_._1) and produce an >>> update for each incoming record. >>> >>> You could add a flag to the record and implement a ReduceFunction that >>> marks a record as final when the no-more-trades event is received. >>> With a filter and a data sink you could emit such final records to a >>> persistent data store. >>> >>> Btw.: You can also define custom trigger policies for windows. A custom >>> trigger is called for each element that is added to a window and when >>> certain timers expire. For example with a custom trigger, you can evaluate >>> a window for every second element that is added. You can also define >>> whether the elements in the window should be retained or removed after the >>> evaluation. >>> >>> Best, Fabian >>> >>> >>> >>> 2015-11-24 21:32 GMT+01:00 Anton Polyakov <polyakov.an...@gmail.com>: >>> >>>> Hi Max >>>> >>>> thanks for reply. From what I understand window works in a way that it >>>> buffers records while window is open, then apply transformation once window >>>> close is triggered and pass transformed result. >>>> In my case then window will be open for few hours, then the whole >>>> amount of trades will be processed once window close is triggered. Actually >>>> I want to process events as they are produced without buffering them. It is >>>> more like a stream with some special mark versus windowing seems more like >>>> a batch (if I understand it correctly). >>>> >>>> In other words - buffering and waiting for window to close, then >>>> processing will be equal to simply doing one-off processing when all events >>>> are produced. I am looking for a solution when I am processing events as >>>> they are produced and when source signals "done" my processing is also >>>> nearly done. >>>> >>>> >>>> On Tue, Nov 24, 2015 at 2:41 PM, Maximilian Michels <m...@apache.org> >>>> wrote: >>>> >>>>> Hi Anton, >>>>> >>>>> You should be able to model your problem using the Flink Streaming >>>>> API. The actions you want to perform on the streamed records >>>>> correspond to transformations on Windows. You can indeed use >>>>> Watermarks to signal the window that a threshold for an action has >>>>> been reached. Otherwise an eviction policy should also do it. >>>>> >>>>> Without more details about what you want to do I can only refer you to >>>>> the streaming API documentation: >>>>> Please see >>>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html >>>>> >>>>> Thanks, >>>>> Max >>>>> >>>>> On Sun, Nov 22, 2015 at 8:53 PM, Anton Polyakov >>>>> <polyakov.an...@gmail.com> wrote: >>>>> > Hi >>>>> > >>>>> > I am very new to Flink and in fact never used it. My task (which I >>>>> currently solve using home grown Redis-based solution) is quite simple - I >>>>> have a system which produces some events (trades, it is a financial >>>>> system) >>>>> and computational chain which computes some measure accumulatively over >>>>> these events. Those events form a long but finite stream, they are >>>>> produced >>>>> as a result of end of day flow. Computational logic forms a processing DAG >>>>> which computes some measure over these events (VaR). Each trade is >>>>> processed through DAG and at different stages might produce different set >>>>> of subsequent events (like return vectors), eventually they all arrive >>>>> into >>>>> some aggregator which computes accumulated measure (reducer). >>>>> > >>>>> > Ideally I would like to process trades as they appear (i.e. stream >>>>> them) and once producer reaches end of portfolio (there will be no more >>>>> trades), I need to write final resulting measure and mark it as “end of >>>>> day >>>>> record”. Of course I also could use a classical batch - i.e. wait until >>>>> all >>>>> trades are produced and then batch process them, but this will be too >>>>> inefficient. >>>>> > >>>>> > If I use Flink, I will need a sort of watermark saying - “done, no >>>>> more trades” and once this watermark reaches end of DAG, final measure can >>>>> be saved. More generally would be cool to have an indication at the end of >>>>> DAG telling to which input stream position current measure corresponds. >>>>> > >>>>> > I feel my problem is very typical yet I can’t find any solution. All >>>>> examples operate either on infinite streams where nobody cares about >>>>> completion or classical batch examples which rely on fact all input data >>>>> is >>>>> ready. >>>>> > >>>>> > Can you please hint me. >>>>> > >>>>> > Thank you vm >>>>> > Anton >>>>> >>>> >>>> >>> >>> >> >