Hi Marc, I assume you have set a UID for your CoProcessFunction as described in [1]? Also, can you provide the Flink version you are working with and the serializer you are using?
If you have the UID set, your strategy seems to be the same as proposed by [2]: "Although it is not possible to change the data type of operator state, a workaround to overcome this limitation can be to define a second state with a different data type and to implement logic to migrate the state from the original state into the new state." I'm no expert on this but it looks like it should work (although I'm curious on where the "aBuffer" in the error message comes from). I'm forwarding this to Gordon in CC because he probably knows better as he was involved in state migration before (afaik). Nico [1] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/ upgrading.html#application-state-compatibility [2] https://ci.apache.org/projects/flink/flink-docs-release-1.3/ops/ upgrading.html#stateful-operators-and-user-functions On Wednesday, 20 September 2017 14:16:27 CEST mrooding wrote: > Hi > > We've got a situation where we're merging several Kafka streams and for > certain streams, we want to retain up to 6 days of history. We're trying to > figure out how we can migrate savepoint data between application updates > when the data type for a certain state buffer updates. > > Let's assume that we have 2 streams with the following data types: > > case class A(id: String, name: String) > case class B1(id: String, price: Double) > > We have a CoProcessFunction which combines the 2 streams and maintains 2 > different buffer states: > > MapState[String, A] and ValueState[B1] > > In our scenario, we're trying to anticipate the data type of B1 changing in > the future. Let's assume that in the foreseeable future, B1 will change to: > > case class B2(id: String, price: Double, date: String) > > When we create a snapshot using B1 and then upgrading the application to B2 > the obvious attempt would be to try and retrieve the stored ValueState and > the new ValueState: > > val oldState = getRuntimeContext.getState(new > ValueStateDescriptor[B1]("1Buffer", createTypeInformation[B1])) > val newState = getRuntimeContext.getState(new > ValueStateDescriptor[B2]("2Buffer", createTypeInformation[B2])) > > However, as soon as you do the following error occurs: > > Unable to restore keyed state [aBuffer]. For memory-backed keyed state, the > previous serializer of the keyed state must be present; the serializer could > have been removed from the classpath, or its implementation have changed > and could not be loaded. This is a temporary restriction that will be fixed > in future versions. > > Our assumption is that the process operator which has a specified ID which > Flink uses to save and restore savepoints. The CoProcessorFunction types > changed from CoProcessFunction[A, B1, A] to CoProcessFunction[A, B2, A] and > therefore the savepoint data does not apply to the operator anymore. Is this > assumption correct? > > We've been going through the documentation and source code of Flink and it > seems like there's no achieve this kind of migrations. If this is the case, > we'd be interested in contributing to Flink to get this added a.s.a.p. and > would love to get some feedback on how to approach this. > > Thanks in advance > > Marc > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
signature.asc
Description: This is a digitally signed message part.