I have run wikifeed example. i have three topics: wikifeedInputtopicDemo2-10 partitions wikifeedOutputtopicDemo2-10 partitions sumoutputeventopicDemo2-5 partitions
i have produced 100000 records.but in the inputTopic(wikifeedInputtopicDemo2) it receives more than 100000 records. can someone explain how this happens?? [admin@nms-181 bin]$ sh kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic wikifeedInputtopicDemo2 --time -1 wikifeedInputtopicDemo2:8:13400 wikifeedInputtopicDemo2:2:13401 wikifeedInputtopicDemo2:5:13400 wikifeedInputtopicDemo2:4:13400 wikifeedInputtopicDemo2:7:13399 wikifeedInputtopicDemo2:1:13399 wikifeedInputtopicDemo2:9:13400 wikifeedInputtopicDemo2:3:13400 wikifeedInputtopicDemo2:6:13400 wikifeedInputtopicDemo2:0:13400 here is my processorTopology code: //------------------------ public static KafkaStreams getWikifeed(){ Properties properties=new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class); properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR); //properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); StreamsBuilder builder= new StreamsBuilder(); KStream<String,Wikifeed> inputStream=builder.stream(WIKIFEED_INPUT); KTable<String,Long> kTable=inputStream .filter((key, value) -> value.isNew()) .map(((key, value) -> KeyValue.pair(value.getName(),value))) .groupByKey() .count(Materialized.as(COUNT_STORE)); kTable.toStream().to(WIKIFEED_OUTPUT, Produced.with(Serdes.String(), Serdes.Long())); KafkaStreams streams= new KafkaStreams(builder.build(),properties); return streams; } ---------> My driver code is in the attachment file.