Hi,

Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the 
function and you can not migrate your state that way.

As far as I know yes, at the moment in order to convert everything at once 
(without getKeyes you still can implement lazy conversion) you would have to 
write your own operator.

Piotrek

> On 7 Jun 2018, at 15:26, Tony Wei <tony19920...@gmail.com> wrote:
> 
> Hi Piotrek,
> 
> I used `ProcessFunction` to implement it, but it seems that I can't call 
> `getKeyedStateBackend()` like `WindowOperator` did.
> I found that `getKeyedStateBackend()` is the method in 
> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it.
> Dose that mean I can't look up all keys and migrate the entire previous 
> states to the new states in `ProcessFunction#open()`?
> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator` to 
> migration state like the manner showed in `WindowOperator`? 
> 
> Best Regards,
> Tony Wei
> 
> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>>:
> What function are you implementing and how are you using it?
> 
> Usually it’s enough if your function implements RichFunction (or rather 
> extend from AbstractRichFunction) and then you could use RichFunction#open in 
> the similar manner as in the code that I posted in previous message. Flink in 
> many places performs instanceof chekcs like: 
> org.apache.flink.api.common.functions.util.FunctionUtils#openFunction
> 
> public static void openFunction(Function function, Configuration parameters) 
> throws Exception{
>    if (function instanceof RichFunction) {
>       RichFunction richFunction = (RichFunction) function;
>       richFunction.open(parameters);
>    }
> }
> 
> Piotrek
> 
> 
>> On 7 Jun 2018, at 11:07, Tony Wei <tony19920...@gmail.com 
>> <mailto:tony19920...@gmail.com>> wrote:
>> 
>> Hi Piotrek,
>> 
>> It seems that this was implemented by `Operator` API, which is a more low 
>> level api compared to `Function` API.
>> Since in `Function` API level we can only migrate state by event triggered, 
>> it is more convenient in this way to migrate state by foreach all keys in 
>> `open()` method.
>> If I was implemented state operator by `ProcessFunction` API, is it possible 
>> to port it to `KeyedProcessOperator` and do the state migration that you 
>> mentioned?
>> And are there something concerned and difficulties that will leads to 
>> restored state failed or other problems? Thank you!
>> 
>> Best Regards,
>> Tony Wei
>> 
>> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>>:
>> Hi,
>> 
>> General solution for state/schema migration is under development and it 
>> might be released with Flink 1.6.0.
>> 
>> Before that, you need to manually handle the state migration in your 
>> operator’s open method. Lets assume that your OperatorV1 has a state field 
>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible 
>> with previous version. What you can do, is to add a logic in open method, to 
>> check:
>> 1. If “stateV2” is non empty, do nothing
>> 2. If there is no “stateV2”, iterate over all of the keys and manually 
>> migrate “stateV1” to “stateV2”
>> 
>> In your OperatorV3 you could drop the support for “stateV1”.
>> 
>> I have once implemented something like that here:
>> 
>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258
>>  
>> <https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258>
>> 
>> Hope that helps!
>> 
>> Piotrek
>> 
>> 
>>> On 6 Jun 2018, at 17:04, TechnoMage <mla...@technomage.com 
>>> <mailto:mla...@technomage.com>> wrote:
>>> 
>>> We are still pretty new to Flink and I have a conceptual / DevOps question.
>>> 
>>> When a job is modified and we want to deploy the new version, what is the 
>>> preferred method?  Our jobs have a lot of keyed state.
>>> 
>>> If we use snapshots we have old state that may no longer apply to the new 
>>> pipeline.
>>> If we start a new job we can reprocess historical data from Kafka, but that 
>>> can be very resource heavy for a while.
>>> 
>>> Is there an option I am missing?  Are there facilities to “patch” or 
>>> “purge” selectively the keyed state?
>>> 
>>> Michael
>> 
>> 
> 
> 

Reply via email to