[jira] [Commented] (KAFKA-4682) Committed offsets should not be deleted if a consumer is still active (KIP-211)

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443661#comment-16443661
 ] 

ASF GitHub Bot commented on KAFKA-4682:
---

vahidhashemian opened a new pull request #4896: KAFKA-4682: Revise expiration 
semantics of consumer group offsets (KIP-211)
URL: https://github.com/apache/kafka/pull/4896
 
 
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Committed offsets should not be deleted if a consumer is still active 
> (KIP-211)
> ---
>
> Key: KAFKA-4682
> URL: https://issues.apache.org/jira/browse/KAFKA-4682
> Project: Kafka
>  Issue Type: Bug
>Reporter: James Cheng
>Assignee: Vahid Hashemian
>Priority: Major
>  Labels: kip
>
> Kafka will delete committed offsets that are older than 
> offsets.retention.minutes
> If there is an active consumer on a low traffic partition, it is possible 
> that Kafka will delete the committed offset for that consumer. Once the 
> offset is deleted, a restart or a rebalance of that consumer will cause the 
> consumer to not find any committed offset and start consuming from 
> earliest/latest (depending on auto.offset.reset). I'm not sure, but a broker 
> failover might also cause you to start reading from auto.offset.reset (due to 
> broker restart, or coordinator failover).
> I think that Kafka should only delete offsets for inactive consumers. The 
> timer should only start after a consumer group goes inactive. For example, if 
> a consumer group goes inactive, then after 1 week, delete the offsets for 
> that consumer group. This is a solution that [~junrao] mentioned in 
> https://issues.apache.org/jira/browse/KAFKA-3806?focusedCommentId=15323521&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15323521
> The current workarounds are to:
> # Commit an offset on every partition you own on a regular basis, making sure 
> that it is more frequent than offsets.retention.minutes (a broker-side 
> setting that a consumer might not be aware of)
> or
> # Turn the value of offsets.retention.minutes up really really high. You have 
> to make sure it is higher than any valid low-traffic rate that you want to 
> support. For example, if you want to support a topic where someone produces 
> once a month, you would have to set offsetes.retention.mintues to 1 month. 
> or
> # Turn on enable.auto.commit (this is essentially #1, but easier to 
> implement).
> None of these are ideal. 
> #1 can be spammy. It requires your consumers know something about how the 
> brokers are configured. Sometimes it is out of your control. Mirrormaker, for 
> example, only commits offsets on partitions where it receives data. And it is 
> duplication that you need to put into all of your consumers.
> #2 has disk-space impact on the broker (in __consumer_offsets) as well as 
> memory-size on the broker (to answer OffsetFetch).
> #3 I think has the potential for message loss (the consumer might commit on 
> messages that are not yet fully processed)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6804) Event values should not be included in log messages

2018-04-19 Thread Jussi Lyytinen (JIRA)
Jussi Lyytinen created KAFKA-6804:
-

 Summary: Event values should not be included in log messages
 Key: KAFKA-6804
 URL: https://issues.apache.org/jira/browse/KAFKA-6804
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 1.0.0
Reporter: Jussi Lyytinen


In certain error situations, event values are included in log messages:
{code:java}
2018-04-19 08:00:28 
[my-component-37670563-39be-4fcb-92e1-02f115aca43c-StreamThread-2] ERROR 
o.a.k.s.p.i.AssignedTasks - stream-thread 
[my-component-37670563-39be-4fcb-92e1-02f115aca43c-StreamThread-2] Failed to 
commit stream task 1_1 due to the following error:
org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort sending 
since an error caught with a previous record (key [my-key] value [my-value] ...
{code}
In some environments, this is highly undesired behavior since the values can 
contain sensitive information. Error logs are usually collected to separate 
systems not meant for storing such information (e.g. patient or financial data).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6804) Event values should not be included in log messages

