Hi!

I still have an issue... I was now using 0.9.1 and the new KafkaConnector. But I still get duplicates in my flink prog. Here's the relevant part:

final FlinkKafkaConsumer082<String> kafkaSrc = new FlinkKafkaConsumer082<String>(
            kafkaTopicIn, new SimpleStringSchema(), properties);

        DataStream<String> start = env.addSource(kafkaSrc)
            .setParallelism(numReadPartitions); //numReadPartitions = 2

        DataStream<JSONObject> jsonized = start
            .flatMap(new ExtractAndFilterJSON());

        DataStream<Session> sessions = jsonized
            .partitionByHash(new KeySelector<JSONObject, String>() {
            /**
             * partition by session id
             */
            @Override
            public String getKey(JSONObject value)
                throws Exception {
                try {
                return /*session id*/;
                } catch (Exception e) {
                LOG.error("no session could be retrieved", e);
                }
                return "";
            }
            }).flatMap(new StatefulSearchSessionizer());

In the StatefulSearchSessionizer I receive duplicates sporadically. I'm sure that the kafka topic I'm reading from does not contain any duplicates. So it must be in the flink program ...

Any ideas?

Cheers, Rico.

Reply via email to