I have run wikifeed example. i have three topics:
wikifeedInputtopicDemo2-10 partitions
wikifeedOutputtopicDemo2-10 partitions
sumoutputeventopicDemo2-5 partitions

i have produced 100000 records.but in the
inputTopic(wikifeedInputtopicDemo2) it receives more than 100000
records.
can someone explain how this happens??

[admin@nms-181 bin]$ sh kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list localhost:9092 --topic wikifeedInputtopicDemo2 --time -1
wikifeedInputtopicDemo2:8:13400
wikifeedInputtopicDemo2:2:13401
wikifeedInputtopicDemo2:5:13400
wikifeedInputtopicDemo2:4:13400
wikifeedInputtopicDemo2:7:13399
wikifeedInputtopicDemo2:1:13399
wikifeedInputtopicDemo2:9:13400
wikifeedInputtopicDemo2:3:13400
wikifeedInputtopicDemo2:6:13400
wikifeedInputtopicDemo2:0:13400

here is my processorTopology code:

//------------------------

public static KafkaStreams getWikifeed(){

        Properties properties=new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA);
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
        
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class);
        properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR);
        //properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        StreamsBuilder builder= new StreamsBuilder();
        KStream<String,Wikifeed> inputStream=builder.stream(WIKIFEED_INPUT);
        KTable<String,Long> kTable=inputStream
                .filter((key, value) -> value.isNew())
                .map(((key, value) -> KeyValue.pair(value.getName(),value)))
                .groupByKey()
                .count(Materialized.as(COUNT_STORE));
        kTable.toStream().to(WIKIFEED_OUTPUT,
Produced.with(Serdes.String(), Serdes.Long()));
        KafkaStreams streams= new KafkaStreams(builder.build(),properties);

        return streams;
    }

--------->


My driver code is in the attachment file.
package kafka.examples.wikifeed;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Collections;
import java.util.Properties;
import java.util.Random;
import java.util.stream.IntStream;

/**
 * Created by PravinKumar on 29/7/17.
 */
public class wikifeedDriverExample {

    final static String BOOTSTRAP_SERVERS="localhost:9092";
    final static String CONSUMER_WIKIFEED_LAMBDA="ConsumerWikiFeedLambda1";

    public static void main(String[] args) {
        ProducerInput();
        ConsumerOutput();
    }

    public static void ProducerInput(){
        String[] users={"pravin","kumar","erica", "bob", "joe", "damian", "tania", "phil", "sam",
                "lauren", "joseph"};

        Properties properties=new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,WikifeedSerde.getInstance().serializer().getClass());

        KafkaProducer<String,Wikifeed> producer=new KafkaProducer<String, Wikifeed>(properties);
        Random random=new Random();
        IntStream.range(0,random.nextInt(100000))
                .mapToObj(value -> new Wikifeed(users[random.nextInt(users.length)],true,"content"))
                .forEach(record -> producer.send(new ProducerRecord<String, Wikifeed>(WikifeedLambdaexample.WIKIFEED_INPUT,null,record)));
        producer.flush();
    }

    public static void ConsumerOutput() {

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_WIKIFEED_LAMBDA);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
       //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C1");
      //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C2");
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C3");

        KafkaConsumer<String, Long> consumer = new KafkaConsumer<String, Long>(properties, new StringDeserializer(), new LongDeserializer());
        consumer.subscribe(Collections.singleton(WikifeedLambdaexample.WIKIFEED_OUTPUT));

        while (true) {
            consumer.poll(100)
                    .forEach((ConsumerRecord<String, Long> consumerRecord) -> System.out.println("Topic :::::::" +consumerRecord.topic() + " " + "Partition:::::" + consumerRecord.partition()+ " " + "Key::::" +consumerRecord.key()+ " " + " = " + " Value:::::: " +consumerRecord.value()));
        }
    }
}

Reply via email to