Thank you Kostas & Ufuk. I get into the following compilation error when I use 
checkpointed interface. Pasting the code & message as follows:

Is the Serializable definition supposed to be from java.io.Serializable or 
somewhere else?

Thanks again,
Buvana

================================================================================================================
Code:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.functions.RichFlatMapFunction;

import java.io.Serializable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.apache.flink.util.Collector;

import java.util.Properties;

/**
 * Created by buvana on 8/9/16.
 */
public class stateful {
    private static String INPUT_KAFKA_TOPIC = null;
---
--- skipping the main as it’s the same as before except for class name change 
-------------
---
        public static class MapStateful extends RichFlatMapFunction<String, 
Tuple2<String, Double>>
            implements Checkpointed<Double> {

        private Double prev_tuple = null;

        @Override
        public void flatMap(String incString, Collector<Tuple2<String, Double>> 
out) {
            try {
                Double value = Double.parseDouble(incString);
                System.out.println("value = " + value);
                System.out.println(prev_tuple);

                Double value2 = value - prev_tuple;
                prev_tuple = value;

                Tuple2<String, Double> tp = new Tuple2<String, Double>();
                tp.setField(INPUT_KAFKA_TOPIC, 0);
                tp.setField(value2, 1);
                out.collect(tp);
            } catch (NumberFormatException e) {
                System.out.println("Could not convert to Float" + incString);
                System.err.println("Could not convert to Float" + incString);
            }
        }
        @Override
        public void open(Configuration config) {
            if (prev_tuple == null) {
                // only recreate if null
                // restoreState will be called before open()
                // so this will already set the sum to the restored value
                prev_tuple = new Double("0.0");
            }
        }

        @Override
        public Serializable snapshotState(
                long checkpointId,
                long checkpointTimestamp) throws Exception {
            return prev_tuple;
        }


        @Override
        public void restoreState(Double state) {
            prev_tuple = state;
        }
    }
}
===============================================================================================================
ERROR message while building:

$ mvn clean package
[INFO] Scanning for projects...
[INFO]                                                                         
[INFO] ------------------------------------------------------------------------
[INFO] Building Flink Quickstart Job 0.1
[INFO] ------------------------------------------------------------------------
[WARNING] The artifact org.apache.commons:commons-io:jar:1.3.2 has been 
relocated to commons-io:commons-io:jar:1.3.2
[INFO] 
[INFO] --- maven-clean-plugin:2.5:clean (default-clean) @ wiki-edits ---
[INFO] Deleting /home/buvana/flink/flink-1.1.0/wiki-edits/target
[INFO] 
[INFO] --- maven-resources-plugin:2.3:resources (default-resources) @ 
wiki-edits ---
[INFO] Using 'UTF-8' encoding to copy filtered resources.
[INFO] Copying 1 resource
[INFO] 
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ wiki-edits ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 7 source files to 
/home/buvana/flink/flink-1.1.0/wiki-edits/target/classes
[INFO] -------------------------------------------------------------
[ERROR] COMPILATION ERROR : 
[INFO] -------------------------------------------------------------
[ERROR] 
/home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19]
 wikiedits.stateful.MapStateful is not abstract and does not override abstract 
method snapshotState(long,long) in 
org.apache.flink.streaming.api.checkpoint.Checkpointed
[ERROR] 
/home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29]
 snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement 
snapshotState(long,long) in 
org.apache.flink.streaming.api.checkpoint.Checkpointed
  return type java.io.Serializable is not compatible with java.lang.Double
[ERROR] 
/home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9]
 method does not override or implement a method from a supertype
[INFO] 3 errors 
[INFO] -------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 2.171s
[INFO] Finished at: Thu Aug 11 10:47:07 EDT 2016
[INFO] Final Memory: 26M/660M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on 
project wiki-edits: Compilation failure: Compilation failure:
[ERROR] 
/home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[116,19]
 wikiedits.stateful.MapStateful is not abstract and does not override abstract 
method snapshotState(long,long) in 
org.apache.flink.streaming.api.checkpoint.Checkpointed
[ERROR] 
/home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[151,29]
 snapshotState(long,long) in wikiedits.stateful.MapStateful cannot implement 
snapshotState(long,long) in 
org.apache.flink.streaming.api.checkpoint.Checkpointed
[ERROR] return type java.io.Serializable is not compatible with java.lang.Double
[ERROR] 
/home/buvana/flink/flink-1.1.0/wiki-edits/src/main/java/wikiedits/stateful.java:[150,9]
 method does not override or implement a method from a supertype
