Hi Kostas, I am trying to use FsStateBackend as the backend for storing state. And configure it as follows in the code:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new FsStateBackend("file:///home/buvana/flink/checkpoints")); env.enableCheckpointing(10000); everything else is same as the code I shared with you previously. When I execute, I see that a directory is created under /home/buvana/flink/checkpoints, but there is nothing under that directory. I was expecting to find some file / sub dir there. Please explain. Thanks, Buvana -----Original Message----- From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] Sent: Friday, August 12, 2016 1:37 AM To: user@flink.apache.org Subject: Re: flink - Working with State example No problem! Regards, Kostas > On Aug 12, 2016, at 3:00 AM, Ramanan, Buvana (Nokia - US) > <buvana.rama...@nokia-bell-labs.com> wrote: > > Kostas, > Good catch! That makes it working! Thank you so much for the help. > Regards, > Buvana > > -----Original Message----- > From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] > Sent: Thursday, August 11, 2016 11:22 AM > To: user@flink.apache.org > Subject: Re: flink - Working with State example > > Hi Buvana, > > At a first glance, your snapshotState() should return a Double. > > Kostas > >> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) >> <buvana.rama...@nokia-bell-labs.com> wrote: >> >> Thank you Kostas & Ufuk. I get into the following compilation error when I >> use checkpointed interface. Pasting the code & message as follows: >> >> Is the Serializable definition supposed to be from java.io.Serializable or >> somewhere else? >> >> Thanks again, >> Buvana >> >> ====================================================================== >> ========================================== >> Code: >> >> import org.apache.flink.api.common.functions.FlatMapFunction; >> import org.apache.flink.api.common.functions.MapFunction; >> import org.apache.flink.streaming.api.checkpoint.Checkpointed; >> import org.apache.flink.configuration.Configuration; >> import org.apache.flink.api.common.typeinfo.TypeInformation; >> import org.apache.flink.api.common.typeinfo.TypeHint; >> import org.apache.flink.api.common.functions.RichFlatMapFunction; >> >> import java.io.Serializable; >> import org.apache.flink.api.java.tuple.Tuple2; >> import org.apache.flink.streaming.api.datastream.DataStream; >> import org.apache.flink.streaming.api.datastream.DataStreamSource; >> import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> import >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; >> import >> org.apache.flink.streaming.util.serialization.SimpleStringSchema; >> import org.apache.flink.util.Collector; >> >> import java.util.Properties; >> >> /** >> * Created by buvana on 8/9/16. >> */ >> public class stateful { >> private static String INPUT_KAFKA_TOPIC = null; >> --- >> --- skipping the main as it’s the same as before except for class name >> change ------------- >> --- >> public static class MapStateful extends RichFlatMapFunction<String, >> Tuple2<String, Double>> >> implements Checkpointed<Double> { >> >> private Double prev_tuple = null; >> >> @Override >> public void flatMap(String incString, Collector<Tuple2<String, >> Double>> out) { >> try { >> Double value = Double.parseDouble(incString); >> System.out.println("value = " + value); >> System.out.println(prev_tuple); >> >> Double value2 = value - prev_tuple; >> prev_tuple = value; >> >> Tuple2<String, Double> tp = new Tuple2<String, Double>(); >> tp.setField(INPUT_KAFKA_TOPIC, 0); >> tp.setField(value2, 1); >> out.collect(tp); >> } catch (NumberFormatException e) { >> System.out.println("Could not convert to Float" + incString); >> System.err.println("Could not convert to Float" + incString); >> } >> } >> @Override >> public void open(Configuration config) { >> if (prev_tuple == null) { >> // only recreate if null >> // restoreState will be called before open() >> // so this will already set the sum to the restored value >> prev_tuple = new Double("0.0"); >> } >> } >> >> @Override >> public Serializable snapshotState( >> long checkpointId, >> long checkpointTimestamp) throws Exception { >> return prev_tuple; >> } >> >> >> @Override >> public void restoreState(Double state) { >> prev_tuple = state; >> } >> } >> } >> ====================================================================== >> ========================================= >> ERROR message while building: >> >> $ mvn clean package >> [INFO] Scanning for projects... >> [INFO] >> >> [INFO] >> ---------------------------------------------------------------------- >> -- [INFO] Building Flink Quickstart Job 0.1 [INFO] >> ---------------------------------------------------------------------- >> -- [WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has >> been relocated to commons-io:commons-io:jar:1.3.2 [INFO] [INFO] --- >> maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits --- [INFO] >> Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target >> [INFO] >> [INFO] --- maven-resources-plugin:2.3:resources (default-resources) @ >> wiki-edits --- [INFO] Using 'UTF-8' encoding to copy filtered resources. >> [INFO] Copying 1 resource >> [INFO] >> [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ >> wiki-edits --- [INFO] Changes detected - recompiling the module! >> [INFO] Compiling 7 source files to >> /home/buvana/flink/flink-1.1.0/wiki-edits/target/classes >> [INFO] ------------------------------------------------------------- >> [ERROR] COMPILATION ERROR : >> [INFO] ------------------------------------------------------------- >> [ERROR] >> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat >> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and >> does not override abstract method snapshotState(long,long) in >> org.apache.flink.streaming.api.checkpoint.Checkpointed >> [ERROR] >> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat >> eful.java:[151,29] snapshotState(long,long) in >> wikiedits.stateful.MapStateful cannot implement >> snapshotState(long,long) in >> org.apache.flink.streaming.api.checkpoint.Checkpointed >> return type java.io.Serializable is not compatible with >> java.lang.Double [ERROR] >> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat >> eful.java:[150,9] method does not override or implement a method from >> a supertype [INFO] 3 errors [INFO] >> ------------------------------------------------------------- >> [INFO] >> ---------------------------------------------------------------------- >> -- >> [INFO] BUILD FAILURE >> [INFO] >> ---------------------------------------------------------------------- >> -- >> [INFO] Total time: 2.171s >> [INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016 [INFO] Final Memory: >> 26M/660M [INFO] >> ---------------------------------------------------------------------- >> -- [ERROR] Failed to execute goal >> org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) >> on project wiki-edits: Compilation failure: Compilation failure: >> [ERROR] >> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat >> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and >> does not override abstract method snapshotState(long,long) in >> org.apache.flink.streaming.api.checkpoint.Checkpointed >> [ERROR] >> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat >> eful.java:[151,29] snapshotState(long,long) in >> wikiedits.stateful.MapStateful cannot implement >> snapshotState(long,long) in >> org.apache.flink.streaming.api.checkpoint.Checkpointed >> [ERROR] return type java.io.Serializable is not compatible with >> java.lang.Double [ERROR] >> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat >> eful.java:[150,9] method does not override or implement a method from >> a supertype [ERROR] -> [Help 1] [ERROR] [ERROR] To see the full stack trace >> of the errors, re-run Maven with the -e switch. >> [ERROR] Re-run Maven using the -X switch to enable full debug logging. >> [ERROR] >> [ERROR] For more information about the errors and possible solutions, please >> read the following articles: >> [ERROR] [Help 1] >> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException >> ====================================================================== >> ========================================== >> >> -----Original Message----- >> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] >> Sent: Thursday, August 11, 2016 10:34 AM >> To: user@flink.apache.org >> Subject: Re: flink - Working with State example >> >> Exactly as Ufuk suggested, if you are not grouping your stream by key, you >> should use the checkpointed interface. >> >> The reason I asked before if you are using the keyBy() is because this is >> the one that implicitly sets the keySerializer and scopes your (keyed) state >> to a specific key. >> >> If there is no keying, then keyed state cannot be used and the Checkpointed >> interface should be used instead. >> >> Let us know if you need anything else. >> >> Kostas >> >>> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <u...@apache.org> wrote: >>> >>> This only works for keyed streams, you have to use keyBy(). >>> >>> You can use the Checkpointed interface instead >>> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields). >>> >>> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US) >>> <buvana.rama...@nokia-bell-labs.com> wrote: >>>> Hi Kostas, >>>> >>>> >>>> >>>> Here is my code. All I am trying to compute is (x[t] – x[t-1]), >>>> where x[t] is the current value of the incoming sample and x[t-1] is >>>> the previous value of the incoming sample. I store the current value >>>> in state store >>>> (‘prev_tuple’) so that I can use it for computation in next cycle. >>>> As you may observe, I am not using keyBy. I am simply printing out >>>> the resultant tuple. >>>> >>>> >>>> >>>> It appears from the error message that I have to set the key >>>> serializer (and possibly value serializer) for the state store. I am >>>> not sure how to do that… >>>> >>>> >>>> >>>> Thanks for your interest in helping, >>>> >>>> >>>> >>>> >>>> >>>> Regards, >>>> >>>> Buvana >>>> >>>> >>>> >>>> public class stateful { >>>> >>>> private static String INPUT_KAFKA_TOPIC = null; >>>> >>>> private static int TIME_WINDOW = 0; >>>> >>>> >>>> >>>> public static void main(String[] args) throws Exception { >>>> >>>> >>>> >>>> if (args.length < 2) { >>>> >>>> throw new IllegalArgumentException("The application needs >>>> two arguments. The first is the name of the kafka topic from which it has >>>> to \n" >>>> >>>> + "fetch the data. The second argument is the size >>>> of the window, in seconds, to which the aggregation function must be >>>> applied. >>>> \n"); >>>> >>>> } >>>> >>>> >>>> >>>> INPUT_KAFKA_TOPIC = args[0]; >>>> >>>> TIME_WINDOW = Integer.parseInt(args[1]); >>>> >>>> >>>> >>>> Properties properties = null; >>>> >>>> >>>> >>>> properties = new Properties(); >>>> >>>> properties.setProperty("bootstrap.servers", "localhost:9092"); >>>> >>>> properties.setProperty("zookeeper.connect", "localhost:2181"); >>>> >>>> properties.setProperty("group.id", "test"); >>>> >>>> >>>> >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> >>>> //env.setStateBackend(new >>>> FsStateBackend("file://home/buvana/flink/checkpoints")); >>>> >>>> >>>> >>>> DataStreamSource<String> stream = env >>>> >>>> .addSource(new >>>> FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(), >>>> properties)); >>>> >>>> >>>> >>>> // maps the data into Flink tuples >>>> >>>> DataStream<Tuple2<String,Double>> streamTuples = >>>> stream.flatMap(new Rec2Tuple2()); >>>> >>>> >>>> >>>> // write the result to the console or in a Kafka topic >>>> >>>> streamTuples.print(); >>>> >>>> >>>> >>>> env.execute("plus one"); >>>> >>>> >>>> >>>> } >>>> >>>> >>>> >>>> public static class Rec2Tuple2 extends RichFlatMapFunction<String, >>>> Tuple2<String,Double> > { >>>> >>>> private transient ValueState<Tuple2<String, Double>> >>>> prev_tuple; >>>> >>>> >>>> >>>> @Override >>>> >>>> public void flatMap(String incString, Collector<Tuple2<String, >>>> Double>> out) throws Exception { >>>> >>>> try { >>>> >>>> Double value = Double.parseDouble(incString); >>>> >>>> System.out.println("value = " + value); >>>> >>>> Tuple2<String, Double> prev_stored_tp = >>>> prev_tuple.value(); >>>> >>>> System.out.println(prev_stored_tp); >>>> >>>> >>>> >>>> Double value2 = value - prev_stored_tp.f1; >>>> >>>> prev_stored_tp.f1 = value; >>>> >>>> prev_stored_tp.f0 = INPUT_KAFKA_TOPIC; >>>> >>>> prev_tuple.update(prev_stored_tp); >>>> >>>> >>>> >>>> Tuple2<String, Double> tp = new Tuple2<String, >>>> Double>(); >>>> >>>> tp.setField(INPUT_KAFKA_TOPIC, 0); >>>> >>>> tp.setField(value2, 1); >>>> >>>> out.collect(tp); >>>> >>>> >>>> >>>> } catch (NumberFormatException e) { >>>> >>>> System.out.println("Could not convert to Float" + >>>> incString); >>>> >>>> System.err.println("Could not convert to Float" + >>>> incString); >>>> >>>> } >>>> >>>> } >>>> >>>> >>>> >>>> @Override >>>> >>>> public void open(Configuration config) { >>>> >>>> ValueStateDescriptor<Tuple2<String, Double>> descriptor = >>>> >>>> new ValueStateDescriptor<>( >>>> >>>> "previous input value", // the state name >>>> >>>> TypeInformation.of(new >>>> TypeHint<Tuple2<String, >>>> Double>>() {}), // type information >>>> >>>> Tuple2.of("test topic", 0.0)); // default >>>> value of the state, if nothing was set >>>> >>>> prev_tuple = getRuntimeContext().getState(descriptor); >>>> >>>> } >>>> >>>> } >>>> >>>> } >>>> >>>> >>>> >>>> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] >>>> Sent: Thursday, August 11, 2016 5:45 AM >>>> To: user@flink.apache.org >>>> Subject: Re: flink - Working with State example >>>> >>>> >>>> >>>> Hello Buvana, >>>> >>>> >>>> >>>> Can you share a bit more details on your operator and how you are using it? >>>> >>>> For example, are you using keyBy before using you custom operator? >>>> >>>> >>>> >>>> Thanks a lot, >>>> >>>> Kostas >>>> >>>> >>>> >>>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) >>>> <buvana.rama...@nokia-bell-labs.com> wrote: >>>> >>>> >>>> >>>> Hello, >>>> >>>> >>>> >>>> I am utilizing the code snippet in: >>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streamin >>>> g /state.html and particularly ‘open’ function in my code: >>>> >>>> @Override >>>> >>>> public void open(Configuration config) { >>>> >>>> ValueStateDescriptor<Tuple2<Long, Long>> descriptor = >>>> >>>> new ValueStateDescriptor<>( >>>> >>>> "average", // the state name >>>> >>>> TypeInformation.of(new TypeHint<Tuple2<Long, >>>> Long>>() {}), // type information >>>> >>>> Tuple2.of(0L, 0L)); // default value of the >>>> state, if nothing was set >>>> >>>> sum = getRuntimeContext().getState(descriptor); >>>> >>>> } >>>> >>>> >>>> >>>> When I run, I get the following error: >>>> >>>> Caused by: java.lang.RuntimeException: Error while getting state >>>> >>>> at >>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get >>>> S >>>> tate(StreamingRuntimeContext.java:120) >>>> >>>> at >>>> wikiedits.stateful$Rec2Tuple2.open(stateful.java:103) >>>> >>>> at >>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunctio >>>> n >>>> (FunctionUtils.java:38) >>>> >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.o >>>> p >>>> en(AbstractUdfStreamOperator.java:91) >>>> >>>> at >>>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFl >>>> a >>>> tMap.java:41) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators >>>> ( >>>> StreamTask.java:314) >>>> >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTas >>>> k >>>> .java:214) >>>> >>>> at >>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>>> >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> Caused by: java.lang.Exception: State key serializer has not been >>>> configured in the config. This operation cannot use partitioned state. >>>> >>>> at >>>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedSt >>>> a >>>> te(AbstractStateBackend.java:199) >>>> >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getP >>>> a >>>> rtitionedState(AbstractStreamOperator.java:260) >>>> >>>> at >>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get >>>> S >>>> tate(StreamingRuntimeContext.java:118) >>>> >>>> ... 8 more >>>> >>>> >>>> >>>> Where do I define the key & value serializer for state? >>>> >>>> >>>> >>>> Thanks, >>>> >>>> Buvana >>>> >>>> >> >