Hi Josh, for the first part of your question you might be interested in our ongoing work of adding side inputs to Flink. I started this design doc: https://docs.google.com/document/d/1hIgxi2Zchww_5fWUHLoYiXwSBXjv-M5eOv-MKQYN3m4/edit?usp=sharing
It's still somewhat rough around the edges but could you see this being useful for your case? I also have some more stuff that I will shortly add to the document. Cheers, Aljoscha On Tue, 24 May 2016 at 14:34 Maximilian Michels <m...@apache.org> wrote: > Hi Josh, > > You can trigger an occasional refresh, e.g. on every 100 elements > received. Or, you could start a thread that does that every 100 > seconds (possible with a lock involved to prevent processing in the > meantime). > > Cheers, > Max > > On Mon, May 23, 2016 at 7:36 PM, Josh <jof...@gmail.com> wrote: > > > > Hi Max, > > > > Thanks, that's very helpful re the REST API sink. For now I don't need > exactly once guarantees for the sink, so I'll just write a simple HTTP sink > implementation. But may need to move to the idempotent version in future! > > > > For 1), that sounds like a simple/easy solution, but how would I handle > occasional updates in that case, since I guess the open() function is only > called once? Do I need to periodically restart the job, or periodically > trigger tasks to restart and refresh their data? Ideally I would want this > job to be running constantly. > > > > Josh > > > > On Mon, May 23, 2016 at 5:56 PM, Maximilian Michels <m...@apache.org> > wrote: > >> > >> Hi Josh, > >> > >> 1) Use a RichFunction which has an `open()` method to load data (e.g. > from a database) at runtime before the processing starts. > >> > >> 2) No that's fine. If you want your Rest API Sink to interplay with > checkpointing (for fault-tolerance), this is a bit tricky though depending > on the guarantees you want to have. Typically, you would have "at least > once" or "exactly once" semantics on the state. In Flink, this is easy to > achieve, it's a bit harder for outside systems. > >> > >> "At Least Once" > >> > >> For example, if you increment a counter in a database, this count will > be off if you recover your job in the case of a failure. You can checkpoint > the current value of the counter and restore this value on a failure (using > the Checkpointed interface). However, your counter might decrease > temporarily when you resume from a checkpoint (until the counter has caught > up again). > >> > >> "Exactly Once" > >> > >> If you want "exactly once" semantics on outside systems (e.g. Rest > API), you'll need idempotent updates. An idempotent variant of this would > be a count with a checkpoint id (cid) in your database. > >> > >> | cid | count | > >> |-----+-------| > >> | 0 | 3 | > >> | 1 | 11 | > >> | 2 | 20 | > >> | 3 | 120 | > >> | 4 | 137 | > >> | 5 | 158 | > >> > >> You would then always read the newest cid value for presentation. You > would only write to the database once you know you have completed the > checkpoint (CheckpointListener). You can still fail while doing that, so > you need to keep the confirmation around in the checkpoint such that you > can confirm again after restore. It is important that confirmation can be > done multiple times without affecting the result (idempotent). On recovery > from a checkpoint, you want to delete all rows higher with a cid higher > than the one you resume from. For example, if you fail after checkpoint 3 > has been created, you'll confirm 3 (because you might have failed before > you could confirm) and then delete 4 and 5 before starting the computation > again. > >> > >> You see, that strong consistency guarantees can be a bit tricky. If you > don't need strong guarantees and undercounting is ok for you, implement a > simple checkpointing for "at least once" using the Checkpointed interface > or the KeyValue state if your counter is scoped by key. > >> > >> Cheers, > >> Max > >> > >> > >> On Mon, May 23, 2016 at 3:22 PM, Josh <jof...@gmail.com> wrote: > >> > Hi all, > >> > > >> > I am new to Flink and have a couple of questions which I've had > trouble > >> > finding answers to online. Any advice would be much appreciated! > >> > > >> > What's a typical way of handling the scenario where you want to join > >> > streaming data with a (relatively) static data source? For example, > if I > >> > have a stream 'orders' where each order has an 'item_id', and I want > to join > >> > this stream with my database of 'items'. The database of items is > mostly > >> > static (with perhaps a few new items added every day). The database > can be > >> > retrieved either directly from a standard SQL database (postgres) or > via a > >> > REST call. I guess one way to handle this would be to distribute the > >> > database of items with the Flink tasks, and to redeploy the entire > job if > >> > the items database changes. But I think there's probably a better way > to do > >> > it? > >> > I'd like my Flink job to output state to a REST API. (i.e. using the > REST > >> > API as a sink). Updates would be incremental, e.g. the job would > output > >> > tumbling window counts which need to be added to some property on a > REST > >> > resource, so I'd probably implement this as a PATCH. I haven't found > much > >> > evidence that anyone else has used a REST API as a Flink sink - is > there a > >> > reason why this might be a bad idea? > >> > > >> > Thanks for any advice on these, > >> > > >> > Josh > >> > > >