Hi Kostas,
Here is my code. All I am trying to compute is (x[t] – x[t-1]), where x[t] is
the current value of the incoming sample and x[t-1] is the previous value of
the incoming sample. I store the current value in state store (‘prev_tuple’) so
that I can use it for computation in next cycle. As you may observe, I am not
using keyBy. I am simply printing out the resultant tuple.
It appears from the error message that I have to set the key serializer (and
possibly value serializer) for the state store. I am not sure how to do that…
Thanks for your interest in helping,
Regards,
Buvana
public class stateful {
private static String INPUT_KAFKA_TOPIC = null;
private static int TIME_WINDOW = 0;
public static void main(String[] args) throws Exception {
if (args.length < 2) {
throw new IllegalArgumentException("The application needs two
arguments. The first is the name of the kafka topic from which it has to \n"
+ "fetch the data. The second argument is the size of the
window, in seconds, to which the aggregation function must be applied. \n");
}
INPUT_KAFKA_TOPIC = args[0];
TIME_WINDOW = Integer.parseInt(args[1]);
Properties properties = null;
properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "test");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//env.setStateBackend(new
FsStateBackend("file://home/buvana/flink/checkpoints"));
DataStreamSource<String> stream = env
.addSource(new FlinkKafkaConsumer09<>(INPUT_KAFKA_TOPIC, new
SimpleStringSchema(), properties));
// maps the data into Flink tuples
DataStream<Tuple2<String,Double>> streamTuples = stream.flatMap(new
Rec2Tuple2());
// write the result to the console or in a Kafka topic
streamTuples.print();
env.execute("plus one");
}
public static class Rec2Tuple2 extends RichFlatMapFunction<String,
Tuple2<String,Double> > {
private transient ValueState<Tuple2<String, Double>> prev_tuple;
@Override
public void flatMap(String incString, Collector<Tuple2<String, Double>>
out) throws Exception {
try {
Double value = Double.parseDouble(incString);
System.out.println("value = " + value);
Tuple2<String, Double> prev_stored_tp = prev_tuple.value();
System.out.println(prev_stored_tp);
Double value2 = value - prev_stored_tp.f1;
prev_stored_tp.f1 = value;
prev_stored_tp.f0 = INPUT_KAFKA_TOPIC;
prev_tuple.update(prev_stored_tp);
Tuple2<String, Double> tp = new Tuple2<String, Double>();
tp.setField(INPUT_KAFKA_TOPIC, 0);
tp.setField(value2, 1);
out.collect(tp);
} catch (NumberFormatException e) {
System.out.println("Could not convert to Float" + incString);
System.err.println("Could not convert to Float" + incString);
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<String, Double>> descriptor =
new ValueStateDescriptor<>(
"previous input value", // the state name
TypeInformation.of(new TypeHint<Tuple2<String,
Double>>() {}), // type information
Tuple2.of("test topic", 0.0)); // default value of
the state, if nothing was set
prev_tuple = getRuntimeContext().getState(descriptor);
}
}
}
From: Kostas Kloudas [mailto:[email protected]]
Sent: Thursday, August 11, 2016 5:45 AM
To: [email protected]
Subject: Re: flink - Working with State example
Hello Buvana,
Can you share a bit more details on your operator and how you are using it?
For example, are you using keyBy before using you custom operator?
Thanks a lot,
Kostas
On Aug 10, 2016, at 10:03 PM, Ramanan, Buvana (Nokia - US)
<[email protected]<mailto:[email protected]>>
wrote:
Hello,
I am utilizing the code snippet in:
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html
and particularly ‘open’ function in my code:
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>()
{}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if
nothing was set
sum = getRuntimeContext().getState(descriptor);
}
When I run, I get the following error:
Caused by: java.lang.RuntimeException: Error while getting state
at
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:120)
at wikiedits.stateful$Rec2Tuple2.open(stateful.java:103)
at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:38)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:91)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.open(StreamFlatMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:314)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:214)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.Exception: State key serializer has not been configured in
the config. This operation cannot use partitioned state.
at
org.apache.flink.runtime.state.AbstractStateBackend.getPartitionedState(AbstractStateBackend.java:199)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.getPartitionedState(AbstractStreamOperator.java:260)
at
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:118)
... 8 more
Where do I define the key & value serializer for state?
Thanks,
Buvana