No. I mean the KafkaSink. A bit more insight to my program: I read from a Kafka topic with flinkKafkaConsumer082, then hashpartition the data, then I do a deduplication (does not eliminate all duplicates though). Then some computation, afterwards again deduplication (group by message in a window of last 2 seconds).
Of course the last deduplication is not perfect. Cheers. Rico. > Am 03.09.2015 um 13:29 schrieb Stephan Ewen <se...@apache.org>: > > Do you mean the KafkaSource? > > Which KafkaSource are you using? The 0.9.1 FlinkKafkaConsumer082 or the > KafkaSource? > >> On Thu, Sep 3, 2015 at 1:10 PM, Rico Bergmann <i...@ricobergmann.de> wrote: >> Hi! >> >> Testing it with the current 0.10 snapshot is not easily possible atm >> >> But I deactivated checkpointing in my program and still get duplicates in my >> output. So it seems not only to come from the checkpointing feature, or? >> >> May be the KafkaSink is responsible for this? (Just my guess) >> >> Cheers Rico. >> >> >> >>> Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek <aljos...@apache.org>: >>> >>> Hi Rico, >>> unfortunately the 0.9 branch still seems to have problems with exactly once >>> processing and checkpointed operators. We reworked how the checkpoints are >>> handled for the 0.10 release so it should work well there. >>> >>> Could you maybe try running on the 0.10-SNAPSHOT release and see if the >>> problems persist there? >>> >>> Cheers, >>> Aljoscha >>> >>>> On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann >>>> <i...@ricobergmann.de> wrote: >>>> Hi! >>>> >>>> I still have an issue... I was now using 0.9.1 and the new >>>> KafkaConnector. But I still get duplicates in my flink prog. Here's the >>>> relevant part: >>>> >>>> final FlinkKafkaConsumer082<String> kafkaSrc = new >>>> FlinkKafkaConsumer082<String>( >>>> kafkaTopicIn, new SimpleStringSchema(), properties); >>>> >>>> DataStream<String> start = env.addSource(kafkaSrc) >>>> .setParallelism(numReadPartitions); //numReadPartitions = 2 >>>> >>>> DataStream<JSONObject> jsonized = start >>>> .flatMap(new ExtractAndFilterJSON()); >>>> >>>> DataStream<Session> sessions = jsonized >>>> .partitionByHash(new KeySelector<JSONObject, String>() { >>>> /** >>>> * partition by session id >>>> */ >>>> @Override >>>> public String getKey(JSONObject value) >>>> throws Exception { >>>> try { >>>> return /*session id*/; >>>> } catch (Exception e) { >>>> LOG.error("no session could be retrieved", e); >>>> } >>>> return ""; >>>> } >>>> }).flatMap(new StatefulSearchSessionizer()); >>>> >>>> In the StatefulSearchSessionizer I receive duplicates sporadically. I'm >>>> sure that the kafka topic I'm reading from does not contain any >>>> duplicates. So it must be in the flink program ... >>>> >>>> Any ideas? >>>> >>>> Cheers, Rico. >