Hi Kostas,

I am trying to use FsStateBackend as the backend for storing state. And 
configure it as follows in the code:

       StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStateBackend(new 
FsStateBackend("file:///home/buvana/flink/checkpoints"));
        env.enableCheckpointing(10000);

everything else is same as the code I shared with you previously. 

When I execute, I see that a directory is created under 
/home/buvana/flink/checkpoints, but there is nothing under that directory.
I was expecting to find some file / sub dir there.

Please explain.

Thanks,
Buvana

-----Original Message-----
From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] 
Sent: Friday, August 12, 2016 1:37 AM
To: user@flink.apache.org
Subject: Re: flink - Working with State example

No problem!

Regards,
Kostas

> On Aug 12, 2016, at 3:00 AM, Ramanan, Buvana (Nokia - US) 
> <buvana.rama...@nokia-bell-labs.com> wrote:
> 
> Kostas,
> Good catch! That makes it working! Thank you so much for the help.
> Regards,
> Buvana
> 
> -----Original Message-----
> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] 
> Sent: Thursday, August 11, 2016 11:22 AM
> To: user@flink.apache.org
> Subject: Re: flink - Working with State example
> 
> Hi Buvana, 
> 
> At a first glance, your snapshotState() should return a Double.
> 
> Kostas
> 
>> On Aug 11, 2016, at 4:57 PM, Ramanan, Buvana (Nokia - US) 
>> <buvana.rama...@nokia-bell-labs.com> wrote:
>> 
>> Thank you Kostas & Ufuk. I get into the following compilation error when I 
>> use checkpointed interface. Pasting the code & message as follows:
>> 
>> Is the Serializable definition supposed to be from java.io.Serializable or 
>> somewhere else?
>> 
>> Thanks again,
>> Buvana
>> 
>> ======================================================================
>> ==========================================
>> Code:
>> 
>> import org.apache.flink.api.common.functions.FlatMapFunction;
>> import org.apache.flink.api.common.functions.MapFunction;
>> import org.apache.flink.streaming.api.checkpoint.Checkpointed;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.common.typeinfo.TypeHint;
>> import org.apache.flink.api.common.functions.RichFlatMapFunction;
>> 
>> import java.io.Serializable;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.datastream.DataStreamSource;
>> import 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import 
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
>> import 
>> org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>> import org.apache.flink.util.Collector;
>> 
>> import java.util.Properties;
>> 
>> /**
>> * Created by buvana on 8/9/16.
>> */
>> public class stateful {
>>   private static String INPUT_KAFKA_TOPIC = null;
>> ---
>> --- skipping the main as it’s the same as before except for class name 
>> change -------------
>> ---
>>      public static class MapStateful extends RichFlatMapFunction<String, 
>> Tuple2<String, Double>>
>>           implements Checkpointed<Double> {
>> 
>>       private Double prev_tuple = null;
>> 
>>       @Override
>>       public void flatMap(String incString, Collector<Tuple2<String, 
>> Double>> out) {
>>           try {
>>               Double value = Double.parseDouble(incString);
>>               System.out.println("value = " + value);
>>               System.out.println(prev_tuple);
>> 
>>               Double value2 = value - prev_tuple;
>>               prev_tuple = value;
>> 
>>               Tuple2<String, Double> tp = new Tuple2<String, Double>();
>>               tp.setField(INPUT_KAFKA_TOPIC, 0);
>>               tp.setField(value2, 1);
>>               out.collect(tp);
>>           } catch (NumberFormatException e) {
>>               System.out.println("Could not convert to Float" + incString);
>>               System.err.println("Could not convert to Float" + incString);
>>           }
>>       }
>>       @Override
>>       public void open(Configuration config) {
>>           if (prev_tuple == null) {
>>               // only recreate if null
>>               // restoreState will be called before open()
>>               // so this will already set the sum to the restored value
>>               prev_tuple = new Double("0.0");
>>           }
>>       }
>> 
>>       @Override
>>       public Serializable snapshotState(
>>               long checkpointId,
>>               long checkpointTimestamp) throws Exception {
>>           return prev_tuple;
>>       }
>> 
>> 
>>       @Override
>>       public void restoreState(Double state) {
>>           prev_tuple = state;
>>       }
>>   }
>> }
>> ======================================================================
>> =========================================
>> ERROR message while building:
>> 
>> $ mvn clean package
>> [INFO] Scanning for projects...
>> [INFO]                                                                       
>>   
>> [INFO] 
>> ----------------------------------------------------------------------
>> -- [INFO] Building Flink Quickstart Job 0.1 [INFO] 
>> ----------------------------------------------------------------------
>> -- [WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has 
>> been relocated to commons-io:commons-io:jar:1.3.2 [INFO] [INFO] --- 
>> maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits --- [INFO] 
>> Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target
>> [INFO]
>> [INFO] --- maven-resources-plugin:2.3:resources (default-resources) @ 
>> wiki-edits --- [INFO] Using 'UTF-8' encoding to copy filtered resources.
>> [INFO] Copying 1 resource
>> [INFO]
>> [INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ 
>> wiki-edits --- [INFO] Changes detected - recompiling the module!
>> [INFO] Compiling 7 source files to 
>> /home/buvana/flink/flink-1.1.0/wiki-edits/target/classes
>> [INFO] -------------------------------------------------------------
>> [ERROR] COMPILATION ERROR : 
>> [INFO] -------------------------------------------------------------
>> [ERROR] 
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and 
>> does not override abstract method snapshotState(long,long) in 
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> [ERROR] 
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[151,29] snapshotState(long,long) in 
>> wikiedits.stateful.MapStateful cannot implement 
>> snapshotState(long,long) in 
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> return type java.io.Serializable is not compatible with 
>> java.lang.Double [ERROR] 
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[150,9] method does not override or implement a method from 
>> a supertype [INFO] 3 errors [INFO] 
>> -------------------------------------------------------------
>> [INFO] 
>> ----------------------------------------------------------------------
>> --
>> [INFO] BUILD FAILURE
>> [INFO] 
>> ----------------------------------------------------------------------
>> --
>> [INFO] Total time: 2.171s
>> [INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016 [INFO] Final Memory: 
>> 26M/660M [INFO] 
>> ----------------------------------------------------------------------
>> -- [ERROR] Failed to execute goal 
>> org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) 
>> on project wiki-edits: Compilation failure: Compilation failure:
>> [ERROR] 
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[116,19] wikiedits.stateful.MapStateful is not abstract and 
>> does not override abstract method snapshotState(long,long) in 
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> [ERROR] 
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[151,29] snapshotState(long,long) in 
>> wikiedits.stateful.MapStateful cannot implement 
>> snapshotState(long,long) in 
>> org.apache.flink.streaming.api.checkpoint.Checkpointed
>> [ERROR] return type java.io.Serializable is not compatible with 
>> java.lang.Double [ERROR] 
>> /home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stat
>> eful.java:[150,9] method does not override or implement a method from 
>> a supertype [ERROR] -> [Help 1] [ERROR] [ERROR] To see the full stack trace 
>> of the errors, re-run Maven with the -e switch.
>> [ERROR] Re-run Maven using the -X switch to enable full debug logging.
>> [ERROR]
>> [ERROR] For more information about the errors and possible solutions, please 
>> read the following articles:
>> [ERROR] [Help 1] 
>> http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
>> ======================================================================
>> ==========================================
>> 
>> -----Original Message-----
>> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com]
>> Sent: Thursday, August 11, 2016 10:34 AM
>> To: user@flink.apache.org
>> Subject: Re: flink - Working with State example
>> 
>> Exactly as Ufuk suggested, if you are not grouping your stream by key, you 
>> should use the checkpointed interface.
>> 
>> The reason I asked before if you are using the keyBy() is because this is 
>> the one that implicitly sets the keySerializer and scopes your (keyed) state 
>> to a specific key.
>> 
>> If there is no keying, then keyed state cannot be used and the Checkpointed 
>> interface should be used instead. 
>> 
>> Let us know if you need anything else.
>> 
>> Kostas
>> 
>>> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <u...@apache.org> wrote:
>>> 
>>> This only works for keyed streams, you have to use keyBy().
>>> 
>>> You can use the Checkpointed interface instead 
>>> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).
>>> 
>>> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US) 
>>> <buvana.rama...@nokia-bell-labs.com> wrote:
>>>> Hi Kostas,
>>>> 
>>>> 
>>>> 
>>>> Here is my code. All I am trying to compute is (x[t] – x[t-1]), 
>>>> where x[t] is the current value of the incoming sample and x[t-1] is 
>>>> the previous value of the incoming sample. I store the current value 
>>>> in state store
>>>> (‘prev_tuple’) so that I can use it for computation in next cycle. 
>>>> As you may observe, I am not using keyBy. I am simply printing out 
>>>> the resultant tuple.
>>>> 
>>>> 
>>>> 
>>>> It appears from the error message that I have to set the key 
>>>> serializer (and possibly value serializer) for the state store. I am 
>>>> not sure how to do that…
>>>> 
>>>> 
>>>> 
>>>> Thanks for your interest in helping,
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> Regards,
>>>> 
>>>> Buvana
>>>> 
>>>> 
>>>> 
>>>> public class stateful {
>>>> 
>>>>  private static String INPUT_KAFKA_TOPIC = null;
>>>> 
>>>>  private static int TIME_WINDOW = 0;
>>>> 
>>>> 
>>>> 
>>>>  public static void main(String[] args) throws Exception {
>>>> 
>>>> 
>>>> 
>>>>      if (args.length < 2) {
>>>> 
>>>>          throw new IllegalArgumentException("The application needs 
>>>> two arguments. The first is the name of the kafka topic from which it has 
>>>> to \n"
>>>> 
>>>>                  + "fetch the data. The second argument is the size 
>>>> of the window, in seconds, to which the aggregation function must be 
>>>> applied.
>>>> \n");
>>>> 
>>>>      }
>>>> 
>>>> 
>>>> 
>>>>      INPUT_KAFKA_TOPIC = args[0];
>>>> 
>>>>      TIME_WINDOW = Integer.parseInt(args[1]);
>>>> 
>>>> 
>>>> 
>>>>      Properties properties = null;
>>>> 
>>>> 
>>>> 
>>>>      properties = new Properties();
>>>> 
>>>>      properties.setProperty("bootstrap.servers", "localhost:9092");
>>>> 
>>>>      properties.setProperty("zookeeper.connect", "localhost:2181");
>>>> 
>>>>      properties.setProperty("group.id", "test");
>>>> 
>>>> 
>>>> 
>>>>      StreamExecutionEnvironment env = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>> 
>>>>      //env.setStateBackend(new
>>>> FsStateBackend("file://home/buvana/flink/checkpoints"));
>>>> 
>>>> 
>>>> 
>>>>      DataStreamSource<String> stream = env
>>>> 
>>>>              .addSource(new
>>>> FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(), 
>>>> properties));
>>>> 
>>>> 
>>>> 
>>>>      // maps the data into Flink tuples
>>>> 
>>>>      DataStream<Tuple2<String,Double>> streamTuples = 
>>>> stream.flatMap(new Rec2Tuple2());
>>>> 
>>>> 
>>>> 
>>>>      // write the result to the console or in a Kafka topic
>>>> 
>>>>      streamTuples.print();
>>>> 
>>>> 
>>>> 
>>>>      env.execute("plus one");
>>>> 
>>>> 
>>>> 
>>>>  }
>>>> 
>>>> 
>>>> 
>>>>  public static class Rec2Tuple2 extends RichFlatMapFunction<String, 
>>>> Tuple2<String,Double> > {
>>>> 
>>>>      private transient ValueState<Tuple2<String, Double>> 
>>>> prev_tuple;
>>>> 
>>>> 
>>>> 
>>>>      @Override
>>>> 
>>>>      public void flatMap(String incString, Collector<Tuple2<String,
>>>> Double>> out) throws Exception {
>>>> 
>>>>          try {
>>>> 
>>>>              Double value = Double.parseDouble(incString);
>>>> 
>>>>              System.out.println("value = " + value);
>>>> 
>>>>              Tuple2<String, Double> prev_stored_tp = 
>>>> prev_tuple.value();
>>>> 
>>>>              System.out.println(prev_stored_tp);
>>>> 
>>>> 
>>>> 
>>>>              Double value2 = value - prev_stored_tp.f1;
>>>> 
>>>>              prev_stored_tp.f1 = value;
>>>> 
>>>>              prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>>>> 
>>>>              prev_tuple.update(prev_stored_tp);
>>>> 
>>>> 
>>>> 
>>>>              Tuple2<String, Double> tp = new Tuple2<String,
>>>> Double>();
>>>> 
>>>>              tp.setField(INPUT_KAFKA_TOPIC, 0);
>>>> 
>>>>              tp.setField(value2, 1);
>>>> 
>>>>              out.collect(tp);
>>>> 
>>>> 
>>>> 
>>>>          } catch (NumberFormatException e) {
>>>> 
>>>>              System.out.println("Could not convert to Float" + 
>>>> incString);
>>>> 
>>>>              System.err.println("Could not convert to Float" + 
>>>> incString);
>>>> 
>>>>          }
>>>> 
>>>>      }
>>>> 
>>>> 
>>>> 
>>>>      @Override
>>>> 
>>>>      public void open(Configuration config) {
>>>> 
>>>>          ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>>>> 
>>>>                  new ValueStateDescriptor<>(
>>>> 
>>>>                          "previous input value", // the state name
>>>> 
>>>>                          TypeInformation.of(new 
>>>> TypeHint<Tuple2<String,
>>>> Double>>() {}), // type information
>>>> 
>>>>                          Tuple2.of("test topic", 0.0)); // default 
>>>> value of the state, if nothing was set
>>>> 
>>>>          prev_tuple = getRuntimeContext().getState(descriptor);
>>>> 
>>>>      }
>>>> 
>>>>  }
>>>> 
>>>> }
>>>> 
>>>> 
>>>> 
>>>> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com]
>>>> Sent: Thursday, August 11, 2016 5:45 AM
>>>> To: user@flink.apache.org
>>>> Subject: Re: flink - Working with State example
>>>> 
>>>> 
>>>> 
>>>> Hello Buvana,
>>>> 
>>>> 
>>>> 
>>>> Can you share a bit more details on your operator and how you are using it?
>>>> 
>>>> For example, are you using keyBy before using you custom operator?
>>>> 
>>>> 
>>>> 
>>>> Thanks a lot,
>>>> 
>>>> Kostas
>>>> 
>>>> 
>>>> 
>>>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) 
>>>> <buvana.rama...@nokia-bell-labs.com> wrote:
>>>> 
>>>> 
>>>> 
>>>> Hello,
>>>> 
>>>> 
>>>> 
>>>> I am utilizing the code snippet in:
>>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streamin
>>>> g /state.html and particularly ‘open’ function in my code:
>>>> 
>>>> @Override
>>>> 
>>>>  public void open(Configuration config) {
>>>> 
>>>>      ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>>>> 
>>>>              new ValueStateDescriptor<>(
>>>> 
>>>>                      "average", // the state name
>>>> 
>>>>                      TypeInformation.of(new TypeHint<Tuple2<Long,
>>>> Long>>() {}), // type information
>>>> 
>>>>                      Tuple2.of(0L, 0L)); // default value of the 
>>>> state, if nothing was set
>>>> 
>>>>      sum = getRuntimeContext().getState(descriptor);
>>>> 
>>>>  }
>>>> 
>>>> 
>>>> 
>>>> When I run, I get the following error:
>>>> 
>>>> Caused by: java.lang.RuntimeException: Error while getting state
>>>> 
>>>>             at
>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get
>>>> S
>>>> tate(StreamingRuntimeContext.java:120)
>>>> 
>>>>             at
>>>> wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>>>> 
>>>>             at
>>>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunctio
>>>> n
>>>> (FunctionUtils.java:38)
>>>> 
>>>>             at
>>>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.o
>>>> p
>>>> en(AbstractUdfStreamOperator.java:91)
>>>> 
>>>>             at
>>>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFl
>>>> a
>>>> tMap.java:41)
>>>> 
>>>>             at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators
>>>> (
>>>> StreamTask.java:314)
>>>> 
>>>>             at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTas
>>>> k
>>>> .java:214)
>>>> 
>>>>             at
>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>> 
>>>>             at java.lang.Thread.run(Thread.java:745)
>>>> 
>>>> Caused by: java.lang.Exception: State key serializer has not been 
>>>> configured in the config. This operation cannot use partitioned state.
>>>> 
>>>>             at
>>>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedSt
>>>> a
>>>> te(AbstractStateBackend.java:199)
>>>> 
>>>>             at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getP
>>>> a
>>>> rtitionedState(AbstractStreamOperator.java:260)
>>>> 
>>>>             at
>>>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.get
>>>> S
>>>> tate(StreamingRuntimeContext.java:118)
>>>> 
>>>>             ... 8 more
>>>> 
>>>> 
>>>> 
>>>> Where do I define the key & value serializer for state?
>>>> 
>>>> 
>>>> 
>>>> Thanks,
>>>> 
>>>> Buvana
>>>> 
>>>> 
>> 
> 

Reply via email to