You cannot force a barrier at one point in time. At what time checkpoints are triggered is decided by the master node.
I think in your case you can use the checkpoint and notification calls to figure out when data has flown through the DAG, but you cannot force a barrier at a specific point. On Mon, Nov 30, 2015 at 3:33 PM, Anton Polyakov <polyakov.an...@gmail.com> wrote: > Hi Stephan > > sorry for misunderstanding, but how do I make sure barrier is placed at > the proper time? How does my source "force" checkpoint to start happening > once it finds that all needed elements are now produced? > > On Mon, Nov 30, 2015 at 2:13 PM, Stephan Ewen <se...@apache.org> wrote: > >> Hi! >> >> If you implement the "Checkpointed" interface, you get the function calls >> to "snapshotState()" at the point when the checkpoint barrier arrives at an >> operator. So, the call to "snapshotState()" in the sink is when the barrier >> reaches the sink. The call to "checkpointComplete()" in the sources comes >> after all barriers have reached all sinks. >> >> Have a look here for an illustration about barriers flowing with the >> stream: >> https://ci.apache.org/projects/flink/flink-docs-release-0.10/internals/stream_checkpointing.html >> >> Stephan >> >> >> On Mon, Nov 30, 2015 at 11:51 AM, Anton Polyakov < >> polyakov.an...@gmail.com> wrote: >> >>> Hi Stephan >>> >>> thanks that looks super. But source needs then to emit checkpoint. At >>> the source, while reading source events I can find out that - this is the >>> source event I want to take actions after. So if at ssource I can then emit >>> checkpoint and catch it at the end of the DAG that would solve my problem >>> (well, I also need to somehow distinguish my checkpoint from Flink's >>> auto-generated ones). >>> >>> Sorry for being too chatty, this is the topic where I need expert >>> opinion, can't find out the answer by just googling. >>> >>> >>> On Mon, Nov 30, 2015 at 11:07 AM, Stephan Ewen <se...@apache.org> wrote: >>> >>>> Hi Anton! >>>> >>>> That you can do! >>>> >>>> You can look at the interfaces "Checkpointed" and "checkpointNotifier". >>>> There you will get a call at every checkpoint (and can look at what records >>>> are before that checkpoint). You also get a call once the checkpoint is >>>> complete, which corresponds to the point when everything has flown through >>>> the DAG. >>>> >>>> I think it is nice to implement it like that, because it works >>>> non-blocking: The stream continues while the the records-you-wait-for flow >>>> through the DAG, and you get an asynchronous notification once they have >>>> flown all the way through. >>>> >>>> Greetings, >>>> Stephan >>>> >>>> >>>> On Mon, Nov 30, 2015 at 11:03 AM, Anton Polyakov < >>>> polyakov.an...@gmail.com> wrote: >>>> >>>>> I think I can turn my problem into a simpler one. >>>>> >>>>> Effectively what I need - I need way to checkpoint certain events in >>>>> input stream and once this checkpoint reaches end of DAG take some action. >>>>> So I need a signal at the sink which can tell "all events in source before >>>>> checkpointed event are now processed". >>>>> >>>>> As far as I understand flagged record don't quite work since DAG >>>>> doesn't propagate source events one-to-one. Some transformations might >>>>> create 3 child events out of 1 source. If I want to make sure I fully >>>>> processed source event, I need to wait till all childs are processed. >>>>> >>>>> >>>>> >>>>> On Sun, Nov 29, 2015 at 4:12 PM, Anton Polyakov < >>>>> polyakov.an...@gmail.com> wrote: >>>>> >>>>>> Hi Fabian >>>>>> >>>>>> Defining a special flag for record seems like a checkpoint barrier. I >>>>>> think I will end up re-implementing checkpointing myself. I found the >>>>>> discussion in flink-dev: >>>>>> mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/… >>>>>> <http://mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/%3CCA+faj9xDFAUG_zi==e2h8s-8r4cn8zbdon_hf+1rud5pjqv...@mail.gmail.com%3E> >>>>>> which >>>>>> seems to solve my task. Essentially they want to have a mechanism which >>>>>> will mark record produced by job as “last” and then wait until it’s fully >>>>>> propagated through DAG. Similarly to what I need. Essentially my job >>>>>> which >>>>>> produces trades can also thought as being finished once it produced all >>>>>> trades, then I just need to wait till latest trade produced by this job >>>>>> is >>>>>> processed. >>>>>> >>>>>> So although windows can probably also be applied, I think propagating >>>>>> barrier through DAG and checkpointing at final job is what I need. >>>>>> >>>>>> Can I possibly utilize internal Flink’s checkpoint barriers (i.e. >>>>>> like triggering a custom checkoint or finishing streaming job)? >>>>>> >>>>>> On 24 Nov 2015, at 21:53, Fabian Hueske <fhue...@gmail.com> wrote: >>>>>> >>>>>> Hi Anton, >>>>>> >>>>>> If I got your requirements right, you are looking for a solution that >>>>>> continuously produces updated partial aggregates in a streaming fashion. >>>>>> When a special event (no more trades) is received, you would like to >>>>>> store >>>>>> the last update as a final result. Is that correct? >>>>>> >>>>>> You can compute continuous updates using a reduce() or fold() >>>>>> function. These will produce a new update for each incoming event. >>>>>> For example: >>>>>> >>>>>> val s: DataStream[(Int, Long)] = ... >>>>>> s.keyBy(_._1) >>>>>> .reduce( (x,y) => (x._1, y._2 + y._2) ) >>>>>> >>>>>> would continuously compute a sum for every key (_._1) and produce an >>>>>> update for each incoming record. >>>>>> >>>>>> You could add a flag to the record and implement a ReduceFunction >>>>>> that marks a record as final when the no-more-trades event is received. >>>>>> With a filter and a data sink you could emit such final records to a >>>>>> persistent data store. >>>>>> >>>>>> Btw.: You can also define custom trigger policies for windows. A >>>>>> custom trigger is called for each element that is added to a window and >>>>>> when certain timers expire. For example with a custom trigger, you can >>>>>> evaluate a window for every second element that is added. You can also >>>>>> define whether the elements in the window should be retained or removed >>>>>> after the evaluation. >>>>>> >>>>>> Best, Fabian >>>>>> >>>>>> >>>>>> >>>>>> 2015-11-24 21:32 GMT+01:00 Anton Polyakov <polyakov.an...@gmail.com>: >>>>>> >>>>>>> Hi Max >>>>>>> >>>>>>> thanks for reply. From what I understand window works in a way that >>>>>>> it buffers records while window is open, then apply transformation once >>>>>>> window close is triggered and pass transformed result. >>>>>>> In my case then window will be open for few hours, then the whole >>>>>>> amount of trades will be processed once window close is triggered. >>>>>>> Actually >>>>>>> I want to process events as they are produced without buffering them. >>>>>>> It is >>>>>>> more like a stream with some special mark versus windowing seems more >>>>>>> like >>>>>>> a batch (if I understand it correctly). >>>>>>> >>>>>>> In other words - buffering and waiting for window to close, then >>>>>>> processing will be equal to simply doing one-off processing when all >>>>>>> events >>>>>>> are produced. I am looking for a solution when I am processing events as >>>>>>> they are produced and when source signals "done" my processing is also >>>>>>> nearly done. >>>>>>> >>>>>>> >>>>>>> On Tue, Nov 24, 2015 at 2:41 PM, Maximilian Michels <m...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Anton, >>>>>>>> >>>>>>>> You should be able to model your problem using the Flink Streaming >>>>>>>> API. The actions you want to perform on the streamed records >>>>>>>> correspond to transformations on Windows. You can indeed use >>>>>>>> Watermarks to signal the window that a threshold for an action has >>>>>>>> been reached. Otherwise an eviction policy should also do it. >>>>>>>> >>>>>>>> Without more details about what you want to do I can only refer you >>>>>>>> to >>>>>>>> the streaming API documentation: >>>>>>>> Please see >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Max >>>>>>>> >>>>>>>> On Sun, Nov 22, 2015 at 8:53 PM, Anton Polyakov >>>>>>>> <polyakov.an...@gmail.com> wrote: >>>>>>>> > Hi >>>>>>>> > >>>>>>>> > I am very new to Flink and in fact never used it. My task (which >>>>>>>> I currently solve using home grown Redis-based solution) is quite >>>>>>>> simple - >>>>>>>> I have a system which produces some events (trades, it is a financial >>>>>>>> system) and computational chain which computes some measure >>>>>>>> accumulatively >>>>>>>> over these events. Those events form a long but finite stream, they are >>>>>>>> produced as a result of end of day flow. Computational logic forms a >>>>>>>> processing DAG which computes some measure over these events (VaR). >>>>>>>> Each >>>>>>>> trade is processed through DAG and at different stages might produce >>>>>>>> different set of subsequent events (like return vectors), eventually >>>>>>>> they >>>>>>>> all arrive into some aggregator which computes accumulated measure >>>>>>>> (reducer). >>>>>>>> > >>>>>>>> > Ideally I would like to process trades as they appear (i.e. >>>>>>>> stream them) and once producer reaches end of portfolio (there will be >>>>>>>> no >>>>>>>> more trades), I need to write final resulting measure and mark it as >>>>>>>> “end >>>>>>>> of day record”. Of course I also could use a classical batch - i.e. >>>>>>>> wait >>>>>>>> until all trades are produced and then batch process them, but this >>>>>>>> will be >>>>>>>> too inefficient. >>>>>>>> > >>>>>>>> > If I use Flink, I will need a sort of watermark saying - “done, >>>>>>>> no more trades” and once this watermark reaches end of DAG, final >>>>>>>> measure >>>>>>>> can be saved. More generally would be cool to have an indication at >>>>>>>> the end >>>>>>>> of DAG telling to which input stream position current measure >>>>>>>> corresponds. >>>>>>>> > >>>>>>>> > I feel my problem is very typical yet I can’t find any solution. >>>>>>>> All examples operate either on infinite streams where nobody cares >>>>>>>> about >>>>>>>> completion or classical batch examples which rely on fact all input >>>>>>>> data is >>>>>>>> ready. >>>>>>>> > >>>>>>>> > Can you please hint me. >>>>>>>> > >>>>>>>> > Thank you vm >>>>>>>> > Anton >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >