@Aljoscha - The N-input operator way sounds very nice, for now I think I'll try and get something quick running the hacky way, then if we decide to make this a permanent solution maybe I can work on the proper solution. I was wondering about your suggestion for "warming up" the state and then taking a savepoint and switching sources - since the Kafka sources are stateful and are part of Flink's internal state, wouldn't this break when trying to restore the job with a different source? Would I need to assign the replay source a UID, and when switching from replay to live, remove the replay source and replace it with an dummy operator with the same UID?
@Jason - I see what you mean now, with the historical and live Flink jobs. That's an interesting approach - I guess it's solving a slightly different problem to my 'rebuilding Flink state upon starting job' - as you're rebuilding state as part of the main job when it comes across events that require historical data. Actually I think we'll need to do something very similar in the future but right now I can probably get away with something simpler! Thanks for the replies! Josh On Fri, Jul 29, 2016 at 2:35 PM, Jason Brelloch <jb.bc....@gmail.com> wrote: > Aljoscha's approach is probably better, but to answer your questions... > > >How do you send a request from one Flink job to another? > All of our different flink jobs communicate over kafka. So the main flink > job would be listening to both a "live" kafka source, and a "historical" > kafka source. The historical flink job would listen to a "request" kafka > source. When the main job gets an event that it does not have state for it > writes to the "request" topic. The historical job would read the request, > grab the relevant old events from GCS, and write them to the "historical" > kafka topic. The "historical" source and the "live" source are merged and > proceed through the main flink job as one stream. > > >How do you handle the switchover between the live stream and the > historical stream? Do you somehow block the live stream source and detect > when the historical data source is no longer emitting new elements? > When the main job sends a request to the historical job, the main job > starts storing any events that are come in for that key. As the historical > events come in they are processed immediately. The historical flink job > flags the last event it sends. When the main flink job sees the flagged > event it knows it is caught up to where it was when it sent the request. > You can then process the events that the main job stored, and when that is > done you are caught up to the live stream, and can stop storing events for > that key and just process them as normal. > > Keep in mind that this is the dangerous part that I was talking about, > where memory in the main job would continue to build until the "historical" > events are all processed. > > >In my case I would want the Flink state to always contain the latest > state of every item (except when the job first starts and there's a period > of time where it's rebuilding its state from the Kafka log). > You could absolutely do it by reading from the beginning of a kafka > topic. The reason we do it with GCS is it is really cheap storage, and we > are not planning on storing forever on the kafka topic. > > >Since I would have everything needed to rebuild the state persisted in a > Kafka topic, I don't think I would need a second Flink job for this? > The reason for the second flink job in our case is that we didn't really > want to block the flink task slot while a single key gets caught up. We > have a much larger key domain then we have number of task slots, so there > would be multiple keys on single task slot. If you go with the single job > approach (which might be the right approach for you guys) any other keys on > that task slot will be blocked until the one key is getting it's state > built up. > > Hope that helps, > > On Fri, Jul 29, 2016 at 5:27 AM, Josh <jof...@gmail.com> wrote: > >> Hi Jason, >> >> Thanks for the reply - I didn't quite understand all of it though! >> >> > it sends a request to the historical flink job for the old data >> How do you send a request from one Flink job to another? >> >> > It continues storing the live events until all the events form the >> historical job have been processed, then it processes the stored events, >> and finally starts processing the live stream again. >> How do you handle the switchover between the live stream and the >> historical stream? Do you somehow block the live stream source and detect >> when the historical data source is no longer emitting new elements? >> >> > So in you case it looks like what you could do is send a request to >> the "historical" job whenever you get a item that you don't yet have the >> current state of. >> In my case I would want the Flink state to always contain the latest >> state of every item (except when the job first starts and there's a period >> of time where it's rebuilding its state from the Kafka log). Since I would >> have everything needed to rebuild the state persisted in a Kafka topic, I >> don't think I would need a second Flink job for this? >> >> Thanks, >> Josh >> >> >> >> >> On Thu, Jul 28, 2016 at 6:57 PM, Jason Brelloch <jb.bc....@gmail.com> >> wrote: >> >>> Hey Josh, >>> >>> The way we replay historical data is we have a second Flink job that >>> listens to the same live stream, and stores every single event in Google >>> Cloud Storage. >>> >>> When the main Flink job that is processing the live stream gets a >>> request for a specific data set that it has not been processing yet, it >>> sends a request to the historical flink job for the old data. The live job >>> then starts storing relevant events from the live stream in state. It >>> continues storing the live events until all the events form the historical >>> job have been processed, then it processes the stored events, and finally >>> starts processing the live stream again. >>> >>> As long as it's properly keyed (we key on the specific data set) then it >>> doesn't block anything, keeps everything ordered, and eventually catches >>> up. It also allows us to completely blow away state and rebuild it from >>> scratch. >>> >>> So in you case it looks like what you could do is send a request to the >>> "historical" job whenever you get a item that you don't yet have the >>> current state of. >>> >>> The potential problems you may have are that it may not be possible to >>> store every single historical event, and that you need to make sure there >>> is enough memory to handle the ever increasing state size while the >>> historical events are being replayed (and make sure to clear the state when >>> it is done). >>> >>> It's a little complicated, and pretty expensive, but it works. Let me >>> know if something doesn't make sense. >>> >>> >>> On Thu, Jul 28, 2016 at 1:14 PM, Josh <jof...@gmail.com> wrote: >>> >>>> Hi all, >>>> >>>> I was wondering what approaches people usually take with reprocessing >>>> data with Flink - specifically the case where you want to upgrade a Flink >>>> job, and make it reprocess historical data before continuing to process a >>>> live stream. >>>> >>>> I'm wondering if we can do something similar to the 'simple rewind' or >>>> 'parallel rewind' which Samza uses to solve this problem, discussed here: >>>> https://samza.apache.org/learn/documentation/0.10/jobs/reprocessing.html >>>> >>>> Having used Flink over the past couple of months, the main issue I've >>>> had involves Flink's internal state - from my experience it seems it is >>>> easy to break the state when upgrading a job, or when changing the >>>> parallelism of operators, plus there's no easy way to view/access an >>>> internal key-value state from outside Flink. >>>> >>>> For an example of what I mean, consider a Flink job which consumes a >>>> stream of 'updates' to items, and maintains a key-value store of items >>>> within Flink's internal state (e.g. in RocksDB). The job also writes the >>>> updated items to a Kafka topic: >>>> >>>> http://oi64.tinypic.com/34q5opf.jpg >>>> >>>> My worry with this is that the state in RocksDB could be lost or become >>>> incompatible with an updated version of the job. If this happens, we need >>>> to be able to rebuild Flink's internal key-value store in RocksDB. So I'd >>>> like to be able to do something like this (which I believe is the Samza >>>> solution): >>>> >>>> http://oi67.tinypic.com/219ri95.jpg >>>> >>>> Has anyone done something like this already with Flink? If so are there >>>> any examples of how to do this replay & switchover (rebuild state by >>>> consuming from a historical log, then switch over to processing the live >>>> stream)? >>>> >>>> Thanks for any insights, >>>> Josh >>>> >>>> >>> >>> >>> -- >>> *Jason Brelloch* | Product Developer >>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 >>> <http://www.bettercloud.com/> >>> Subscribe to the BetterCloud Monitor >>> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> >>> - >>> Get IT delivered to your inbox >>> >> >> > > > -- > *Jason Brelloch* | Product Developer > 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 > <http://www.bettercloud.com/> > Subscribe to the BetterCloud Monitor > <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> > - > Get IT delivered to your inbox >