No, unfortunately this is the same for 1.1. The idea was to be explicit
about what works and what not. I see that this is actually a pain for this
use case (which is very nice and reasonable ;)). I think we can either
always ignore state that does not match to the new job or if that is too
aggressive we can add a flag to ignore unmatched state.

On Mon, Aug 1, 2016 at 6:39 PM, Aljoscha Krettek <>

> +Ufuk, looping him in directly
> Hmm, I think this is changed for the 1.1 release. Ufuk could you please
> comment?
> On Mon, 1 Aug 2016 at 08:07 Josh <> wrote:
>> Cool, thanks - I've tried out the approach where we replay data from the
>> Kafka compacted log, then take a savepoint and switch to the live stream.
>> It works but I did have to add in a dummy operator for every operator
>> that was removed. Without doing this, I got an exception:
>> java.lang.IllegalStateException: Failed to rollback to savepoint
>> Checkpoint 1 @ 1470059433553 for cb321c233dfd28f73c565030481657cd. Cannot
>> map old state for task 02ea922553bc7522bdea373f52a702d6 to the new program.
>> This indicates that the program has been changed in a non-compatible way
>>  after the savepoint.
>> I had a Kafka source and a flat mapper chained together when replaying,
>> so to make it work I had to add two dummy operators and assign the same UID
>> I used when replaying, like this:
>> =>
>> x).uid("kafka-replay").name("dummy-1").startNewChain().map(x =>
>> x).name("dummy-2")
>> I guess it would be nice if Flink could recover from removed
>> tasks/operators without needing to add dummy operators like this.
>> Josh
>> On Fri, Jul 29, 2016 at 5:46 PM, Aljoscha Krettek <>
>> wrote:
>>> Hi,
>>> I have to try this to verify but I think the approach works if you give
>>> the two sources different UIDs. The reason is that Flink will ignore state
>>> for which it doesn't have an operator to assign it to. Therefore, the state
>>> of the "historical Kafka source" should be silently discarded.
>>> Cheers,
>>> Aljoscha
>>> On Fri, 29 Jul 2016 at 18:12 Josh <> wrote:
>>>> @Aljoscha - The N-input operator way sounds very nice, for now I think
>>>> I'll try and get something quick running the hacky way, then if we decide
>>>> to make this a permanent solution maybe I can work on the proper solution.
>>>> I was wondering about your suggestion for "warming up" the state and then
>>>> taking a savepoint and switching sources - since the Kafka sources are
>>>> stateful and are part of Flink's internal state, wouldn't this break when
>>>> trying to restore the job with a different source? Would I need to assign
>>>> the replay source a UID, and when switching from replay to live, remove the
>>>> replay source and replace it with an dummy operator with the same UID?
>>>> @Jason - I see what you mean now, with the historical and live Flink
>>>> jobs. That's an interesting approach - I guess it's solving a slightly
>>>> different problem to my 'rebuilding Flink state upon starting job' - as
>>>> you're rebuilding state as part of the main job when it comes across events
>>>> that require historical data. Actually I think we'll need to do something
>>>> very similar in the future but right now I can probably get away with
>>>> something simpler!
>>>> Thanks for the replies!
>>>> Josh
>>>> On Fri, Jul 29, 2016 at 2:35 PM, Jason Brelloch <>
>>>> wrote:
>>>>> Aljoscha's approach is probably better, but to answer your questions...
>>>>> >How do you send a request from one Flink job to another?
>>>>> All of our different flink jobs communicate over kafka.  So the main
>>>>> flink job would be listening to both a "live" kafka source, and a
>>>>> "historical" kafka source.  The historical flink job would listen to a
>>>>> "request" kafka source.  When the main job gets an event that it does not
>>>>> have state for it writes to the "request" topic.  The historical job would
>>>>> read the request, grab the relevant old events from GCS, and write them to
>>>>> the "historical" kafka topic.  The "historical" source and the "live"
>>>>> source are merged and proceed through the main flink job as one stream.
>>>>> >How do you handle the switchover between the live stream and the
>>>>> historical stream? Do you somehow block the live stream source and detect
>>>>> when the historical data source is no longer emitting new elements?
>>>>> When the main job sends a request to the historical job, the main job
>>>>> starts storing any events that are come in for that key.  As the 
>>>>> historical
>>>>> events come in they are processed immediately.  The historical flink job
>>>>> flags the last event it sends.  When the main flink job sees the flagged
>>>>> event it knows it is caught up to where it was when it sent the request.
>>>>> You can then process the events that the main job stored, and when that is
>>>>> done you are caught up to the live stream, and can stop storing events for
>>>>> that key and just process them as normal.
>>>>> Keep in mind that this is the dangerous part that I was talking about,
>>>>> where memory in the main job would continue to build until the 
>>>>> "historical"
>>>>> events are all processed.
>>>>> >In my case I would want the Flink state to always contain the latest
>>>>> state of every item (except when the job first starts and there's a period
>>>>> of time where it's rebuilding its state from the Kafka log).
>>>>> You could absolutely do it by reading from the beginning of a kafka
>>>>> topic.  The reason we do it with GCS is it is really cheap storage, and we
>>>>> are not planning on storing forever on the kafka topic.
>>>>> >Since I would have everything needed to rebuild the state persisted
>>>>> in a Kafka topic, I don't think I would need a second Flink job for this?
>>>>> The reason for the second flink job in our case is that we didn't
>>>>> really want to block the flink task slot while a single key gets caught
>>>>> up.  We have a much larger key domain then we have number of task slots, 
>>>>> so
>>>>> there would be multiple keys on single task slot.  If you go with the
>>>>> single job approach (which might be the right approach for you guys) any
>>>>> other keys on that task slot will be blocked until the one key is getting
>>>>> it's state built up.
>>>>> Hope that helps,
>>>>> On Fri, Jul 29, 2016 at 5:27 AM, Josh <> wrote:
>>>>>> Hi Jason,
>>>>>> Thanks for the reply - I didn't quite understand all of it though!
>>>>>> > it sends a request to the historical flink job for the old data
>>>>>> How do you send a request from one Flink job to another?
>>>>>> > It continues storing the live events until all the events form the
>>>>>> historical job have been processed, then it processes the stored events,
>>>>>> and finally starts processing the live stream again.
>>>>>> How do you handle the switchover between the live stream and the
>>>>>> historical stream? Do you somehow block the live stream source and detect
>>>>>> when the historical data source is no longer emitting new elements?
>>>>>> > So in you case it looks like what you could do is send a request
>>>>>> to the "historical" job whenever you get a item that you don't yet have 
>>>>>> the
>>>>>> current state of.
>>>>>> In my case I would want the Flink state to always contain the latest
>>>>>> state of every item (except when the job first starts and there's a 
>>>>>> period
>>>>>> of time where it's rebuilding its state from the Kafka log). Since I 
>>>>>> would
>>>>>> have everything needed to rebuild the state persisted in a Kafka topic, I
>>>>>> don't think I would need a second Flink job for this?
>>>>>> Thanks,
>>>>>> Josh
>>>>>> On Thu, Jul 28, 2016 at 6:57 PM, Jason Brelloch <>
>>>>>> wrote:
>>>>>>> Hey Josh,
>>>>>>> The way we replay historical data is we have a second Flink job that
>>>>>>> listens to the same live stream, and stores every single event in Google
>>>>>>> Cloud Storage.
>>>>>>> When the main Flink job that is processing the live stream gets a
>>>>>>> request for a specific data set that it has not been processing yet, it
>>>>>>> sends a request to the historical flink job for the old data.  The live 
>>>>>>> job
>>>>>>> then starts storing relevant events from the live stream in state.  It
>>>>>>> continues storing the live events until all the events form the 
>>>>>>> historical
>>>>>>> job have been processed, then it processes the stored events, and 
>>>>>>> finally
>>>>>>> starts processing the live stream again.
>>>>>>> As long as it's properly keyed (we key on the specific data set)
>>>>>>> then it doesn't block anything, keeps everything ordered, and eventually
>>>>>>> catches up.  It also allows us to completely blow away state and 
>>>>>>> rebuild it
>>>>>>> from scratch.
>>>>>>> So in you case it looks like what you could do is send a request to
>>>>>>> the "historical" job whenever you get a item that you don't yet have the
>>>>>>> current state of.
>>>>>>> The potential problems you may have are that it may not be possible
>>>>>>> to store every single historical event, and that you need to make sure
>>>>>>> there is enough memory to handle the ever increasing state size while 
>>>>>>> the
>>>>>>> historical events are being replayed (and make sure to clear the state 
>>>>>>> when
>>>>>>> it is done).
>>>>>>> It's a little complicated, and pretty expensive, but it works.  Let
>>>>>>> me know if something doesn't make sense.
>>>>>>> On Thu, Jul 28, 2016 at 1:14 PM, Josh <> wrote:
>>>>>>>> Hi all,
>>>>>>>> I was wondering what approaches people usually take with
>>>>>>>> reprocessing data with Flink - specifically the case where you want to
>>>>>>>> upgrade a Flink job, and make it reprocess historical data before
>>>>>>>> continuing to process a live stream.
>>>>>>>> I'm wondering if we can do something similar to the 'simple rewind'
>>>>>>>> or 'parallel rewind' which Samza uses to solve this problem, discussed
>>>>>>>> here:
>>>>>>>> Having used Flink over the past couple of months, the main issue
>>>>>>>> I've had involves Flink's internal state - from my experience it seems 
>>>>>>>> it
>>>>>>>> is easy to break the state when upgrading a job, or when changing the
>>>>>>>> parallelism of operators, plus there's no easy way to view/access an
>>>>>>>> internal key-value state from outside Flink.
>>>>>>>> For an example of what I mean, consider a Flink job which consumes
>>>>>>>> a stream of 'updates' to items, and maintains a key-value store of 
>>>>>>>> items
>>>>>>>> within Flink's internal state (e.g. in RocksDB). The job also writes 
>>>>>>>> the
>>>>>>>> updated items to a Kafka topic:
>>>>>>>> My worry with this is that the state in RocksDB could be lost or
>>>>>>>> become incompatible with an updated version of the job. If this 
>>>>>>>> happens, we
>>>>>>>> need to be able to rebuild Flink's internal key-value store in 
>>>>>>>> RocksDB. So
>>>>>>>> I'd like to be able to do something like this (which I believe is the 
>>>>>>>> Samza
>>>>>>>> solution):
>>>>>>>> Has anyone done something like this already with Flink? If so are
>>>>>>>> there any examples of how to do this replay & switchover (rebuild 
>>>>>>>> state by
>>>>>>>> consuming from a historical log, then switch over to processing the 
>>>>>>>> live
>>>>>>>> stream)?
>>>>>>>> Thanks for any insights,
>>>>>>>> Josh
>>>>>>> --
>>>>>>> *Jason Brelloch* | Product Developer
>>>>>>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
>>>>>>> <>
>>>>>>> Subscribe to the BetterCloud Monitor
>>>>>>> <>
>>>>>>>  -
>>>>>>> Get IT delivered to your inbox
>>>>> --
>>>>> *Jason Brelloch* | Product Developer
>>>>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
>>>>> <>
>>>>> Subscribe to the BetterCloud Monitor
>>>>> <>
>>>>>  -
>>>>> Get IT delivered to your inbox

Reply via email to