---------- Forwarded message ----------
From: "pravin kumar" <pk007...@gmail.com>
Date: 27-Mar-2018 6:11 PM
Subject: Consumer slowness issue
To: <users@kafka.apache.org>
Cc:

i have two topics with 5 partitions each

wikifeedInputT10

KafkaProducer produces 100000 elements and wikifeedInputT10 have
received these elements.
[admin@nms-181 bin]$ sh kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list localhost:9092 --topic wikifeedInputT10 --time -1

wikifeedInputT10:2:20000
wikifeedInputT10:4:20000
wikifeedInputT10:1:20000
wikifeedInputT10:3:20000
wikifeedInputT10:0:20000

but after processing reading from my outputTopic: wikifeedOutputT15
i have received


[admin@nms-181 bin]$ sh kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list localhost:9092 --topic wikifeedOutputT15 --time -1
wikifeedOutputT15:2:1
wikifeedOutputT15:4:1
wikifeedOutputT15:1:3
wikifeedOutputT15:3:3
wikifeedOutputT15:0:3

I have received the output in my console as

[2018-03-27 17:55:32,359] INFO Kafka version : 1.0.1
(org.apache.kafka.common.utils.AppInfoParser)
[2018-03-27 17:55:32,359] INFO Kafka commitId : c0518aa65f25317e
(org.apache.kafka.common.utils.AppInfoParser)
[2018-03-27 17:55:32,600] INFO [Consumer clientId=C2,
groupId=ConsumerWikiFeedLambda4] Discovered group coordinator
nms-181.nmsworks.co.in:9092 (id: 2147483647 rack: null)
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-03-27 17:55:32,602] INFO [Consumer clientId=C2,
groupId=ConsumerWikiFeedLambda4] Revoking previously assigned
partitions [] (org.apache.kafka.clients.consumer.internals.
ConsumerCoordinator)
[2018-03-27 17:55:32,602] INFO [Consumer clientId=C2,
groupId=ConsumerWikiFeedLambda4] (Re-)joining group
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-03-27 17:55:32,610] INFO [Consumer clientId=C2,
groupId=ConsumerWikiFeedLambda4] Successfully joined group with
generation 5 (org.apache.kafka.clients.consumer.internals.
AbstractCoordinator)
[2018-03-27 17:55:32,611] INFO [Consumer clientId=C2,
groupId=ConsumerWikiFeedLambda4] Setting newly assigned partitions
[wikifeedOutputT15-2, wikifeedOutputT15-1, wikifeedOutputT15-0,
wikifeedOutputT15-4, wikifeedOutputT15-3]
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
Topic :::::::wikifeedOutputT15 Partition:::::0 Key::::joe  =  Value::::::
18263
Topic :::::::wikifeedOutputT15 Partition:::::0 Key::::phil  =  Value::::::
18230
Topic :::::::wikifeedOutputT15 Partition:::::0 Key::::tania  =
Value:::::: 18344
Topic :::::::wikifeedOutputT15 Partition:::::1 Key::::pravin  =
Value:::::: 18140
Topic :::::::wikifeedOutputT15 Partition:::::1 Key::::kumar  =
Value:::::: 18248
Topic :::::::wikifeedOutputT15 Partition:::::1 Key::::joseph  =
Value:::::: 18116
Topic :::::::wikifeedOutputT15 Partition:::::2 Key::::lauren  =
Value:::::: 18150
Topic :::::::wikifeedOutputT15 Partition:::::3 Key::::bob  =  Value::::::
18131
Topic :::::::wikifeedOutputT15 Partition:::::3 Key::::erica  =
Value:::::: 18084
Topic :::::::wikifeedOutputT15 Partition:::::3 Key::::damian  =
Value:::::: 18126
Topic :::::::wikifeedOutputT15 Partition:::::4 Key::::sam  =  Value::::::
18168


it stops here and im not getting any msgs

i have attached my code below
package kafka.examples.wikifeed;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Properties;
import java.util.Random;
import java.util.stream.IntStream;

/**
 * Created by PravinKumar on 29/7/17.
 */
public class WikifeedLambdaexample {

    final static String WIKIFEED_INPUT="wikifeedInputT10";
    final static String WIKIFEED_OUTPUT="wikifeedOutputT15";
    final static String WIKIFEED_LAMBDA="WikiFeedLambdaT10";
    //final static String SUM_LAMBDA="sumlambda10";
    final static String BOOTSTRAP_SERVER="localhost:9092";
    final static String COUNT_STORE="countstoreT10";
    final static String STAT_DIR="/home/admin/Document/kafka_2.11.1.0.1/kafka-streams";
    //final static String SUM_OUTPUT_EVEN_TOPIC = "sumoutputeventopicT10";
   // final static String EVEN_TABLE = "sumDemo10";

    public static void main(String[] args) {

        ProducerInput();
        KafkaStreams WikifeedKStreams= getWikifeed();
        WikifeedKStreams.cleanUp();
        WikifeedKStreams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(WikifeedKStreams::close));

