Thank you Kostas & Ufuk. I get into the following compilation error when I use checkpointed interface. Pasting the code & message as follows:
Is the Serializable definition supposed to be from java.io.Serializable or somewhere else? Thanks again, Buvana ================================================================================================================ Code: import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.configuration.Configuration; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.functions.RichFlatMapFunction; import java.io.Serializable; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.Collector; import java.util.Properties; /** * Created by buvana on 8/9/16. */ public class stateful { private static String INPUT_KAFKA_TOPIC = null; --- --- skipping the main as it’s the same as before except for class name change ------------- --- public static class MapStateful extends RichFlatMapFunction<String, Tuple2<String, Double>> implements Checkpointed<Double> { private Double prev_tuple = null; @Override public void flatMap(String incString, Collector<Tuple2<String, Double>> out) { try { Double value = Double.parseDouble(incString); System.out.println("value = " + value); System.out.println(prev_tuple); Double value2 = value - prev_tuple; prev_tuple = value; Tuple2<String, Double> tp = new Tuple2<String, Double>(); tp.setField(INPUT_KAFKA_TOPIC, 0); tp.setField(value2, 1); out.collect(tp); } catch (NumberFormatException e) { System.out.println("Could not convert to Float" + incString); System.err.println("Could not convert to Float" + incString); } } @Override public void open(Configuration config) { if (prev_tuple == null) { // only recreate if null // restoreState will be called before open() // so this will already set the sum to the restored value prev_tuple = new Double("0.0"); } } @Override public Serializable snapshotState( long checkpointId, long checkpointTimestamp) throws Exception { return prev_tuple; } @Override public void restoreState(Double state) { prev_tuple = state; } } } =============================================================================================================== ERROR message while building: $ mvn clean package [INFO] Scanning for projects... [INFO] [INFO] ------------------------------------------------------------------------ [INFO] Building Flink Quickstart Job 0.1 [INFO] ------------------------------------------------------------------------ [WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has been relocated to commons-io:commons-io:jar:1.3.2 [INFO] [INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits --- [INFO] Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target [INFO] [INFO] --- maven-resources-plugin:2.3:resources (default-resources) @ wiki-edits --- [INFO] Using 'UTF-8' encoding to copy filtered resources. [INFO] Copying 1 resource [INFO] [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ wiki-edits --- [INFO] Changes detected - recompiling the module! [INFO] Compiling 7 source files to /home/buvana/flink/flink-1.1.0/wiki-edits/target/classes [INFO] ------------------------------------------------------------- [ERROR] COMPILATION ERROR : [INFO] ------------------------------------------------------------- [ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and does not override abstract method snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed [ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29] snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed return type java.io.Serializable is not compatible with java.lang.Double [ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9] method does not override or implement a method from a supertype [INFO] 3 errors [INFO] ------------------------------------------------------------- [INFO] ------------------------------------------------------------------------ [INFO] BUILD FAILURE [INFO] ------------------------------------------------------------------------ [INFO] Total time: 2.171s [INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016 [INFO] Final Memory: 26M/660M [INFO] ------------------------------------------------------------------------ [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project wiki-edits: Compilation failure: Compilation failure: [ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and does not override abstract method snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed [ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29] snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement snapshotState(long,long) in org.apache.flink.streaming.api.checkpoint.Checkpointed [ERROR] return type java.io.Serializable is not compatible with java.lang.Double [ERROR] /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9] method does not override or implement a method from a supertype [ERROR] -> [Help 1] [ERROR] [ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch. [ERROR] Re-run Maven using the -X switch to enable full debug logging. [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException ================================================================================================================ -----Original Message----- From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] Sent: Thursday, August 11, 2016 10:34 AM To: user@flink.apache.org Subject: Re: flink - Working with State example Exactly as Ufuk suggested, if you are not grouping your stream by key, you should use the checkpointed interface. The reason I asked before if you are using the keyBy() is because this is the one that implicitly sets the keySerializer and scopes your (keyed) state to a specific key. If there is no keying, then keyed state cannot be used and the Checkpointed interface should be used instead. Let us know if you need anything else. Kostas > On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <u...@apache.org> wrote: > > This only works for keyed streams, you have to use keyBy(). > > You can use the Checkpointed interface instead > (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields). > > On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US) > <buvana.rama...@nokia-bell-labs.com> wrote: >> Hi Kostas, >> >> >> >> Here is my code. All I am trying to compute is (x[t] – x[t-1]), where >> x[t] is the current value of the incoming sample and x[t-1] is the >> previous value of the incoming sample. I store the current value in >> state store >> (‘prev_tuple’) so that I can use it for computation in next cycle. As >> you may observe, I am not using keyBy. I am simply printing out the >> resultant tuple. >> >> >> >> It appears from the error message that I have to set the key >> serializer (and possibly value serializer) for the state store. I am >> not sure how to do that… >> >> >> >> Thanks for your interest in helping, >> >> >> >> >> >> Regards, >> >> Buvana >> >> >> >> public class stateful { >> >> private static String INPUT_KAFKA_TOPIC = null; >> >> private static int TIME_WINDOW = 0; >> >> >> >> public static void main(String[] args) throws Exception { >> >> >> >> if (args.length < 2) { >> >> throw new IllegalArgumentException("The application needs >> two arguments. The first is the name of the kafka topic from which it has to >> \n" >> >> + "fetch the data. The second argument is the size >> of the window, in seconds, to which the aggregation function must be applied. >> \n"); >> >> } >> >> >> >> INPUT_KAFKA_TOPIC = args[0]; >> >> TIME_WINDOW = Integer.parseInt(args[1]); >> >> >> >> Properties properties = null; >> >> >> >> properties = new Properties(); >> >> properties.setProperty("bootstrap.servers", "localhost:9092"); >> >> properties.setProperty("zookeeper.connect", "localhost:2181"); >> >> properties.setProperty("group.id", "test"); >> >> >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> //env.setStateBackend(new >> FsStateBackend("file://home/buvana/flink/checkpoints")); >> >> >> >> DataStreamSource<String> stream = env >> >> .addSource(new >> FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(), >> properties)); >> >> >> >> // maps the data into Flink tuples >> >> DataStream<Tuple2<String,Double>> streamTuples = >> stream.flatMap(new Rec2Tuple2()); >> >> >> >> // write the result to the console or in a Kafka topic >> >> streamTuples.print(); >> >> >> >> env.execute("plus one"); >> >> >> >> } >> >> >> >> public static class Rec2Tuple2 extends RichFlatMapFunction<String, >> Tuple2<String,Double> > { >> >> private transient ValueState<Tuple2<String, Double>> >> prev_tuple; >> >> >> >> @Override >> >> public void flatMap(String incString, Collector<Tuple2<String, >> Double>> out) throws Exception { >> >> try { >> >> Double value = Double.parseDouble(incString); >> >> System.out.println("value = " + value); >> >> Tuple2<String, Double> prev_stored_tp = >> prev_tuple.value(); >> >> System.out.println(prev_stored_tp); >> >> >> >> Double value2 = value - prev_stored_tp.f1; >> >> prev_stored_tp.f1 = value; >> >> prev_stored_tp.f0 = INPUT_KAFKA_TOPIC; >> >> prev_tuple.update(prev_stored_tp); >> >> >> >> Tuple2<String, Double> tp = new Tuple2<String, >> Double>(); >> >> tp.setField(INPUT_KAFKA_TOPIC, 0); >> >> tp.setField(value2, 1); >> >> out.collect(tp); >> >> >> >> } catch (NumberFormatException e) { >> >> System.out.println("Could not convert to Float" + >> incString); >> >> System.err.println("Could not convert to Float" + >> incString); >> >> } >> >> } >> >> >> >> @Override >> >> public void open(Configuration config) { >> >> ValueStateDescriptor<Tuple2<String, Double>> descriptor = >> >> new ValueStateDescriptor<>( >> >> "previous input value", // the state name >> >> TypeInformation.of(new >> TypeHint<Tuple2<String, >> Double>>() {}), // type information >> >> Tuple2.of("test topic", 0.0)); // default >> value of the state, if nothing was set >> >> prev_tuple = getRuntimeContext().getState(descriptor); >> >> } >> >> } >> >> } >> >> >> >> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] >> Sent: Thursday, August 11, 2016 5:45 AM >> To: user@flink.apache.org >> Subject: Re: flink - Working with State example >> >> >> >> Hello Buvana, >> >> >> >> Can you share a bit more details on your operator and how you are using it? >> >> For example, are you using keyBy before using you custom operator? >> >> >> >> Thanks a lot, >> >> Kostas >> >> >> >> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) >> <buvana.rama...@nokia-bell-labs.com> wrote: >> >> >> >> Hello, >> >> >> >> I am utilizing the code snippet in: >> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming >> /state.html and particularly ‘open’ function in my code: >> >> @Override >> >> public void open(Configuration config) { >> >> ValueStateDescriptor<Tuple2<Long, Long>> descriptor = >> >> new ValueStateDescriptor<>( >> >> "average", // the state name >> >> TypeInformation.of(new TypeHint<Tuple2<Long, >> Long>>() {}), // type information >> >> Tuple2.of(0L, 0L)); // default value of the >> state, if nothing was set >> >> sum = getRuntimeContext().getState(descriptor); >> >> } >> >> >> >> When I run, I get the following error: >> >> Caused by: java.lang.RuntimeException: Error while getting state >> >> at >> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getS >> tate(StreamingRuntimeContext.java:120) >> >> at >> wikiedits.stateful$Rec2Tuple2.open(stateful.java:103) >> >> at >> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction >> (FunctionUtils.java:38) >> >> at >> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.op >> en(AbstractUdfStreamOperator.java:91) >> >> at >> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFla >> tMap.java:41) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators( >> StreamTask.java:314) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask >> .java:214) >> >> at >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> >> at java.lang.Thread.run(Thread.java:745) >> >> Caused by: java.lang.Exception: State key serializer has not been >> configured in the config. This operation cannot use partitioned state. >> >> at >> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedSta >> te(AbstractStateBackend.java:199) >> >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPa >> rtitionedState(AbstractStreamOperator.java:260) >> >> at >> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getS >> tate(StreamingRuntimeContext.java:118) >> >> ... 8 more >> >> >> >> Where do I define the key & value serializer for state? >> >> >> >> Thanks, >> >> Buvana >> >>