casuallc opened a new issue, #38:
URL: https://github.com/apache/pulsar-adapters/issues/38

   **Reproduce**
   - create non-paritioned topic
   - send message to this topic
   
   **error**
   
![image](https://user-images.githubusercontent.com/9473606/183640375-517bcea0-fa50-48af-8881-9696fab6d558.png)
   
   **probable reason**
   PulsarKafkaConsumer -> poll
   ```
   public ConsumerRecords<K, V> poll(long timeoutMillis) {
           try {
               QueueItem item = receivedMessages.poll(timeoutMillis, 
TimeUnit.MILLISECONDS);
               if (item == null) {
                   return (ConsumerRecords<K, V>) ConsumerRecords.EMPTY;
               }
   
               Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new 
HashMap<>();
   
               int numberOfRecords = 0;
   
               while (item != null) {
                   TopicName topicName = 
TopicName.get(item.consumer.getTopic());
                   String topic = topicName.getPartitionedTopicName();
                   int partition = topicName.isPartitioned() ? 
topicName.getPartitionIndex() : 0;
                   Message<byte[]> msg = item.message;
                   MessageId msgId = msg.getMessageId();
                   if (msgId instanceof TopicMessageIdImpl) {
                       msgId = ((TopicMessageIdImpl) msgId).getInnerMessageId();
                   }
                   long offset = MessageIdUtils.getOffset(msgId);
   
                   TopicPartition tp = new TopicPartition(topic, partition);
                   if (lastReceivedOffset.get(tp) == null && 
!unpolledPartitions.contains(tp)) {
                        log.info("When polling offsets, invalid offsets were 
detected. Resetting topic partition {}", tp);
                        resetOffsets(tp);
                   }
   
                  // .. other code
   
               // If no interceptor is provided, interceptors list will an 
empty list, original ConsumerRecords will be return.
               return applyConsumerInterceptorsOnConsume(interceptors, new 
ConsumerRecords<>(records));
           } catch (InterruptedException e) {
               throw new RuntimeException(e);
           }
       }
   ```
   **int partition = topicName.isPartitioned() ? topicName.getPartitionIndex() 
: 0;**
   This code can not discriminate partitioned-topic or non-paritioned-topic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to