[ERROR] -> [Help 1]
[ERROR] 
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e 
switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR] 
[ERROR] For more information about the errors and possible solutions, please 
read the following articles:
[ERROR] [Help 1] 
http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
================================================================================================================

-----Original Message-----
From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] 
Sent: Thursday, August 11, 2016 10:34 AM
To: user@flink.apache.org
Subject: Re: flink - Working with State example

Exactly as Ufuk suggested, if you are not grouping your stream by key, you 
should use the checkpointed interface.

The reason I asked before if you are using the keyBy() is because this is the 
one that implicitly sets the keySerializer and scopes your (keyed) state to a 
specific key.

If there is no keying, then keyed state cannot be used and the Checkpointed 
interface should be used instead. 

Let us know if you need anything else.

Kostas

> On Aug 11, 2016, at 4:10 PM, Ufuk Celebi <u...@apache.org> wrote:
> 
> This only works for keyed streams, you have to use keyBy().
> 
> You can use the Checkpointed interface instead 
> (https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#checkpointing-instance-fields).
> 
> On Thu, Aug 11, 2016 at 3:35 PM, Ramanan, Buvana (Nokia - US) 
> <buvana.rama...@nokia-bell-labs.com> wrote:
>> Hi Kostas,
>> 
>> 
>> 
>> Here is my code. All I am trying to compute is (x[t] – x[t-1]), where 
>> x[t] is the current value of the incoming sample and x[t-1] is the 
>> previous value of the incoming sample. I store the current value in 
>> state store
>> (‘prev_tuple’) so that I can use it for computation in next cycle. As 
>> you may observe, I am not using keyBy. I am simply printing out the 
>> resultant tuple.
>> 
>> 
>> 
>> It appears from the error message that I have to set the key 
>> serializer (and possibly value serializer) for the state store. I am 
>> not sure how to do that…
>> 
>> 
>> 
>> Thanks for your interest in helping,
>> 
>> 
>> 
>> 
>> 
>> Regards,
>> 
>> Buvana
>> 
>> 
>> 
>> public class stateful {
>> 
>>    private static String INPUT_KAFKA_TOPIC = null;
>> 
>>    private static int TIME_WINDOW = 0;
>> 
>> 
>> 
>>    public static void main(String[] args) throws Exception {
>> 
>> 
>> 
>>        if (args.length < 2) {
>> 
>>            throw new IllegalArgumentException("The application needs 
>> two arguments. The first is the name of the kafka topic from which it has to 
>> \n"
>> 
>>                    + "fetch the data. The second argument is the size 
>> of the window, in seconds, to which the aggregation function must be applied.
>> \n");
>> 
>>        }
>> 
>> 
>> 
>>        INPUT_KAFKA_TOPIC = args[0];
>> 
>>        TIME_WINDOW = Integer.parseInt(args[1]);
>> 
>> 
>> 
>>        Properties properties = null;
>> 
>> 
>> 
>>        properties = new Properties();
>> 
>>        properties.setProperty("bootstrap.servers", "localhost:9092");
>> 
>>        properties.setProperty("zookeeper.connect", "localhost:2181");
>> 
>>        properties.setProperty("group.id", "test");
>> 
>> 
>> 
>>        StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>>        //env.setStateBackend(new
>> FsStateBackend("file://home/buvana/flink/checkpoints"));
>> 
>> 
>> 
>>        DataStreamSource<String> stream = env
>> 
>>                .addSource(new 
>> FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(), 
>> properties));
>> 
>> 
>> 
>>        // maps the data into Flink tuples
>> 
>>        DataStream<Tuple2<String,Double>> streamTuples = 
>> stream.flatMap(new Rec2Tuple2());
>> 
>> 
>> 
>>        // write the result to the console or in a Kafka topic
>> 
>>        streamTuples.print();
>> 
>> 
>> 
>>        env.execute("plus one");
>> 
>> 
>> 
>>    }
>> 
>> 
>> 
>>    public static class Rec2Tuple2 extends RichFlatMapFunction<String, 
>> Tuple2<String,Double> > {
>> 
>>        private transient ValueState<Tuple2<String, Double>> 
>> prev_tuple;
>> 
>> 
>> 
>>        @Override
>> 
>>        public void flatMap(String incString, Collector<Tuple2<String,
>> Double>> out) throws Exception {
>> 
>>            try {
>> 
>>                Double value = Double.parseDouble(incString);
>> 
>>                System.out.println("value = " + value);
>> 
>>                Tuple2<String, Double> prev_stored_tp = 
>> prev_tuple.value();
>> 
>>                System.out.println(prev_stored_tp);
>> 
>> 
>> 
>>                Double value2 = value - prev_stored_tp.f1;
>> 
>>                prev_stored_tp.f1 = value;
>> 
>>                prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
>> 
>>                prev_tuple.update(prev_stored_tp);
>> 
>> 
>> 
>>                Tuple2<String, Double> tp = new Tuple2<String, 
>> Double>();
>> 
>>                tp.setField(INPUT_KAFKA_TOPIC, 0);
>> 
>>                tp.setField(value2, 1);
>> 
>>                out.collect(tp);
>> 
>> 
>> 
>>            } catch (NumberFormatException e) {
>> 
>>                System.out.println("Could not convert to Float" + 
>> incString);
>> 
>>                System.err.println("Could not convert to Float" + 
>> incString);
>> 
>>            }
>> 
>>        }
>> 
>> 
>> 
>>        @Override
>> 
>>        public void open(Configuration config) {
>> 
>>            ValueStateDescriptor<Tuple2<String, Double>> descriptor =
>> 
>>                    new ValueStateDescriptor<>(
>> 
>>                            "previous input value", // the state name
>> 
>>                            TypeInformation.of(new 
>> TypeHint<Tuple2<String,
>> Double>>() {}), // type information
>> 
>>                            Tuple2.of("test topic", 0.0)); // default 
>> value of the state, if nothing was set
>> 
>>            prev_tuple = getRuntimeContext().getState(descriptor);
>> 
>>        }
>> 
>>    }
>> 
>> }
>> 
>> 
>> 
>> From: Kostas Kloudas [mailto:k.klou...@data-artisans.com]
>> Sent: Thursday, August 11, 2016 5:45 AM
>> To: user@flink.apache.org
>> Subject: Re: flink - Working with State example
>> 
>> 
>> 
>> Hello Buvana,
>> 
>> 
>> 
>> Can you share a bit more details on your operator and how you are using it?
>> 
>> For example, are you using keyBy before using you custom operator?
>> 
>> 
>> 
>> Thanks a lot,
>> 
>> Kostas
>> 
>> 
>> 
>> On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) 
>> <buvana.rama...@nokia-bell-labs.com> wrote:
>> 
>> 
>> 
>> Hello,
>> 
>> 
>> 
>> I am utilizing the code snippet in:
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming
>> /state.html and particularly ‘open’ function in my code:
>> 
>> @Override
>> 
>>    public void open(Configuration config) {
>> 
>>        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
>> 
>>                new ValueStateDescriptor<>(
>> 
>>                        "average", // the state name
>> 
>>                        TypeInformation.of(new TypeHint<Tuple2<Long,
>> Long>>() {}), // type information
>> 
>>                        Tuple2.of(0L, 0L)); // default value of the 
>> state, if nothing was set
>> 
>>        sum = getRuntimeContext().getState(descriptor);
>> 
>>    }
>> 
>> 
>> 
>> When I run, I get the following error:
>> 
>> Caused by: java.lang.RuntimeException: Error while getting state
>> 
>>               at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getS
>> tate(StreamingRuntimeContext.java:120)
>> 
>>               at 
>> wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
>> 
>>               at
>> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction
>> (FunctionUtils.java:38)
>> 
>>               at
>> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.op
>> en(AbstractUdfStreamOperator.java:91)
>> 
>>               at
>> org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFla
>> tMap.java:41)
>> 
>>               at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(
>> StreamTask.java:314)
>> 
>>               at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
>> .java:214)
>> 
>>               at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> 
>>               at java.lang.Thread.run(Thread.java:745)
>> 
>> Caused by: java.lang.Exception: State key serializer has not been 
>> configured in the config. This operation cannot use partitioned state.
>> 
>>               at
>> org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedSta
>> te(AbstractStateBackend.java:199)
>> 
>>               at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPa
>> rtitionedState(AbstractStreamOperator.java:260)
>> 
>>               at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getS
>> tate(StreamingRuntimeContext.java:118)
>> 
>>               ... 8 more
>> 
>> 
>> 
>> Where do I define the key & value serializer for state?
>> 
>> 
>> 
>> Thanks,
>> 
>> Buvana
>> 
>> 

Reply via email to