You are very close. I got your example to work by switching from the MemoryStateBackend to the FsStateBackend, and adding
bEnv.execute(); at the end of main(). I'm not sure why either of those might be necessary, but it doesn't seem to work without both changes. See https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf for my version. *David Anderson* | Training Coordinator Follow us @VervericaData -- Join Flink Forward - The Apache Flink Conference Stream Processing | Event Driven | Real Time On Thu, Mar 19, 2020 at 2:54 AM Dmitry Minaev <mina...@gmail.com> wrote: > Hi everyone, > > I'm looking for a way to modify state inside an operator in Flink. I found > State Processor API > < > https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/state_processor_api.html#modifying-savepoints> > > that allows to modify savepoints, which looks great. But I can't make it > work. > > I can read an existing state from savepoint but if I try to create (or > modify) a savepoint it doesn't write it by some reason. > > Questions: > 1. Is State Processor API the right way to achieve what I'm looking for? > Are > there any other approaches? > 2. can I ran this as a standalone java program or it has to be a part of a > Flink job? > 3. I expect to have a new savepoint in the provided location after running > the code below, is that the right expectation? > ``` > public static void main( String[] args ) throws Exception > { > ExecutionEnvironment bEnv = > ExecutionEnvironment.getExecutionEnvironment(); > > BootstrapTransformation<Integer> transform = > OperatorTransformation.bootstrapWith(bEnv.fromElements(1, 2, 3)) > .keyBy(String::valueOf) > .transform(new SimplestTransform()); > > Savepoint.create(new MemoryStateBackend(), > 16).withOperator("my-operator-uid", > transform).write("file:///tmp/savepoints/"); > } > > public class SimplestTransform extends KeyedStateBootstrapFunction<String, > Integer> > { > ValueState<Integer> state; > > @Override > public void open( Configuration parameters) { > ValueStateDescriptor<Integer> descriptor = new > ValueStateDescriptor<>("total", Types.INT); > state = getRuntimeContext().getState(descriptor); > } > > @Override > public void processElement(Integer value, Context ctx) throws Exception > { > state.update(value); > } > } > ``` > > It finishes successfully but it doesn't write anything to the specified > folder. I tried folder format with "file://" prefix and without it. > > I feel I'm missing something. > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >