[ 
https://issues.apache.org/jira/browse/KAFKA-4246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15545134#comment-15545134
 ] 

Alexandru Ionita commented on KAFKA-4246:
-----------------------------------------

There's nothing special about the consumer. Below is the code that I use to 
reproduce the issue.
What I was able to find out is that the problem is related to the status of the 
consumer that is stored in the kafka server app. If I change the consumer group 
to something else and not using this particular one, it works well.

Once a consumer, identified by a consumer group, was used by assigning topics 
and not partitions, the same group identification CANNOT be used anymore by 
assigning partitions.

{code}
public class Consumer implements Runnable
{

    public static final String KFK_SERVERS = 
"kafka1:9092,kafka2:9092,kafka3:9092";
    public static final String KEY_DESERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
    public static final String VALUE_DESERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
    public static final String CONSUMER_COMMIT_RETRIES = "0";
    public static final String ACKS = "all";
    public static final String AUTO_COMMIT = "true";
    public static final String GROUP = "dolphin";

    static Properties kfkProps = new Properties(  );
    static {
        kfkProps.setProperty( "bootstrap.servers", KFK_SERVERS );
        kfkProps.setProperty( "key.deserializer", KEY_DESERIALIZER );
        kfkProps.setProperty( "value.deserializer", VALUE_DESERIALIZER );
        kfkProps.setProperty( "retries", CONSUMER_COMMIT_RETRIES );
        kfkProps.setProperty( "acks", ACKS );
        kfkProps.setProperty( "enable.auto.commit", AUTO_COMMIT );
        kfkProps.setProperty( "group.id", GROUP );

    }

    private KafkaConsumer consumer;

    public Consumer()
    {
        consumer = new KafkaConsumer<>( kfkProps );
    }

    @Override
    public void run()
    {
        TopicPartition echoPart = new TopicPartition( "igadev_echo", 4 );
        TopicPartition deltaPart = new TopicPartition( "igadev_deltasync", 4 );
        consumer.assign( Arrays.asList( new TopicPartition[]{echoPart, 
deltaPart} ) );

        while (true)
        {
            ConsumerRecords<String, String> records = consumer.poll( 1000 );
            for (ConsumerRecord<String, String> record : records) {
                System.out.println( String.format(
                        "Topic: %s, Partition: %d, Key: %s, Value: %s",
                        record.topic(),
                        record.partition(),
                        record.key(),
                        record.value() ) );
            }
        }
    }
}
{code}

> Discretionary partition assignment on the consumer side not functional
> ----------------------------------------------------------------------
>
>                 Key: KAFKA-4246
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4246
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 0.10.0.1
>            Reporter: Alexandru Ionita
>
> Trying to manually assign partition/topics to a consumer will not work 
> correctly. The consumer will be able to fetch records from the given 
> partitions, but the first commit will fail with the following message:
> {code}
> 2016-10-03 13:44:50.673 DEBUG 11757 --- [pool-9-thread-1] 
> o.a.k.c.c.internals.ConsumerCoordinator  : Offset commit for group XXXXXX 
> failed: The coordinator is not aware of this member.
> 2016-10-03 13:44:50.673  WARN 11757 --- [pool-9-thread-1] 
> o.a.k.c.c.internals.ConsumerCoordinator  : Auto offset commit failed for 
> group XXXXXX: Commit cannot be completed since the group has already 
> rebalanced and assigned the partitions to another member. This means that the 
> time between subsequent calls to poll() was longer than the configured 
> session.timeout.ms, which typically implies that the poll loop is spending 
> too much time message processing. You can address this either by increasing 
> the session timeout or by reducing the maximum size of batches returned in 
> poll() with max.poll.records.
> {code}.
> All this while the consumer will continue to poll records from the kafka 
> cluster, but every commit will fail with the same message.
> I tried setting the {{session.timeout.ms}} to values like 50000, but I was 
> getting the same outcome => no successfull commits.
> If I only switch from {{consumer.assign( subscribedPartitions )}} to 
> {{consumer.subscribe( topics )}}, everything works as expected. No other 
> client configurations should be changed to make it work.
> Am I missing something here?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to