Hi,

I think it would probably be a good idea to make these tunable from the
command line. Otherwise we might run into the problem of accidentally
restoring a job that should fail like it does now.

Gyula

Stephan Ewen <se...@apache.org> ezt írta (időpont: 2016. aug. 2., K, 17:17):

> +1 to ignore unmatched state.
>
> Also +1 to allow programs that resume partially (add some new state that
> starts empty)
>
> Both are quite important for program evolution.
>
> On Tue, Aug 2, 2016 at 2:58 PM, Ufuk Celebi <u...@apache.org> wrote:
>
>> No, unfortunately this is the same for 1.1. The idea was to be explicit
>> about what works and what not. I see that this is actually a pain for this
>> use case (which is very nice and reasonable ;)). I think we can either
>> always ignore state that does not match to the new job or if that is too
>> aggressive we can add a flag to ignore unmatched state.
>>
>>
>> On Mon, Aug 1, 2016 at 6:39 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> +Ufuk, looping him in directly
>>>
>>> Hmm, I think this is changed for the 1.1 release. Ufuk could you please
>>> comment?
>>>
>>>
>>> On Mon, 1 Aug 2016 at 08:07 Josh <jof...@gmail.com> wrote:
>>>
>>>> Cool, thanks - I've tried out the approach where we replay data from
>>>> the Kafka compacted log, then take a savepoint and switch to the live
>>>> stream.
>>>>
>>>> It works but I did have to add in a dummy operator for every operator
>>>> that was removed. Without doing this, I got an exception:
>>>> java.lang.IllegalStateException: Failed to rollback to savepoint
>>>> Checkpoint 1 @ 1470059433553 for cb321c233dfd28f73c565030481657cd. Cannot
>>>> map old state for task 02ea922553bc7522bdea373f52a702d6 to the new program.
>>>> This indicates that the program has been changed in a non-compatible way
>>>>  after the savepoint.
>>>>
>>>> I had a Kafka source and a flat mapper chained together when replaying,
>>>> so to make it work I had to add two dummy operators and assign the same UID
>>>> I used when replaying, like this:
>>>> stream.map(x =>
>>>> x).uid("kafka-replay").name("dummy-1").startNewChain().map(x =>
>>>> x).name("dummy-2")
>>>>
>>>> I guess it would be nice if Flink could recover from removed
>>>> tasks/operators without needing to add dummy operators like this.
>>>>
>>>> Josh
>>>>
>>>> On Fri, Jul 29, 2016 at 5:46 PM, Aljoscha Krettek <aljos...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I have to try this to verify but I think the approach works if you
>>>>> give the two sources different UIDs. The reason is that Flink will ignore
>>>>> state for which it doesn't have an operator to assign it to. Therefore, 
>>>>> the
>>>>> state of the "historical Kafka source" should be silently discarded.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Fri, 29 Jul 2016 at 18:12 Josh <jof...@gmail.com> wrote:
>>>>>
>>>>>> @Aljoscha - The N-input operator way sounds very nice, for now I
>>>>>> think I'll try and get something quick running the hacky way, then if we
>>>>>> decide to make this a permanent solution maybe I can work on the proper
>>>>>> solution. I was wondering about your suggestion for "warming up" the 
>>>>>> state
>>>>>> and then taking a savepoint and switching sources - since the Kafka 
>>>>>> sources
>>>>>> are stateful and are part of Flink's internal state, wouldn't this break
>>>>>> when trying to restore the job with a different source? Would I need to
>>>>>> assign the replay source a UID, and when switching from replay to live,
>>>>>> remove the replay source and replace it with an dummy operator with the
>>>>>> same UID?
>>>>>>
>>>>>> @Jason - I see what you mean now, with the historical and live Flink
>>>>>> jobs. That's an interesting approach - I guess it's solving a slightly
>>>>>> different problem to my 'rebuilding Flink state upon starting job' - as
>>>>>> you're rebuilding state as part of the main job when it comes across 
>>>>>> events
>>>>>> that require historical data. Actually I think we'll need to do something
>>>>>> very similar in the future but right now I can probably get away with
>>>>>> something simpler!
>>>>>>
>>>>>> Thanks for the replies!
>>>>>>
>>>>>> Josh
>>>>>>
>>>>>> On Fri, Jul 29, 2016 at 2:35 PM, Jason Brelloch <jb.bc....@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Aljoscha's approach is probably better, but to answer your
>>>>>>> questions...
>>>>>>>
>>>>>>> >How do you send a request from one Flink job to another?
>>>>>>> All of our different flink jobs communicate over kafka.  So the main
>>>>>>> flink job would be listening to both a "live" kafka source, and a
>>>>>>> "historical" kafka source.  The historical flink job would listen to a
>>>>>>> "request" kafka source.  When the main job gets an event that it does 
>>>>>>> not
>>>>>>> have state for it writes to the "request" topic.  The historical job 
>>>>>>> would
>>>>>>> read the request, grab the relevant old events from GCS, and write them 
>>>>>>> to
>>>>>>> the "historical" kafka topic.  The "historical" source and the "live"
>>>>>>> source are merged and proceed through the main flink job as one stream.
>>>>>>>
>>>>>>> >How do you handle the switchover between the live stream and the
>>>>>>> historical stream? Do you somehow block the live stream source and 
>>>>>>> detect
>>>>>>> when the historical data source is no longer emitting new elements?
>>>>>>> When the main job sends a request to the historical job, the main
>>>>>>> job starts storing any events that are come in for that key.  As the
>>>>>>> historical events come in they are processed immediately.  The 
>>>>>>> historical
>>>>>>> flink job flags the last event it sends.  When the main flink job sees 
>>>>>>> the
>>>>>>> flagged event it knows it is caught up to where it was when it sent the
>>>>>>> request.  You can then process the events that the main job stored, and
>>>>>>> when that is done you are caught up to the live stream, and can stop
>>>>>>> storing events for that key and just process them as normal.
>>>>>>>
>>>>>>> Keep in mind that this is the dangerous part that I was talking
>>>>>>> about, where memory in the main job would continue to build until the
>>>>>>> "historical" events are all processed.
>>>>>>>
>>>>>>> >In my case I would want the Flink state to always contain the
>>>>>>> latest state of every item (except when the job first starts and 
>>>>>>> there's a
>>>>>>> period of time where it's rebuilding its state from the Kafka log).
>>>>>>> You could absolutely do it by reading from the beginning of a kafka
>>>>>>> topic.  The reason we do it with GCS is it is really cheap storage, and 
>>>>>>> we
>>>>>>> are not planning on storing forever on the kafka topic.
>>>>>>>
>>>>>>> >Since I would have everything needed to rebuild the state
>>>>>>> persisted in a Kafka topic, I don't think I would need a second Flink 
>>>>>>> job
>>>>>>> for this?
>>>>>>> The reason for the second flink job in our case is that we didn't
>>>>>>> really want to block the flink task slot while a single key gets caught
>>>>>>> up.  We have a much larger key domain then we have number of task 
>>>>>>> slots, so
>>>>>>> there would be multiple keys on single task slot.  If you go with the
>>>>>>> single job approach (which might be the right approach for you guys) any
>>>>>>> other keys on that task slot will be blocked until the one key is 
>>>>>>> getting
>>>>>>> it's state built up.
>>>>>>>
>>>>>>> Hope that helps,
>>>>>>>
>>>>>>> On Fri, Jul 29, 2016 at 5:27 AM, Josh <jof...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Jason,
>>>>>>>>
>>>>>>>> Thanks for the reply - I didn't quite understand all of it though!
>>>>>>>>
>>>>>>>> > it sends a request to the historical flink job for the old data
>>>>>>>> How do you send a request from one Flink job to another?
>>>>>>>>
>>>>>>>> > It continues storing the live events until all the events form
>>>>>>>> the historical job have been processed, then it processes the stored
>>>>>>>> events, and finally starts processing the live stream again.
>>>>>>>> How do you handle the switchover between the live stream and the
>>>>>>>> historical stream? Do you somehow block the live stream source and 
>>>>>>>> detect
>>>>>>>> when the historical data source is no longer emitting new elements?
>>>>>>>>
>>>>>>>> > So in you case it looks like what you could do is send a request
>>>>>>>> to the "historical" job whenever you get a item that you don't yet 
>>>>>>>> have the
>>>>>>>> current state of.
>>>>>>>> In my case I would want the Flink state to always contain the
>>>>>>>> latest state of every item (except when the job first starts and 
>>>>>>>> there's a
>>>>>>>> period of time where it's rebuilding its state from the Kafka log). 
>>>>>>>> Since I
>>>>>>>> would have everything needed to rebuild the state persisted in a Kafka
>>>>>>>> topic, I don't think I would need a second Flink job for this?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Josh
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jul 28, 2016 at 6:57 PM, Jason Brelloch <
>>>>>>>> jb.bc....@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hey Josh,
>>>>>>>>>
>>>>>>>>> The way we replay historical data is we have a second Flink job
>>>>>>>>> that listens to the same live stream, and stores every single event in
>>>>>>>>> Google Cloud Storage.
>>>>>>>>>
>>>>>>>>> When the main Flink job that is processing the live stream gets a
>>>>>>>>> request for a specific data set that it has not been processing yet, 
>>>>>>>>> it
>>>>>>>>> sends a request to the historical flink job for the old data.  The 
>>>>>>>>> live job
>>>>>>>>> then starts storing relevant events from the live stream in state.  It
>>>>>>>>> continues storing the live events until all the events form the 
>>>>>>>>> historical
>>>>>>>>> job have been processed, then it processes the stored events, and 
>>>>>>>>> finally
>>>>>>>>> starts processing the live stream again.
>>>>>>>>>
>>>>>>>>> As long as it's properly keyed (we key on the specific data set)
>>>>>>>>> then it doesn't block anything, keeps everything ordered, and 
>>>>>>>>> eventually
>>>>>>>>> catches up.  It also allows us to completely blow away state and 
>>>>>>>>> rebuild it
>>>>>>>>> from scratch.
>>>>>>>>>
>>>>>>>>> So in you case it looks like what you could do is send a request
>>>>>>>>> to the "historical" job whenever you get a item that you don't yet 
>>>>>>>>> have the
>>>>>>>>> current state of.
>>>>>>>>>
>>>>>>>>> The potential problems you may have are that it may not be
>>>>>>>>> possible to store every single historical event, and that you need to 
>>>>>>>>> make
>>>>>>>>> sure there is enough memory to handle the ever increasing state size 
>>>>>>>>> while
>>>>>>>>> the historical events are being replayed (and make sure to clear the 
>>>>>>>>> state
>>>>>>>>> when it is done).
>>>>>>>>>
>>>>>>>>> It's a little complicated, and pretty expensive, but it works.
>>>>>>>>> Let me know if something doesn't make sense.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jul 28, 2016 at 1:14 PM, Josh <jof...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>> I was wondering what approaches people usually take with
>>>>>>>>>> reprocessing data with Flink - specifically the case where you want 
>>>>>>>>>> to
>>>>>>>>>> upgrade a Flink job, and make it reprocess historical data before
>>>>>>>>>> continuing to process a live stream.
>>>>>>>>>>
>>>>>>>>>> I'm wondering if we can do something similar to the 'simple
>>>>>>>>>> rewind' or 'parallel rewind' which Samza uses to solve this problem,
>>>>>>>>>> discussed here:
>>>>>>>>>> https://samza.apache.org/learn/documentation/0.10/jobs/reprocessing.html
>>>>>>>>>>
>>>>>>>>>> Having used Flink over the past couple of months, the main issue
>>>>>>>>>> I've had involves Flink's internal state - from my experience it 
>>>>>>>>>> seems it
>>>>>>>>>> is easy to break the state when upgrading a job, or when changing the
>>>>>>>>>> parallelism of operators, plus there's no easy way to view/access an
>>>>>>>>>> internal key-value state from outside Flink.
>>>>>>>>>>
>>>>>>>>>> For an example of what I mean, consider a Flink job which
>>>>>>>>>> consumes a stream of 'updates' to items, and maintains a key-value 
>>>>>>>>>> store of
>>>>>>>>>> items within Flink's internal state (e.g. in RocksDB). The job also 
>>>>>>>>>> writes
>>>>>>>>>> the updated items to a Kafka topic:
>>>>>>>>>>
>>>>>>>>>> http://oi64.tinypic.com/34q5opf.jpg
>>>>>>>>>>
>>>>>>>>>> My worry with this is that the state in RocksDB could be lost or
>>>>>>>>>> become incompatible with an updated version of the job. If this 
>>>>>>>>>> happens, we
>>>>>>>>>> need to be able to rebuild Flink's internal key-value store in 
>>>>>>>>>> RocksDB. So
>>>>>>>>>> I'd like to be able to do something like this (which I believe is 
>>>>>>>>>> the Samza
>>>>>>>>>> solution):
>>>>>>>>>>
>>>>>>>>>> http://oi67.tinypic.com/219ri95.jpg
>>>>>>>>>>
>>>>>>>>>> Has anyone done something like this already with Flink? If so are
>>>>>>>>>> there any examples of how to do this replay & switchover (rebuild 
>>>>>>>>>> state by
>>>>>>>>>> consuming from a historical log, then switch over to processing the 
>>>>>>>>>> live
>>>>>>>>>> stream)?
>>>>>>>>>>
>>>>>>>>>> Thanks for any insights,
>>>>>>>>>> Josh
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> *Jason Brelloch* | Product Developer
>>>>>>>>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
>>>>>>>>> <http://www.bettercloud.com/>
>>>>>>>>> Subscribe to the BetterCloud Monitor
>>>>>>>>> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch>
>>>>>>>>>  -
>>>>>>>>> Get IT delivered to your inbox
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> *Jason Brelloch* | Product Developer
>>>>>>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
>>>>>>> <http://www.bettercloud.com/>
>>>>>>> Subscribe to the BetterCloud Monitor
>>>>>>> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch>
>>>>>>>  -
>>>>>>> Get IT delivered to your inbox
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>
>

Reply via email to