Can you tell us where the KafkaSink comes into play? At what point do the
duplicates come up?

On Thu, Sep 3, 2015 at 2:09 PM, Rico Bergmann <i...@ricobergmann.de> wrote:

> No. I mean the KafkaSink.
>
> A bit more insight to my program: I read from a Kafka topic with
> flinkKafkaConsumer082, then hashpartition the data, then I do a
> deduplication (does not eliminate all duplicates though). Then some
> computation, afterwards again deduplication (group by message in a window
> of last 2 seconds).
>
> Of course the last deduplication is not perfect.
>
> Cheers. Rico.
>
>
>
> Am 03.09.2015 um 13:29 schrieb Stephan Ewen <se...@apache.org>:
>
> Do you mean the KafkaSource?
>
> Which KafkaSource are you using? The 0.9.1 FlinkKafkaConsumer082 or the
> KafkaSource?
>
> On Thu, Sep 3, 2015 at 1:10 PM, Rico Bergmann <i...@ricobergmann.de>
> wrote:
>
>> Hi!
>>
>> Testing it with the current 0.10 snapshot is not easily possible atm
>>
>> But I deactivated checkpointing in my program and still get duplicates in
>> my output. So it seems not only to come from the checkpointing feature, or?
>>
>> May be the KafkaSink is responsible for this? (Just my guess)
>>
>> Cheers Rico.
>>
>>
>>
>> Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek <aljos...@apache.org>:
>>
>> Hi Rico,
>> unfortunately the 0.9 branch still seems to have problems with exactly
>> once processing and checkpointed operators. We reworked how the checkpoints
>> are handled for the 0.10 release so it should work well there.
>>
>> Could you maybe try running on the 0.10-SNAPSHOT release and see if the
>> problems persist there?
>>
>> Cheers,
>> Aljoscha
>>
>> On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann <
>> i...@ricobergmann.de> wrote:
>>
>>> Hi!
>>>
>>> I still have an issue... I was now using 0.9.1 and the new
>>> KafkaConnector. But I still get duplicates in my flink prog. Here's the
>>> relevant part:
>>>
>>>          final FlinkKafkaConsumer082<String> kafkaSrc = new
>>> FlinkKafkaConsumer082<String>(
>>>              kafkaTopicIn, new SimpleStringSchema(), properties);
>>>
>>>          DataStream<String> start = env.addSource(kafkaSrc)
>>>              .setParallelism(numReadPartitions); //numReadPartitions = 2
>>>
>>>          DataStream<JSONObject> jsonized = start
>>>              .flatMap(new ExtractAndFilterJSON());
>>>
>>>          DataStream<Session> sessions = jsonized
>>>              .partitionByHash(new KeySelector<JSONObject, String>() {
>>>              /**
>>>               * partition by session id
>>>               */
>>>              @Override
>>>              public String getKey(JSONObject value)
>>>                  throws Exception {
>>>                  try {
>>>                  return /*session id*/;
>>>                  } catch (Exception e) {
>>>                  LOG.error("no session could be retrieved", e);
>>>                  }
>>>                  return "";
>>>              }
>>>              }).flatMap(new StatefulSearchSessionizer());
>>>
>>> In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
>>> sure that the kafka topic I'm reading from does not contain any
>>> duplicates. So it must be in the flink program ...
>>>
>>> Any ideas?
>>>
>>> Cheers, Rico.
>>>
>>>
>

Reply via email to