2018-04-19 Thread Jussi Lyytinen (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jussi Lyytinen updated KAFKA-6804:
--
Affects Version/s: (was: 1.0.0)
   1.0.1

> Event values should not be included in log messages
> ---
>
> Key: KAFKA-6804
> URL: https://issues.apache.org/jira/browse/KAFKA-6804
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.1
>Reporter: Jussi Lyytinen
>Priority: Major
>
> In certain error situations, event values are included in log messages:
> {code:java}
> 2018-04-19 08:00:28 
> [my-component-37670563-39be-4fcb-92e1-02f115aca43c-StreamThread-2] ERROR 
> o.a.k.s.p.i.AssignedTasks - stream-thread 
> [my-component-37670563-39be-4fcb-92e1-02f115aca43c-StreamThread-2] Failed to 
> commit stream task 1_1 due to the following error:
> org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort sending 
> since an error caught with a previous record (key [my-key] value [my-value] 
> ...
> {code}
> In some environments, this is highly undesired behavior since the values can 
> contain sensitive information. Error logs are usually collected to separate 
> systems not meant for storing such information (e.g. patient or financial 
> data).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6782) GlobalKTable GlobalStateStore never finishes restoring when consuming aborted messages

2018-04-19 Thread Lingxiao WANG (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lingxiao WANG updated KAFKA-6782:
-
Summary: GlobalKTable GlobalStateStore never finishes restoring when 
consuming aborted messages  (was: GlobalStateStore never finishes restoring 
when consuming transactional messages)

> GlobalKTable GlobalStateStore never finishes restoring when consuming aborted 
> messages
> --
>
> Key: KAFKA-6782
> URL: https://issues.apache.org/jira/browse/KAFKA-6782
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Lingxiao WANG
>Priority: Major
>
> Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his 
> proposition :
> {code:java}
> while (offset < highWatermark) {
>  final ConsumerRecords records = consumer.poll(100);
>  for (ConsumerRecord record : records) {
>  if (record.key() != null) {
>stateRestoreCallback.restore(record.key(), record.value());
>  }
>  offset = consumer.position(topicPartition);
>  }
>  }{code}
> doesn't work for me. In my situation, there is chance to have several 
> transaction markers appear in sequence in one partition. In this case, the 
> consumer is blocked and can't poll any records, and the code 'offset = 
> consumer.position(topicPartition)' doesn't have any opportunity to execute.
>  So I propose to move the code 'offset = consumer.position(topicPartition)' 
> outside of the cycle to guarantee that event if no records are polled, the 
> offset can always be updated.
> {code:java}
> while (offset < highWatermark) {
>  final ConsumerRecords records = consumer.poll(100);
>  for (ConsumerRecord record : records) {
>  if (record.key() != null) {
>stateRestoreCallback.restore(record.key(), record.value());
>  }
>  }
>  offset = consumer.position(topicPartition);
>  }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6782) GlobalKTable GlobalStateStore never finishes restoring when consuming aborted messages

2018-04-19 Thread Lingxiao WANG (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lingxiao WANG updated KAFKA-6782:
-
Description: 
Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his 
solution which is below, works for the succeed transactional messages. But when 
there are aborted messages, it will be in infinite loop. Here is his 
proposition :
{code:java}
while (offset < highWatermark) {
 final ConsumerRecords records = consumer.poll(100);
 for (ConsumerRecord record : records) {
 if (record.key() != null) {
   stateRestoreCallback.restore(record.key(), record.value());
 }
 offset = consumer.position(topicPartition);
 }
 }{code}
Concretely, when the consumer consume a set of aborted messages, it polls 0 
records, and the code 'offset = consumer.position(topicPartition)' doesn't have 
any opportunity to execute.

 So I propose to move the code 'offset = consumer.position(topicPartition)' 
outside of the cycle to guarantee that event if no records are polled, the 
offset can always be updated.
{code:java}
while (offset < highWatermark) {
 final ConsumerRecords records = consumer.poll(100);
 for (ConsumerRecord record : records) {
 if (record.key() != null) {
   stateRestoreCallback.restore(record.key(), record.value());
 }
 }
 offset = consumer.position(topicPartition);
 }{code}
 

  was:
Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his 
proposition :
{code:java}
while (offset < highWatermark) {
 final ConsumerRecords records = consumer.poll(100);
 for (ConsumerRecord record : records) {
 if (record.key() != null) {
   stateRestoreCallback.restore(record.key(), record.value());
 }
 offset = consumer.position(topicPartition);
 }
 }{code}
doesn't work for me. In my situation, there is chance to have several 
transaction markers appear in sequence in one partition. In this case, the 
consumer is blocked and can't poll any records, and the code 'offset = 
consumer.position(topicPartition)' doesn't have any opportunity to execute.

 So I propose to move the code 'offset = consumer.position(topicPartition)' 
outside of the cycle to guarantee that event if no records are polled, the 
offset can always be updated.
{code:java}
while (offset < highWatermark) {
 final ConsumerRecords records = consumer.poll(100);
 for (ConsumerRecord record : records) {
 if (record.key() != null) {
   stateRestoreCallback.restore(record.key(), record.value());
 }
 }
 offset = consumer.position(topicPartition);
 }{code}
 


> GlobalKTable GlobalStateStore never finishes restoring when consuming aborted 
> messages
> --
>
> Key: KAFKA-6782
> URL: https://issues.apache.org/jira/browse/KAFKA-6782
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Lingxiao WANG
>Priority: Major
>
> Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his 
> solution which is below, works for the succeed transactional messages. But 
> when there are aborted messages, it will be in infinite loop. Here is his 
> proposition :
> {code:java}
> while (offset < highWatermark) {
>  final ConsumerRecords records = consumer.poll(100);
>  for (ConsumerRecord record : records) {
>  if (record.key() != null) {
>stateRestoreCallback.restore(record.key(), record.value());
>  }
>  offset = consumer.position(topicPartition);
>  }
>  }{code}
> Concretely, when the consumer consume a set of aborted messages, it polls 0 
> records, and the code 'offset = consumer.position(topicPartition)' doesn't 
> have any opportunity to execute.
>  So I propose to move the code 'offset = consumer.position(topicPartition)' 
> outside of the cycle to guarantee that event if no records are polled, the 
> offset can always be updated.
> {code:java}
> while (offset < highWatermark) {
>  final ConsumerRecords records = consumer.poll(100);
>  for (ConsumerRecord record : records) {
>  if (record.key() != null) {
>stateRestoreCallback.restore(record.key(), record.value());
>  }
>  }
>  offset = consumer.position(topicPartition);
>  }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6799) Consumer livelock during consumer group rebalance

2018-04-19 Thread Attila Sasvari (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443921#comment-16443921
 ] 

Attila Sasvari commented on KAFKA-6799:
---

[~phdezann] thanks for reporting this issue and creating the docker environment 
for reproducing the issue.

- I had to set the environment variable M2_REPOSITORY before running the shell 
script: {{M2_REPOSITORY=/root/.m2/ ./spin.sh}}
Then I saw the issue you described in the description.
- I looked a bit around in the helloworld-kafka-1 docker container, and noticed 
that the replication factor for the internal topic was set to 1:
{code}
root@b6e9218f1761:/install/kafka# bin/kafka-topics.sh --describe -zookeeper 
172.170.0.80:2181
Topic:__consumer_offsetsPartitionCount:50   ReplicationFactor:1 
Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
Topic: __consumer_offsets   Partition: 0Leader: 103 
Replicas: 103   Isr: 103
Topic: __consumer_offsets   Partition: 1Leader: 101 
Replicas: 101   Isr: 101
Topic: __consumer_offsets   Partition: 2Leader: -1  
Replicas: 102   Isr: 102
{code}
In this situation, consumer cannot contact the partition leader for  
__consumer_offsets, Partition: 2 as it was killed by the test. So it won't be 
able to commit the offset, for that specific partition.
- I changed replication factor to 3 for {{__consumer_offsets}} and then I did 
not see this issue.
- Can you add the following to {{docker/entrypoint.sh}} and re-test?
{code}
cat >>config/server.properties < Consumer livelock during consumer group rebalance
> -
>
> Key: KAFKA-6799
> URL: https://issues.apache.org/jira/browse/KAFKA-6799
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 1.0.0, 0.11.0.2, 1.1.0
>Reporter: Pierre-Henri Dezanneau
>Priority: Critical
>
> We have the following environment:
> * 1 kafka cluster with 3 brokers
> * 1 topic with 3 partitions
> * 1 producer
> * 1 consumer group with 3 consumers
> From this setup, we remove one broker from the cluster, the hard way, by 
> simply killing it. Quite often, we see that the consumer group is not 
> rebalanced correctly. By that I mean that all 3 consumers stop consuming and 
> get stuck in a loop, forever.
> The thread dump shows that the consumer threads aren't blocked but run 
> forever in {{AbstractCoordinator.ensureCoordinatorReady}}, holding a lock due 
> to the {{synchonized}} keyword on the calling method. Heartbeat threads are 
> blocked, waiting for the consumer threads to release the lock. This situation 
> prevents all consumers from consuming any more record.
> We build a simple project which seems to reliably demonstrate this:
> {code:sh}
> $ mkdir -p /tmp/sandbox && cd /tmp/sandbox
> $ git clone https://github.com/phdezann/helloworld-kafka-livelock
> $ cd helloworld-kafka-livelock && ./spin.sh
> ...
> livelock detected
> {code}
> {code:sh|title=Consumer thread|borderStyle=solid}
> "kafka-consumer-1@10733" daemon prio=5 tid=0x31 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>blocks kafka-coordinator-heartbeat-thread | helloWorldGroup@10728
> at sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:-1)
> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> - locked <0x2a15> (a sun.nio.ch.EPollSelectorImpl)
> - locked <0x2a16> (a java.util.Collections$UnmodifiableSet)
> - locked <0x2a17> (a sun.nio.ch.Util$3)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> at org.apache.kafka.common.network.Selector.select(Selector.java:684)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:408)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:261)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:156)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:228)
> - locked <0x2a0c> (a 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensur

[jira] [Commented] (KAFKA-6335) SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails intermittently

2018-04-19 Thread Sonia Garudi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443967#comment-16443967
 ] 

