Onur Karaman created KAFKA-6014:
-----------------------------------

             Summary: new consumer mirror maker halts after committing offsets 
to a deleted topic
                 Key: KAFKA-6014
                 URL: https://issues.apache.org/jira/browse/KAFKA-6014
             Project: Kafka
          Issue Type: Bug
            Reporter: Onur Karaman


New consumer throws an unexpected KafkaException when trying to commit to a 
topic that has been deleted. MirrorMaker.commitOffsets doesn't attempt to catch 
the KafkaException and just kills the process. We didn't see this in the old 
consumer because old consumer just silently drops failed offset commits.

I ran a quick experiment locally to prove the behavior. The experiment:
1. start up a single broker
2. create a single-partition topic t
3. create a new consumer that consumes topic t
4. make the consumer commit every few seconds
5. delete topic t
6. expect: KafkaException that kills the process.

Here's my script:
{code}
package org.apache.kafka.clients.consumer;

import org.apache.kafka.common.TopicPartition;

import java.util.Collections;
import java.util.List;
import java.util.Properties;

public class OffsetCommitTopicDeletionTest {
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9090");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "g");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
"org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        KafkaConsumer<byte[], byte[]> kafkaConsumer = new 
KafkaConsumer<>(props);
        TopicPartition partition = new TopicPartition("t", 0);
        List<TopicPartition> partitions = Collections.singletonList(partition);
        kafkaConsumer.assign(partitions);
        while (true) {
            kafkaConsumer.commitSync(Collections.singletonMap(partition, new 
OffsetAndMetadata(0, "")));
            Thread.sleep(1000);
        }
    }
}
{code}

Here are the other commands:
{code}
> rm -rf /tmp/zookeeper/ /tmp/kafka-logs* logs*
> ./gradlew clean jar
> ./bin/zookeeper-server-start.sh config/zookeeper.properties
> export LOG_DIR=logs0 && ./bin/kafka-server-start.sh config/server0.properties
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic t 
> --partitions 1 --replication-factor 1
> ./bin/kafka-run-class.sh 
> org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic t
{code}

Here is the output:
{code}
[2017-10-04 20:00:14,451] ERROR [Consumer clientId=consumer-1, groupId=g] 
Offset commit failed on partition t-0 at offset 0: This server does not host 
this topic-partition. 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
Exception in thread "main" org.apache.kafka.common.KafkaException: Partition 
t-0 may not exist or user may not have Describe access to topic
  at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:789)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:734)
  at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
  at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
  at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
  at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
  at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:190)
  at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600)
  at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1231)
  at 
org.apache.kafka.clients.consumer.OffsetCommitTopicDeletionTest.main(OffsetCommitTopicDeletionTest.java:22)
{code}

A couple ways we could fix this:
1. make OffsetCommitResponseHandler throw a more specific exception and make 
MirrorMaker.commitOffsets catch the exception. It currently just catches 
WakeupException and CommitFailedException.
2. make OffsetCommitResponseHandler log the error and move on. This is probably 
the simpler option. Just delete lines:
{code}
-                        future.raise(new KafkaException("Partition " + tp + " 
may not exist or user may not have Describe access to topic"));
-                        return;
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to