     /*   KafkaStreams sumStreams= getEvenNumSum();
        sumStreams.cleanUp();
        sumStreams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(sumStreams::close));*/
    }

    public static void ProducerInput(){
        String[] users={"pravin","kumar","erica", "bob", "joe", "damian", "tania", "phil", "sam",
                "lauren", "joseph"};

        Properties properties=new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,WikifeedSerde.getInstance().serializer().getClass());

        KafkaProducer<String,Wikifeed> producer=new KafkaProducer<String, Wikifeed>(properties);
        Random random=new Random();
        IntStream.range(0,100000)
                .mapToObj(value -> new Wikifeed(users[random.nextInt(users.length)],true,"content"))
                .forEach(record -> producer.send(new ProducerRecord<String, Wikifeed>(WikifeedLambdaexample.WIKIFEED_INPUT,null,record)));
        producer.flush();
    }


    public static KafkaStreams getWikifeed(){

        Properties properties=new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA);
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class);
        properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR);
        //properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        StreamsBuilder builder= new StreamsBuilder();
        KStream<String,Wikifeed> inputStream=builder.stream(WIKIFEED_INPUT);
        KTable<String,Long> kTable=inputStream
                .filter((key, value) -> value.isNew())
                .map(((key, value) -> KeyValue.pair(value.getName(),value)))
                .groupByKey()
                .count(Materialized.as(COUNT_STORE));
        kTable.toStream().to(WIKIFEED_OUTPUT, Produced.with(Serdes.String(), Serdes.Long()));
        KafkaStreams streams= new KafkaStreams(builder.build(),properties);

        return streams;
    }

    /*public static KafkaStreams getEvenNumSum(){

        Properties props=new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, SUM_LAMBDA);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
        props.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR);
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.Long().getClass().getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

        StreamsBuilder builder= new StreamsBuilder();
        KStream<String,Long> suminput=builder.stream(WIKIFEED_OUTPUT);
        getKTableForEvenNums(suminput).toStream().to(SUM_OUTPUT_EVEN_TOPIC);

        KafkaStreams kafkaStreams=new KafkaStreams(builder.build(),props);

        return kafkaStreams;
    }*/

   /* private static KTable getKTableForEvenNums(KStream<String,Long> sumeveninput){
        KTable<String, Long> evenKTable=sumeveninput
                .filter((key,value)-> value%2 ==0)
                .groupByKey()
                .reduce((v1, v2)-> v1 + v2,Materialized.as(EVEN_TABLE));
        return evenKTable;
    }*/
}
package kafka.examples.wikifeed;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

/**
 * Created by PravinKumar on 29/7/17.
 */
public class wikifeedDriverExample {

    final static String CONSUMER_WIKIFEED_LAMBDA="ConsumerWikiFeedLambda4";

    public static void main(String[] args) {
        ConsumerOutput();
    }

    public static void ConsumerOutput() {

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, WikifeedLambdaexample.BOOTSTRAP_SERVER);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_WIKIFEED_LAMBDA);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      // properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C1");
      properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C2");
        //properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C3");

        KafkaConsumer<String, Long> consumer = new KafkaConsumer<String, Long>(properties, new StringDeserializer(), new LongDeserializer());
        consumer.subscribe(Collections.singleton(WikifeedLambdaexample.WIKIFEED_OUTPUT));

        while (true) {
            consumer.poll(100)

                    .forEach((ConsumerRecord<String, Long> consumerRecord) -> System.out.println("Topic :::::::" +consumerRecord.topic() + " " + "Partition:::::" + consumerRecord.partition()+ " " + "Key::::" +consumerRecord.key()+ " " + " = " + " Value:::::: " +consumerRecord.value()));
        }
    }
}
package kafka.examples.wikifeed;

import java.io.Serializable;

/**
 * Created by PravinKumar on 29/7/17.
 */
public class Wikifeed implements Serializable {

    private String name;
    private boolean isNew;
    private String content;

    public Wikifeed(String name, boolean isNew, String content) {
        this.name = name;
        this.isNew = isNew;
        this.content = content;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public boolean isNew() {
        return isNew;
    }

    public void setNew(boolean aNew) {
        isNew = aNew;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }
}
package kafka.examples.wikifeed;


import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

import java.io.Serializable;
import java.util.Map;

/**
 * Created by PravinKumar on 29/7/17.
 */
public class WikifeedSerde<T extends Serializable> implements Serde<T>{

    Serializer<T> serializer;
    Deserializer<T> deserializer;

    public static WikifeedSerde getInstance(){
        return new WikifeedSerde();
    }
    public WikifeedSerde(){
        serializer=new CustomSerializer();
        deserializer=new CustomDeserializer();
    }
    @Override
    public void configure(Map<String, ?> map, boolean b) {
        serializer.configure(map, b);
        deserializer.configure(map, b);
    }

    @Override
    public void close() {
        serializer.close();
        deserializer.close();
    }

    @Override
    public Serializer<T> serializer() {
        return serializer;
    }

    @Override
    public Deserializer<T> deserializer() {
        return deserializer;
    }
}
package kafka.examples.wikifeed;

import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Serializer;

import java.io.Serializable;
import java.util.Map;

/**
 * Created by PravinKumar on 29/7/17.
 */
public class CustomSerializer<T> implements Serializer<T> {

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public byte[] serialize(String s, T t) {
        return SerializationUtils.serialize((Serializable) t);
    }

    @Override
    public void close() {

    }
}
package kafka.examples.wikifeed;

import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

/**
 * Created by PravinKumar on 29/7/17.
 */
public class CustomDeserializer<T> implements Deserializer<T> {
    @Override
    public void configure(Map map, boolean b) {

    }

    @Override
    public T deserialize(String s, byte[] bytes) {
        return (T) SerializationUtils.deserialize(bytes);
    }

    @Override
    public void close() {

    }
}

Reply via email to