Hi, you mean the directory is completely empty? Can you check in the JobManager dashboard whether it reports any successful checkpoints for the job? One possible explanation is an optimization that the FsStateBackend performs: when the state is very small it will not actually be written to files but stored in the meta data of the checkpoint that is sent to the JobManager. This would explain why there are no files. You can set the threshold size for this optimization with an additional FsStateBackend constructor parameter, i.e. new FsStateBackend("file:///home/buvana/flink/checkpoints", 0) to disable this optimization.
Cheers, Aljoscha On Fri, 12 Aug 2016 at 21:12 Ramanan, Buvana (Nokia - US) < buvana.rama...@nokia-bell-labs.com> wrote: > Hi Kostas, > > I am trying to use FsStateBackend as the backend for storing state. And > configure it as follows in the code: > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStateBackend(new > FsStateBackend("file:///home/buvana/flink/checkpoints")); > env.enableCheckpointing(10000); > > everything else is same as the code I shared with you previously. > > When I execute, I see that a directory is created under > /home/buvana/flink/checkpoints, but there is nothing under that directory. > I was expecting to find some file / sub dir there. > > Please explain. > > Thanks, > Buvana > > -----Original Message----- > From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] > Sent: Friday, August 12, 2016 1:37 AM > To: user@flink.apache.org > Subject: Re: flink - Working with State example > > No problem! > > Regards, > Kostas > > > On Aug 12, 2016, at 3:00 AM, Ramanan, Buvana (Nokia - US) < > buvana.rama...@nokia-bell-labs.com> wrote: > > > > Kostas, > > Good catch! That makes it working! Thank you so much for the help. > > Regards, > > Buvana > > > > -----Original Message----- > > From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] > > Sent: Thursday, August 11, 2016 11:22 AM > > To: user@flink.apache.org > > Subject: Re: flink - Working with State example > > > > Hi Buvana, > > > > At a first glance, your snapshotState() should return a Double. > > > > Kostas > > > >> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) < > buvana.rama...@nokia-bell-labs.com> wrote: > >> > >> Thank you Kostas & Ufuk. I get into the following compilation error > when I use checkpointed interface. Pasting the code & message as follows: > >> > >> Is the Serializable definition supposed to be from java.io.Serializable > or somewhere else? > >> > >> Thanks again, > >> Buvana > >> > >> ====================================================================== > >> ========================================== > >> Code: > >> > >> import org.apache.flink.api.common.functions.FlatMapFunction; > >> import org.apache.flink.api.common.functions.MapFunction; > >> import org.apache.flink.streaming.api.checkpoint.Checkpointed; > >> import org.apache.flink.configuration.Configuration; > >> import org.apache.flink.api.common.typeinfo.TypeInformation; > >> import org.apache.flink.api.common.typeinfo.TypeHint; > >> import org.apache.flink.api.common.functions.RichFlatMapFunction; > >> > >> import java.io.Serializable; > >> import org.apache.flink.api.java.tuple.Tuple2; > >> import org.apache.flink.streaming.api.datastream.DataStream; > >> import org.apache.flink.streaming.api.datastream.DataStreamSource; > >> import > >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > >> import > >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; > >> import > >> org.apache.flink.streaming.util.serialization.SimpleStringSchema; > >> import org.apache.flink.util.Collector; > >> > >> import java.util.Properties; > >> > >> /** > >> * Created by buvana on 8/9/16. > >> */ > >> public class stateful { > >> private static String INPUT_KAFKA_TOPIC = null; > >> --- > >> --- skipping the main as it’s the same as before except for class name > >> change ------------- > >> --- > >> public static class MapStateful extends > RichFlatMapFunction<String, Tuple2<String, Double>> > >> implements Checkpointed<Double> { > >> > >> private Double prev_tuple = null; > >> > >> @Override > >> public void flatMap(String incString, Collector<Tuple2<String, > Double>> out) { > >> try { > >> Double value = Double.parseDouble(incString); > >> System.out.println("value = " + value); > >> System.out.println(prev_tuple); > >> > >> Double value2 = value - prev_tuple; > >> prev_tuple = value; > >> > >> Tuple2<String, Double> tp = new Tuple2<String, Double>(); > >> tp.setField(INPUT_KAFKA_TOPIC, 0); > >> tp.setField(value2, 1); > >> out.collect(tp); > >> } catch (NumberFormatException e) { > >> System.out.println("Could not convert to Float" + > incString); > >> System.err.println("Could not convert to Float" + > incString); > >> } > >> } > >> @Override > >> public void open(Configuration config) { > >> if (prev_tuple == null) { > >> // only recreate if null > >> // restoreState will be called before open() > >> // so this will already set the sum to the restored value > >> prev_tuple = new Double("0.0"); > >> } > >> } > >> > >> @Override > >> public Serializable snapshotState( > >> long checkpointId, > >> long checkpointTimestamp) throws Exception { > >> return prev_tuple; > >> } > >> > >> > >> @Override > >> public void restoreState(Double state) { > >> prev_tuple = state; > >> } > >> } > >> } > >> ====================================================================== > >> ========================================= > >> ERROR message while building: > >> > >> $ mvn clean package > >> [INFO] Scanning for projects... > >> [INFO] > >> [INFO] > >> ---------------------------------------------------------------------- > >> -- [INFO] Building Flink Quickstart Job 0.1 [INFO] > >> ---------------------------------------------------------------------- > >> -- [WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has > >> been relocated to commons-io:commons-io:jar:1.3.2 [INFO] [INFO] --- > >> maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits --- [INFO] > >> Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target > >> [INFO] > >> [INFO] --- maven-resources-plugin:2.3:resources (default-resources) @ > >> wiki-edits --- [INFO] Using 'UTF-8' encoding to copy filtered resources. > >> [INFO] Copying 1 resource > >> [INFO] > >> [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ > >> wiki-edits --- [INFO] Changes detected - recompiling the module! > >> [INFO] Compiling 7 source files to > >> /home/buvana/flink/flink-1.1.0/wiki-edits/target/classes > >> [INFO] ------------------------------------------------------------- > >> [ERROR] COMPILATION ERROR : > >> [INFO] ------------------------------------------------------------- > >> [ERROR] > >> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat > >> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and > >> does not override abstract method snapshotState(long,long) in > >> org.apache.flink.streaming.api.checkpoint.Checkpointed > >> [ERROR] > >> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat > >> eful.java:[151,29] snapshotState(long,long) in > >> wikiedits.stateful.MapStateful cannot implement > >> snapshotState(long,long) in > >> org.apache.flink.streaming.api.checkpoint.Checkpointed > >> return type java.io.Serializable is not compatible with > >> java.lang.Double [ERROR] > >> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat > >> eful.java:[150,9] method does not override or implement a method from > >> a supertype [INFO] 3 errors [INFO] > >> ------------------------------------------------------------- > >> [INFO] > >> ---------------------------------------------------------------------- > >> -- > >> [INFO] BUILD FAILURE > >> [INFO] > >> ---------------------------------------------------------------------- > >> -- > >> [INFO] Total time: 2.171s > >> [INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016 [INFO] Final Memory: > >> 26M/660M [INFO] > >> ---------------------------------------------------------------------- > >> -- [ERROR] Failed to execute goal > >> org.apache.maven.plugins:maven-compiler-plugin:3.1:compile > (default-compile) on project wiki-edits: Compilation failure: Compilation > failure: > >> [ERROR] > >> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat > >> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and > >> does not override abstract method snapshotState(long,long) in > >> org.apache.flink.streaming.api.checkpoint.Checkpointed > >> [ERROR] > >> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat > >> eful.java:[151,29] snapshotState(long,long) in > >> wikiedits.stateful.MapStateful cannot implement > >> snapshotState(long,long) in > >> org.apache.flink.streaming.api.checkpoint.Checkpointed > >> [ERROR] return type java.io.Serializable is not compatible with > >> java.lang.Double [ERROR] > >> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat > >> eful.java:[150,9] method does not override or implement a method from > >> a supertype [ERROR] -> [Help 1] [ERROR] [ERROR] To see the full stack > trace of the errors, re-run Maven with the -e switch. > >> [ERROR] Re-run Maven using the -X switch to enable full debug logging. > >> [ERROR] > >> [ERROR] For more information about the errors and possible solutions, > please read the following articles: > >> [ERROR] [Help 1] > >> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException > >> ====================================================================== > >> ========================================== > >> > >> -----Original Message----- > >> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] > >> Sent: Thursday, August 11, 2016 10:34 AM > >> To: user@flink.apache.org > >> Subject: Re: flink - Working with State example > >> > >> Exactly as Ufuk suggested, if you are not grouping your stream by key, > you should use the checkpointed interface. > >> > >> The reason I asked before if you are using the keyBy() is because this > is the one that implicitly sets the keySerializer and scopes your (keyed) > state to a specific key. > >> > >> If there is no keying, then keyed state cannot be used and the > Checkpointed interface should be used instead. > >> > >> Let us know if you need anything else. > >> > >> Kostas > >> > >>> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <u...@apache.org> wrote: > >>> > >>> This only works for keyed streams, you have to use keyBy(). > >>> > >>> You can use the Checkpointed interface instead > >>> ( > https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields > ). > >>> > >>> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US) > >>> <buvana.rama...@nokia-bell-labs.com> wrote: > >>>> Hi Kostas, > >>>> > >>>> > >>>> > >>>> Here is my code. All I am trying to compute is (x[t] – x[t-1]), > >>>> where x[t] is the current value of the incoming sample and x[t-1] is > >>>> the previous value of the incoming sample. I store the current value > >>>> in state store > >>>> (‘prev_tuple’) so that I can use it for computation in next cycle. > >>>> As you may observe, I am not using keyBy. I am simply printing out > >>>> the resultant tuple. > >>>> > >>>> > >>>> > >>>> It appears from the error message that I have to set the key > >>>> serializer (and possibly value serializer) for the state store. I am > >>>> not sure how to do that… > >>>> > >>>> > >>>> > >>>> Thanks for your interest in helping, > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> Regards, > >>>> > >>>> Buvana > >>>> > >>>> > >>>> > >>>> public class stateful { > >>>> > >>>> private static String INPUT_KAFKA_TOPIC = null; > >>>> > >>>> private static int TIME_WINDOW = 0; > >>>> > >>>> > >>>> > >>>> public static void main(String[] args) throws Exception { > >>>> > >>>> > >>>> > >>>> if (args.length < 2) { > >>>> > >>>> throw new IllegalArgumentException("The application needs > >>>> two arguments. The first is the name of the kafka topic from which it > has to \n" > >>>> > >>>> + "fetch the data. The second argument is the size > >>>> of the window, in seconds, to which the aggregation function must be > applied. > >>>> \n"); > >>>> > >>>> } > >>>> > >>>> > >>>> > >>>> INPUT_KAFKA_TOPIC = args[0]; > >>>> > >>>> TIME_WINDOW = Integer.parseInt(args[1]); > >>>> > >>>> > >>>> > >>>> Properties properties = null; > >>>> > >>>> > >>>> > >>>> properties = new Properties(); > >>>> > >>>> properties.setProperty("bootstrap.servers", "localhost:9092"); > >>>> > >>>> properties.setProperty("zookeeper.connect", "localhost:2181"); > >>>> > >>>> properties.setProperty("group.id", "test"); > >>>> > >>>> > >>>> > >>>> StreamExecutionEnvironment env = > >>>> StreamExecutionEnvironment.getExecutionEnvironment(); > >>>> > >>>> //env.setStateBackend(new > >>>> FsStateBackend("file://home/buvana/flink/checkpoints")); > >>>> > >>>> > >>>> > >>>> DataStreamSource<String> stream = env > >>>> > >>>> .addSource(new > >>>> FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(), > >>>> properties)); > >>>> > >>>> > >>>> > >>>> // maps the data into Flink tuples > >>>> > >>>> DataStream<Tuple2<String,Double>> streamTuples = > >>>> stream.flatMap(new Rec2Tuple2()); > >>>> > >>>> > >>>> > >>>> // write the result to the console or in a Kafka topic > >>>> > >>>> streamTuples.print(); > >>>> > >>>> > >>>> > >>>> env.execute("plus one"); > >>>> > >>>> > >>>> > >>>> } > >>>> > >>>> > >>>> > >>>> public static class Rec2Tuple2 extends RichFlatMapFunction<String, > >>>> Tuple2<String,Double> > { > >>>> > >>>> private transient ValueState<Tuple2<String, Double>> > >>>> prev_tuple; > >>>> > >>>> > >>>> > >>>> @Override > >>>> > >>>> public void flatMap(String incString, Collector<Tuple2<String, > >>>> Double>> out) throws Exception { > >>>> > >>>> try { > >>>> > >>>> Double value = Double.parseDouble(incString); > >>>> > >>>> System.out.println("value = " + value); > >>>> > >>>> Tuple2<String, Double> prev_stored_tp = > >>>> prev_tuple.value(); > >>>> > >>>> System.out.println(prev_stored_tp); > >>>> > >>>> > >>>> > >>>> Double value2 = value - prev_stored_tp.f1; > >>>> > >>>> prev_stored_tp.f1 = value; > >>>> > >>>> prev_stored_tp.f0 = INPUT_KAFKA_TOPIC; > >>>> > >>>> prev_tuple.update(prev_stored_tp); > >>>> > >>>> > >>>> > >>>> Tuple2<String, Double> tp = new Tuple2<String, > >>>> Double>(); > >>>> > >>>> tp.setField(INPUT_KAFKA_TOPIC, 0); > >>>> > >>>> tp.setField(value2, 1); > >>>> > >>>> out.collect(tp); > >>>> > >>>> > >>>> > >>>> } catch (NumberFormatException e) { > >>>> > >>>> System.out.println("Could not convert to Float" + > >>>> incString); > >>>> > >>>> System.err.println("Could not convert to Float" + > >>>> incString); > >>>> > >>>> } > >>>> > >>>> } > >>>> > >>>> > >>>> > >>>> @Override > >>>> > >>>> public void open(Configuration config) { > >>>> > >>>> ValueStateDescriptor<Tuple2<String, Double>> descriptor = > >>>> > >>>> new ValueStateDescriptor<>( > >>>> > >>>> "previous input value", // the state name > >>>> > >>>> TypeInformation.of(new > >>>> TypeHint<Tuple2<String, > >>>> Double>>() {}), // type information > >>>> > >>>> Tuple2.of("test topic", 0.0)); // default > >>>> value of the state, if nothing was set > >>>> > >>>> prev_tuple = getRuntimeContext().getState(descriptor); > >>>> > >>>> } > >>>> > >>>> } > >>>> > >>>> } > >>>> > >>>> > >>>> > >>>> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] > >>>> Sent: Thursday, August 11, 2016 5:45 AM > >>>> To: user@flink.apache.org > >>>> Subject: Re: flink - Working with State example > >>>> > >>>> > >>>> > >>>> Hello Buvana, > >>>> > >>>> > >>>> > >>>> Can you share a bit more details on your operator and how you are > using it? > >>>> > >>>> For example, are you using keyBy before using you custom operator? > >>>> > >>>> > >>>> > >>>> Thanks a lot, > >>>> > >>>> Kostas > >>>> > >>>> > >>>> > >>>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) > >>>> <buvana.rama...@nokia-bell-labs.com> wrote: > >>>> > >>>> > >>>> > >>>> Hello, > >>>> > >>>> > >>>> > >>>> I am utilizing the code snippet in: > >>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streamin > >>>> g /state.html and particularly ‘open’ function in my code: > >>>> > >>>> @Override > >>>> > >>>> public void open(Configuration config) { > >>>> > >>>> ValueStateDescriptor<Tuple2<Long, Long>> descriptor = > >>>> > >>>> new ValueStateDescriptor<>( > >>>> > >>>> "average", // the state name > >>>> > >>>> TypeInformation.of(new TypeHint<Tuple2<Long, > >>>> Long>>() {}), // type information > >>>> > >>>> Tuple2.of(0L, 0L)); // default value of the > >>>> state, if nothing was set > >>>> > >>>> sum = getRuntimeContext().getState(descriptor); > >>>> > >>>> } > >>>> > >>>> > >>>> > >>>> When I run, I get the following error: > >>>> > >>>> Caused by: java.lang.RuntimeException: Error while getting state > >>>> > >>>> at > >>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get > >>>> S > >>>> tate(StreamingRuntimeContext.java:120) > >>>> > >>>> at > >>>> wikiedits.stateful$Rec2Tuple2.open(stateful.java:103) > >>>> > >>>> at > >>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunctio > >>>> n > >>>> (FunctionUtils.java:38) > >>>> > >>>> at > >>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.o > >>>> p > >>>> en(AbstractUdfStreamOperator.java:91) > >>>> > >>>> at > >>>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFl > >>>> a > >>>> tMap.java:41) > >>>> > >>>> at > >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators > >>>> ( > >>>> StreamTask.java:314) > >>>> > >>>> at > >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTas > >>>> k > >>>> .java:214) > >>>> > >>>> at > >>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > >>>> > >>>> at java.lang.Thread.run(Thread.java:745) > >>>> > >>>> Caused by: java.lang.Exception: State key serializer has not been > >>>> configured in the config. This operation cannot use partitioned state. > >>>> > >>>> at > >>>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedSt > >>>> a > >>>> te(AbstractStateBackend.java:199) > >>>> > >>>> at > >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getP > >>>> a > >>>> rtitionedState(AbstractStreamOperator.java:260) > >>>> > >>>> at > >>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get > >>>> S > >>>> tate(StreamingRuntimeContext.java:118) > >>>> > >>>> ... 8 more > >>>> > >>>> > >>>> > >>>> Where do I define the key & value serializer for state? > >>>> > >>>> > >>>> > >>>> Thanks, > >>>> > >>>> Buvana > >>>> > >>>> > >> > > > >