---------- Forwarded message ----------
From: "pravin kumar" <pk007...@gmail.com>
Date: 27-Mar-2018 6:11 PM
Subject: Consumer slowness issue
To: <users@kafka.apache.org>
Cc:
i have two topics with 5 partitions each
wikifeedInputT10
KafkaProducer produces 100000 elements and wikifeedInputT10 have
received these elements.
[admin@nms-181 bin]$ sh kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list localhost:9092 --topic wikifeedInputT10 --time -1
wikifeedInputT10:2:20000
wikifeedInputT10:4:20000
wikifeedInputT10:1:20000
wikifeedInputT10:3:20000
wikifeedInputT10:0:20000
but after processing reading from my outputTopic: wikifeedOutputT15
i have received
[admin@nms-181 bin]$ sh kafka-run-class.sh kafka.tools.GetOffsetShell
--broker-list localhost:9092 --topic wikifeedOutputT15 --time -1
wikifeedOutputT15:2:1
wikifeedOutputT15:4:1
wikifeedOutputT15:1:3
wikifeedOutputT15:3:3
wikifeedOutputT15:0:3
I have received the output in my console as
[2018-03-27 17:55:32,359] INFO Kafka version : 1.0.1
(org.apache.kafka.common.utils.AppInfoParser)
[2018-03-27 17:55:32,359] INFO Kafka commitId : c0518aa65f25317e
(org.apache.kafka.common.utils.AppInfoParser)
[2018-03-27 17:55:32,600] INFO [Consumer clientId=C2,
groupId=ConsumerWikiFeedLambda4] Discovered group coordinator
nms-181.nmsworks.co.in:9092 (id: 2147483647 rack: null)
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-03-27 17:55:32,602] INFO [Consumer clientId=C2,
groupId=ConsumerWikiFeedLambda4] Revoking previously assigned
partitions [] (org.apache.kafka.clients.consumer.internals.
ConsumerCoordinator)
[2018-03-27 17:55:32,602] INFO [Consumer clientId=C2,
groupId=ConsumerWikiFeedLambda4] (Re-)joining group
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-03-27 17:55:32,610] INFO [Consumer clientId=C2,
groupId=ConsumerWikiFeedLambda4] Successfully joined group with
generation 5 (org.apache.kafka.clients.consumer.internals.
AbstractCoordinator)
[2018-03-27 17:55:32,611] INFO [Consumer clientId=C2,
groupId=ConsumerWikiFeedLambda4] Setting newly assigned partitions
[wikifeedOutputT15-2, wikifeedOutputT15-1, wikifeedOutputT15-0,
wikifeedOutputT15-4, wikifeedOutputT15-3]
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
Topic :::::::wikifeedOutputT15 Partition:::::0 Key::::joe = Value::::::
18263
Topic :::::::wikifeedOutputT15 Partition:::::0 Key::::phil = Value::::::
18230
Topic :::::::wikifeedOutputT15 Partition:::::0 Key::::tania =
Value:::::: 18344
Topic :::::::wikifeedOutputT15 Partition:::::1 Key::::pravin =
Value:::::: 18140
Topic :::::::wikifeedOutputT15 Partition:::::1 Key::::kumar =
Value:::::: 18248
Topic :::::::wikifeedOutputT15 Partition:::::1 Key::::joseph =
Value:::::: 18116
Topic :::::::wikifeedOutputT15 Partition:::::2 Key::::lauren =
Value:::::: 18150
Topic :::::::wikifeedOutputT15 Partition:::::3 Key::::bob = Value::::::
18131
Topic :::::::wikifeedOutputT15 Partition:::::3 Key::::erica =
Value:::::: 18084
Topic :::::::wikifeedOutputT15 Partition:::::3 Key::::damian =
Value:::::: 18126
Topic :::::::wikifeedOutputT15 Partition:::::4 Key::::sam = Value::::::
18168
it stops here and im not getting any msgs
i have attached my code below
package kafka.examples.wikifeed;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import java.util.Properties;
import java.util.Random;
import java.util.stream.IntStream;
/**
* Created by PravinKumar on 29/7/17.
*/
public class WikifeedLambdaexample {
final static String WIKIFEED_INPUT="wikifeedInputT10";
final static String WIKIFEED_OUTPUT="wikifeedOutputT15";
final static String WIKIFEED_LAMBDA="WikiFeedLambdaT10";
//final static String SUM_LAMBDA="sumlambda10";
final static String BOOTSTRAP_SERVER="localhost:9092";
final static String COUNT_STORE="countstoreT10";
final static String STAT_DIR="/home/admin/Document/kafka_2.11.1.0.1/kafka-streams";
//final static String SUM_OUTPUT_EVEN_TOPIC = "sumoutputeventopicT10";
// final static String EVEN_TABLE = "sumDemo10";
public static void main(String[] args) {
ProducerInput();
KafkaStreams WikifeedKStreams= getWikifeed();
WikifeedKStreams.cleanUp();
WikifeedKStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(WikifeedKStreams::close));
/* KafkaStreams sumStreams= getEvenNumSum();
sumStreams.cleanUp();
sumStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(sumStreams::close));*/
}
public static void ProducerInput(){
String[] users={"pravin","kumar","erica", "bob", "joe", "damian", "tania", "phil", "sam",
"lauren", "joseph"};
Properties properties=new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,WikifeedSerde.getInstance().serializer().getClass());
KafkaProducer<String,Wikifeed> producer=new KafkaProducer<String, Wikifeed>(properties);
Random random=new Random();
IntStream.range(0,100000)
.mapToObj(value -> new Wikifeed(users[random.nextInt(users.length)],true,"content"))
.forEach(record -> producer.send(new ProducerRecord<String, Wikifeed>(WikifeedLambdaexample.WIKIFEED_INPUT,null,record)));
producer.flush();
}
public static KafkaStreams getWikifeed(){
Properties properties=new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG,WIKIFEED_LAMBDA);
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,WikifeedSerde.class);
properties.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR);
//properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
StreamsBuilder builder= new StreamsBuilder();
KStream<String,Wikifeed> inputStream=builder.stream(WIKIFEED_INPUT);
KTable<String,Long> kTable=inputStream
.filter((key, value) -> value.isNew())
.map(((key, value) -> KeyValue.pair(value.getName(),value)))
.groupByKey()
.count(Materialized.as(COUNT_STORE));
kTable.toStream().to(WIKIFEED_OUTPUT, Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams= new KafkaStreams(builder.build(),properties);
return streams;
}
/*public static KafkaStreams getEvenNumSum(){
Properties props=new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, SUM_LAMBDA);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVER);
props.put(StreamsConfig.STATE_DIR_CONFIG,STAT_DIR);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,500);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.Long().getClass().getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
StreamsBuilder builder= new StreamsBuilder();
KStream<String,Long> suminput=builder.stream(WIKIFEED_OUTPUT);
getKTableForEvenNums(suminput).toStream().to(SUM_OUTPUT_EVEN_TOPIC);
KafkaStreams kafkaStreams=new KafkaStreams(builder.build(),props);
return kafkaStreams;
}*/
/* private static KTable getKTableForEvenNums(KStream<String,Long> sumeveninput){
KTable<String, Long> evenKTable=sumeveninput
.filter((key,value)-> value%2 ==0)
.groupByKey()
.reduce((v1, v2)-> v1 + v2,Materialized.as(EVEN_TABLE));
return evenKTable;
}*/
}
package kafka.examples.wikifeed;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
/**
* Created by PravinKumar on 29/7/17.
*/
public class wikifeedDriverExample {
final static String CONSUMER_WIKIFEED_LAMBDA="ConsumerWikiFeedLambda4";
public static void main(String[] args) {
ConsumerOutput();
}
public static void ConsumerOutput() {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, WikifeedLambdaexample.BOOTSTRAP_SERVER);
properties.put(ConsumerConfig.GROUP_ID_CONFIG,CONSUMER_WIKIFEED_LAMBDA);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C1");
properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C2");
//properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"C3");
KafkaConsumer<String, Long> consumer = new KafkaConsumer<String, Long>(properties, new StringDeserializer(), new LongDeserializer());
consumer.subscribe(Collections.singleton(WikifeedLambdaexample.WIKIFEED_OUTPUT));
while (true) {
consumer.poll(100)
.forEach((ConsumerRecord<String, Long> consumerRecord) -> System.out.println("Topic :::::::" +consumerRecord.topic() + " " + "Partition:::::" + consumerRecord.partition()+ " " + "Key::::" +consumerRecord.key()+ " " + " = " + " Value:::::: " +consumerRecord.value()));
}
}
}
package kafka.examples.wikifeed;
import java.io.Serializable;
/**
* Created by PravinKumar on 29/7/17.
*/
public class Wikifeed implements Serializable {
private String name;
private boolean isNew;
private String content;
public Wikifeed(String name, boolean isNew, String content) {
this.name = name;
this.isNew = isNew;
this.content = content;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public boolean isNew() {
return isNew;
}
public void setNew(boolean aNew) {
isNew = aNew;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
package kafka.examples.wikifeed;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import java.io.Serializable;
import java.util.Map;
/**
* Created by PravinKumar on 29/7/17.
*/
public class WikifeedSerde<T extends Serializable> implements Serde<T>{
Serializer<T> serializer;
Deserializer<T> deserializer;
public static WikifeedSerde getInstance(){
return new WikifeedSerde();
}
public WikifeedSerde(){
serializer=new CustomSerializer();
deserializer=new CustomDeserializer();
}
@Override
public void configure(Map<String, ?> map, boolean b) {
serializer.configure(map, b);
deserializer.configure(map, b);
}
@Override
public void close() {
serializer.close();
deserializer.close();
}
@Override
public Serializer<T> serializer() {
return serializer;
}
@Override
public Deserializer<T> deserializer() {
return deserializer;
}
}
package kafka.examples.wikifeed;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Serializer;
import java.io.Serializable;
import java.util.Map;
/**
* Created by PravinKumar on 29/7/17.
*/
public class CustomSerializer<T> implements Serializer<T> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String s, T t) {
return SerializationUtils.serialize((Serializable) t);
}
@Override
public void close() {
}
}
package kafka.examples.wikifeed;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
/**
* Created by PravinKumar on 29/7/17.
*/
public class CustomDeserializer<T> implements Deserializer<T> {
@Override
public void configure(Map map, boolean b) {
}
@Override
public T deserialize(String s, byte[] bytes) {
return (T) SerializationUtils.deserialize(bytes);
}
@Override
public void close() {
}
}