Sonia Garudi commented on KAFKA-6335:
-

[~omkreddy] I got this as a test failure.Its an intermittent failure. I am 
running tests for the trunk branch of Kafka(1.2.0-SNAPSHOT). 

> SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails 
> intermittently
> --
>
> Key: KAFKA-6335
> URL: https://issues.apache.org/jira/browse/KAFKA-6335
> Project: Kafka
>  Issue Type: Test
>Reporter: Ted Yu
>Assignee: Manikumar
>Priority: Major
> Fix For: 1.2.0
>
>
> From 
> https://builds.apache.org/job/kafka-pr-jdk9-scala2.12/3045/testReport/junit/kafka.security.auth/SimpleAclAuthorizerTest/testHighConcurrencyModificationOfResourceAcls/
>  :
> {code}
> java.lang.AssertionError: expected acls Set(User:36 has Allow permission for 
> operations: Read from hosts: *, User:7 has Allow permission for operations: 
> Read from hosts: *, User:21 has Allow permission for operations: Read from 
> hosts: *, User:39 has Allow permission for operations: Read from hosts: *, 
> User:43 has Allow permission for operations: Read from hosts: *, User:3 has 
> Allow permission for operations: Read from hosts: *, User:35 has Allow 
> permission for operations: Read from hosts: *, User:15 has Allow permission 
> for operations: Read from hosts: *, User:16 has Allow permission for 
> operations: Read from hosts: *, User:22 has Allow permission for operations: 
> Read from hosts: *, User:26 has Allow permission for operations: Read from 
> hosts: *, User:11 has Allow permission for operations: Read from hosts: *, 
> User:38 has Allow permission for operations: Read from hosts: *, User:8 has 
> Allow permission for operations: Read from hosts: *, User:28 has Allow 
> permission for operations: Read from hosts: *, User:32 has Allow permission 
> for operations: Read from hosts: *, User:25 has Allow permission for 
> operations: Read from hosts: *, User:41 has Allow permission for operations: 
> Read from hosts: *, User:44 has Allow permission for operations: Read from 
> hosts: *, User:48 has Allow permission for operations: Read from hosts: *, 
> User:2 has Allow permission for operations: Read from hosts: *, User:9 has 
> Allow permission for operations: Read from hosts: *, User:14 has Allow 
> permission for operations: Read from hosts: *, User:46 has Allow permission 
> for operations: Read from hosts: *, User:13 has Allow permission for 
> operations: Read from hosts: *, User:5 has Allow permission for operations: 
> Read from hosts: *, User:29 has Allow permission for operations: Read from 
> hosts: *, User:45 has Allow permission for operations: Read from hosts: *, 
> User:6 has Allow permission for operations: Read from hosts: *, User:37 has 
> Allow permission for operations: Read from hosts: *, User:23 has Allow 
> permission for operations: Read from hosts: *, User:19 has Allow permission 
> for operations: Read from hosts: *, User:24 has Allow permission for 
> operations: Read from hosts: *, User:17 has Allow permission for operations: 
> Read from hosts: *, User:34 has Allow permission for operations: Read from 
> hosts: *, User:12 has Allow permission for operations: Read from hosts: *, 
> User:42 has Allow permission for operations: Read from hosts: *, User:4 has 
> Allow permission for operations: Read from hosts: *, User:47 has Allow 
> permission for operations: Read from hosts: *, User:18 has Allow permission 
> for operations: Read from hosts: *, User:31 has Allow permission for 
> operations: Read from hosts: *, User:49 has Allow permission for operations: 
> Read from hosts: *, User:33 has Allow permission for operations: Read from 
> hosts: *, User:1 has Allow permission for operations: Read from hosts: *, 
> User:27 has Allow permission for operations: Read from hosts: *) but got 
> Set(User:36 has Allow permission for operations: Read from hosts: *, User:7 
> has Allow permission for operations: Read from hosts: *, User:21 has Allow 
> permission for operations: Read from hosts: *, User:39 has Allow permission 
> for operations: Read from hosts: *, User:43 has Allow permission for 
> operations: Read from hosts: *, User:3 has Allow permission for operations: 
> Read from hosts: *, User:35 has Allow permission for operations: Read from 
> hosts: *, User:15 has Allow permission for operations: Read from hosts: *, 
> User:16 has Allow permission for operations: Read from hosts: *, User:22 has 
> Allow permission for operations: Read from hosts: *, User:26 has Allow 
> permission for operations: Read from hosts: *, User:11 has Allow permission 
> for operations: Read from hosts: *, User:38 has Allow permission for 
> 

[jira] [Created] (KAFKA-6805) Allow dynamic broker configs to be configured in ZooKeeper before starting broker

2018-04-19 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-6805:
-

 Summary: Allow dynamic broker configs to be configured in 
ZooKeeper before starting broker
 Key: KAFKA-6805
 URL: https://issues.apache.org/jira/browse/KAFKA-6805
 Project: Kafka
  Issue Type: Task
  Components: tools
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 2.0.0


At the moment, dynamic broker configs like SSL keystore and password can be 
configured using ConfigCommand only after a broker is started (using the new 
AdminClient). To start a broker, these configs have to be defined in 
server.properties. We want to restrict updates using ZooKeeper once broker 
starts up, but we should allow updates using ZK prior to starting brokers. This 
is particularly useful for password configs which are stored encrypted in ZK, 
making it difficult to set manually before starting brokers.

ConfigCommand is being updated to talk to AdminClient under KIP-248, but we 
will need to maintain the tool using ZK to enable credentials to be created in 
ZK before starting brokers. So the functionality to set broker configs can sit 
alongside that.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6730) Simplify state store recovery

2018-04-19 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444026#comment-16444026
 ] 

Matthias J. Sax commented on KAFKA-6730:


[~Yohan123] Thanks, please go ahead and assign the ticket to yourself if you 
are interested.

Not sure, what the best way for refactoring is atm. Maybe we could remove the 
method. We can discuss on the PR.

