Hi David, I have read the document for `Context.applyToKeyedState()`, but I still have some questions for using it to implement keyed state migration. `Context.applyToKeyedState()` can only be called in `processBoradcaseElement()`, so it won't have any key information. It looks like I can use `KeyedStateFunction` to get, update or clear my keyed states. Am I right? If I want to migrate to different type, e.g. change `string` type to `int` type, how do I archive by using this functionality? It seems that I can't use `key` parameter in `KeyedStateFunction` to access the other state, generated by another state descriptor. Please correct me if I misunderstood. Thank you.
Best Regards, Tony Wei 2018-06-09 9:45 GMT+08:00 TechnoMage <mla...@technomage.com>: > Thank you all. This discussion is very helpful. It sounds like I can > wait for 1.6 though given our development status. > > Michael > > > On Jun 8, 2018, at 1:08 PM, David Anderson <da...@data-artisans.com> > wrote: > > Hi all, > > I think I see a way to eagerly do full state migration without writing > your own Operator, but it's kind of hacky and may have flaws I'm not aware > of. > > In Flink 1.5 we now have the possibility to connect BroadcastStreams to > KeyedStreams and apply a KeyedBroadcastProcessFunction. This is relevant > because in the processBroadcastElement() method you can supply a > KeyedStateFunction to the Context.applyToKeyedState() method, and this > KeyedStateFunction > will be applied every item of keyed state associated with the state > descriptor you specify. I've been doing some experiments with this, and > it's quite powerful in cases where it's useful to operate on all of your > application's state. > > I believe this was intended for cases where an update to an item of > broadcast state has implications for associated keyed state, but I see > nothing that prevents you from essentially ignoring the broadcast stream > and using this mechanism to implement keyed state migration. > > David > > > > On Fri, Jun 8, 2018 at 9:27 AM, Piotr Nowojski <pi...@data-artisans.com> > wrote: > >> Hi, >> >> Yes it should be feasible. As I said before, with Flink 1.6 there will be >> better way for migrating a state, but for now you either need to lazily >> convert the state, or iterate over the keys and do the job manually. >> >> Piotrek >> >> >> On 7 Jun 2018, at 15:52, Tony Wei <tony19920...@gmail.com> wrote: >> >> Hi Piotrek, >> >> So my question is: is that feasible to migrate state from >> `ProcessFunction` to my own operator then use `getKeyedStateBackend()` to >> migrate the states? >> If yes, is there anything I need to be careful with? If no, why and can >> it be available in the future? Thank you. >> >> Best Regards, >> Tony Wei >> >> 2018-06-07 21:43 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com>: >> >>> Hi, >>> >>> Oh, I see now. Yes indeed getKeyedStateBackened() is not exposed to the >>> function and you can not migrate your state that way. >>> >>> As far as I know yes, at the moment in order to convert everything at >>> once (without getKeyes you still can implement lazy conversion) you would >>> have to write your own operator. >>> >>> Piotrek >>> >>> >>> On 7 Jun 2018, at 15:26, Tony Wei <tony19920...@gmail.com> wrote: >>> >>> Hi Piotrek, >>> >>> I used `ProcessFunction` to implement it, but it seems that I can't call >>> `getKeyedStateBackend()` like `WindowOperator` did. >>> I found that `getKeyedStateBackend()` is the method in >>> `AbstractStreamOperator` and `ProcessFunction` API didn't extend it. >>> Dose that mean I can't look up all keys and migrate the entire previous >>> states to the new states in `ProcessFunction#open()`? >>> As I said, do I need to port `ProcessFunction` to `KeyedProcessOperator` >>> to migration state like the manner showed in `WindowOperator`? >>> >>> Best Regards, >>> Tony Wei >>> >>> 2018-06-07 20:28 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com>: >>> >>>> What function are you implementing and how are you using it? >>>> >>>> Usually it’s enough if your function implements RichFunction (or rather >>>> extend from AbstractRichFunction) and then you could use RichFunction#open >>>> in the similar manner as in the code that I posted in previous message. >>>> Flink in many places performs instanceof chekcs like: >>>> org.apache.flink.api.common.functions.util.FunctionUtils#openFunction >>>> >>>> public static void openFunction(Function fun >>>> ction, Configuration parameters) throws Exception{ >>>> if (function instanceof RichFunction) { >>>> RichFunction richFunction = (RichFunction) function; >>>> richFunction.open(parameters); >>>> } >>>> } >>>> >>>> Piotrek >>>> >>>> >>>> On 7 Jun 2018, at 11:07, Tony Wei <tony19920...@gmail.com> wrote: >>>> >>>> Hi Piotrek, >>>> >>>> It seems that this was implemented by `Operator` API, which is a more >>>> low level api compared to `Function` API. >>>> Since in `Function` API level we can only migrate state by event >>>> triggered, it is more convenient in this way to migrate state by foreach >>>> all keys in `open()` method. >>>> If I was implemented state operator by `ProcessFunction` API, is it >>>> possible to port it to `KeyedProcessOperator` and do the state migration >>>> that you mentioned? >>>> And are there something concerned and difficulties that will leads to >>>> restored state failed or other problems? Thank you! >>>> >>>> Best Regards, >>>> Tony Wei >>>> >>>> 2018-06-07 16:10 GMT+08:00 Piotr Nowojski <pi...@data-artisans.com>: >>>> >>>>> Hi, >>>>> >>>>> General solution for state/schema migration is under development and >>>>> it might be released with Flink 1.6.0. >>>>> >>>>> Before that, you need to manually handle the state migration in your >>>>> operator’s open method. Lets assume that your OperatorV1 has a state field >>>>> “stateV1”. Your OperatorV2 defines field “stateV2”, which is incompatible >>>>> with previous version. What you can do, is to add a logic in open method, >>>>> to check: >>>>> 1. If “stateV2” is non empty, do nothing >>>>> 2. If there is no “stateV2”, iterate over all of the keys and manually >>>>> migrate “stateV1” to “stateV2” >>>>> >>>>> In your OperatorV3 you could drop the support for “stateV1”. >>>>> >>>>> I have once implemented something like that here: >>>>> >>>>> https://github.com/pnowojski/flink/blob/bfc8858fc4b9125b8fc7 >>>>> acd03cb3f95c000926b2/flink-streaming-java/src/main/java/org/ >>>>> apache/flink/streaming/runtime/operators/windowing/WindowOpe >>>>> rator.java#L258 >>>>> >>>>> Hope that helps! >>>>> >>>>> Piotrek >>>>> >>>>> >>>>> On 6 Jun 2018, at 17:04, TechnoMage <mla...@technomage.com> wrote: >>>>> >>>>> We are still pretty new to Flink and I have a conceptual / DevOps >>>>> question. >>>>> >>>>> When a job is modified and we want to deploy the new version, what is >>>>> the preferred method? Our jobs have a lot of keyed state. >>>>> >>>>> If we use snapshots we have old state that may no longer apply to the >>>>> new pipeline. >>>>> If we start a new job we can reprocess historical data from Kafka, but >>>>> that can be very resource heavy for a while. >>>>> >>>>> Is there an option I am missing? Are there facilities to “patch” or >>>>> “purge” selectively the keyed state? >>>>> >>>>> Michael >>>>> >>>>> >>>>> >>>> >>>> >>> >>> >> >> > > > -- > David Anderson | Training Coordinator > > <https://data-artisans.com/> > > Follow us @dataArtisans <https://twitter.com/dataArtisans> > -- > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > Stream Processing | Event Driven | Real Time > > >