Hello,

I'm attempting to upgrade an application from 0.8 to 0.10 broker / client
libs, and integrate kafka streams.  I am currently using the following
producer / consumer configs:

Producer:

        Properties props = new Properties();
        props.put("bootstrap.servers", brokerList);
        props = ProducerConfig.addSerializerToConfig(props,
Serdes.String().serializer(), Serdes.String().serializer());
        return props;


Consumer (kafka streams Processor)

        Properties streamsSettings = new Properties();
        streamsSettings.put("bootstrap.servers", brokerList);
        streamsSettings.put("application.id", consumerGroupId);
        streamsSettings.put("key.serde",
Serdes.StringSerde.class.getName());
        streamsSettings.put("value.serde",
Serdes.StringSerde.class.getName());
        StreamsConfig config = new StreamsConfig(streamsSettings);

I'm running a 0.10 broker.  However, when I publish a message, I see the
following error on the consumer side:

org.apache.kafka.common.KafkaException: Error deserializing key/value for
partition <topic name>-0 at offset 0
at
org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:665)
at
org.apache.kafka.clients.consumer.internals.Fetcher.handleFetchResponse(Fetcher.java:593)
at
org.apache.kafka.clients.consumer.internals.Fetcher.access$000(Fetcher.java:71)
at
org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:142)
at
org.apache.kafka.clients.consumer.internals.Fetcher$1.onSuccess(Fetcher.java:139)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:998)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: java.lang.IllegalArgumentException: null
at java.nio.Buffer.limit(Buffer.java:275)
at org.apache.kafka.common.record.Record.sliceDelimited(Record.java:392)
at org.apache.kafka.common.record.Record.key(Record.java:376)
at
org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:650)
... 15 common frames omitted


What could be causing this error?  I see the same error if I attempt to use
the kafka bin tools to consume from the topic.

Thanks,
Ryan

Reply via email to