Hi Kostas, Here is my code. All I am trying to compute is (x[t] – x[t-1]), where x[t] is the current value of the incoming sample and x[t-1] is the previous value of the incoming sample. I store the current value in state store (‘prev_tuple’) so that I can use it for computation in next cycle. As you may observe, I am not using keyBy. I am simply printing out the resultant tuple.
It appears from the error message that I have to set the key serializer (and possibly value serializer) for the state store. I am not sure how to do that… Thanks for your interest in helping, Regards, Buvana public class stateful { private static String INPUT_KAFKA_TOPIC = null; private static int TIME_WINDOW = 0; public static void main(String[] args) throws Exception { if (args.length < 2) { throw new IllegalArgumentException("The application needs two arguments. The first is the name of the kafka topic from which it has to \n" + "fetch the data. The second argument is the size of the window, in seconds, to which the aggregation function must be applied. \n"); } INPUT_KAFKA_TOPIC = args[0]; TIME_WINDOW = Integer.parseInt(args[1]); Properties properties = null; properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("zookeeper.connect", "localhost:2181"); properties.setProperty("group.id", "test"); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //env.setStateBackend(new FsStateBackend("file://home/buvana/flink/checkpoints")); DataStreamSource<String> stream = env .addSource(new FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new SimpleStringSchema(), properties)); // maps the data into Flink tuples DataStream<Tuple2<String,Double>> streamTuples = stream.flatMap(new Rec2Tuple2()); // write the result to the console or in a Kafka topic streamTuples.print(); env.execute("plus one"); } public static class Rec2Tuple2 extends RichFlatMapFunction<String, Tuple2<String,Double> > { private transient ValueState<Tuple2<String, Double>> prev_tuple; @Override public void flatMap(String incString, Collector<Tuple2<String, Double>> out) throws Exception { try { Double value = Double.parseDouble(incString); System.out.println("value = " + value); Tuple2<String, Double> prev_stored_tp = prev_tuple.value(); System.out.println(prev_stored_tp); Double value2 = value - prev_stored_tp.f1; prev_stored_tp.f1 = value; prev_stored_tp.f0 = INPUT_KAFKA_TOPIC; prev_tuple.update(prev_stored_tp); Tuple2<String, Double> tp = new Tuple2<String, Double>(); tp.setField(INPUT_KAFKA_TOPIC, 0); tp.setField(value2, 1); out.collect(tp); } catch (NumberFormatException e) { System.out.println("Could not convert to Float" + incString); System.err.println("Could not convert to Float" + incString); } } @Override public void open(Configuration config) { ValueStateDescriptor<Tuple2<String, Double>> descriptor = new ValueStateDescriptor<>( "previous input value", // the state name TypeInformation.of(new TypeHint<Tuple2<String, Double>>() {}), // type information Tuple2.of("test topic", 0.0)); // default value of the state, if nothing was set prev_tuple = getRuntimeContext().getState(descriptor); } } } From: Kostas Kloudas [mailto:k.klou...@data-artisans.com] Sent: Thursday, August 11, 2016 5:45 AM To: user@flink.apache.org Subject: Re: flink - Working with State example Hello Buvana, Can you share a bit more details on your operator and how you are using it? For example, are you using keyBy before using you custom operator? Thanks a lot, Kostas On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US) <buvana.rama...@nokia-bell-labs.com<mailto:buvana.rama...@nokia-bell-labs.com>> wrote: Hello, I am utilizing the code snippet in: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html and particularly ‘open’ function in my code: @Override public void open(Configuration config) { ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>( "average", // the state name TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information Tuple2.of(0L, 0L)); // default value of the state, if nothing was set sum = getRuntimeContext().getState(descriptor); } When I run, I get the following error: Caused by: java.lang.RuntimeException: Error while getting state at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120) at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91) at org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: State key serializer has not been configured in the config. This operation cannot use partitioned state. at org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260) at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118) ... 8 more Where do I define the key & value serializer for state? Thanks, Buvana