Hi David,

I have read the document for `Context.applyToKeyedState()`, but I still
have some questions for using it to implement keyed state migration.
`Context.applyToKeyedState()` can only be called in
`processBoradcaseElement()`, so it won't have any key information.
It looks like I can use `KeyedStateFunction` to get, update or clear my
keyed states. Am I right?
If I want to migrate to different type, e.g. change `string` type to `int`
type, how do I archive by using this functionality?
It seems that I can't use `key` parameter in `KeyedStateFunction` to access
the other state, generated by another state descriptor.
Please correct me if I misunderstood. Thank you.

Best Regards,
Tony Wei



2018-06-09 9:45 GMT+08:00 TechnoMage <mla...@technomage.com>:

> Thank you all.  This discussion is very helpful.  It sounds like I can
> wait for 1.6 though given our development status.
>
> Michael
>
>
> On Jun 8, 2018, at 1:08 PM, David Anderson <da...@data-artisans.com>
> wrote:
>
> Hi all,
>
> I think I see a way to eagerly do full state migration without writing
> your own Operator, but it's kind of hacky and may have flaws I'm not aware
> of.
>
> In Flink 1.5 we now have the possibility to connect BroadcastStreams to
> KeyedStreams and apply a KeyedBroadcastProcessFunction. This is relevant
> because in the processBroadcastElement() method you can supply a
> KeyedStateFunction to the Context.applyToKeyedState() method, and this 
> KeyedStateFunction
> will be applied every item of keyed state associated with the state
> descriptor you specify. I've been doing some experiments with this, and
> it's quite powerful in cases where it's useful to operate on all of your
> application's state.
>
> I believe this was intended for cases where an update to an item of
> broadcast state has implications for associated keyed state, but I see
> nothing that prevents you from essentially ignoring the broadcast stream
> and using this mechanism to implement keyed state migration.
>
> David
>
>
>
> On Fri, Jun 8, 2018 at 9:27 AM, Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
>> Hi,
>>
>> Yes it should be feasible. As I said before, with Flink 1.6 there will be
>> better way for migrating a state, but for now you either need to lazily
>> convert the state, or iterate over the keys and do the job manually.
>>
>> Piotrek
>>
>>
>> On 7 Jun 2018, at 15:52, Tony Wei <tony19920...@gmail.com> wrote:
>>
>> Hi Piotrek,
>>
>> So my question is: is that feasible to migrate state from
>> `ProcessFunction` to my own operator then use `getKeyedStateBackend()` to
>> migrate the states?
>> If yes, is there anything I need to be careful with? If no, why and can
>> it be available in the future? Thank you.
>>
>> Best Regards,
>> Tony Wei
>>
>> 2018-06-07 21:43 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com>:
>>
>>> Hi,
>>>
>>> Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the
>>> function and you can not migrate your state that way.
>>>
>>> As far as I know yes, at the moment in order to convert everything at
>>> once (without getKeyes you still can implement lazy conversion) you would
>>> have to write your own operator.
>>>
>>> Piotrek
>>>
>>>
>>> On 7 Jun 2018, at 15:26, Tony Wei <tony19920...@gmail.com> wrote:
>>>
>>> Hi Piotrek,
>>>
>>> I used `ProcessFunction` to implement it, but it seems that I can't call
>>> `getKeyedStateBackend()` like `WindowOperator` did.
>>> I found that `getKeyedStateBackend()` is the method in
>>> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
>>> Dose that mean I can't look up all keys and migrate the entire previous
>>> states to the new states in `ProcessFunction#open()`?
>>> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator`
>>> to migration state like the manner showed in `WindowOperator`?
>>>
>>> Best Regards,
>>> Tony Wei
>>>
>>> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com>:
>>>
>>>> What function are you implementing and how are you using it?
>>>>
>>>> Usually it’s enough if your function implements RichFunction (or rather
>>>> extend from AbstractRichFunction) and then you could use RichFunction#open
>>>> in the similar manner as in the code that I posted in previous message.
>>>> Flink in many places performs instanceof chekcs like:
>>>> org.apache.flink.api.common.functions.util.FunctionUtils#openFunction
>>>>
>>>> public static void openFunction(Function fun
>>>> ction, Configuration parameters) throws Exception{
>>>>    if (function instanceof RichFunction) {
>>>>       RichFunction richFunction = (RichFunction) function;
>>>>       richFunction.open(parameters);
>>>>    }
>>>> }
>>>>
>>>> Piotrek
>>>>
>>>>
>>>> On 7 Jun 2018, at 11:07, Tony Wei <tony19920...@gmail.com> wrote:
>>>>
>>>> Hi Piotrek,
>>>>
>>>> It seems that this was implemented by `Operator` API, which is a more
>>>> low level api compared to `Function` API.
>>>> Since in `Function` API level we can only migrate state by event
>>>> triggered, it is more convenient in this way to migrate state by foreach
>>>> all keys in `open()` method.
>>>> If I was implemented state operator by `ProcessFunction` API, is it
>>>> possible to port it to `KeyedProcessOperator` and do the state migration
>>>> that you mentioned?
>>>> And are there something concerned and difficulties that will leads to
>>>> restored state failed or other problems? Thank you!
>>>>
>>>> Best Regards,
>>>> Tony Wei
>>>>
>>>> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com>:
>>>>
>>>>> Hi,
>>>>>
>>>>> General solution for state/schema migration is under development and
>>>>> it might be released with Flink 1.6.0.
>>>>>
>>>>> Before that, you need to manually handle the state migration in your
>>>>> operator’s open method. Lets assume that your OperatorV1 has a state field
>>>>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible
>>>>> with previous version. What you can do, is to add a logic in open method,
>>>>> to check:
>>>>> 1. If “stateV2” is non empty, do nothing
>>>>> 2. If there is no “stateV2”, iterate over all of the keys and manually
>>>>> migrate “stateV1” to “stateV2”
>>>>>
>>>>> In your OperatorV3 you could drop the support for “stateV1”.
>>>>>
>>>>> I have once implemented something like that here:
>>>>>
>>>>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7
>>>>> acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/
>>>>> apache/flink/streaming/runtime/operators/windowing/WindowOpe
>>>>> rator.java#L258
>>>>>
>>>>> Hope that helps!
>>>>>
>>>>> Piotrek
>>>>>
>>>>>
>>>>> On 6 Jun 2018, at 17:04, TechnoMage <mla...@technomage.com> wrote:
>>>>>
>>>>> We are still pretty new to Flink and I have a conceptual / DevOps
>>>>> question.
>>>>>
>>>>> When a job is modified and we want to deploy the new version, what is
>>>>> the preferred method?  Our jobs have a lot of keyed state.
>>>>>
>>>>> If we use snapshots we have old state that may no longer apply to the
>>>>> new pipeline.
>>>>> If we start a new job we can reprocess historical data from Kafka, but
>>>>> that can be very resource heavy for a while.
>>>>>
>>>>> Is there an option I am missing?  Are there facilities to “patch” or
>>>>> “purge” selectively the keyed state?
>>>>>
>>>>> Michael
>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>
>
> --
> David Anderson | Training Coordinator
>
> <https://data-artisans.com/>
>
> Follow us @dataArtisans <https://twitter.com/dataArtisans>
> --
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
> Stream Processing | Event Driven | Real Time
>
>
>

Reply via email to