Hi!

Testing it with the current 0.10 snapshot is not easily possible atm

But I deactivated checkpointing in my program and still get duplicates in my 
output. So it seems not only to come from the checkpointing feature, or?

May be the KafkaSink is responsible for this? (Just my guess)

Cheers Rico. 



> Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek <aljos...@apache.org>:
> 
> Hi Rico,
> unfortunately the 0.9 branch still seems to have problems with exactly once 
> processing and checkpointed operators. We reworked how the checkpoints are 
> handled for the 0.10 release so it should work well there. 
> 
> Could you maybe try running on the 0.10-SNAPSHOT release and see if the 
> problems persist there?
> 
> Cheers,
> Aljoscha
> 
>> On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann <i...@ricobergmann.de> 
>> wrote:
>> Hi!
>> 
>> I still have an issue... I was now using 0.9.1 and the new
>> KafkaConnector. But I still get duplicates in my flink prog. Here's the
>> relevant part:
>> 
>>          final FlinkKafkaConsumer082<String> kafkaSrc = new
>> FlinkKafkaConsumer082<String>(
>>              kafkaTopicIn, new SimpleStringSchema(), properties);
>> 
>>          DataStream<String> start = env.addSource(kafkaSrc)
>>              .setParallelism(numReadPartitions); //numReadPartitions = 2
>> 
>>          DataStream<JSONObject> jsonized = start
>>              .flatMap(new ExtractAndFilterJSON());
>> 
>>          DataStream<Session> sessions = jsonized
>>              .partitionByHash(new KeySelector<JSONObject, String>() {
>>              /**
>>               * partition by session id
>>               */
>>              @Override
>>              public String getKey(JSONObject value)
>>                  throws Exception {
>>                  try {
>>                  return /*session id*/;
>>                  } catch (Exception e) {
>>                  LOG.error("no session could be retrieved", e);
>>                  }
>>                  return "";
>>              }
>>              }).flatMap(new StatefulSearchSessionizer());
>> 
>> In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
>> sure that the kafka topic I'm reading from does not contain any
>> duplicates. So it must be in the flink program ...
>> 
>> Any ideas?
>> 
>> Cheers, Rico.

Reply via email to