I have run wikifeed example. i have three topics: wikifeedInputtopicDemo2-10 partitions wikifeedOutputtopicDemo2-10 partitions sumoutputeventopicDemo2-5 partitions
i have produced 100000 records.but in the inputTopic(wikifeedInputtopicDemo2) it receives more than 100000 records. can someone explain how this happens?? [admin@nms-181 bin]$ sh kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic wikifeedInputtopicDemo2 --time -1 wikifeedInputtopicDemo2:8:13400 wikifeedInputtopicDemo2:2:13401 wikifeedInputtopicDemo2:5:13400 wikifeedInputtopicDemo2:4:13400 wikifeedInputtopicDemo2:7:13399 wikifeedInputtopicDemo2:1:13399 wikifeedInputtopicDemo2:9:13400 wikifeedInputtopicDemo2:3:13400 wikifeedInputtopicDemo2:6:13400 wikifeedInputtopicDemo2:0:13400 here is my processorTopology code: //------------------------ public static KafkaStreams getWikifeed(){ Properties properties=new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class); properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR); //properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); StreamsBuilder builder= new StreamsBuilder(); KStream<String,Wikifeed> inputStream=builder.stream(WIKIFEED_INPUT); KTable<String,Long> kTable=inputStream .filter((key, value) -> value.isNew()) .map(((key, value) -> KeyValue.pair(value.getName(),value))) .groupByKey() .count(Materialized.as(COUNT_STORE)); kTable.toStream().to(WIKIFEED_OUTPUT, Produced.with(Serdes.String(), Serdes.Long())); KafkaStreams streams= new KafkaStreams(builder.build(),properties); return streams; } ---------> My driver code is in the attachment file.
package kafka.examples.wikifeed; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Collections; import java.util.Properties; import java.util.Random; import java.util.stream.IntStream; /** * Created by PravinKumar on 29/7/17. */ public class wikifeedDriverExample { final static String BOOTSTRAP_SERVERS="localhost:9092"; final static String CONSUMER_WIKIFEED_LAMBDA="ConsumerWikiFeedLambda1"; public static void main(String[] args) { ProducerInput(); ConsumerOutput(); } public static void ProducerInput(){ String[] users={"pravin","kumar","erica", "bob", "joe", "damian", "tania", "phil", "sam", "lauren", "joseph"}; Properties properties=new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,WikifeedSerde.getInstance().serializer().getClass()); KafkaProducer<String,Wikifeed> producer=new KafkaProducer<String, Wikifeed>(properties); Random random=new Random(); IntStream.range(0,random.nextInt(100000)) .mapToObj(value -> new Wikifeed(users[random.nextInt(users.length)],true,"content")) .forEach(record -> producer.send(new ProducerRecord<String, Wikifeed>(WikifeedLambdaexample.WIKIFEED_INPUT,null,record))); producer.flush(); } public static void ConsumerOutput() { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); properties.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_WIKIFEED_LAMBDA); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C1"); //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C2"); properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C3"); KafkaConsumer<String, Long> consumer = new KafkaConsumer<String, Long>(properties, new StringDeserializer(), new LongDeserializer()); consumer.subscribe(Collections.singleton(WikifeedLambdaexample.WIKIFEED_OUTPUT)); while (true) { consumer.poll(100) .forEach((ConsumerRecord<String, Long> consumerRecord) -> System.out.println("Topic :::::::" +consumerRecord.topic() + " " + "Partition:::::" + consumerRecord.partition()+ " " + "Key::::" +consumerRecord.key()+ " " + " = " + " Value:::::: " +consumerRecord.value())); } } }