> Simplify state store recovery
> -
>
> Key: KAFKA-6730
> URL: https://issues.apache.org/jira/browse/KAFKA-6730
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
> Fix For: 1.2.0
>
>
> In the current code base, we restore state stores in the main thread (in 
> contrast to older code that did restore state stored in the rebalance call 
> back). This has multiple advantages and allows us the further simplify 
> restore code.
> In the original code base, during a long restore phase, it was possible that 
> a instance misses a rebalance and drops out of the consumer group. To detect 
> this case, we apply a check during the restore phase, that the end-offset of 
> the changelog topic does not change. A changed offset implies a missed 
> rebalance as another thread started to write into the changelog topic (ie, 
> the current thread does not own the task/store/changelog-topic anymore).
> With the new code, that restores in the main-loop, it's ensured that `poll()` 
> is called regularly and thus, a rebalance will be detected automatically. 
> This make the check about an changing changelog-end-offset unnecessary.
> We can simplify the restore logic, to just consuming until `poll()` does not 
> return any data. For this case, we fetch the end-offset to see if we did 
> fully restore. If yes, we resume processing, if not, we continue the restore.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6394) Prevent misconfiguration of advertised listeners

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444062#comment-16444062
 ] 

ASF GitHub Bot commented on KAFKA-6394:
---

omkreddy opened a new pull request #4897: KAFKA-6394: Add a check to prevent 
misconfiguration of advertised listeners
URL: https://github.com/apache/kafka/pull/4897
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Prevent misconfiguration of advertised listeners
> 
>
> Key: KAFKA-6394
> URL: https://issues.apache.org/jira/browse/KAFKA-6394
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Major
>
> We don't really have any protection from misconfiguration of the advertised 
> listeners. Sometimes users will copy the config from one host to another 
> during an upgrade. They may remember to update the broker id, but forget 
> about the advertised listeners. It can be surprisingly difficult to detect 
> this unless you know to look for it (e.g. you might just see a lot of 
> NotLeaderForPartition errors as the fetchers connect to the wrong broker). It 
> may not be totally foolproof, but it's probably enough for the common 
> misconfiguration case to check existing brokers to see whether there are any 
> which have already registered the advertised listener.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6806) Unable to validate sink connectors without "topics" component which is not required

2018-04-19 Thread JIRA
Ivan Majnarić created KAFKA-6806:


 Summary: Unable to validate sink connectors without "topics" 
component which is not required
 Key: KAFKA-6806
 URL: https://issues.apache.org/jira/browse/KAFKA-6806
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 1.1.0
 Environment: CP4.1., Centos7
Reporter: Ivan Majnarić


The bug is happening when you try to create new connector through for example 
kafka-connect-ui.

While both source and sink connectors were able to be validated through REST 
without "topics" as add-on with "connector.class" like this:
{code:java}
PUT / 
http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate
{
    "connector.class": 
"com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
}{code}
In the new version of CP4.1 you still can validate *source connectors* but not 
*sink connectors*. If you want to validate sink connectors you need to add to 
request -> "topics" config, like:
{code:java}
PUT / 
http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate
{
    "connector.class": 
"com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector",
"topics": "test-topic"
}{code}
So there is a little missmatch of the ways how to validate connectors which I 
think happened accidentally.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6799) Consumer livelock during consumer group rebalance

2018-04-19 Thread Attila Sasvari (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443921#comment-16443921
 ] 

Attila Sasvari edited comment on KAFKA-6799 at 4/19/18 3:00 PM:


[~phdezann] thanks for reporting this issue and creating the docker environment 
for reproducing it.

- I had to set the environment variable M2_REPOSITORY before running the shell 
script: {{M2_REPOSITORY=/root/.m2/ ./spin.sh}}
Then I saw the issue you described in the description.
- I looked a bit around in the helloworld-kafka-1 docker container, and noticed 
that the replication factor for the internal topic was set to 1:
{code}
root@b6e9218f1761:/install/kafka# bin/kafka-topics.sh --describe -zookeeper 
172.170.0.80:2181
Topic:__consumer_offsetsPartitionCount:50   ReplicationFactor:1 
Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
Topic: __consumer_offsets   Partition: 0Leader: 103 
Replicas: 103   Isr: 103
Topic: __consumer_offsets   Partition: 1Leader: 101 
Replicas: 101   Isr: 101
Topic: __consumer_offsets   Partition: 2Leader: -1  
Replicas: 102   Isr: 102
{code}
In this situation, consumer cannot contact the partition leader for  
__consumer_offsets, Partition: 2 as it was killed by the test. So it won't be 
able to commit the offset, for that specific partition.
- I changed replication factor to 3 for {{__consumer_offsets}} and then I did 
not see this issue.
- Can you add the following to {{docker/entrypoint.sh}} and re-test?
{code}
cat >>config/server.properties <>config/server.properties < Consumer livelock during consumer group rebalance
> -
>
> Key: KAFKA-6799
> URL: https://issues.apache.org/jira/browse/KAFKA-6799
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 1.0.0, 0.11.0.2, 1.1.0
>Reporter: Pierre-Henri Dezanneau
>Priority: Critical
>
> We have the following environment:
> * 1 kafka cluster with 3 brokers
> * 1 topic with 3 partitions
> * 1 producer
> * 1 consumer group with 3 consumers
> From this setup, we remove one broker from the cluster, the hard way, by 
> simply killing it. Quite often, we see that the consumer group is not 
> rebalanced correctly. By that I mean that all 3 consumers stop consuming and 
> get stuck in a loop, forever.
> The thread dump shows that the consumer threads aren't blocked but run 
> forever in {{AbstractCoordinator.ensureCoordinatorReady}}, holding a lock due 
> to the {{synchonized}} keyword on the calling method. Heartbeat threads are 
> blocked, waiting for the consumer threads to release the lock. This situation 
> prevents all consumers from consuming any more record.
> We build a simple project which seems to reliably demonstrate this:
> {code:sh}
> $ mkdir -p /tmp/sandbox && cd /tmp/sandbox
> $ git clone https://github.com/phdezann/helloworld-kafka-livelock
> $ cd helloworld-kafka-livelock && ./spin.sh
> ...
> livelock detected
> {code}
> {code:sh|title=Consumer thread|borderStyle=solid}
> "kafka-consumer-1@10733" daemon prio=5 tid=0x31 nid=NA runnable
>   java.lang.Thread.State: RUNNABLE
>blocks kafka-coordinator-heartbeat-thread | helloWorldGroup@10728
> at sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:-1)
> at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
> at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
> - locked <0x2a15> (a sun.nio.ch.EPollSelectorImpl)
> - locked <0x2a16> (a java.util.Collections$UnmodifiableSet)
> - locked <0x2a17> (a sun.nio.ch.Util$3)
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> at org.apache.kafka.common.network.Selector.select(Selector.java:684)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:408)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:261)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:156)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:228)
> - locked <0x2a0c> (a 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
> at 
> 

[jira] [Commented] (KAFKA-6805) Allow dynamic broker configs to be configured in ZooKeeper before starting broker

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444274#comment-16444274
 ] 

ASF GitHub Bot commented on KAFKA-6805:
---

