+1 to ignore unmatched state. Also +1 to allow programs that resume partially (add some new state that starts empty)
Both are quite important for program evolution. On Tue, Aug 2, 2016 at 2:58 PM, Ufuk Celebi <u...@apache.org> wrote: > No, unfortunately this is the same for 1.1. The idea was to be explicit > about what works and what not. I see that this is actually a pain for this > use case (which is very nice and reasonable ;)). I think we can either > always ignore state that does not match to the new job or if that is too > aggressive we can add a flag to ignore unmatched state. > > > On Mon, Aug 1, 2016 at 6:39 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> +Ufuk, looping him in directly >> >> Hmm, I think this is changed for the 1.1 release. Ufuk could you please >> comment? >> >> >> On Mon, 1 Aug 2016 at 08:07 Josh <jof...@gmail.com> wrote: >> >>> Cool, thanks - I've tried out the approach where we replay data from the >>> Kafka compacted log, then take a savepoint and switch to the live stream. >>> >>> It works but I did have to add in a dummy operator for every operator >>> that was removed. Without doing this, I got an exception: >>> java.lang.IllegalStateException: Failed to rollback to savepoint >>> Checkpoint 1 @ 1470059433553 for cb321c233dfd28f73c565030481657cd. Cannot >>> map old state for task 02ea922553bc7522bdea373f52a702d6 to the new program. >>> This indicates that the program has been changed in a non-compatible way >>> after the savepoint. >>> >>> I had a Kafka source and a flat mapper chained together when replaying, >>> so to make it work I had to add two dummy operators and assign the same UID >>> I used when replaying, like this: >>> stream.map(x => >>> x).uid("kafka-replay").name("dummy-1").startNewChain().map(x => >>> x).name("dummy-2") >>> >>> I guess it would be nice if Flink could recover from removed >>> tasks/operators without needing to add dummy operators like this. >>> >>> Josh >>> >>> On Fri, Jul 29, 2016 at 5:46 PM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> Hi, >>>> I have to try this to verify but I think the approach works if you give >>>> the two sources different UIDs. The reason is that Flink will ignore state >>>> for which it doesn't have an operator to assign it to. Therefore, the state >>>> of the "historical Kafka source" should be silently discarded. >>>> >>>> Cheers, >>>> Aljoscha >>>> >>>> On Fri, 29 Jul 2016 at 18:12 Josh <jof...@gmail.com> wrote: >>>> >>>>> @Aljoscha - The N-input operator way sounds very nice, for now I think >>>>> I'll try and get something quick running the hacky way, then if we decide >>>>> to make this a permanent solution maybe I can work on the proper solution. >>>>> I was wondering about your suggestion for "warming up" the state and then >>>>> taking a savepoint and switching sources - since the Kafka sources are >>>>> stateful and are part of Flink's internal state, wouldn't this break when >>>>> trying to restore the job with a different source? Would I need to assign >>>>> the replay source a UID, and when switching from replay to live, remove >>>>> the >>>>> replay source and replace it with an dummy operator with the same UID? >>>>> >>>>> @Jason - I see what you mean now, with the historical and live Flink >>>>> jobs. That's an interesting approach - I guess it's solving a slightly >>>>> different problem to my 'rebuilding Flink state upon starting job' - as >>>>> you're rebuilding state as part of the main job when it comes across >>>>> events >>>>> that require historical data. Actually I think we'll need to do something >>>>> very similar in the future but right now I can probably get away with >>>>> something simpler! >>>>> >>>>> Thanks for the replies! >>>>> >>>>> Josh >>>>> >>>>> On Fri, Jul 29, 2016 at 2:35 PM, Jason Brelloch <jb.bc....@gmail.com> >>>>> wrote: >>>>> >>>>>> Aljoscha's approach is probably better, but to answer your >>>>>> questions... >>>>>> >>>>>> >How do you send a request from one Flink job to another? >>>>>> All of our different flink jobs communicate over kafka. So the main >>>>>> flink job would be listening to both a "live" kafka source, and a >>>>>> "historical" kafka source. The historical flink job would listen to a >>>>>> "request" kafka source. When the main job gets an event that it does not >>>>>> have state for it writes to the "request" topic. The historical job >>>>>> would >>>>>> read the request, grab the relevant old events from GCS, and write them >>>>>> to >>>>>> the "historical" kafka topic. The "historical" source and the "live" >>>>>> source are merged and proceed through the main flink job as one stream. >>>>>> >>>>>> >How do you handle the switchover between the live stream and the >>>>>> historical stream? Do you somehow block the live stream source and detect >>>>>> when the historical data source is no longer emitting new elements? >>>>>> When the main job sends a request to the historical job, the main job >>>>>> starts storing any events that are come in for that key. As the >>>>>> historical >>>>>> events come in they are processed immediately. The historical flink job >>>>>> flags the last event it sends. When the main flink job sees the flagged >>>>>> event it knows it is caught up to where it was when it sent the request. >>>>>> You can then process the events that the main job stored, and when that >>>>>> is >>>>>> done you are caught up to the live stream, and can stop storing events >>>>>> for >>>>>> that key and just process them as normal. >>>>>> >>>>>> Keep in mind that this is the dangerous part that I was talking >>>>>> about, where memory in the main job would continue to build until the >>>>>> "historical" events are all processed. >>>>>> >>>>>> >In my case I would want the Flink state to always contain the latest >>>>>> state of every item (except when the job first starts and there's a >>>>>> period >>>>>> of time where it's rebuilding its state from the Kafka log). >>>>>> You could absolutely do it by reading from the beginning of a kafka >>>>>> topic. The reason we do it with GCS is it is really cheap storage, and >>>>>> we >>>>>> are not planning on storing forever on the kafka topic. >>>>>> >>>>>> >Since I would have everything needed to rebuild the state persisted >>>>>> in a Kafka topic, I don't think I would need a second Flink job for this? >>>>>> The reason for the second flink job in our case is that we didn't >>>>>> really want to block the flink task slot while a single key gets caught >>>>>> up. We have a much larger key domain then we have number of task slots, >>>>>> so >>>>>> there would be multiple keys on single task slot. If you go with the >>>>>> single job approach (which might be the right approach for you guys) any >>>>>> other keys on that task slot will be blocked until the one key is getting >>>>>> it's state built up. >>>>>> >>>>>> Hope that helps, >>>>>> >>>>>> On Fri, Jul 29, 2016 at 5:27 AM, Josh <jof...@gmail.com> wrote: >>>>>> >>>>>>> Hi Jason, >>>>>>> >>>>>>> Thanks for the reply - I didn't quite understand all of it though! >>>>>>> >>>>>>> > it sends a request to the historical flink job for the old data >>>>>>> How do you send a request from one Flink job to another? >>>>>>> >>>>>>> > It continues storing the live events until all the events form the >>>>>>> historical job have been processed, then it processes the stored events, >>>>>>> and finally starts processing the live stream again. >>>>>>> How do you handle the switchover between the live stream and the >>>>>>> historical stream? Do you somehow block the live stream source and >>>>>>> detect >>>>>>> when the historical data source is no longer emitting new elements? >>>>>>> >>>>>>> > So in you case it looks like what you could do is send a request >>>>>>> to the "historical" job whenever you get a item that you don't yet have >>>>>>> the >>>>>>> current state of. >>>>>>> In my case I would want the Flink state to always contain the latest >>>>>>> state of every item (except when the job first starts and there's a >>>>>>> period >>>>>>> of time where it's rebuilding its state from the Kafka log). Since I >>>>>>> would >>>>>>> have everything needed to rebuild the state persisted in a Kafka topic, >>>>>>> I >>>>>>> don't think I would need a second Flink job for this? >>>>>>> >>>>>>> Thanks, >>>>>>> Josh >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Thu, Jul 28, 2016 at 6:57 PM, Jason Brelloch <jb.bc....@gmail.com >>>>>>> > wrote: >>>>>>> >>>>>>>> Hey Josh, >>>>>>>> >>>>>>>> The way we replay historical data is we have a second Flink job >>>>>>>> that listens to the same live stream, and stores every single event in >>>>>>>> Google Cloud Storage. >>>>>>>> >>>>>>>> When the main Flink job that is processing the live stream gets a >>>>>>>> request for a specific data set that it has not been processing yet, it >>>>>>>> sends a request to the historical flink job for the old data. The >>>>>>>> live job >>>>>>>> then starts storing relevant events from the live stream in state. It >>>>>>>> continues storing the live events until all the events form the >>>>>>>> historical >>>>>>>> job have been processed, then it processes the stored events, and >>>>>>>> finally >>>>>>>> starts processing the live stream again. >>>>>>>> >>>>>>>> As long as it's properly keyed (we key on the specific data set) >>>>>>>> then it doesn't block anything, keeps everything ordered, and >>>>>>>> eventually >>>>>>>> catches up. It also allows us to completely blow away state and >>>>>>>> rebuild it >>>>>>>> from scratch. >>>>>>>> >>>>>>>> So in you case it looks like what you could do is send a request to >>>>>>>> the "historical" job whenever you get a item that you don't yet have >>>>>>>> the >>>>>>>> current state of. >>>>>>>> >>>>>>>> The potential problems you may have are that it may not be possible >>>>>>>> to store every single historical event, and that you need to make sure >>>>>>>> there is enough memory to handle the ever increasing state size while >>>>>>>> the >>>>>>>> historical events are being replayed (and make sure to clear the state >>>>>>>> when >>>>>>>> it is done). >>>>>>>> >>>>>>>> It's a little complicated, and pretty expensive, but it works. Let >>>>>>>> me know if something doesn't make sense. >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Jul 28, 2016 at 1:14 PM, Josh <jof...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hi all, >>>>>>>>> >>>>>>>>> I was wondering what approaches people usually take with >>>>>>>>> reprocessing data with Flink - specifically the case where you want to >>>>>>>>> upgrade a Flink job, and make it reprocess historical data before >>>>>>>>> continuing to process a live stream. >>>>>>>>> >>>>>>>>> I'm wondering if we can do something similar to the 'simple >>>>>>>>> rewind' or 'parallel rewind' which Samza uses to solve this problem, >>>>>>>>> discussed here: >>>>>>>>> https://samza.apache.org/learn/documentation/0.10/jobs/reprocessing.html >>>>>>>>> >>>>>>>>> Having used Flink over the past couple of months, the main issue >>>>>>>>> I've had involves Flink's internal state - from my experience it >>>>>>>>> seems it >>>>>>>>> is easy to break the state when upgrading a job, or when changing the >>>>>>>>> parallelism of operators, plus there's no easy way to view/access an >>>>>>>>> internal key-value state from outside Flink. >>>>>>>>> >>>>>>>>> For an example of what I mean, consider a Flink job which consumes >>>>>>>>> a stream of 'updates' to items, and maintains a key-value store of >>>>>>>>> items >>>>>>>>> within Flink's internal state (e.g. in RocksDB). The job also writes >>>>>>>>> the >>>>>>>>> updated items to a Kafka topic: >>>>>>>>> >>>>>>>>> http://oi64.tinypic.com/34q5opf.jpg >>>>>>>>> >>>>>>>>> My worry with this is that the state in RocksDB could be lost or >>>>>>>>> become incompatible with an updated version of the job. If this >>>>>>>>> happens, we >>>>>>>>> need to be able to rebuild Flink's internal key-value store in >>>>>>>>> RocksDB. So >>>>>>>>> I'd like to be able to do something like this (which I believe is the >>>>>>>>> Samza >>>>>>>>> solution): >>>>>>>>> >>>>>>>>> http://oi67.tinypic.com/219ri95.jpg >>>>>>>>> >>>>>>>>> Has anyone done something like this already with Flink? If so are >>>>>>>>> there any examples of how to do this replay & switchover (rebuild >>>>>>>>> state by >>>>>>>>> consuming from a historical log, then switch over to processing the >>>>>>>>> live >>>>>>>>> stream)? >>>>>>>>> >>>>>>>>> Thanks for any insights, >>>>>>>>> Josh >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> *Jason Brelloch* | Product Developer >>>>>>>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 >>>>>>>> <http://www.bettercloud.com/> >>>>>>>> Subscribe to the BetterCloud Monitor >>>>>>>> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> >>>>>>>> - >>>>>>>> Get IT delivered to your inbox >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> *Jason Brelloch* | Product Developer >>>>>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305 >>>>>> <http://www.bettercloud.com/> >>>>>> Subscribe to the BetterCloud Monitor >>>>>> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch> >>>>>> - >>>>>> Get IT delivered to your inbox >>>>>> >>>>> >>>>> >>> >