casuallc opened a new issue, #38: URL: https://github.com/apache/pulsar-adapters/issues/38
**Reproduce** - create non-paritioned topic - send message to this topic **error** data:image/s3,"s3://crabby-images/93ce6/93ce6a2bc36e09f99d1e1c8e9b3a904fba0b6c27" alt="image" **probable reason** PulsarKafkaConsumer -> poll ``` public ConsumerRecords<K, V> poll(long timeoutMillis) { try { QueueItem item = receivedMessages.poll(timeoutMillis, TimeUnit.MILLISECONDS); if (item == null) { return (ConsumerRecords<K, V>) ConsumerRecords.EMPTY; } Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new HashMap<>(); int numberOfRecords = 0; while (item != null) { TopicName topicName = TopicName.get(item.consumer.getTopic()); String topic = topicName.getPartitionedTopicName(); int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0; Message<byte[]> msg = item.message; MessageId msgId = msg.getMessageId(); if (msgId instanceof TopicMessageIdImpl) { msgId = ((TopicMessageIdImpl) msgId).getInnerMessageId(); } long offset = MessageIdUtils.getOffset(msgId); TopicPartition tp = new TopicPartition(topic, partition); if (lastReceivedOffset.get(tp) == null && !unpolledPartitions.contains(tp)) { log.info("When polling offsets, invalid offsets were detected. Resetting topic partition {}", tp); resetOffsets(tp); } // .. other code // If no interceptor is provided, interceptors list will an empty list, original ConsumerRecords will be return. return applyConsumerInterceptorsOnConsume(interceptors, new ConsumerRecords<>(records)); } catch (InterruptedException e) { throw new RuntimeException(e); } } ``` **int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() : 0;** This code can not discriminate partitioned-topic or non-paritioned-topic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org