Hi!
I still have an issue... I was now using 0.9.1 and the new
KafkaConnector. But I still get duplicates in my flink prog. Here's the
relevant part:
final FlinkKafkaConsumer082<String> kafkaSrc = new
FlinkKafkaConsumer082<String>(
kafkaTopicIn, new SimpleStringSchema(), properties);
DataStream<String> start = env.addSource(kafkaSrc)
.setParallelism(numReadPartitions); //numReadPartitions = 2
DataStream<JSONObject> jsonized = start
.flatMap(new ExtractAndFilterJSON());
DataStream<Session> sessions = jsonized
.partitionByHash(new KeySelector<JSONObject, String>() {
/**
* partition by session id
*/
@Override
public String getKey(JSONObject value)
throws Exception {
try {
return /*session id*/;
} catch (Exception e) {
LOG.error("no session could be retrieved", e);
}
return "";
}
}).flatMap(new StatefulSearchSessionizer());
In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
sure that the kafka topic I'm reading from does not contain any
duplicates. So it must be in the flink program ...
Any ideas?
Cheers, Rico.