rajinisivaram opened a new pull request #4898: KAFKA-6805: Enable broker 
configs to be stored in ZK before broker start
URL: https://github.com/apache/kafka/pull/4898
 
 
   Support configuration of dynamic broker configs in ZooKeeper before starting 
brokers using ConfigCommand. This will allow password configs to be encrypted 
and stored in ZooKeeper, without requiring clear passwords in server.properties 
to bootstrap the broker first.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Allow dynamic broker configs to be configured in ZooKeeper before starting 
> broker
> -
>
> Key: KAFKA-6805
> URL: https://issues.apache.org/jira/browse/KAFKA-6805
> Project: Kafka
>  Issue Type: Task
>  Components: tools
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.0.0
>
>
> At the moment, dynamic broker configs like SSL keystore and password can be 
> configured using ConfigCommand only after a broker is started (using the new 
> AdminClient). To start a broker, these configs have to be defined in 
> server.properties. We want to restrict updates using ZooKeeper once broker 
> starts up, but we should allow updates using ZK prior to starting brokers. 
> This is particularly useful for password configs which are stored encrypted 
> in ZK, making it difficult to set manually before starting brokers.
> ConfigCommand is being updated to talk to AdminClient under KIP-248, but we 
> will need to maintain the tool using ZK to enable credentials to be created 
> in ZK before starting brokers. So the functionality to set broker configs can 
> sit alongside that.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6807) Inconsistent method name

2018-04-19 Thread KuiLIU (JIRA)
KuiLIU created KAFKA-6807:
-

 Summary: Inconsistent method name
 Key: KAFKA-6807
 URL: https://issues.apache.org/jira/browse/KAFKA-6807
 Project: Kafka
  Issue Type: Improvement
Reporter: KuiLIU


The following method is named as "readTo", but the method will return the 
variable with lesses value. Thus, the name "readTo" is inconsistent with the 
method body code.
Rename the method as "lesser" should be better.

{code:java}
private Long readTo(final long endOffset) {
return endOffset < offsetLimit ? endOffset : offsetLimit;
}
{code}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6807) Inconsistent method name

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444322#comment-16444322
 ] 

ASF GitHub Bot commented on KAFKA-6807:
---

BruceKuiLiu opened a new pull request #4899: KAFKA-6807: Inconsistent method 
name.
URL: https://github.com/apache/kafka/pull/4899
 
 
   Change the method name "readTo" to "lesser".
   The method is named as "readTo", but the method will return the variable 
with lesses value. Thus, the name "readTo" is inconsistent with the method body 
code.
   Rename the method as "lesser" should be better.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Inconsistent method name
> 
>
> Key: KAFKA-6807
> URL: https://issues.apache.org/jira/browse/KAFKA-6807
> Project: Kafka
>  Issue Type: Improvement
>Reporter: KuiLIU
>Priority: Major
>
> The following method is named as "readTo", but the method will return the 
> variable with lesses value. Thus, the name "readTo" is inconsistent with the 
> method body code.
> Rename the method as "lesser" should be better.
> {code:java}
> private Long readTo(final long endOffset) {
> return endOffset < offsetLimit ? endOffset : offsetLimit;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6782) GlobalKTable GlobalStateStore never finishes restoring when consuming aborted messages

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444331#comment-16444331
 ] 

ASF GitHub Bot commented on KAFKA-6782:
---

Gitomain opened a new pull request #4900: KAFKA-6782: solved the bug of 
restoration of aborted messages for GlobalStateStore and KGlobalTable
URL: https://github.com/apache/kafka/pull/4900
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> GlobalKTable GlobalStateStore never finishes restoring when consuming aborted 
> messages
> --
>
> Key: KAFKA-6782
> URL: https://issues.apache.org/jira/browse/KAFKA-6782
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Lingxiao WANG
>Priority: Major
>
> Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his 
> solution which is below, works for the succeed transactional messages. But 
> when there are aborted messages, it will be in infinite loop. Here is his 
> proposition :
> {code:java}
> while (offset < highWatermark) {
>  final ConsumerRecords records = consumer.poll(100);
>  for (ConsumerRecord record : records) {
>  if (record.key() != null) {
>stateRestoreCallback.restore(record.key(), record.value());
>  }
>  offset = consumer.position(topicPartition);
>  }
>  }{code}
> Concretely, when the consumer consume a set of aborted messages, it polls 0 
> records, and the code 'offset = consumer.position(topicPartition)' doesn't 
> have any opportunity to execute.
>  So I propose to move the code 'offset = consumer.position(topicPartition)' 
> outside of the cycle to guarantee that event if no records are polled, the 
> offset can always be updated.
> {code:java}
> while (offset < highWatermark) {
>  final ConsumerRecords records = consumer.poll(100);
>  for (ConsumerRecord record : records) {
>  if (record.key() != null) {
>stateRestoreCallback.restore(record.key(), record.value());
>  }
>  }
>  offset = consumer.position(topicPartition);
>  }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6782) GlobalKTable GlobalStateStore never finishes restoring when consuming aborted messages

2018-04-19 Thread Lingxiao WANG (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lingxiao WANG updated KAFKA-6782:
-
External issue URL: https://github.com/apache/kafka/pull/4900

> GlobalKTable GlobalStateStore never finishes restoring when consuming aborted 
> messages
> --
>
> Key: KAFKA-6782
> URL: https://issues.apache.org/jira/browse/KAFKA-6782
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Lingxiao WANG
>Priority: Major
>
> Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his 
> solution which is below, works for the succeed transactional messages. But 
> when there are aborted messages, it will be in infinite loop. Here is his 
> proposition :
> {code:java}
> while (offset < highWatermark) {
>  final ConsumerRecords records = consumer.poll(100);
>  for (ConsumerRecord record : records) {
>  if (record.key() != null) {
>stateRestoreCallback.restore(record.key(), record.value());
>  }
>  offset = consumer.position(topicPartition);
>  }
>  }{code}
> Concretely, when the consumer consume a set of aborted messages, it polls 0 
> records, and the code 'offset = consumer.position(topicPartition)' doesn't 
> have any opportunity to execute.
>  So I propose to move the code 'offset = consumer.position(topicPartition)' 
> outside of the cycle to guarantee that event if no records are polled, the 
> offset can always be updated.
> {code:java}
> while (offset < highWatermark) {
>  final ConsumerRecords records = consumer.poll(100);
>  for (ConsumerRecord record : records) {
>  if (record.key() != null) {
>stateRestoreCallback.restore(record.key(), record.value());
>  }
>  }
>  offset = consumer.position(topicPartition);
>  }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6730) Simplify state store recovery

2018-04-19 Thread Richard Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Richard Yu reassigned KAFKA-6730:
-

Assignee: Richard Yu

