No. I mean the KafkaSink. 

A bit more insight to my program: I read from a Kafka topic with 
flinkKafkaConsumer082, then hashpartition the data, then I do a deduplication 
(does not eliminate all duplicates though). Then some computation, afterwards 
again deduplication (group by message in a window of last 2 seconds). 

Of course the last deduplication is not perfect.

Cheers. Rico. 



> Am 03.09.2015 um 13:29 schrieb Stephan Ewen <se...@apache.org>:
> 
> Do you mean the KafkaSource?
> 
> Which KafkaSource are you using? The 0.9.1 FlinkKafkaConsumer082 or the 
> KafkaSource?
> 
>> On Thu, Sep 3, 2015 at 1:10 PM, Rico Bergmann <i...@ricobergmann.de> wrote:
>> Hi!
>> 
>> Testing it with the current 0.10 snapshot is not easily possible atm
>> 
>> But I deactivated checkpointing in my program and still get duplicates in my 
>> output. So it seems not only to come from the checkpointing feature, or?
>> 
>> May be the KafkaSink is responsible for this? (Just my guess)
>> 
>> Cheers Rico. 
>> 
>> 
>> 
>>> Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek <aljos...@apache.org>:
>>> 
>>> Hi Rico,
>>> unfortunately the 0.9 branch still seems to have problems with exactly once 
>>> processing and checkpointed operators. We reworked how the checkpoints are 
>>> handled for the 0.10 release so it should work well there. 
>>> 
>>> Could you maybe try running on the 0.10-SNAPSHOT release and see if the 
>>> problems persist there?
>>> 
>>> Cheers,
>>> Aljoscha
>>> 
>>>> On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann 
>>>> <i...@ricobergmann.de> wrote:
>>>> Hi!
>>>> 
>>>> I still have an issue... I was now using 0.9.1 and the new
>>>> KafkaConnector. But I still get duplicates in my flink prog. Here's the
>>>> relevant part:
>>>> 
>>>>          final FlinkKafkaConsumer082<String> kafkaSrc = new
>>>> FlinkKafkaConsumer082<String>(
>>>>              kafkaTopicIn, new SimpleStringSchema(), properties);
>>>> 
>>>>          DataStream<String> start = env.addSource(kafkaSrc)
>>>>              .setParallelism(numReadPartitions); //numReadPartitions = 2
>>>> 
>>>>          DataStream<JSONObject> jsonized = start
>>>>              .flatMap(new ExtractAndFilterJSON());
>>>> 
>>>>          DataStream<Session> sessions = jsonized
>>>>              .partitionByHash(new KeySelector<JSONObject, String>() {
>>>>              /**
>>>>               * partition by session id
>>>>               */
>>>>              @Override
>>>>              public String getKey(JSONObject value)
>>>>                  throws Exception {
>>>>                  try {
>>>>                  return /*session id*/;
>>>>                  } catch (Exception e) {
>>>>                  LOG.error("no session could be retrieved", e);
>>>>                  }
>>>>                  return "";
>>>>              }
>>>>              }).flatMap(new StatefulSearchSessionizer());
>>>> 
>>>> In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
>>>> sure that the kafka topic I'm reading from does not contain any
>>>> duplicates. So it must be in the flink program ...
>>>> 
>>>> Any ideas?
>>>> 
>>>> Cheers, Rico.
> 

Reply via email to