What function are you implementing and how are you using it?
Usually it’s enough if your function implements RichFunction (or rather extend
from AbstractRichFunction) and then you could use RichFunction#open in the
similar manner as in the code that I posted in previous message. Flink in many
places performs instanceof chekcs like:
org.apache.flink.api.common.functions.util.FunctionUtils#openFunction
public static void openFunction(Function function, Configuration parameters)
throws Exception{
if (function instanceof RichFunction) {
RichFunction richFunction = (RichFunction) function;
richFunction.open(parameters);
}
}
Piotrek
> On 7 Jun 2018, at 11:07, Tony Wei <[email protected]> wrote:
>
> Hi Piotrek,
>
> It seems that this was implemented by `Operator` API, which is a more low
> level api compared to `Function` API.
> Since in `Function` API level we can only migrate state by event triggered,
> it is more convenient in this way to migrate state by foreach all keys in
> `open()` method.
> If I was implemented state operator by `ProcessFunction` API, is it possible
> to port it to `KeyedProcessOperator` and do the state migration that you
> mentioned?
> And are there something concerned and difficulties that will leads to
> restored state failed or other problems? Thank you!
>
> Best Regards,
> Tony Wei
>
> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski <[email protected]
> <mailto:[email protected]>>:
> Hi,
>
> General solution for state/schema migration is under development and it might
> be released with Flink 1.6.0.
>
> Before that, you need to manually handle the state migration in your
> operator’s open method. Lets assume that your OperatorV1 has a state field
> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible
> with previous version. What you can do, is to add a logic in open method, to
> check:
> 1. If “stateV2” is non empty, do nothing
> 2. If there is no “stateV2”, iterate over all of the keys and manually
> migrate “stateV1” to “stateV2”
>
> In your OperatorV3 you could drop the support for “stateV1”.
>
> I have once implemented something like that here:
>
> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258
>
> <https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L258>
>
> Hope that helps!
>
> Piotrek
>
>
>> On 6 Jun 2018, at 17:04, TechnoMage <[email protected]
>> <mailto:[email protected]>> wrote:
>>
>> We are still pretty new to Flink and I have a conceptual / DevOps question.
>>
>> When a job is modified and we want to deploy the new version, what is the
>> preferred method? Our jobs have a lot of keyed state.
>>
>> If we use snapshots we have old state that may no longer apply to the new
>> pipeline.
>> If we start a new job we can reprocess historical data from Kafka, but that
>> can be very resource heavy for a while.
>>
>> Is there an option I am missing? Are there facilities to “patch” or “purge”
>> selectively the keyed state?
>>
>> Michael
>
>