> Simplify state store recovery
> -
>
> Key: KAFKA-6730
> URL: https://issues.apache.org/jira/browse/KAFKA-6730
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Fix For: 1.2.0
>
>
> In the current code base, we restore state stores in the main thread (in 
> contrast to older code that did restore state stored in the rebalance call 
> back). This has multiple advantages and allows us the further simplify 
> restore code.
> In the original code base, during a long restore phase, it was possible that 
> a instance misses a rebalance and drops out of the consumer group. To detect 
> this case, we apply a check during the restore phase, that the end-offset of 
> the changelog topic does not change. A changed offset implies a missed 
> rebalance as another thread started to write into the changelog topic (ie, 
> the current thread does not own the task/store/changelog-topic anymore).
> With the new code, that restores in the main-loop, it's ensured that `poll()` 
> is called regularly and thus, a rebalance will be detected automatically. 
> This make the check about an changing changelog-end-offset unnecessary.
> We can simplify the restore logic, to just consuming until `poll()` does not 
> return any data. For this case, we fetch the end-offset to see if we did 
> fully restore. If yes, we resume processing, if not, we continue the restore.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6808) Creating source Kafka connectors re-using a name of a deleted connector causes the connector to never push messages to kafka

2018-04-19 Thread Igor Candido (JIRA)
Igor Candido created KAFKA-6808:
---

 Summary: Creating source Kafka connectors re-using a name of a 
deleted connector causes the connector to never push messages to kafka
 Key: KAFKA-6808
 URL: https://issues.apache.org/jira/browse/KAFKA-6808
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 1.1.0, 1.0.0
Reporter: Igor Candido


I tried to deploy a source kafka connector to a kafka connect instance that 
previously had a connector with a similar definition deployed to but since had 
been deleted and the newly deployed connector was not erroring but also wasn't 
pushing messages to kafka.

 

The connector created was this salesforce 
[https://github.com/jcustenborder/kafka-connect-salesforce]

 

And the definition of the connector is:

{
  "connector.class": 
"com.github.jcustenborder.kafka.connect.salesforce.SalesforceSourceConnector",
  "salesforce.username": "XXX",
  "tasks.max": "1",
  "salesforce.consumer.key": "XXX",
  "salesforce.push.topic.name": "Leads",
  "salesforce.instance": "https://eu8.salesforce.com";,
  "salesforce.password": "XXX",
  "salesforce.password.token": "XXX",
  "salesforce.version": "36",
  "name": "salesforce-lead-source",
  "kafka.topic": "salesforce-lead",
  "salesforce.consumer.secret": "XXX",
  "salesforce.object": "Lead"
}

 

I tried to restart kafka connect instance and that didn't fix the problem.

 

As soon as I changed the name of the connector it started working without any 
configuration change or environment change.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6730) Simplify state store recovery

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1659#comment-1659
 ] 

ASF GitHub Bot commented on KAFKA-6730:
---

ConcurrencyPractitioner opened a new pull request #4901: [KAFKA-6730] Simplify 
state store recovery
URL: https://github.com/apache/kafka/pull/4901
 
 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Simplify state store recovery
> -
>
> Key: KAFKA-6730
> URL: https://issues.apache.org/jira/browse/KAFKA-6730
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Fix For: 1.2.0
>
>
> In the current code base, we restore state stores in the main thread (in 
> contrast to older code that did restore state stored in the rebalance call 
> back). This has multiple advantages and allows us the further simplify 
> restore code.
> In the original code base, during a long restore phase, it was possible that 
> a instance misses a rebalance and drops out of the consumer group. To detect 
> this case, we apply a check during the restore phase, that the end-offset of 
> the changelog topic does not change. A changed offset implies a missed 
> rebalance as another thread started to write into the changelog topic (ie, 
> the current thread does not own the task/store/changelog-topic anymore).
> With the new code, that restores in the main-loop, it's ensured that `poll()` 
> is called regularly and thus, a rebalance will be detected automatically. 
> This make the check about an changing changelog-end-offset unnecessary.
> We can simplify the restore logic, to just consuming until `poll()` does not 
> return any data. For this case, we fetch the end-offset to see if we did 
> fully restore. If yes, we resume processing, if not, we continue the restore.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5413) Log cleaner fails due to large offset in segment file

2018-04-19 Thread Badai Aqrandista (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444926#comment-16444926
 ] 

Badai Aqrandista commented on KAFKA-5413:
-

Hi [~davidpsv17]

Did you just upgraded from 0.10.x to 0.11.x when you hit this issue? If yes, it 
maybe related to KAFKA-6264. 

Thanks

Badai

