Do you mean the KafkaSource?

Which KafkaSource are you using? The 0.9.1 FlinkKafkaConsumer082 or the
KafkaSource?

On Thu, Sep 3, 2015 at 1:10 PM, Rico Bergmann <i...@ricobergmann.de> wrote:

> Hi!
>
> Testing it with the current 0.10 snapshot is not easily possible atm
>
> But I deactivated checkpointing in my program and still get duplicates in
> my output. So it seems not only to come from the checkpointing feature, or?
>
> May be the KafkaSink is responsible for this? (Just my guess)
>
> Cheers Rico.
>
>
>
> Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek <aljos...@apache.org>:
>
> Hi Rico,
> unfortunately the 0.9 branch still seems to have problems with exactly
> once processing and checkpointed operators. We reworked how the checkpoints
> are handled for the 0.10 release so it should work well there.
>
> Could you maybe try running on the 0.10-SNAPSHOT release and see if the
> problems persist there?
>
> Cheers,
> Aljoscha
>
> On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann <i...@ricobergmann.de>
> wrote:
>
>> Hi!
>>
>> I still have an issue... I was now using 0.9.1 and the new
>> KafkaConnector. But I still get duplicates in my flink prog. Here's the
>> relevant part:
>>
>>          final FlinkKafkaConsumer082<String> kafkaSrc = new
>> FlinkKafkaConsumer082<String>(
>>              kafkaTopicIn, new SimpleStringSchema(), properties);
>>
>>          DataStream<String> start = env.addSource(kafkaSrc)
>>              .setParallelism(numReadPartitions); //numReadPartitions = 2
>>
>>          DataStream<JSONObject> jsonized = start
>>              .flatMap(new ExtractAndFilterJSON());
>>
>>          DataStream<Session> sessions = jsonized
>>              .partitionByHash(new KeySelector<JSONObject, String>() {
>>              /**
>>               * partition by session id
>>               */
>>              @Override
>>              public String getKey(JSONObject value)
>>                  throws Exception {
>>                  try {
>>                  return /*session id*/;
>>                  } catch (Exception e) {
>>                  LOG.error("no session could be retrieved", e);
>>                  }
>>                  return "";
>>              }
>>              }).flatMap(new StatefulSearchSessionizer());
>>
>> In the StatefulSearchSessionizer I receive duplicates sporadically. I'm
>> sure that the kafka topic I'm reading from does not contain any
>> duplicates. So it must be in the flink program ...
>>
>> Any ideas?
>>
>> Cheers, Rico.
>>
>>

Reply via email to