Can you tell us where the KafkaSink comes into play? At what point do the duplicates come up?
On Thu, Sep 3, 2015 at 2:09 PM, Rico Bergmann <i...@ricobergmann.de> wrote: > No. I mean the KafkaSink. > > A bit more insight to my program: I read from a Kafka topic with > flinkKafkaConsumer082, then hashpartition the data, then I do a > deduplication (does not eliminate all duplicates though). Then some > computation, afterwards again deduplication (group by message in a window > of last 2 seconds). > > Of course the last deduplication is not perfect. > > Cheers. Rico. > > > > Am 03.09.2015 um 13:29 schrieb Stephan Ewen <se...@apache.org>: > > Do you mean the KafkaSource? > > Which KafkaSource are you using? The 0.9.1 FlinkKafkaConsumer082 or the > KafkaSource? > > On Thu, Sep 3, 2015 at 1:10 PM, Rico Bergmann <i...@ricobergmann.de> > wrote: > >> Hi! >> >> Testing it with the current 0.10 snapshot is not easily possible atm >> >> But I deactivated checkpointing in my program and still get duplicates in >> my output. So it seems not only to come from the checkpointing feature, or? >> >> May be the KafkaSink is responsible for this? (Just my guess) >> >> Cheers Rico. >> >> >> >> Am 01.09.2015 um 15:37 schrieb Aljoscha Krettek <aljos...@apache.org>: >> >> Hi Rico, >> unfortunately the 0.9 branch still seems to have problems with exactly >> once processing and checkpointed operators. We reworked how the checkpoints >> are handled for the 0.10 release so it should work well there. >> >> Could you maybe try running on the 0.10-SNAPSHOT release and see if the >> problems persist there? >> >> Cheers, >> Aljoscha >> >> On Tue, 1 Sep 2015 at 14:38 Dipl.-Inf. Rico Bergmann < >> i...@ricobergmann.de> wrote: >> >>> Hi! >>> >>> I still have an issue... I was now using 0.9.1 and the new >>> KafkaConnector. But I still get duplicates in my flink prog. Here's the >>> relevant part: >>> >>> final FlinkKafkaConsumer082<String> kafkaSrc = new >>> FlinkKafkaConsumer082<String>( >>> kafkaTopicIn, new SimpleStringSchema(), properties); >>> >>> DataStream<String> start = env.addSource(kafkaSrc) >>> .setParallelism(numReadPartitions); //numReadPartitions = 2 >>> >>> DataStream<JSONObject> jsonized = start >>> .flatMap(new ExtractAndFilterJSON()); >>> >>> DataStream<Session> sessions = jsonized >>> .partitionByHash(new KeySelector<JSONObject, String>() { >>> /** >>> * partition by session id >>> */ >>> @Override >>> public String getKey(JSONObject value) >>> throws Exception { >>> try { >>> return /*session id*/; >>> } catch (Exception e) { >>> LOG.error("no session could be retrieved", e); >>> } >>> return ""; >>> } >>> }).flatMap(new StatefulSearchSessionizer()); >>> >>> In the StatefulSearchSessionizer I receive duplicates sporadically. I'm >>> sure that the kafka topic I'm reading from does not contain any >>> duplicates. So it must be in the flink program ... >>> >>> Any ideas? >>> >>> Cheers, Rico. >>> >>> >