> Log cleaner fails due to large offset in segment file
> -
>
> Key: KAFKA-5413
> URL: https://issues.apache.org/jira/browse/KAFKA-5413
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.0, 0.10.2.1
> Environment: Ubuntu 14.04 LTS, Oracle Java 8u92, kafka_2.11-0.10.2.0
>Reporter: Nicholas Ngorok
>Assignee: Kelvin Rutt
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.2.2, 0.11.0.0
>
> Attachments: .index.cleaned, 
> .log, .log.cleaned, 
> .timeindex.cleaned, 002147422683.log, 
> kafka-5413.patch
>
>
> The log cleaner thread in our brokers is failing with the trace below
> {noformat}
> [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: 
> Cleaning segment 0 in log __consumer_offsets-12 (largest timestamp Thu Jun 08 
> 15:48:59 PDT 2017) into 0, retaining deletes. (kafka.log.LogCleaner)
> [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: 
> Cleaning segment 2147343575 in log __consumer_offsets-12 (largest timestamp 
> Thu Jun 08 15:49:06 PDT 2017) into 0, retaining deletes. 
> (kafka.log.LogCleaner)
> [2017-06-08 15:49:54,834] ERROR {kafka-log-cleaner-thread-0} 
> [kafka-log-cleaner-thread-0], Error due to  (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: largest offset in 
> message set can not be safely converted to relative offset.
> at scala.Predef$.require(Predef.scala:224)
> at kafka.log.LogSegment.append(LogSegment.scala:109)
> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:478)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:405)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:401)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:362)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.log.Cleaner.clean(LogCleaner.scala:362)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:241)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:220)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2017-06-08 15:49:54,835] INFO {kafka-log-cleaner-thread-0} 
> [kafka-log-cleaner-thread-0], Stopped  (kafka.log.LogCleaner)
> {noformat}
> This seems to point at the specific line [here| 
> https://github.com/apache/kafka/blob/0.11.0/core/src/main/scala/kafka/log/LogSegment.scala#L92]
>  in the kafka src where the difference is actually larger than MAXINT as both 
> baseOffset and offset are of type long. It was introduced in this [pr| 
> https://github.com/apache/kafka/pull/2210/files/56d1f8196b77a47b176b7bbd1e4220a3be827631]
> These were the outputs of dumping the first two log segments
> {noformat}
> :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration 
> --files /kafka-logs/__consumer_offsets-12/000
> 0.log
> Dumping /kafka-logs/__consumer_offsets-12/.log
> Starting offset: 0
> offset: 1810054758 position: 0 NoTimestampType: -1 isvalid: true payloadsize: 
> -1 magic: 0 compresscodec: NONE crc: 3127861909 keysize: 34
> :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration 
> --files /kafka-logs/__consumer_offsets-12/000
> 0002147343575.log
> Dumping /kafka-logs/__consumer_offsets-12/002147343575.log
> Starting offset: 2147343575
> offset: 2147539884 position: 0 NoTimestampType: -1 isvalid: true paylo
> adsize: -1 magic: 0 compresscodec: NONE crc: 2282192097 keysize: 34
> {noformat}
> My guess is that since 2147539884 is larger than MAXINT, we are hitting this 
> exception. Was there a specific reason, this check was added in 0.10.2?
> E.g. if the first offset is a key = "key 0" and then we have MAXINT + 1 of 
> "key 1" following, wouldn't we run into this situation whenever the log 
> cleaner runs?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5413) Log cleaner fails due to large offset in segment file

2018-04-19 Thread Badai Aqrandista (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444926#comment-16444926
 ] 

Badai Aqrandista edited comment on KAFKA-5413 at 4/19/18 10:43 PM:
---

Hi [~davidpsv17]

Did you just upgraded to 0.11.x when you hit this issue? If yes, it maybe 
related to KAFKA-6264. 

Thanks

Badai


was (Author: badai):
Hi [~davidpsv17]

Did you just upgraded from 0.10.x to 0.11.x when you hit this issue? If yes, it 
maybe related to KAFKA-6264. 

Thanks

Badai

> Log cleaner fails due to large offset in segment file
> -
>
> Key: KAFKA-5413
> URL: https://issues.apache.org/jira/browse/KAFKA-5413
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.0, 0.10.2.1
> Environment: Ubuntu 14.04 LTS, Oracle Java 8u92, kafka_2.11-0.10.2.0
>Reporter: Nicholas Ngorok
>Assignee: Kelvin Rutt
>Priority: Critical
>  Labels: reliability
> Fix For: 0.10.2.2, 0.11.0.0
>
> Attachments: .index.cleaned, 
> .log, .log.cleaned, 
> .timeindex.cleaned, 002147422683.log, 
> kafka-5413.patch
>
>
> The log cleaner thread in our brokers is failing with the trace below
> {noformat}
> [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: 
> Cleaning segment 0 in log __consumer_offsets-12 (largest timestamp Thu Jun 08 
> 15:48:59 PDT 2017) into 0, retaining deletes. (kafka.log.LogCleaner)
> [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: 
> Cleaning segment 2147343575 in log __consumer_offsets-12 (largest timestamp 
> Thu Jun 08 15:49:06 PDT 2017) into 0, retaining deletes. 
> (kafka.log.LogCleaner)
> [2017-06-08 15:49:54,834] ERROR {kafka-log-cleaner-thread-0} 
> [kafka-log-cleaner-thread-0], Error due to  (kafka.log.LogCleaner)
> java.lang.IllegalArgumentException: requirement failed: largest offset in 
> message set can not be safely converted to relative offset.
> at scala.Predef$.require(Predef.scala:224)
> at kafka.log.LogSegment.append(LogSegment.scala:109)
> at kafka.log.Cleaner.cleanInto(LogCleaner.scala:478)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:405)
> at 
> kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:401)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363)
> at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:362)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at kafka.log.Cleaner.clean(LogCleaner.scala:362)
> at 
> kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:241)
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:220)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2017-06-08 15:49:54,835] INFO {kafka-log-cleaner-thread-0} 
> [kafka-log-cleaner-thread-0], Stopped  (kafka.log.LogCleaner)
> {noformat}
> This seems to point at the specific line [here| 
> https://github.com/apache/kafka/blob/0.11.0/core/src/main/scala/kafka/log/LogSegment.scala#L92]
>  in the kafka src where the difference is actually larger than MAXINT as both 
> baseOffset and offset are of type long. It was introduced in this [pr| 
> https://github.com/apache/kafka/pull/2210/files/56d1f8196b77a47b176b7bbd1e4220a3be827631]
> These were the outputs of dumping the first two log segments
> {noformat}
> :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration 
> --files /kafka-logs/__consumer_offsets-12/000
> 0.log
> Dumping /kafka-logs/__consumer_offsets-12/.log
> Starting offset: 0
> offset: 1810054758 position: 0 NoTimestampType: -1 isvalid: true payloadsize: 
> -1 magic: 0 compresscodec: NONE crc: 3127861909 keysize: 34
> :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration 
> --files /kafka-logs/__consumer_offsets-12/000
> 0002147343575.log
> Dumping /kafka-logs/__consumer_offsets-12/002147343575.log
> Starting offset: 2147343575
> offset: 2147539884 position: 0 NoTimestampType: -1 isvalid: true paylo
> adsize: -1 magic: 0 compresscodec: NONE crc: 2282192097 keysize: 34
> {noformat}
> My guess is that since 2147539884 is larger than MAXINT, we are hitting this 
> exception. Was there a specific reason, this check was added in 0.10.2?
> E.g. if the first offset is a key = "key 0" and then we have MAXINT + 1 of 
> "key 1" following, wouldn't we run into this situation whenever the log 
> cleaner runs?



--
This m

[jira] [Created] (KAFKA-6809) connections-created metric does not behave as expected

2018-04-19 Thread Anna Povzner (JIRA)
Anna Povzner created KAFKA-6809:
---

 Summary: connections-created metric does not behave as expected
 Key: KAFKA-6809
 URL: https://issues.apache.org/jira/browse/KAFKA-6809
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.0.1
Reporter: Anna Povzner


"connections-created" sensor is described as "new connections established". It 
currently records only connections that the broker creates, but does not count 
connections received. Seems like we should also count connections received – 
either include them into this metric (and also clarify the description) or add 
a new metric (separately counting two types of connections). I am not sure how 
useful is to separate them, so I think we should do the first approach.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6809) connections-created metric does not behave as expected

2018-04-19 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-6809:
---
Affects Version/s: 1.1.0

> connections-created metric does not behave as expected
> --
>
> Key: KAFKA-6809
> URL: https://issues.apache.org/jira/browse/KAFKA-6809
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Anna Povzner
>Priority: Major
> Fix For: 1.2.0, 1.1.1
>
>
> "connections-created" sensor is described as "new connections established". 
> It currently records only connections that the broker/client creates, but 
> does not count connections received. Seems like we should also count 
> connections received – either include them into this metric (and also clarify 
> the description) or add a new metric (separately counting two types of 
> connections). I am not sure how useful is to separate them, so I think we 
> should do the first approach.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6809) connections-created metric does not behave as expected

