Hi, I think it would probably be a good idea to make these tunable from the command line. Otherwise we might run into the problem of accidentally restoring a job that should fail like it does now.
Gyula Stephan Ewen <se...@apache.org> ezt írta (időpont: 2016. aug. 2., K, 17:17): > +1 to ignore unmatched state. > > Also +1 to allow programs that resume partially (add some new state that > starts empty) > > Both are quite important for program evolution. > > On Tue, Aug 2, 2016 at 2:58 PM, Ufuk Celebi <u...@apache.org> wrote: > >> No, unfortunately this is the same for 1.1. The idea was to be explicit >> about what works and what not. I see that this is actually a pain for this >> use case (which is very nice and reasonable ;)). I think we can either >> always ignore state that does not match to the new job or if that is too >> aggressive we can add a flag to ignore unmatched state. >> >> >> On Mon, Aug 1, 2016 at 6:39 PM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> +Ufuk, looping him in directly >>> >>> Hmm, I think this is changed for the 1.1 release. Ufuk could you please >>> comment? >>> >>> >>> On Mon, 1 Aug 2016 at 08:07 Josh <jof...@gmail.com> wrote: >>> >>>> Cool, thanks - I've tried out the approach where we replay data from >>>> the Kafka compacted log, then take a savepoint and switch to the live >>>> stream. >>>> >>>> It works but I did have to add in a dummy operator for every operator >>>> that was removed. Without doing this, I got an exception: >>>> java.lang.IllegalStateException: Failed to rollback to savepoint >>>> Checkpoint 1 @ 1470059433553 for cb321c233dfd28f73c565030481657cd. Cannot >>>> map old state for task 02ea922553bc7522bdea373f52a702d6 to the new program. >>>> This indicates that the program has been changed in a non-compatible way >>>> after the savepoint. >>>> >>>> I had a Kafka source and a flat mapper chained together when replaying, >>>> so to make it work I had to add two dummy operators and assign the same UID >>>> I used when replaying, like this: >>>> stream.map(x => >>>> x).uid("kafka-replay").name("dummy-1").startNewChain().map(x => >>>> x).name("dummy-2") >>>> >>>> I guess it would be nice if Flink could recover from removed >>>> tasks/operators without needing to add dummy operators like this. >>>> >>>> Josh >>>> >>>> On Fri, Jul 29, 2016 at 5:46 PM, Aljoscha Krettek <aljos...@apache.org> >>>> wrote: >>>> >>>>> Hi, >>>>> I have to try this to verify but I think the approach works if you >>>>> give the two sources different UIDs. The reason is that Flink will ignore >>>>> state for which it doesn't have an operator to assign it to. Therefore, >>>>> the >>>>> state of the "historical Kafka source" should be silently discarded. >>>>> >>>>> Cheers, >>>>> Aljoscha >>>>> >>>>> On Fri, 29 Jul 2016 at 18:12 Josh <jof...@gmail.com> wrote: >>>>> >>>>>> @Aljoscha - The N-input operator way sounds very nice, for now I >>>>>> think I'll try and get something quick running the hacky way, then if we >>>>>> decide to make this a permanent solution maybe I can work on the proper >>>>>> solution. I was wondering about your suggestion for "warming up" the >>>>>> state >>>>>> and then taking a savepoint and switching sources - since the Kafka >>>>>> sources >>>>>> are stateful and are part of Flink's internal state, wouldn't this break >>>>>> when trying to restore the job with a different source? Would I need to >>>>>> assign the replay source a UID, and when switching from replay to live, >>>>>> remove the replay source and replace it with an dummy operator with the >>>>>> same UID? >>>>>> >>>>>> @Jason - I see what you mean now, with the historical and live Flink >>>>>> jobs. That's an interesting approach - I guess it's solving a slightly >>>>>> different problem to my 'rebuilding Flink state upon starting job' - as >>>>>> you're rebuilding state as part of the main job when it comes across >>>>>> events >>>>>> that require historical data. Actually I think we'll need to do something >>>>>> very similar in the future but right now I can probably get away with >>>>>> something simpler! >>>>>> >>>>>> Thanks for the replies! >>>>>> >>>>>> Josh >>>>>> >>>>>> On Fri, Jul 29, 2016 at 2:35 PM, Jason Brelloch <jb.bc....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Aljoscha's approach is probably better, but to answer your >>>>>>> questions... >>>>>>> >>>>>>> >How do you send a request from one Flink job to another? >>>>>>> All of our different flink jobs communicate over kafka. So the main >>>>>>> flink job would be listening to both a "live" kafka source, and a >>>>>>> "historical" kafka source. The historical flink job would listen to a >>>>>>> "request" kafka source. When the main job gets an event that it does >>>>>>> not >>>>>>> have state for it writes to the "request" topic. The historical job >>>>>>> would >>>>>>> read the request, grab the relevant old events from GCS, and write them >>>>>>> to >>>>>>> the "historical" kafka topic. The "historical" source and the "live" >>>>>>> source are merged and proceed through the main flink job as one stream. >>>>>>> >>>>>>> >How do you handle the switchover between the live stream and the >>>>>>> historical stream? Do you somehow block the live stream source and >>>>>>> detect >>>>>>> when the historical data source is no longer emitting new elements? >>>>>>> When the main job sends a request to the historical job, the main >>>>>>> job starts storing any events that are come in for that key. As the >>>>>>> historical events come in they are processed immediately. The >>>>>>> historical >>>>>>> flink job flags the last event it sends. When the main flink job sees >>>>>>> the >>>>>>> flagged event it knows it is caught up to where it was when it sent the >>>>>>> request. You can then process the events that the main job stored, and >>>>>>> when that is done you are caught up to the live stream, and can stop >>>>>>> storing events for that key and just process them as normal. >>>>>>> >>>>>>> Keep in mind that this is the dangerous part that I was talking >>>>>>> about, where memory in the main job would continue to build until the >>>>>>> "historical" events are all processed. >>>>>>> >>>>>>> >In my case I would want the Flink state to always contain the >>>>>>> latest state of every item (except when the job first starts and >>>>>>> there's a >>>>>>> period of time where it's rebuilding its state from the Kafka log). >>>>>>> You could absolutely do it by reading from the beginning of a kafka >>>>>>> topic. The reason we do it with GCS is it is really cheap storage, and >>>>>>> we >>>>>>> are not planning on storing forever on the kafka topic. >>>>>>> >>>>>>> >Since I would have everything needed to rebuild the state >>>>>>> persisted in a Kafka topic, I don't think I would need a second Flink >>>>>>> job >>>>>>> for this? >>>>>>> The reason for the second flink job in our case is that we didn't >>>>>>> really want to block the flink task slot while a single key gets caught >>>>>>> up. We have a much larger key domain then we have number of task >>>>>>> slots, so >>>>>>> there would be multiple keys on single task slot. If you go with the >>>>>>> single job approach (which might be the right approach for you guys) any >>>>>>> other keys on that task slot will be blocked until the one key is >>>>>>> getting >>>>>>> it's state built up. >>>>>>> >>>>>>> Hope that helps, >>>>>>> >>>>>>> On Fri, Jul 29, 2016 at 5:27 AM, Josh <jof...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Jason, >>>>>>>> >>>>>>>> Thanks for the reply - I didn't quite understand all of it though! >>>>>>>> >>>>>>>> > it sends a request to the historical flink job for the old data >>>>>>>> How do you send a request from one Flink job to another? >>>>>>>> >>>>>>>> > It continues storing the live events until all the events form >>>>>>>> the historical job have been processed, then it processes the stored >>>>>>>> events, and finally starts processing the live stream again. >>>>>>>> How do you handle the switchover between the live stream and the >>>>>>>> historical stream? Do you somehow block the live stream source and >>>>>>>> detect >>>>>>>> when the historical data source is no longer emitting new elements? >>>>>>>> >>>>>>>> > So in you case it looks like what you could do is send a request >>>>>>>> to the "historical" job whenever you get a item that you don't yet >>>>>>>> have the >>>>>>>> current state of. >>>>>>>> In my case I would want the Flink state to always contain the >>>>>>>> latest state of every item (except when the job first starts and >>>>>>>> there's a >>>>>>>> period of time where it's rebuilding its state from the Kafka log). >>>>>>>> Since I >>>>>>>> would have everything needed to rebuild the state persisted in a Kafka >>>>>>>> topic, I don't think I would need a second Flink job for this? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Josh >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jul 28, 2016 at 6:57 PM, Jason Brelloch < >>>>>>>> jb.bc....@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hey Josh, >>>>>>>>> >>>>>>>>> The way we replay historical data is we have a second Flink job >>>>>>>>> that listens to the same live stream, and stores every single event in >>>>>>>>> Google Cloud Storage. >>>>>>>>> >>>>>>>>> When the main Flink job that is processing the live stream gets a >>>>>>>>> request for a specific data set that it has not been processing yet, >>>>>>>>> it >>>>>>>>> sends a request to the historical flink job for the old data. The >>>>>>>>> live job >>>>>>>>> then starts storing relevant events from the live stream in state. It >>>>>>>>> continues storing the live events until all the events form the >>>>>>>>> historical >>>>>>>>> job have been processed, then it processes the stored events, and >>>>>>>>> finally >>>>>>>>> starts processing the live stream again. >>>>>>>>> >>>>>>>>> As long as it's properly keyed (we key on the specific data set) >>>>>>>>> then it doesn't block anything, keeps everything ordered, and >>>>>>>>> eventually >>>>>>>>> catches up. It also allows us to completely blow away state and >>>>>>>>> rebuild it >>>>>>>>> from scratch. >>>>>>>>> >>>>>>>>> So in you case it looks like what you could do is send a request >>>>>>>>> to the "historical" job whenever you get a item that you don't yet >>>>>>>>> have the >>>>>>>>> current state of. >>>>>>>>> >>>>>>>>> The potential problems you may have are that it may not be >>>>>>>>> possible to store every single historical event, and that you need to >>>>>>>>> make >>>>>>>>> sure there is enough memory to handle the ever increasing state size >>>>>>>>> while >>>>>>>>> the historical events are being replayed (and make sure to clear the >>>>>>>>> state >>>>>>>>> when it is done). >>>>>>>>> >>>>>>>>> It's a little complicated, and pretty expensive, but it works. >>>>>>>>> Let me know if something doesn't make sense. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Thu, Jul 28, 2016 at 1:14 PM, Josh <jof...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hi all, >>>>>>>>>> >>>>>>>>>> I was wondering what approaches people usually take with >>>>>>>>>> reprocessing data with Flink - specifically the case where you want >>>>>>>>>> to >>>>>>>>>> upgrade a Flink job, and make it reprocess historical data before >>>>>>>>>> continuing to process a live stream. >>>>>>>>>> >>>>>>>>>> I'm wondering if we can do something similar to the 'simple >>>>>>>>>> rewind' or 'parallel rewind' which Samza uses to solve this problem, >>>>>>>>>> discussed here: >>>>>>>>>> https://samza.apache.org/learn/documentation/0.10/jobs/reprocessing.html >>>>>>>>>> >>>>>>>>>> Having used Flink over the past couple of months, the main issue >>>>>>>>>> I've had involves Flink's internal state - from my experience it >>>>>>>>>> seems it >>>>>>>>>> is easy to break the state when upgrading a job, or when changing the >>>>>>>>>> parallelism of operators, plus there's no easy way to view/access an >>>>>>>>>> internal key-value state from outside Flink. >>>>>>>>>> >>>>>>>>>> For an example of what I mean, consider a Flink job which >>>>>>>>>> consumes a stream of 'updates' to items, and maintains a key-value >>>>>>>>>> store of >>>>>>>>>> items within Flink's internal state (e.g. in RocksDB). The job also >>>>>>>>>> writes >>>>>>>>>> the updated items to a Kafka topic: >>>>>>>>>> >>>>>>>>>> http://oi64.tinypic.com/34q5opf.jpg >>>>>>>>>> >>>>>>>>>> My worry with this is that the state in RocksDB could be lost or >>>>>>>>>> become incompatible with an updated version of the job. If this >>>>>>>>>> happens, we >>>>>>>>>> need to be able to rebuild Flink's internal key-value store in >>>>>>>>>> RocksDB. So >>>>>>>>>> I'd like to be able to do something like this (which I believe is >>>>>>>>>> the Samza >>>>>>>>>> solution): >>>>>>>>>> >>>>>>>>>> http://oi67.tinypic.com/219ri95.jpg >>>>>>>>>> >>>>>>>>>> Has anyone done something like this already with Flink? If so are >>>>>>>>>> there any examples of how to do this replay & switchover (rebuild >>>>>>>>>> state by >>>>>>>>>> consuming from a historical log, then switch over to processing the >>>>>>>>>> live >>>>>>>>>> stream)? >>>>>>>>>> >>>>>>>>>> Thanks for any insights, >>>>>>>>>> Josh >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> *Jason Brelloch* | Product Developer >>>>>>>>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 >>>>>>>>> <http://www.bettercloud.com/> >>>>>>>>> Subscribe to the BetterCloud Monitor >>>>>>>>> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> >>>>>>>>> - >>>>>>>>> Get IT delivered to your inbox >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> *Jason Brelloch* | Product Developer >>>>>>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 >>>>>>> <http://www.bettercloud.com/> >>>>>>> Subscribe to the BetterCloud Monitor >>>>>>> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> >>>>>>> - >>>>>>> Get IT delivered to your inbox >>>>>>> >>>>>> >>>>>> >>>> >> >