Hi Vahid, What is the broker version?
Ismael On Mon, May 9, 2016 at 11:09 PM, Vahid S Hashemian < vahidhashem...@us.ibm.com> wrote: > I am trying to create a very simple (new) consumer in Java using the > trunk: > > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092"); > props.put("group.id", "group1"); > props.put("key.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > props.put("value.deserializer", > "org.apache.kafka.common.serialization.StringDeserializer"); > > KafkaConsumer<String, String> consumer = new > KafkaConsumer<String, String>(props); > consumer.subscribe(Collections.singletonList("t1")); > > try { > while (true) { > ConsumerRecords<String, String> records = > consumer.poll(100); > for (ConsumerRecord<String, String> record > : records) > { > System.out.println(String.format( > "topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", > record.topic(), > record.partition(), record.offset(), record.key(), record.value())); > } > } > } catch (Exception e) { > e.printStackTrace(); > } finally { > consumer.close(); > } > > This code raises the following exception when polling: > > org.apache.kafka.common.protocol.types.SchemaException: Error reading > field 'topic_metadata': Error reading array of size 160817, only 30 bytes > available > at org.apache.kafka.common.protocol.types.Schema.read( > Schema.java:73) > at org.apache.kafka.clients.NetworkClient.parseResponse( > NetworkClient.java:380) > at org.apache.kafka.clients.NetworkClient.handleCompletedReceives( > NetworkClient.java:449) > at org.apache.kafka.clients.NetworkClient.poll( > NetworkClient.java:269) > at > > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll( > ConsumerNetworkClient.java:360) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > ConsumerNetworkClient.java:192) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll( > ConsumerNetworkClient.java:163) > at > > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady( > AbstractCoordinator.java:179) > at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce( > KafkaConsumer.java:973) > at org.apache.kafka.clients.consumer.KafkaConsumer.poll( > KafkaConsumer.java:937) > at NewConsumerLoop.main(NewConsumerLoop.java:97) > > > Can anyone spot what the issue is? Thanks. > > Regards, > --Vahid Hashemian > > >