I am trying to create a very simple (new) consumer in Java using the 
trunk:

                Properties props = new Properties();
                props.put("bootstrap.servers", "localhost:9092");
                props.put("group.id", "group1");
                props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
                props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");

                KafkaConsumer<String, String> consumer = new 
KafkaConsumer<String, String>(props);
                consumer.subscribe(Collections.singletonList("t1"));
 
                try {
                        while (true) {
                                ConsumerRecords<String, String> records = 
consumer.poll(100);
                                for (ConsumerRecord<String, String> record 
: records)
                                {
                                        System.out.println(String.format(
"topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n",
                                                        record.topic(), 
record.partition(), record.offset(), record.key(), record.value()));
                                }
                        }
                } catch (Exception e) {
                        e.printStackTrace();
                } finally {
                        consumer.close();
                }

This code raises the following exception when polling:

org.apache.kafka.common.protocol.types.SchemaException: Error reading 
field 'topic_metadata': Error reading array of size 160817, only 30 bytes 
available
        at org.apache.kafka.common.protocol.types.Schema.read(
Schema.java:73)
        at org.apache.kafka.clients.NetworkClient.parseResponse(
NetworkClient.java:380)
        at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(
NetworkClient.java:449)
        at org.apache.kafka.clients.NetworkClient.poll(
NetworkClient.java:269)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(
ConsumerNetworkClient.java:360)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
ConsumerNetworkClient.java:224)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
ConsumerNetworkClient.java:192)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(
ConsumerNetworkClient.java:163)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(
AbstractCoordinator.java:179)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
KafkaConsumer.java:973)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(
KafkaConsumer.java:937)
        at NewConsumerLoop.main(NewConsumerLoop.java:97)


Can anyone spot what the issue is? Thanks.
 
Regards,
--Vahid Hashemian
 

Reply via email to