@Aljoscha - The N-input operator way sounds very nice, for now I think I'll
try and get something quick running the hacky way, then if we decide to
make this a permanent solution maybe I can work on the proper solution. I
was wondering about your suggestion for "warming up" the state and then
taking a savepoint and switching sources - since the Kafka sources are
stateful and are part of Flink's internal state, wouldn't this break when
trying to restore the job with a different source? Would I need to assign
the replay source a UID, and when switching from replay to live, remove the
replay source and replace it with an dummy operator with the same UID?

@Jason - I see what you mean now, with the historical and live Flink jobs.
That's an interesting approach - I guess it's solving a slightly different
problem to my 'rebuilding Flink state upon starting job' - as you're
rebuilding state as part of the main job when it comes across events that
require historical data. Actually I think we'll need to do something very
similar in the future but right now I can probably get away with something
simpler!

Thanks for the replies!

Josh

On Fri, Jul 29, 2016 at 2:35 PM, Jason Brelloch <jb.bc....@gmail.com> wrote:

> Aljoscha's approach is probably better, but to answer your questions...
>
> >How do you send a request from one Flink job to another?
> All of our different flink jobs communicate over kafka.  So the main flink
> job would be listening to both a "live" kafka source, and a "historical"
> kafka source.  The historical flink job would listen to a "request" kafka
> source.  When the main job gets an event that it does not have state for it
> writes to the "request" topic.  The historical job would read the request,
> grab the relevant old events from GCS, and write them to the "historical"
> kafka topic.  The "historical" source and the "live" source are merged and
> proceed through the main flink job as one stream.
>
> >How do you handle the switchover between the live stream and the
> historical stream? Do you somehow block the live stream source and detect
> when the historical data source is no longer emitting new elements?
> When the main job sends a request to the historical job, the main job
> starts storing any events that are come in for that key.  As the historical
> events come in they are processed immediately.  The historical flink job
> flags the last event it sends.  When the main flink job sees the flagged
> event it knows it is caught up to where it was when it sent the request.
> You can then process the events that the main job stored, and when that is
> done you are caught up to the live stream, and can stop storing events for
> that key and just process them as normal.
>
> Keep in mind that this is the dangerous part that I was talking about,
> where memory in the main job would continue to build until the "historical"
> events are all processed.
>
> >In my case I would want the Flink state to always contain the latest
> state of every item (except when the job first starts and there's a period
> of time where it's rebuilding its state from the Kafka log).
> You could absolutely do it by reading from the beginning of a kafka
> topic.  The reason we do it with GCS is it is really cheap storage, and we
> are not planning on storing forever on the kafka topic.
>
> >Since I would have everything needed to rebuild the state persisted in a
> Kafka topic, I don't think I would need a second Flink job for this?
> The reason for the second flink job in our case is that we didn't really
> want to block the flink task slot while a single key gets caught up.  We
> have a much larger key domain then we have number of task slots, so there
> would be multiple keys on single task slot.  If you go with the single job
> approach (which might be the right approach for you guys) any other keys on
> that task slot will be blocked until the one key is getting it's state
> built up.
>
> Hope that helps,
>
> On Fri, Jul 29, 2016 at 5:27 AM, Josh <jof...@gmail.com> wrote:
>
>> Hi Jason,
>>
>> Thanks for the reply - I didn't quite understand all of it though!
>>
>> > it sends a request to the historical flink job for the old data
>> How do you send a request from one Flink job to another?
>>
>> > It continues storing the live events until all the events form the
>> historical job have been processed, then it processes the stored events,
>> and finally starts processing the live stream again.
>> How do you handle the switchover between the live stream and the
>> historical stream? Do you somehow block the live stream source and detect
>> when the historical data source is no longer emitting new elements?
>>
>> > So in you case it looks like what you could do is send a request to
>> the "historical" job whenever you get a item that you don't yet have the
>> current state of.
>> In my case I would want the Flink state to always contain the latest
>> state of every item (except when the job first starts and there's a period
>> of time where it's rebuilding its state from the Kafka log). Since I would
>> have everything needed to rebuild the state persisted in a Kafka topic, I
>> don't think I would need a second Flink job for this?
>>
>> Thanks,
>> Josh
>>
>>
>>
>>
>> On Thu, Jul 28, 2016 at 6:57 PM, Jason Brelloch <jb.bc....@gmail.com>
>> wrote:
>>
>>> Hey Josh,
>>>
>>> The way we replay historical data is we have a second Flink job that
>>> listens to the same live stream, and stores every single event in Google
>>> Cloud Storage.
>>>
>>> When the main Flink job that is processing the live stream gets a
>>> request for a specific data set that it has not been processing yet, it
>>> sends a request to the historical flink job for the old data.  The live job
>>> then starts storing relevant events from the live stream in state.  It
>>> continues storing the live events until all the events form the historical
>>> job have been processed, then it processes the stored events, and finally
>>> starts processing the live stream again.
>>>
>>> As long as it's properly keyed (we key on the specific data set) then it
>>> doesn't block anything, keeps everything ordered, and eventually catches
>>> up.  It also allows us to completely blow away state and rebuild it from
>>> scratch.
>>>
>>> So in you case it looks like what you could do is send a request to the
>>> "historical" job whenever you get a item that you don't yet have the
>>> current state of.
>>>
>>> The potential problems you may have are that it may not be possible to
>>> store every single historical event, and that you need to make sure there
>>> is enough memory to handle the ever increasing state size while the
>>> historical events are being replayed (and make sure to clear the state when
>>> it is done).
>>>
>>> It's a little complicated, and pretty expensive, but it works.  Let me
>>> know if something doesn't make sense.
>>>
>>>
>>> On Thu, Jul 28, 2016 at 1:14 PM, Josh <jof...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I was wondering what approaches people usually take with reprocessing
>>>> data with Flink - specifically the case where you want to upgrade a Flink
>>>> job, and make it reprocess historical data before continuing to process a
>>>> live stream.
>>>>
>>>> I'm wondering if we can do something similar to the 'simple rewind' or
>>>> 'parallel rewind' which Samza uses to solve this problem, discussed here:
>>>> https://samza.apache.org/learn/documentation/0.10/jobs/reprocessing.html
>>>>
>>>> Having used Flink over the past couple of months, the main issue I've
>>>> had involves Flink's internal state - from my experience it seems it is
>>>> easy to break the state when upgrading a job, or when changing the
>>>> parallelism of operators, plus there's no easy way to view/access an
>>>> internal key-value state from outside Flink.
>>>>
>>>> For an example of what I mean, consider a Flink job which consumes a
>>>> stream of 'updates' to items, and maintains a key-value store of items
>>>> within Flink's internal state (e.g. in RocksDB). The job also writes the
>>>> updated items to a Kafka topic:
>>>>
>>>> http://oi64.tinypic.com/34q5opf.jpg
>>>>
>>>> My worry with this is that the state in RocksDB could be lost or become
>>>> incompatible with an updated version of the job. If this happens, we need
>>>> to be able to rebuild Flink's internal key-value store in RocksDB. So I'd
>>>> like to be able to do something like this (which I believe is the Samza
>>>> solution):
>>>>
>>>> http://oi67.tinypic.com/219ri95.jpg
>>>>
>>>> Has anyone done something like this already with Flink? If so are there
>>>> any examples of how to do this replay & switchover (rebuild state by
>>>> consuming from a historical log, then switch over to processing the live
>>>> stream)?
>>>>
>>>> Thanks for any insights,
>>>> Josh
>>>>
>>>>
>>>
>>>
>>> --
>>> *Jason Brelloch* | Product Developer
>>> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
>>> <http://www.bettercloud.com/>
>>> Subscribe to the BetterCloud Monitor
>>> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch>
>>>  -
>>> Get IT delivered to your inbox
>>>
>>
>>
>
>
> --
> *Jason Brelloch* | Product Developer
> 3405 Piedmont Rd. NE, Suite 325, Atlanta, GA 30305
> <http://www.bettercloud.com/>
> Subscribe to the BetterCloud Monitor
> <https://www.bettercloud.com/monitor?utm_source=bettercloud_email&utm_medium=email_signature&utm_campaign=monitor_launch>
>  -
> Get IT delivered to your inbox
>

Reply via email to