Cool, thanks - I've tried out the approach where we replay data from the Kafka compacted log, then take a savepoint and switch to the live stream.
It works but I did have to add in a dummy operator for every operator that was removed. Without doing this, I got an exception: java.lang.IllegalStateException: Failed to rollback to savepoint Checkpoint 1 @ 1470059433553 for cb321c233dfd28f73c565030481657cd. Cannot map old state for task 02ea922553bc7522bdea373f52a702d6 to the new program. This indicates that the program has been changed in a non-compatible way after the savepoint. I had a Kafka source and a flat mapper chained together when replaying, so to make it work I had to add two dummy operators and assign the same UID I used when replaying, like this: stream.map(x => x).uid("kafka-replay").name("dummy-1").startNewChain().map(x => x).name("dummy-2") I guess it would be nice if Flink could recover from removed tasks/operators without needing to add dummy operators like this. Josh On Fri, Jul 29, 2016 at 5:46 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > I have to try this to verify but I think the approach works if you give > the two sources different UIDs. The reason is that Flink will ignore state > for which it doesn't have an operator to assign it to. Therefore, the state > of the "historical Kafka source" should be silently discarded. > > Cheers, > Aljoscha > > On Fri, 29 Jul 2016 at 18:12 Josh <jof...@gmail.com> wrote: > >> @Aljoscha - The N-input operator way sounds very nice, for now I think >> I'll try and get something quick running the hacky way, then if we decide >> to make this a permanent solution maybe I can work on the proper solution. >> I was wondering about your suggestion for "warming up" the state and then >> taking a savepoint and switching sources - since the Kafka sources are >> stateful and are part of Flink's internal state, wouldn't this break when >> trying to restore the job with a different source? Would I need to assign >> the replay source a UID, and when switching from replay to live, remove the >> replay source and replace it with an dummy operator with the same UID? >> >> @Jason - I see what you mean now, with the historical and live Flink >> jobs. That's an interesting approach - I guess it's solving a slightly >> different problem to my 'rebuilding Flink state upon starting job' - as >> you're rebuilding state as part of the main job when it comes across events >> that require historical data. Actually I think we'll need to do something >> very similar in the future but right now I can probably get away with >> something simpler! >> >> Thanks for the replies! >> >> Josh >> >> On Fri, Jul 29, 2016 at 2:35 PM, Jason Brelloch <jb.bc....@gmail.com> >> wrote: >> >>> Aljoscha's approach is probably better, but to answer your questions... >>> >>> >How do you send a request from one Flink job to another? >>> All of our different flink jobs communicate over kafka. So the main >>> flink job would be listening to both a "live" kafka source, and a >>> "historical" kafka source. The historical flink job would listen to a >>> "request" kafka source. When the main job gets an event that it does not >>> have state for it writes to the "request" topic. The historical job would >>> read the request, grab the relevant old events from GCS, and write them to >>> the "historical" kafka topic. The "historical" source and the "live" >>> source are merged and proceed through the main flink job as one stream. >>> >>> >How do you handle the switchover between the live stream and the >>> historical stream? Do you somehow block the live stream source and detect >>> when the historical data source is no longer emitting new elements? >>> When the main job sends a request to the historical job, the main job >>> starts storing any events that are come in for that key. As the historical >>> events come in they are processed immediately. The historical flink job >>> flags the last event it sends. When the main flink job sees the flagged >>> event it knows it is caught up to where it was when it sent the request. >>> You can then process the events that the main job stored, and when that is >>> done you are caught up to the live stream, and can stop storing events for >>> that key and just process them as normal. >>> >>> Keep in mind that this is the dangerous part that I was talking about, >>> where memory in the main job would continue to build until the "historical" >>> events are all processed. >>> >>> >In my case I would want the Flink state to always contain the latest >>> state of every item (except when the job first starts and there's a period >>> of time where it's rebuilding its state from the Kafka log). >>> You could absolutely do it by reading from the beginning of a kafka >>> topic. The reason we do it with GCS is it is really cheap storage, and we >>> are not planning on storing forever on the kafka topic. >>> >>> >Since I would have everything needed to rebuild the state persisted in >>> a Kafka topic, I don't think I would need a second Flink job for this? >>> The reason for the second flink job in our case is that we didn't really >>> want to block the flink task slot while a single key gets caught up. We >>> have a much larger key domain then we have number of task slots, so there >>> would be multiple keys on single task slot. If you go with the single job >>> approach (which might be the right approach for you guys) any other keys on >>> that task slot will be blocked until the one key is getting it's state >>> built up. >>> >>> Hope that helps, >>> >>> On Fri, Jul 29, 2016 at 5:27 AM, Josh <jof...@gmail.com> wrote: >>> >>>> Hi Jason, >>>> >>>> Thanks for the reply - I didn't quite understand all of it though! >>>> >>>> > it sends a request to the historical flink job for the old data >>>> How do you send a request from one Flink job to another? >>>> >>>> > It continues storing the live events until all the events form the >>>> historical job have been processed, then it processes the stored events, >>>> and finally starts processing the live stream again. >>>> How do you handle the switchover between the live stream and the >>>> historical stream? Do you somehow block the live stream source and detect >>>> when the historical data source is no longer emitting new elements? >>>> >>>> > So in you case it looks like what you could do is send a request to >>>> the "historical" job whenever you get a item that you don't yet have the >>>> current state of. >>>> In my case I would want the Flink state to always contain the latest >>>> state of every item (except when the job first starts and there's a period >>>> of time where it's rebuilding its state from the Kafka log). Since I would >>>> have everything needed to rebuild the state persisted in a Kafka topic, I >>>> don't think I would need a second Flink job for this? >>>> >>>> Thanks, >>>> Josh >>>> >>>> >>>> >>>> >>>> On Thu, Jul 28, 2016 at 6:57 PM, Jason Brelloch <jb.bc....@gmail.com> >>>> wrote: >>>> >>>>> Hey Josh, >>>>> >>>>> The way we replay historical data is we have a second Flink job that >>>>> listens to the same live stream, and stores every single event in Google >>>>> Cloud Storage. >>>>> >>>>> When the main Flink job that is processing the live stream gets a >>>>> request for a specific data set that it has not been processing yet, it >>>>> sends a request to the historical flink job for the old data. The live >>>>> job >>>>> then starts storing relevant events from the live stream in state. It >>>>> continues storing the live events until all the events form the historical >>>>> job have been processed, then it processes the stored events, and finally >>>>> starts processing the live stream again. >>>>> >>>>> As long as it's properly keyed (we key on the specific data set) then >>>>> it doesn't block anything, keeps everything ordered, and eventually >>>>> catches >>>>> up. It also allows us to completely blow away state and rebuild it from >>>>> scratch. >>>>> >>>>> So in you case it looks like what you could do is send a request to >>>>> the "historical" job whenever you get a item that you don't yet have the >>>>> current state of. >>>>> >>>>> The potential problems you may have are that it may not be possible to >>>>> store every single historical event, and that you need to make sure there >>>>> is enough memory to handle the ever increasing state size while the >>>>> historical events are being replayed (and make sure to clear the state >>>>> when >>>>> it is done). >>>>> >>>>> It's a little complicated, and pretty expensive, but it works. Let me >>>>> know if something doesn't make sense. >>>>> >>>>> >>>>> On Thu, Jul 28, 2016 at 1:14 PM, Josh <jof...@gmail.com> wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> I was wondering what approaches people usually take with reprocessing >>>>>> data with Flink - specifically the case where you want to upgrade a Flink >>>>>> job, and make it reprocess historical data before continuing to process a >>>>>> live stream. >>>>>> >>>>>> I'm wondering if we can do something similar to the 'simple rewind' >>>>>> or 'parallel rewind' which Samza uses to solve this problem, discussed >>>>>> here: >>>>>> https://samza.apache.org/learn/documentation/0.10/jobs/reprocessing.html >>>>>> >>>>>> Having used Flink over the past couple of months, the main issue I've >>>>>> had involves Flink's internal state - from my experience it seems it is >>>>>> easy to break the state when upgrading a job, or when changing the >>>>>> parallelism of operators, plus there's no easy way to view/access an >>>>>> internal key-value state from outside Flink. >>>>>> >>>>>> For an example of what I mean, consider a Flink job which consumes a >>>>>> stream of 'updates' to items, and maintains a key-value store of items >>>>>> within Flink's internal state (e.g. in RocksDB). The job also writes the >>>>>> updated items to a Kafka topic: >>>>>> >>>>>> http://oi64.tinypic.com/34q5opf.jpg >>>>>> >>>>>> My worry with this is that the state in RocksDB could be lost or >>>>>> become incompatible with an updated version of the job. If this happens, >>>>>> we >>>>>> need to be able to rebuild Flink's internal key-value store in RocksDB. >>>>>> So >>>>>> I'd like to be able to do something like this (which I believe is the >>>>>> Samza >>>>>> solution): >>>>>> >>>>>> http://oi67.tinypic.com/219ri95.jpg >>>>>> >>>>>> Has anyone done something like this already with Flink? If so are >>>>>> there any examples of how to do this replay & switchover (rebuild state >>>>>> by >>>>>> consuming from a historical log, then switch over to processing the live >>>>>> stream)? >>>>>> >>>>>> Thanks for any insights, >>>>>> Josh >>>>>> >>>>>> >>>>> >>>>> >>>>> -- >>>>> *Jason Brelloch* | Product Developer >>>>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 >>>>> <http://www.bettercloud.com/> >>>>> Subscribe to the BetterCloud Monitor >>>>> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> >>>>> - >>>>> Get IT delivered to your inbox >>>>> >>>> >>>> >>> >>> >>> -- >>> *Jason Brelloch* | Product Developer >>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 >>> <http://www.bettercloud.com/> >>> Subscribe to the BetterCloud Monitor >>> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> >>> - >>> Get IT delivered to your inbox >>> >> >>