Hi Buvana, At a first glance, your snapshotState() should return a Double.
Kostas > On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) > <buvana.rama...@nokia-bell-labs.com> wrote: > > Thank you Kostas & Ufuk. I get into the following compilation error when I > use checkpointed interface. Pasting the code & message as follows: > > Is the Serializable definition supposed to be from java.io.Serializable or > somewhere else? > > Thanks again, > Buvana > > ================================================================================================================ > Code: > > import org.apache.flink.api.common.functions.FlatMapFunction; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.streaming.api.checkpoint.Checkpointed; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.common.typeinfo.TypeHint; > import org.apache.flink.api.common.functions.RichFlatMapFunction; > > import java.io.Serializable; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; > import org.apache.flink.streaming.util.serialization.SimpleStringSchema; > import org.apache.flink.util.Collector; > > import java.util.Properties; > > /** > * Created by buvana on 8/9/16. > */ > public class stateful { > private static String INPUT_KAFKA_TOPIC = null; > --- > --- skipping the main as it’s the same as before except for class name change > ------------- > --- > public static class MapStateful extends RichFlatMapFunction<String, > Tuple2<String, Double>> > implements Checkpointed<Double> { > > private Double prev_tuple = null; > > @Override > public void flatMap(String incString, Collector<Tuple2<String, > Double>> out) { > try { > Double value = Double.parseDouble(incString); > System.out.println("value = " + value); > System.out.println(prev_tuple); > > Double value2 = value - prev_tuple; > prev_tuple = value; > > Tuple2<String, Double> tp = new Tuple2<String, Double>(); > tp.setField(INPUT_KAFKA_TOPIC, 0); > tp.setField(value2, 1); > out.collect(tp); > } catch (NumberFormatException e) { > System.out.println("Could not convert to Float" + incString); > System.err.println("Could not convert to Float" + incString); > } > } > @Override > public void open(Configuration config) { > if (prev_tuple == null) { > // only recreate if null > // restoreState will be called before open() > // so this will already set the sum to the restored value > prev_tuple = new Double("0.0"); > } > } > > @Override > public Serializable snapshotState( > long checkpointId, > long checkpointTimestamp) throws Exception { > return prev_tuple; > } > > > @Override > public void restoreState(Double state) { > prev_tuple = state; > } > } > } > =============================================================================================================== > ERROR message while building: > > $ mvn clean package > [INFO] Scanning for projects... > [INFO] > > [INFO] > ------------------------------------------------------------------------ > [INFO] Building Flink Quickstart Job 0.1 > [INFO] > ------------------------------------------------------------------------ > [WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has been > relocated to commons-io:commons-io:jar:1.3.2 > [INFO] > [INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits --- > [INFO] Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target > [INFO] > [INFO] --- maven-resources-plugin:2.3:resources (default-resources) @ > wiki-edits --- > [INFO] Using 'UTF-8' encoding to copy filtered resources. > [INFO] Copying 1 resource > [INFO] > [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ wiki-edits > --- > [INFO] Changes detected - recompiling the module! > [INFO] Compiling 7 source files to > /home/buvana/flink/flink-1.1.0/wiki-edits/target/classes > [INFO] ------------------------------------------------------------- > [ERROR] COMPILATION ERROR : > [INFO] ------------------------------------------------------------- > [ERROR] > /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19] > wikiedits.stateful.MapStateful is not abstract and does not override > abstract method snapshotState(long,long) in > org.apache.flink.streaming.api.checkpoint.Checkpointed > [ERROR] > /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29] > snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement > snapshotState(long,long) in > org.apache.flink.streaming.api.checkpoint.Checkpointed > return type java.io.Serializable is not compatible with java.lang.Double > [ERROR] > /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9] > method does not override or implement a method from a supertype > [INFO] 3 errors > [INFO] ------------------------------------------------------------- > [INFO] > ------------------------------------------------------------------------ > [INFO] BUILD FAILURE > [INFO] > ------------------------------------------------------------------------ > [INFO] Total time: 2.171s > [INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016 > [INFO] Final Memory: 26M/660M > [INFO] > ------------------------------------------------------------------------ > [ERROR] Failed to execute goal > org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) > on project wiki-edits: Compilation failure: Compilation failure: > [ERROR] > /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19] > wikiedits.stateful.MapStateful is not abstract and does not override > abstract method snapshotState(long,long) in > org.apache.flink.streaming.api.checkpoint.Checkpointed > [ERROR] > /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29] > snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement > snapshotState(long,long) in > org.apache.flink.streaming.api.checkpoint.Checkpointed > [ERROR] return type java.io.Serializable is not compatible with > java.lang.Double > [ERROR] > /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9] > method does not override or implement a method from a supertype > [ERROR] -> [Help 1] > [ERROR] > [ERROR] To see the full stack trace of the errors, re-run Maven with the -e > switch. > [ERROR] Re-run Maven using the -X switch to enable full debug logging. > [ERROR] > [ERROR] For more information about the errors and possible solutions, please > read the following articles: > [ERROR] [Help 1] > http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException > ================================================================================================================ > > -----Original Message----- > From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] > Sent: Thursday, August 11, 2016 10:34 AM > To: user@flink.apache.org > Subject: Re: flink - Working with State example > > Exactly as Ufuk suggested, if you are not grouping your stream by key, you > should use the checkpointed interface. > > The reason I asked before if you are using the keyBy() is because this is the > one that implicitly sets the keySerializer and scopes your (keyed) state to a > specific key. > > If there is no keying, then keyed state cannot be used and the Checkpointed > interface should be used instead. > > Let us know if you need anything else. > > Kostas > >> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <u...@apache.org> wrote: >> >> This only works for keyed streams, you have to use keyBy(). >> >> You can use the Checkpointed interface instead >> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields). >> >> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US) >> <buvana.rama...@nokia-bell-labs.com> wrote: >>> Hi Kostas, >>> >>> >>> >>> Here is my code. All I am trying to compute is (x[t] – x[t-1]), where >>> x[t] is the current value of the incoming sample and x[t-1] is the >>> previous value of the incoming sample. I store the current value in >>> state store >>> (‘prev_tuple’) so that I can use it for computation in next cycle. As >>> you may observe, I am not using keyBy. I am simply printing out the >>> resultant tuple. >>> >>> >>> >>> It appears from the error message that I have to set the key >>> serializer (and possibly value serializer) for the state store. I am >>> not sure how to do that… >>> >>> >>> >>> Thanks for your interest in helping, >>> >>> >>> >>> >>> >>> Regards, >>> >>> Buvana >>> >>> >>> >>> public class stateful { >>> >>> private static String INPUT_KAFKA_TOPIC = null; >>> >>> private static int TIME_WINDOW = 0; >>> >>> >>> >>> public static void main(String[] args) throws Exception { >>> >>> >>> >>> if (args.length < 2) { >>> >>> throw new IllegalArgumentException("The application needs >>> two arguments. The first is the name of the kafka topic from which it has >>> to \n" >>> >>> + "fetch the data. The second argument is the size >>> of the window, in seconds, to which the aggregation function must be >>> applied. >>> \n"); >>> >>> } >>> >>> >>> >>> INPUT_KAFKA_TOPIC = args[0]; >>> >>> TIME_WINDOW = Integer.parseInt(args[1]); >>> >>> >>> >>> Properties properties = null; >>> >>> >>> >>> properties = new Properties(); >>> >>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>> >>> properties.setProperty("zookeeper.connect", "localhost:2181"); >>> >>> properties.setProperty("group.id", "test"); >>> >>> >>> >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>> //env.setStateBackend(new >>> FsStateBackend("file://home/buvana/flink/checkpoints")); >>> >>> >>> >>> DataStreamSource<String> stream = env >>> >>> .addSource(new >>> FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(), >>> properties)); >>> >>> >>> >>> // maps the data into Flink tuples >>> >>> DataStream<Tuple2<String,Double>> streamTuples = >>> stream.flatMap(new Rec2Tuple2()); >>> >>> >>> >>> // write the result to the console or in a Kafka topic >>> >>> streamTuples.print(); >>> >>> >>> >>> env.execute("plus one"); >>> >>> >>> >>> } >>> >>> >>> >>> public static class Rec2Tuple2 extends RichFlatMapFunction<String, >>> Tuple2<String,Double> > { >>> >>> private transient ValueState<Tuple2<String, Double>> >>> prev_tuple; >>> >>> >>> >>> @Override >>> >>> public void flatMap(String incString, Collector<Tuple2<String, >>> Double>> out) throws Exception { >>> >>> try { >>> >>> Double value = Double.parseDouble(incString); >>> >>> System.out.println("value = " + value); >>> >>> Tuple2<String, Double> prev_stored_tp = >>> prev_tuple.value(); >>> >>> System.out.println(prev_stored_tp); >>> >>> >>> >>> Double value2 = value - prev_stored_tp.f1; >>> >>> prev_stored_tp.f1 = value; >>> >>> prev_stored_tp.f0 = INPUT_KAFKA_TOPIC; >>> >>> prev_tuple.update(prev_stored_tp); >>> >>> >>> >>> Tuple2<String, Double> tp = new Tuple2<String, >>> Double>(); >>> >>> tp.setField(INPUT_KAFKA_TOPIC, 0); >>> >>> tp.setField(value2, 1); >>> >>> out.collect(tp); >>> >>> >>> >>> } catch (NumberFormatException e) { >>> >>> System.out.println("Could not convert to Float" + >>> incString); >>> >>> System.err.println("Could not convert to Float" + >>> incString); >>> >>> } >>> >>> } >>> >>> >>> >>> @Override >>> >>> public void open(Configuration config) { >>> >>> ValueStateDescriptor<Tuple2<String, Double>> descriptor = >>> >>> new ValueStateDescriptor<>( >>> >>> "previous input value", // the state name >>> >>> TypeInformation.of(new >>> TypeHint<Tuple2<String, >>> Double>>() {}), // type information >>> >>> Tuple2.of("test topic", 0.0)); // default >>> value of the state, if nothing was set >>> >>> prev_tuple = getRuntimeContext().getState(descriptor); >>> >>> } >>> >>> } >>> >>> } >>> >>> >>> >>> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] >>> Sent: Thursday, August 11, 2016 5:45 AM >>> To: user@flink.apache.org >>> Subject: Re: flink - Working with State example >>> >>> >>> >>> Hello Buvana, >>> >>> >>> >>> Can you share a bit more details on your operator and how you are using it? >>> >>> For example, are you using keyBy before using you custom operator? >>> >>> >>> >>> Thanks a lot, >>> >>> Kostas >>> >>> >>> >>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) >>> <buvana.rama...@nokia-bell-labs.com> wrote: >>> >>> >>> >>> Hello, >>> >>> >>> >>> I am utilizing the code snippet in: >>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming >>> /state.html and particularly ‘open’ function in my code: >>> >>> @Override >>> >>> public void open(Configuration config) { >>> >>> ValueStateDescriptor<Tuple2<Long, Long>> descriptor = >>> >>> new ValueStateDescriptor<>( >>> >>> "average", // the state name >>> >>> TypeInformation.of(new TypeHint<Tuple2<Long, >>> Long>>() {}), // type information >>> >>> Tuple2.of(0L, 0L)); // default value of the >>> state, if nothing was set >>> >>> sum = getRuntimeContext().getState(descriptor); >>> >>> } >>> >>> >>> >>> When I run, I get the following error: >>> >>> Caused by: java.lang.RuntimeException: Error while getting state >>> >>> at >>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getS >>> tate(StreamingRuntimeContext.java:120) >>> >>> at >>> wikiedits.stateful$Rec2Tuple2.open(stateful.java:103) >>> >>> at >>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction >>> (FunctionUtils.java:38) >>> >>> at >>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.op >>> en(AbstractUdfStreamOperator.java:91) >>> >>> at >>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFla >>> tMap.java:41) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators( >>> StreamTask.java:314) >>> >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask >>> .java:214) >>> >>> at >>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>> >>> at java.lang.Thread.run(Thread.java:745) >>> >>> Caused by: java.lang.Exception: State key serializer has not been >>> configured in the config. This operation cannot use partitioned state. >>> >>> at >>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedSta >>> te(AbstractStateBackend.java:199) >>> >>> at >>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPa >>> rtitionedState(AbstractStreamOperator.java:260) >>> >>> at >>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getS >>> tate(StreamingRuntimeContext.java:118) >>> >>> ... 8 more >>> >>> >>> >>> Where do I define the key & value serializer for state? >>> >>> >>> >>> Thanks, >>> >>> Buvana >>> >>> >