2018-04-19 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-6809:
---
Fix Version/s: 1.1.1
   1.2.0

> connections-created metric does not behave as expected
> --
>
> Key: KAFKA-6809
> URL: https://issues.apache.org/jira/browse/KAFKA-6809
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Anna Povzner
>Priority: Major
> Fix For: 1.2.0, 1.1.1
>
>
> "connections-created" sensor is described as "new connections established". 
> It currently records only connections that the broker/client creates, but 
> does not count connections received. Seems like we should also count 
> connections received – either include them into this metric (and also clarify 
> the description) or add a new metric (separately counting two types of 
> connections). I am not sure how useful is to separate them, so I think we 
> should do the first approach.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6809) connections-created metric does not behave as expected

2018-04-19 Thread Ismael Juma (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-6809:
---
Description: "connections-created" sensor is described as "new connections 
established". It currently records only connections that the broker/client 
creates, but does not count connections received. Seems like we should also 
count connections received – either include them into this metric (and also 
clarify the description) or add a new metric (separately counting two types of 
connections). I am not sure how useful is to separate them, so I think we 
should do the first approach.  (was: "connections-created" sensor is described 
as "new connections established". It currently records only connections that 
the broker creates, but does not count connections received. Seems like we 
should also count connections received – either include them into this metric 
(and also clarify the description) or add a new metric (separately counting two 
types of connections). I am not sure how useful is to separate them, so I think 
we should do the first approach.)

> connections-created metric does not behave as expected
> --
>
> Key: KAFKA-6809
> URL: https://issues.apache.org/jira/browse/KAFKA-6809
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0, 1.0.1
>Reporter: Anna Povzner
>Priority: Major
> Fix For: 1.2.0, 1.1.1
>
>
> "connections-created" sensor is described as "new connections established". 
> It currently records only connections that the broker/client creates, but 
> does not count connections received. Seems like we should also count 
> connections received – either include them into this metric (and also clarify 
> the description) or add a new metric (separately counting two types of 
> connections). I am not sure how useful is to separate them, so I think we 
> should do the first approach.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6592) NullPointerException thrown when executing ConsoleCosumer with deserializer set to `WindowedDeserializer`

2018-04-19 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16445120#comment-16445120
 ] 

huxihx commented on KAFKA-6592:
---

[~guozhang] Can this Jira issue be closed safely?

> NullPointerException thrown when executing ConsoleCosumer with deserializer 
> set to `WindowedDeserializer`
> -
>
> Key: KAFKA-6592
> URL: https://issues.apache.org/jira/browse/KAFKA-6592
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 1.0.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Minor
>
> When reading streams app's output topic with WindowedDeserializer deserilizer 
> using kafka-console-consumer.sh, NullPointerException was thrown due to the 
> fact that the inner deserializer was not initialized since there is no place 
> in ConsoleConsumer to set this class.
> Complete stack trace is shown below:
> {code:java}
> [2018-02-26 14:56:04,736] ERROR Unknown error when running consumer:  
> (kafka.tools.ConsoleConsumer$)
> java.lang.NullPointerException
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:89)
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:35)
> at 
> kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:544)
> at scala.Option.map(Option.scala:146)
> at kafka.tools.DefaultMessageFormatter.write$1(ConsoleConsumer.scala:545)
> at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:560)
> at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:147)
> at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
> at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
> at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6791) Add a CoordinatorNodeProvider in KafkaAdminClient

2018-04-19 Thread huxihx (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx reassigned KAFKA-6791:
-

Assignee: huxihx

> Add a CoordinatorNodeProvider in KafkaAdminClient
> -
>
> Key: KAFKA-6791
> URL: https://issues.apache.org/jira/browse/KAFKA-6791
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Guozhang Wang
>Assignee: huxihx
>Priority: Major
>
> As we add more and more coordinator-related requests to the admin client, we 
> can consider adding a CoordinatorNodeProvider to consolidate the common logic 
> pattern of finding the coordinator first, then send the corresponding request.
> Note that 1) with this provider interface it is almost not possible to batch 
> multiple groupIds per coordinator; there has to be a little more refactoring 
> to make it work. 2) for some requests like list consumers, group ids are not 
> known beforehand and hence we cannot use this provider as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6592) NullPointerException thrown when executing ConsoleCosumer with deserializer set to `WindowedDeserializer`

2018-04-19 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-6592.
--
Resolution: Fixed

Thanks for the reminder!

> NullPointerException thrown when executing ConsoleCosumer with deserializer 
> set to `WindowedDeserializer`
> -
>
> Key: KAFKA-6592
> URL: https://issues.apache.org/jira/browse/KAFKA-6592
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, tools
>Affects Versions: 1.0.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Minor
>
> When reading streams app's output topic with WindowedDeserializer deserilizer 
> using kafka-console-consumer.sh, NullPointerException was thrown due to the 
> fact that the inner deserializer was not initialized since there is no place 
> in ConsoleConsumer to set this class.
> Complete stack trace is shown below:
> {code:java}
> [2018-02-26 14:56:04,736] ERROR Unknown error when running consumer:  
> (kafka.tools.ConsoleConsumer$)
> java.lang.NullPointerException
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:89)
> at 
> org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:35)
> at 
> kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:544)
> at scala.Option.map(Option.scala:146)
> at kafka.tools.DefaultMessageFormatter.write$1(ConsoleConsumer.scala:545)
> at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:560)
> at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:147)
> at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84)
> at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54)
> at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6791) Add a CoordinatorNodeProvider in KafkaAdminClient

2018-04-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16445248#comment-16445248
 ] 

ASF GitHub Bot commented on KAFKA-6791:
---

huxihx opened a new pull request #4902: KAFKA-6791: Add CoordinatorNodeProvider 
in KafkaAdminClient
URL: https://github.com/apache/kafka/pull/4902
 
 
   KAFKA-6791: Add CoordinatorNodeProvider in KafkaAdminClient
   https://issues.apache.org/jira/browse/KAFKA-6791
   
   Add CoordinatorNodeProvider interface to support batch retrieval for group 
coordinators.
   
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a CoordinatorNodeProvider in KafkaAdminClient
> -
>
> Key: KAFKA-6791
> URL: https://issues.apache.org/jira/browse/KAFKA-6791
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Guozhang Wang
>Assignee: huxihx
>Priority: Major
>
> As we add more and more coordinator-related requests to the admin client, we 
> can consider adding a CoordinatorNodeProvider to consolidate the common logic 
> pattern of finding the coordinator first, then send the corresponding request.
> Note that 1) with this provider interface it is almost not possible to batch 
> multiple groupIds per coordinator; there has to be a little more refactoring 
> to make it work. 2) for some requests like list consumers, group ids are not 
> known beforehand and hence we cannot use this provider as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)