[jira] [Commented] (KAFKA-4682) Committed offsets should not be deleted if a consumer is still active (KIP-211)
[ https://issues.apache.org/jira/browse/KAFKA-4682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443661#comment-16443661 ] ASF GitHub Bot commented on KAFKA-4682: --- vahidhashemian opened a new pull request #4896: KAFKA-4682: Revise expiration semantics of consumer group offsets (KIP-211) URL: https://github.com/apache/kafka/pull/4896 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Committed offsets should not be deleted if a consumer is still active > (KIP-211) > --- > > Key: KAFKA-4682 > URL: https://issues.apache.org/jira/browse/KAFKA-4682 > Project: Kafka > Issue Type: Bug >Reporter: James Cheng >Assignee: Vahid Hashemian >Priority: Major > Labels: kip > > Kafka will delete committed offsets that are older than > offsets.retention.minutes > If there is an active consumer on a low traffic partition, it is possible > that Kafka will delete the committed offset for that consumer. Once the > offset is deleted, a restart or a rebalance of that consumer will cause the > consumer to not find any committed offset and start consuming from > earliest/latest (depending on auto.offset.reset). I'm not sure, but a broker > failover might also cause you to start reading from auto.offset.reset (due to > broker restart, or coordinator failover). > I think that Kafka should only delete offsets for inactive consumers. The > timer should only start after a consumer group goes inactive. For example, if > a consumer group goes inactive, then after 1 week, delete the offsets for > that consumer group. This is a solution that [~junrao] mentioned in > https://issues.apache.org/jira/browse/KAFKA-3806?focusedCommentId=15323521&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15323521 > The current workarounds are to: > # Commit an offset on every partition you own on a regular basis, making sure > that it is more frequent than offsets.retention.minutes (a broker-side > setting that a consumer might not be aware of) > or > # Turn the value of offsets.retention.minutes up really really high. You have > to make sure it is higher than any valid low-traffic rate that you want to > support. For example, if you want to support a topic where someone produces > once a month, you would have to set offsetes.retention.mintues to 1 month. > or > # Turn on enable.auto.commit (this is essentially #1, but easier to > implement). > None of these are ideal. > #1 can be spammy. It requires your consumers know something about how the > brokers are configured. Sometimes it is out of your control. Mirrormaker, for > example, only commits offsets on partitions where it receives data. And it is > duplication that you need to put into all of your consumers. > #2 has disk-space impact on the broker (in __consumer_offsets) as well as > memory-size on the broker (to answer OffsetFetch). > #3 I think has the potential for message loss (the consumer might commit on > messages that are not yet fully processed) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6804) Event values should not be included in log messages
Jussi Lyytinen created KAFKA-6804: - Summary: Event values should not be included in log messages Key: KAFKA-6804 URL: https://issues.apache.org/jira/browse/KAFKA-6804 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 1.0.0 Reporter: Jussi Lyytinen In certain error situations, event values are included in log messages: {code:java} 2018-04-19 08:00:28 [my-component-37670563-39be-4fcb-92e1-02f115aca43c-StreamThread-2] ERROR o.a.k.s.p.i.AssignedTasks - stream-thread [my-component-37670563-39be-4fcb-92e1-02f115aca43c-StreamThread-2] Failed to commit stream task 1_1 due to the following error: org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort sending since an error caught with a previous record (key [my-key] value [my-value] ... {code} In some environments, this is highly undesired behavior since the values can contain sensitive information. Error logs are usually collected to separate systems not meant for storing such information (e.g. patient or financial data). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6804) Event values should not be included in log messages
[ https://issues.apache.org/jira/browse/KAFKA-6804?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jussi Lyytinen updated KAFKA-6804: -- Affects Version/s: (was: 1.0.0) 1.0.1 > Event values should not be included in log messages > --- > > Key: KAFKA-6804 > URL: https://issues.apache.org/jira/browse/KAFKA-6804 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 1.0.1 >Reporter: Jussi Lyytinen >Priority: Major > > In certain error situations, event values are included in log messages: > {code:java} > 2018-04-19 08:00:28 > [my-component-37670563-39be-4fcb-92e1-02f115aca43c-StreamThread-2] ERROR > o.a.k.s.p.i.AssignedTasks - stream-thread > [my-component-37670563-39be-4fcb-92e1-02f115aca43c-StreamThread-2] Failed to > commit stream task 1_1 due to the following error: > org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort sending > since an error caught with a previous record (key [my-key] value [my-value] > ... > {code} > In some environments, this is highly undesired behavior since the values can > contain sensitive information. Error logs are usually collected to separate > systems not meant for storing such information (e.g. patient or financial > data). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6782) GlobalKTable GlobalStateStore never finishes restoring when consuming aborted messages
[ https://issues.apache.org/jira/browse/KAFKA-6782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lingxiao WANG updated KAFKA-6782: - Summary: GlobalKTable GlobalStateStore never finishes restoring when consuming aborted messages (was: GlobalStateStore never finishes restoring when consuming transactional messages) > GlobalKTable GlobalStateStore never finishes restoring when consuming aborted > messages > -- > > Key: KAFKA-6782 > URL: https://issues.apache.org/jira/browse/KAFKA-6782 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0, 1.0.1 >Reporter: Lingxiao WANG >Priority: Major > > Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his > proposition : > {code:java} > while (offset < highWatermark) { > final ConsumerRecords records = consumer.poll(100); > for (ConsumerRecord record : records) { > if (record.key() != null) { >stateRestoreCallback.restore(record.key(), record.value()); > } > offset = consumer.position(topicPartition); > } > }{code} > doesn't work for me. In my situation, there is chance to have several > transaction markers appear in sequence in one partition. In this case, the > consumer is blocked and can't poll any records, and the code 'offset = > consumer.position(topicPartition)' doesn't have any opportunity to execute. > So I propose to move the code 'offset = consumer.position(topicPartition)' > outside of the cycle to guarantee that event if no records are polled, the > offset can always be updated. > {code:java} > while (offset < highWatermark) { > final ConsumerRecords records = consumer.poll(100); > for (ConsumerRecord record : records) { > if (record.key() != null) { >stateRestoreCallback.restore(record.key(), record.value()); > } > } > offset = consumer.position(topicPartition); > }{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6782) GlobalKTable GlobalStateStore never finishes restoring when consuming aborted messages
[ https://issues.apache.org/jira/browse/KAFKA-6782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lingxiao WANG updated KAFKA-6782: - Description: Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his solution which is below, works for the succeed transactional messages. But when there are aborted messages, it will be in infinite loop. Here is his proposition : {code:java} while (offset < highWatermark) { final ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { if (record.key() != null) { stateRestoreCallback.restore(record.key(), record.value()); } offset = consumer.position(topicPartition); } }{code} Concretely, when the consumer consume a set of aborted messages, it polls 0 records, and the code 'offset = consumer.position(topicPartition)' doesn't have any opportunity to execute. So I propose to move the code 'offset = consumer.position(topicPartition)' outside of the cycle to guarantee that event if no records are polled, the offset can always be updated. {code:java} while (offset < highWatermark) { final ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { if (record.key() != null) { stateRestoreCallback.restore(record.key(), record.value()); } } offset = consumer.position(topicPartition); }{code} was: Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his proposition : {code:java} while (offset < highWatermark) { final ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { if (record.key() != null) { stateRestoreCallback.restore(record.key(), record.value()); } offset = consumer.position(topicPartition); } }{code} doesn't work for me. In my situation, there is chance to have several transaction markers appear in sequence in one partition. In this case, the consumer is blocked and can't poll any records, and the code 'offset = consumer.position(topicPartition)' doesn't have any opportunity to execute. So I propose to move the code 'offset = consumer.position(topicPartition)' outside of the cycle to guarantee that event if no records are polled, the offset can always be updated. {code:java} while (offset < highWatermark) { final ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { if (record.key() != null) { stateRestoreCallback.restore(record.key(), record.value()); } } offset = consumer.position(topicPartition); }{code} > GlobalKTable GlobalStateStore never finishes restoring when consuming aborted > messages > -- > > Key: KAFKA-6782 > URL: https://issues.apache.org/jira/browse/KAFKA-6782 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0, 1.0.1 >Reporter: Lingxiao WANG >Priority: Major > > Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his > solution which is below, works for the succeed transactional messages. But > when there are aborted messages, it will be in infinite loop. Here is his > proposition : > {code:java} > while (offset < highWatermark) { > final ConsumerRecords records = consumer.poll(100); > for (ConsumerRecord record : records) { > if (record.key() != null) { >stateRestoreCallback.restore(record.key(), record.value()); > } > offset = consumer.position(topicPartition); > } > }{code} > Concretely, when the consumer consume a set of aborted messages, it polls 0 > records, and the code 'offset = consumer.position(topicPartition)' doesn't > have any opportunity to execute. > So I propose to move the code 'offset = consumer.position(topicPartition)' > outside of the cycle to guarantee that event if no records are polled, the > offset can always be updated. > {code:java} > while (offset < highWatermark) { > final ConsumerRecords records = consumer.poll(100); > for (ConsumerRecord record : records) { > if (record.key() != null) { >stateRestoreCallback.restore(record.key(), record.value()); > } > } > offset = consumer.position(topicPartition); > }{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6799) Consumer livelock during consumer group rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443921#comment-16443921 ] Attila Sasvari commented on KAFKA-6799: --- [~phdezann] thanks for reporting this issue and creating the docker environment for reproducing the issue. - I had to set the environment variable M2_REPOSITORY before running the shell script: {{M2_REPOSITORY=/root/.m2/ ./spin.sh}} Then I saw the issue you described in the description. - I looked a bit around in the helloworld-kafka-1 docker container, and noticed that the replication factor for the internal topic was set to 1: {code} root@b6e9218f1761:/install/kafka# bin/kafka-topics.sh --describe -zookeeper 172.170.0.80:2181 Topic:__consumer_offsetsPartitionCount:50 ReplicationFactor:1 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer Topic: __consumer_offsets Partition: 0Leader: 103 Replicas: 103 Isr: 103 Topic: __consumer_offsets Partition: 1Leader: 101 Replicas: 101 Isr: 101 Topic: __consumer_offsets Partition: 2Leader: -1 Replicas: 102 Isr: 102 {code} In this situation, consumer cannot contact the partition leader for __consumer_offsets, Partition: 2 as it was killed by the test. So it won't be able to commit the offset, for that specific partition. - I changed replication factor to 3 for {{__consumer_offsets}} and then I did not see this issue. - Can you add the following to {{docker/entrypoint.sh}} and re-test? {code} cat >>config/server.properties < Consumer livelock during consumer group rebalance > - > > Key: KAFKA-6799 > URL: https://issues.apache.org/jira/browse/KAFKA-6799 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 1.0.0, 0.11.0.2, 1.1.0 >Reporter: Pierre-Henri Dezanneau >Priority: Critical > > We have the following environment: > * 1 kafka cluster with 3 brokers > * 1 topic with 3 partitions > * 1 producer > * 1 consumer group with 3 consumers > From this setup, we remove one broker from the cluster, the hard way, by > simply killing it. Quite often, we see that the consumer group is not > rebalanced correctly. By that I mean that all 3 consumers stop consuming and > get stuck in a loop, forever. > The thread dump shows that the consumer threads aren't blocked but run > forever in {{AbstractCoordinator.ensureCoordinatorReady}}, holding a lock due > to the {{synchonized}} keyword on the calling method. Heartbeat threads are > blocked, waiting for the consumer threads to release the lock. This situation > prevents all consumers from consuming any more record. > We build a simple project which seems to reliably demonstrate this: > {code:sh} > $ mkdir -p /tmp/sandbox && cd /tmp/sandbox > $ git clone https://github.com/phdezann/helloworld-kafka-livelock > $ cd helloworld-kafka-livelock && ./spin.sh > ... > livelock detected > {code} > {code:sh|title=Consumer thread|borderStyle=solid} > "kafka-consumer-1@10733" daemon prio=5 tid=0x31 nid=NA runnable > java.lang.Thread.State: RUNNABLE >blocks kafka-coordinator-heartbeat-thread | helloWorldGroup@10728 > at sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:-1) > at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) > at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) > - locked <0x2a15> (a sun.nio.ch.EPollSelectorImpl) > - locked <0x2a16> (a java.util.Collections$UnmodifiableSet) > - locked <0x2a17> (a sun.nio.ch.Util$3) > at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > at org.apache.kafka.common.network.Selector.select(Selector.java:684) > at org.apache.kafka.common.network.Selector.poll(Selector.java:408) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:261) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:156) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:228) > - locked <0x2a0c> (a > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensur
[jira] [Commented] (KAFKA-6335) SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails intermittently
[ https://issues.apache.org/jira/browse/KAFKA-6335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443967#comment-16443967 ] Sonia Garudi commented on KAFKA-6335: - [~omkreddy] I got this as a test failure.Its an intermittent failure. I am running tests for the trunk branch of Kafka(1.2.0-SNAPSHOT). > SimpleAclAuthorizerTest#testHighConcurrencyModificationOfResourceAcls fails > intermittently > -- > > Key: KAFKA-6335 > URL: https://issues.apache.org/jira/browse/KAFKA-6335 > Project: Kafka > Issue Type: Test >Reporter: Ted Yu >Assignee: Manikumar >Priority: Major > Fix For: 1.2.0 > > > From > https://builds.apache.org/job/kafka-pr-jdk9-scala2.12/3045/testReport/junit/kafka.security.auth/SimpleAclAuthorizerTest/testHighConcurrencyModificationOfResourceAcls/ > : > {code} > java.lang.AssertionError: expected acls Set(User:36 has Allow permission for > operations: Read from hosts: *, User:7 has Allow permission for operations: > Read from hosts: *, User:21 has Allow permission for operations: Read from > hosts: *, User:39 has Allow permission for operations: Read from hosts: *, > User:43 has Allow permission for operations: Read from hosts: *, User:3 has > Allow permission for operations: Read from hosts: *, User:35 has Allow > permission for operations: Read from hosts: *, User:15 has Allow permission > for operations: Read from hosts: *, User:16 has Allow permission for > operations: Read from hosts: *, User:22 has Allow permission for operations: > Read from hosts: *, User:26 has Allow permission for operations: Read from > hosts: *, User:11 has Allow permission for operations: Read from hosts: *, > User:38 has Allow permission for operations: Read from hosts: *, User:8 has > Allow permission for operations: Read from hosts: *, User:28 has Allow > permission for operations: Read from hosts: *, User:32 has Allow permission > for operations: Read from hosts: *, User:25 has Allow permission for > operations: Read from hosts: *, User:41 has Allow permission for operations: > Read from hosts: *, User:44 has Allow permission for operations: Read from > hosts: *, User:48 has Allow permission for operations: Read from hosts: *, > User:2 has Allow permission for operations: Read from hosts: *, User:9 has > Allow permission for operations: Read from hosts: *, User:14 has Allow > permission for operations: Read from hosts: *, User:46 has Allow permission > for operations: Read from hosts: *, User:13 has Allow permission for > operations: Read from hosts: *, User:5 has Allow permission for operations: > Read from hosts: *, User:29 has Allow permission for operations: Read from > hosts: *, User:45 has Allow permission for operations: Read from hosts: *, > User:6 has Allow permission for operations: Read from hosts: *, User:37 has > Allow permission for operations: Read from hosts: *, User:23 has Allow > permission for operations: Read from hosts: *, User:19 has Allow permission > for operations: Read from hosts: *, User:24 has Allow permission for > operations: Read from hosts: *, User:17 has Allow permission for operations: > Read from hosts: *, User:34 has Allow permission for operations: Read from > hosts: *, User:12 has Allow permission for operations: Read from hosts: *, > User:42 has Allow permission for operations: Read from hosts: *, User:4 has > Allow permission for operations: Read from hosts: *, User:47 has Allow > permission for operations: Read from hosts: *, User:18 has Allow permission > for operations: Read from hosts: *, User:31 has Allow permission for > operations: Read from hosts: *, User:49 has Allow permission for operations: > Read from hosts: *, User:33 has Allow permission for operations: Read from > hosts: *, User:1 has Allow permission for operations: Read from hosts: *, > User:27 has Allow permission for operations: Read from hosts: *) but got > Set(User:36 has Allow permission for operations: Read from hosts: *, User:7 > has Allow permission for operations: Read from hosts: *, User:21 has Allow > permission for operations: Read from hosts: *, User:39 has Allow permission > for operations: Read from hosts: *, User:43 has Allow permission for > operations: Read from hosts: *, User:3 has Allow permission for operations: > Read from hosts: *, User:35 has Allow permission for operations: Read from > hosts: *, User:15 has Allow permission for operations: Read from hosts: *, > User:16 has Allow permission for operations: Read from hosts: *, User:22 has > Allow permission for operations: Read from hosts: *, User:26 has Allow > permission for operations: Read from hosts: *, User:11 has Allow permission > for operations: Read from hosts: *, User:38 has Allow permission for >
[jira] [Created] (KAFKA-6805) Allow dynamic broker configs to be configured in ZooKeeper before starting broker
Rajini Sivaram created KAFKA-6805: - Summary: Allow dynamic broker configs to be configured in ZooKeeper before starting broker Key: KAFKA-6805 URL: https://issues.apache.org/jira/browse/KAFKA-6805 Project: Kafka Issue Type: Task Components: tools Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.0.0 At the moment, dynamic broker configs like SSL keystore and password can be configured using ConfigCommand only after a broker is started (using the new AdminClient). To start a broker, these configs have to be defined in server.properties. We want to restrict updates using ZooKeeper once broker starts up, but we should allow updates using ZK prior to starting brokers. This is particularly useful for password configs which are stored encrypted in ZK, making it difficult to set manually before starting brokers. ConfigCommand is being updated to talk to AdminClient under KIP-248, but we will need to maintain the tool using ZK to enable credentials to be created in ZK before starting brokers. So the functionality to set broker configs can sit alongside that. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6730) Simplify state store recovery
[ https://issues.apache.org/jira/browse/KAFKA-6730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444026#comment-16444026 ] Matthias J. Sax commented on KAFKA-6730: [~Yohan123] Thanks, please go ahead and assign the ticket to yourself if you are interested. Not sure, what the best way for refactoring is atm. Maybe we could remove the method. We can discuss on the PR. > Simplify state store recovery > - > > Key: KAFKA-6730 > URL: https://issues.apache.org/jira/browse/KAFKA-6730 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > Fix For: 1.2.0 > > > In the current code base, we restore state stores in the main thread (in > contrast to older code that did restore state stored in the rebalance call > back). This has multiple advantages and allows us the further simplify > restore code. > In the original code base, during a long restore phase, it was possible that > a instance misses a rebalance and drops out of the consumer group. To detect > this case, we apply a check during the restore phase, that the end-offset of > the changelog topic does not change. A changed offset implies a missed > rebalance as another thread started to write into the changelog topic (ie, > the current thread does not own the task/store/changelog-topic anymore). > With the new code, that restores in the main-loop, it's ensured that `poll()` > is called regularly and thus, a rebalance will be detected automatically. > This make the check about an changing changelog-end-offset unnecessary. > We can simplify the restore logic, to just consuming until `poll()` does not > return any data. For this case, we fetch the end-offset to see if we did > fully restore. If yes, we resume processing, if not, we continue the restore. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6394) Prevent misconfiguration of advertised listeners
[ https://issues.apache.org/jira/browse/KAFKA-6394?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444062#comment-16444062 ] ASF GitHub Bot commented on KAFKA-6394: --- omkreddy opened a new pull request #4897: KAFKA-6394: Add a check to prevent misconfiguration of advertised listeners URL: https://github.com/apache/kafka/pull/4897 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Prevent misconfiguration of advertised listeners > > > Key: KAFKA-6394 > URL: https://issues.apache.org/jira/browse/KAFKA-6394 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Major > > We don't really have any protection from misconfiguration of the advertised > listeners. Sometimes users will copy the config from one host to another > during an upgrade. They may remember to update the broker id, but forget > about the advertised listeners. It can be surprisingly difficult to detect > this unless you know to look for it (e.g. you might just see a lot of > NotLeaderForPartition errors as the fetchers connect to the wrong broker). It > may not be totally foolproof, but it's probably enough for the common > misconfiguration case to check existing brokers to see whether there are any > which have already registered the advertised listener. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6806) Unable to validate sink connectors without "topics" component which is not required
Ivan Majnarić created KAFKA-6806: Summary: Unable to validate sink connectors without "topics" component which is not required Key: KAFKA-6806 URL: https://issues.apache.org/jira/browse/KAFKA-6806 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 1.1.0 Environment: CP4.1., Centos7 Reporter: Ivan Majnarić The bug is happening when you try to create new connector through for example kafka-connect-ui. While both source and sink connectors were able to be validated through REST without "topics" as add-on with "connector.class" like this: {code:java} PUT / http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate { "connector.class": "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector", }{code} In the new version of CP4.1 you still can validate *source connectors* but not *sink connectors*. If you want to validate sink connectors you need to add to request -> "topics" config, like: {code:java} PUT / http://connect-url:8083/connector-plugins/com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector/config/validate { "connector.class": "com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector", "topics": "test-topic" }{code} So there is a little missmatch of the ways how to validate connectors which I think happened accidentally. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-6799) Consumer livelock during consumer group rebalance
[ https://issues.apache.org/jira/browse/KAFKA-6799?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16443921#comment-16443921 ] Attila Sasvari edited comment on KAFKA-6799 at 4/19/18 3:00 PM: [~phdezann] thanks for reporting this issue and creating the docker environment for reproducing it. - I had to set the environment variable M2_REPOSITORY before running the shell script: {{M2_REPOSITORY=/root/.m2/ ./spin.sh}} Then I saw the issue you described in the description. - I looked a bit around in the helloworld-kafka-1 docker container, and noticed that the replication factor for the internal topic was set to 1: {code} root@b6e9218f1761:/install/kafka# bin/kafka-topics.sh --describe -zookeeper 172.170.0.80:2181 Topic:__consumer_offsetsPartitionCount:50 ReplicationFactor:1 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer Topic: __consumer_offsets Partition: 0Leader: 103 Replicas: 103 Isr: 103 Topic: __consumer_offsets Partition: 1Leader: 101 Replicas: 101 Isr: 101 Topic: __consumer_offsets Partition: 2Leader: -1 Replicas: 102 Isr: 102 {code} In this situation, consumer cannot contact the partition leader for __consumer_offsets, Partition: 2 as it was killed by the test. So it won't be able to commit the offset, for that specific partition. - I changed replication factor to 3 for {{__consumer_offsets}} and then I did not see this issue. - Can you add the following to {{docker/entrypoint.sh}} and re-test? {code} cat >>config/server.properties <>config/server.properties < Consumer livelock during consumer group rebalance > - > > Key: KAFKA-6799 > URL: https://issues.apache.org/jira/browse/KAFKA-6799 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 1.0.0, 0.11.0.2, 1.1.0 >Reporter: Pierre-Henri Dezanneau >Priority: Critical > > We have the following environment: > * 1 kafka cluster with 3 brokers > * 1 topic with 3 partitions > * 1 producer > * 1 consumer group with 3 consumers > From this setup, we remove one broker from the cluster, the hard way, by > simply killing it. Quite often, we see that the consumer group is not > rebalanced correctly. By that I mean that all 3 consumers stop consuming and > get stuck in a loop, forever. > The thread dump shows that the consumer threads aren't blocked but run > forever in {{AbstractCoordinator.ensureCoordinatorReady}}, holding a lock due > to the {{synchonized}} keyword on the calling method. Heartbeat threads are > blocked, waiting for the consumer threads to release the lock. This situation > prevents all consumers from consuming any more record. > We build a simple project which seems to reliably demonstrate this: > {code:sh} > $ mkdir -p /tmp/sandbox && cd /tmp/sandbox > $ git clone https://github.com/phdezann/helloworld-kafka-livelock > $ cd helloworld-kafka-livelock && ./spin.sh > ... > livelock detected > {code} > {code:sh|title=Consumer thread|borderStyle=solid} > "kafka-consumer-1@10733" daemon prio=5 tid=0x31 nid=NA runnable > java.lang.Thread.State: RUNNABLE >blocks kafka-coordinator-heartbeat-thread | helloWorldGroup@10728 > at sun.nio.ch.EPollArrayWrapper.epollWait(EPollArrayWrapper.java:-1) > at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) > at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) > at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) > - locked <0x2a15> (a sun.nio.ch.EPollSelectorImpl) > - locked <0x2a16> (a java.util.Collections$UnmodifiableSet) > - locked <0x2a17> (a sun.nio.ch.Util$3) > at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) > at org.apache.kafka.common.network.Selector.select(Selector.java:684) > at org.apache.kafka.common.network.Selector.poll(Selector.java:408) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:460) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:261) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:156) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:228) > - locked <0x2a0c> (a > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator) > at >
[jira] [Commented] (KAFKA-6805) Allow dynamic broker configs to be configured in ZooKeeper before starting broker
[ https://issues.apache.org/jira/browse/KAFKA-6805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444274#comment-16444274 ] ASF GitHub Bot commented on KAFKA-6805: --- rajinisivaram opened a new pull request #4898: KAFKA-6805: Enable broker configs to be stored in ZK before broker start URL: https://github.com/apache/kafka/pull/4898 Support configuration of dynamic broker configs in ZooKeeper before starting brokers using ConfigCommand. This will allow password configs to be encrypted and stored in ZooKeeper, without requiring clear passwords in server.properties to bootstrap the broker first. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow dynamic broker configs to be configured in ZooKeeper before starting > broker > - > > Key: KAFKA-6805 > URL: https://issues.apache.org/jira/browse/KAFKA-6805 > Project: Kafka > Issue Type: Task > Components: tools >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.0.0 > > > At the moment, dynamic broker configs like SSL keystore and password can be > configured using ConfigCommand only after a broker is started (using the new > AdminClient). To start a broker, these configs have to be defined in > server.properties. We want to restrict updates using ZooKeeper once broker > starts up, but we should allow updates using ZK prior to starting brokers. > This is particularly useful for password configs which are stored encrypted > in ZK, making it difficult to set manually before starting brokers. > ConfigCommand is being updated to talk to AdminClient under KIP-248, but we > will need to maintain the tool using ZK to enable credentials to be created > in ZK before starting brokers. So the functionality to set broker configs can > sit alongside that. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6807) Inconsistent method name
KuiLIU created KAFKA-6807: - Summary: Inconsistent method name Key: KAFKA-6807 URL: https://issues.apache.org/jira/browse/KAFKA-6807 Project: Kafka Issue Type: Improvement Reporter: KuiLIU The following method is named as "readTo", but the method will return the variable with lesses value. Thus, the name "readTo" is inconsistent with the method body code. Rename the method as "lesser" should be better. {code:java} private Long readTo(final long endOffset) { return endOffset < offsetLimit ? endOffset : offsetLimit; } {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6807) Inconsistent method name
[ https://issues.apache.org/jira/browse/KAFKA-6807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444322#comment-16444322 ] ASF GitHub Bot commented on KAFKA-6807: --- BruceKuiLiu opened a new pull request #4899: KAFKA-6807: Inconsistent method name. URL: https://github.com/apache/kafka/pull/4899 Change the method name "readTo" to "lesser". The method is named as "readTo", but the method will return the variable with lesses value. Thus, the name "readTo" is inconsistent with the method body code. Rename the method as "lesser" should be better. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Inconsistent method name > > > Key: KAFKA-6807 > URL: https://issues.apache.org/jira/browse/KAFKA-6807 > Project: Kafka > Issue Type: Improvement >Reporter: KuiLIU >Priority: Major > > The following method is named as "readTo", but the method will return the > variable with lesses value. Thus, the name "readTo" is inconsistent with the > method body code. > Rename the method as "lesser" should be better. > {code:java} > private Long readTo(final long endOffset) { > return endOffset < offsetLimit ? endOffset : offsetLimit; > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6782) GlobalKTable GlobalStateStore never finishes restoring when consuming aborted messages
[ https://issues.apache.org/jira/browse/KAFKA-6782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444331#comment-16444331 ] ASF GitHub Bot commented on KAFKA-6782: --- Gitomain opened a new pull request #4900: KAFKA-6782: solved the bug of restoration of aborted messages for GlobalStateStore and KGlobalTable URL: https://github.com/apache/kafka/pull/4900 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > GlobalKTable GlobalStateStore never finishes restoring when consuming aborted > messages > -- > > Key: KAFKA-6782 > URL: https://issues.apache.org/jira/browse/KAFKA-6782 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0, 1.0.1 >Reporter: Lingxiao WANG >Priority: Major > > Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his > solution which is below, works for the succeed transactional messages. But > when there are aborted messages, it will be in infinite loop. Here is his > proposition : > {code:java} > while (offset < highWatermark) { > final ConsumerRecords records = consumer.poll(100); > for (ConsumerRecord record : records) { > if (record.key() != null) { >stateRestoreCallback.restore(record.key(), record.value()); > } > offset = consumer.position(topicPartition); > } > }{code} > Concretely, when the consumer consume a set of aborted messages, it polls 0 > records, and the code 'offset = consumer.position(topicPartition)' doesn't > have any opportunity to execute. > So I propose to move the code 'offset = consumer.position(topicPartition)' > outside of the cycle to guarantee that event if no records are polled, the > offset can always be updated. > {code:java} > while (offset < highWatermark) { > final ConsumerRecords records = consumer.poll(100); > for (ConsumerRecord record : records) { > if (record.key() != null) { >stateRestoreCallback.restore(record.key(), record.value()); > } > } > offset = consumer.position(topicPartition); > }{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6782) GlobalKTable GlobalStateStore never finishes restoring when consuming aborted messages
[ https://issues.apache.org/jira/browse/KAFKA-6782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lingxiao WANG updated KAFKA-6782: - External issue URL: https://github.com/apache/kafka/pull/4900 > GlobalKTable GlobalStateStore never finishes restoring when consuming aborted > messages > -- > > Key: KAFKA-6782 > URL: https://issues.apache.org/jira/browse/KAFKA-6782 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0, 1.0.1 >Reporter: Lingxiao WANG >Priority: Major > > Same problem with https://issues.apache.org/jira/browse/KAFKA-6190, but his > solution which is below, works for the succeed transactional messages. But > when there are aborted messages, it will be in infinite loop. Here is his > proposition : > {code:java} > while (offset < highWatermark) { > final ConsumerRecords records = consumer.poll(100); > for (ConsumerRecord record : records) { > if (record.key() != null) { >stateRestoreCallback.restore(record.key(), record.value()); > } > offset = consumer.position(topicPartition); > } > }{code} > Concretely, when the consumer consume a set of aborted messages, it polls 0 > records, and the code 'offset = consumer.position(topicPartition)' doesn't > have any opportunity to execute. > So I propose to move the code 'offset = consumer.position(topicPartition)' > outside of the cycle to guarantee that event if no records are polled, the > offset can always be updated. > {code:java} > while (offset < highWatermark) { > final ConsumerRecords records = consumer.poll(100); > for (ConsumerRecord record : records) { > if (record.key() != null) { >stateRestoreCallback.restore(record.key(), record.value()); > } > } > offset = consumer.position(topicPartition); > }{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6730) Simplify state store recovery
[ https://issues.apache.org/jira/browse/KAFKA-6730?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Richard Yu reassigned KAFKA-6730: - Assignee: Richard Yu > Simplify state store recovery > - > > Key: KAFKA-6730 > URL: https://issues.apache.org/jira/browse/KAFKA-6730 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Richard Yu >Priority: Major > Fix For: 1.2.0 > > > In the current code base, we restore state stores in the main thread (in > contrast to older code that did restore state stored in the rebalance call > back). This has multiple advantages and allows us the further simplify > restore code. > In the original code base, during a long restore phase, it was possible that > a instance misses a rebalance and drops out of the consumer group. To detect > this case, we apply a check during the restore phase, that the end-offset of > the changelog topic does not change. A changed offset implies a missed > rebalance as another thread started to write into the changelog topic (ie, > the current thread does not own the task/store/changelog-topic anymore). > With the new code, that restores in the main-loop, it's ensured that `poll()` > is called regularly and thus, a rebalance will be detected automatically. > This make the check about an changing changelog-end-offset unnecessary. > We can simplify the restore logic, to just consuming until `poll()` does not > return any data. For this case, we fetch the end-offset to see if we did > fully restore. If yes, we resume processing, if not, we continue the restore. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6808) Creating source Kafka connectors re-using a name of a deleted connector causes the connector to never push messages to kafka
Igor Candido created KAFKA-6808: --- Summary: Creating source Kafka connectors re-using a name of a deleted connector causes the connector to never push messages to kafka Key: KAFKA-6808 URL: https://issues.apache.org/jira/browse/KAFKA-6808 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 1.1.0, 1.0.0 Reporter: Igor Candido I tried to deploy a source kafka connector to a kafka connect instance that previously had a connector with a similar definition deployed to but since had been deleted and the newly deployed connector was not erroring but also wasn't pushing messages to kafka. The connector created was this salesforce [https://github.com/jcustenborder/kafka-connect-salesforce] And the definition of the connector is: { "connector.class": "com.github.jcustenborder.kafka.connect.salesforce.SalesforceSourceConnector", "salesforce.username": "XXX", "tasks.max": "1", "salesforce.consumer.key": "XXX", "salesforce.push.topic.name": "Leads", "salesforce.instance": "https://eu8.salesforce.com";, "salesforce.password": "XXX", "salesforce.password.token": "XXX", "salesforce.version": "36", "name": "salesforce-lead-source", "kafka.topic": "salesforce-lead", "salesforce.consumer.secret": "XXX", "salesforce.object": "Lead" } I tried to restart kafka connect instance and that didn't fix the problem. As soon as I changed the name of the connector it started working without any configuration change or environment change. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6730) Simplify state store recovery
[ https://issues.apache.org/jira/browse/KAFKA-6730?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1659#comment-1659 ] ASF GitHub Bot commented on KAFKA-6730: --- ConcurrencyPractitioner opened a new pull request #4901: [KAFKA-6730] Simplify state store recovery URL: https://github.com/apache/kafka/pull/4901 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Simplify state store recovery > - > > Key: KAFKA-6730 > URL: https://issues.apache.org/jira/browse/KAFKA-6730 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Richard Yu >Priority: Major > Fix For: 1.2.0 > > > In the current code base, we restore state stores in the main thread (in > contrast to older code that did restore state stored in the rebalance call > back). This has multiple advantages and allows us the further simplify > restore code. > In the original code base, during a long restore phase, it was possible that > a instance misses a rebalance and drops out of the consumer group. To detect > this case, we apply a check during the restore phase, that the end-offset of > the changelog topic does not change. A changed offset implies a missed > rebalance as another thread started to write into the changelog topic (ie, > the current thread does not own the task/store/changelog-topic anymore). > With the new code, that restores in the main-loop, it's ensured that `poll()` > is called regularly and thus, a rebalance will be detected automatically. > This make the check about an changing changelog-end-offset unnecessary. > We can simplify the restore logic, to just consuming until `poll()` does not > return any data. For this case, we fetch the end-offset to see if we did > fully restore. If yes, we resume processing, if not, we continue the restore. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5413) Log cleaner fails due to large offset in segment file
[ https://issues.apache.org/jira/browse/KAFKA-5413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444926#comment-16444926 ] Badai Aqrandista commented on KAFKA-5413: - Hi [~davidpsv17] Did you just upgraded from 0.10.x to 0.11.x when you hit this issue? If yes, it maybe related to KAFKA-6264. Thanks Badai > Log cleaner fails due to large offset in segment file > - > > Key: KAFKA-5413 > URL: https://issues.apache.org/jira/browse/KAFKA-5413 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.2.0, 0.10.2.1 > Environment: Ubuntu 14.04 LTS, Oracle Java 8u92, kafka_2.11-0.10.2.0 >Reporter: Nicholas Ngorok >Assignee: Kelvin Rutt >Priority: Critical > Labels: reliability > Fix For: 0.10.2.2, 0.11.0.0 > > Attachments: .index.cleaned, > .log, .log.cleaned, > .timeindex.cleaned, 002147422683.log, > kafka-5413.patch > > > The log cleaner thread in our brokers is failing with the trace below > {noformat} > [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: > Cleaning segment 0 in log __consumer_offsets-12 (largest timestamp Thu Jun 08 > 15:48:59 PDT 2017) into 0, retaining deletes. (kafka.log.LogCleaner) > [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: > Cleaning segment 2147343575 in log __consumer_offsets-12 (largest timestamp > Thu Jun 08 15:49:06 PDT 2017) into 0, retaining deletes. > (kafka.log.LogCleaner) > [2017-06-08 15:49:54,834] ERROR {kafka-log-cleaner-thread-0} > [kafka-log-cleaner-thread-0], Error due to (kafka.log.LogCleaner) > java.lang.IllegalArgumentException: requirement failed: largest offset in > message set can not be safely converted to relative offset. > at scala.Predef$.require(Predef.scala:224) > at kafka.log.LogSegment.append(LogSegment.scala:109) > at kafka.log.Cleaner.cleanInto(LogCleaner.scala:478) > at > kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:405) > at > kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:401) > at scala.collection.immutable.List.foreach(List.scala:381) > at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401) > at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363) > at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:362) > at scala.collection.immutable.List.foreach(List.scala:381) > at kafka.log.Cleaner.clean(LogCleaner.scala:362) > at > kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:241) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:220) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > [2017-06-08 15:49:54,835] INFO {kafka-log-cleaner-thread-0} > [kafka-log-cleaner-thread-0], Stopped (kafka.log.LogCleaner) > {noformat} > This seems to point at the specific line [here| > https://github.com/apache/kafka/blob/0.11.0/core/src/main/scala/kafka/log/LogSegment.scala#L92] > in the kafka src where the difference is actually larger than MAXINT as both > baseOffset and offset are of type long. It was introduced in this [pr| > https://github.com/apache/kafka/pull/2210/files/56d1f8196b77a47b176b7bbd1e4220a3be827631] > These were the outputs of dumping the first two log segments > {noformat} > :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration > --files /kafka-logs/__consumer_offsets-12/000 > 0.log > Dumping /kafka-logs/__consumer_offsets-12/.log > Starting offset: 0 > offset: 1810054758 position: 0 NoTimestampType: -1 isvalid: true payloadsize: > -1 magic: 0 compresscodec: NONE crc: 3127861909 keysize: 34 > :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration > --files /kafka-logs/__consumer_offsets-12/000 > 0002147343575.log > Dumping /kafka-logs/__consumer_offsets-12/002147343575.log > Starting offset: 2147343575 > offset: 2147539884 position: 0 NoTimestampType: -1 isvalid: true paylo > adsize: -1 magic: 0 compresscodec: NONE crc: 2282192097 keysize: 34 > {noformat} > My guess is that since 2147539884 is larger than MAXINT, we are hitting this > exception. Was there a specific reason, this check was added in 0.10.2? > E.g. if the first offset is a key = "key 0" and then we have MAXINT + 1 of > "key 1" following, wouldn't we run into this situation whenever the log > cleaner runs? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-5413) Log cleaner fails due to large offset in segment file
[ https://issues.apache.org/jira/browse/KAFKA-5413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16444926#comment-16444926 ] Badai Aqrandista edited comment on KAFKA-5413 at 4/19/18 10:43 PM: --- Hi [~davidpsv17] Did you just upgraded to 0.11.x when you hit this issue? If yes, it maybe related to KAFKA-6264. Thanks Badai was (Author: badai): Hi [~davidpsv17] Did you just upgraded from 0.10.x to 0.11.x when you hit this issue? If yes, it maybe related to KAFKA-6264. Thanks Badai > Log cleaner fails due to large offset in segment file > - > > Key: KAFKA-5413 > URL: https://issues.apache.org/jira/browse/KAFKA-5413 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.10.2.0, 0.10.2.1 > Environment: Ubuntu 14.04 LTS, Oracle Java 8u92, kafka_2.11-0.10.2.0 >Reporter: Nicholas Ngorok >Assignee: Kelvin Rutt >Priority: Critical > Labels: reliability > Fix For: 0.10.2.2, 0.11.0.0 > > Attachments: .index.cleaned, > .log, .log.cleaned, > .timeindex.cleaned, 002147422683.log, > kafka-5413.patch > > > The log cleaner thread in our brokers is failing with the trace below > {noformat} > [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: > Cleaning segment 0 in log __consumer_offsets-12 (largest timestamp Thu Jun 08 > 15:48:59 PDT 2017) into 0, retaining deletes. (kafka.log.LogCleaner) > [2017-06-08 15:49:54,822] INFO {kafka-log-cleaner-thread-0} Cleaner 0: > Cleaning segment 2147343575 in log __consumer_offsets-12 (largest timestamp > Thu Jun 08 15:49:06 PDT 2017) into 0, retaining deletes. > (kafka.log.LogCleaner) > [2017-06-08 15:49:54,834] ERROR {kafka-log-cleaner-thread-0} > [kafka-log-cleaner-thread-0], Error due to (kafka.log.LogCleaner) > java.lang.IllegalArgumentException: requirement failed: largest offset in > message set can not be safely converted to relative offset. > at scala.Predef$.require(Predef.scala:224) > at kafka.log.LogSegment.append(LogSegment.scala:109) > at kafka.log.Cleaner.cleanInto(LogCleaner.scala:478) > at > kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:405) > at > kafka.log.Cleaner$$anonfun$cleanSegments$1.apply(LogCleaner.scala:401) > at scala.collection.immutable.List.foreach(List.scala:381) > at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:401) > at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:363) > at kafka.log.Cleaner$$anonfun$clean$4.apply(LogCleaner.scala:362) > at scala.collection.immutable.List.foreach(List.scala:381) > at kafka.log.Cleaner.clean(LogCleaner.scala:362) > at > kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:241) > at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:220) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63) > [2017-06-08 15:49:54,835] INFO {kafka-log-cleaner-thread-0} > [kafka-log-cleaner-thread-0], Stopped (kafka.log.LogCleaner) > {noformat} > This seems to point at the specific line [here| > https://github.com/apache/kafka/blob/0.11.0/core/src/main/scala/kafka/log/LogSegment.scala#L92] > in the kafka src where the difference is actually larger than MAXINT as both > baseOffset and offset are of type long. It was introduced in this [pr| > https://github.com/apache/kafka/pull/2210/files/56d1f8196b77a47b176b7bbd1e4220a3be827631] > These were the outputs of dumping the first two log segments > {noformat} > :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration > --files /kafka-logs/__consumer_offsets-12/000 > 0.log > Dumping /kafka-logs/__consumer_offsets-12/.log > Starting offset: 0 > offset: 1810054758 position: 0 NoTimestampType: -1 isvalid: true payloadsize: > -1 magic: 0 compresscodec: NONE crc: 3127861909 keysize: 34 > :~$ /usr/bin/kafka-run-class kafka.tools.DumpLogSegments --deep-iteration > --files /kafka-logs/__consumer_offsets-12/000 > 0002147343575.log > Dumping /kafka-logs/__consumer_offsets-12/002147343575.log > Starting offset: 2147343575 > offset: 2147539884 position: 0 NoTimestampType: -1 isvalid: true paylo > adsize: -1 magic: 0 compresscodec: NONE crc: 2282192097 keysize: 34 > {noformat} > My guess is that since 2147539884 is larger than MAXINT, we are hitting this > exception. Was there a specific reason, this check was added in 0.10.2? > E.g. if the first offset is a key = "key 0" and then we have MAXINT + 1 of > "key 1" following, wouldn't we run into this situation whenever the log > cleaner runs? -- This m
[jira] [Created] (KAFKA-6809) connections-created metric does not behave as expected
Anna Povzner created KAFKA-6809: --- Summary: connections-created metric does not behave as expected Key: KAFKA-6809 URL: https://issues.apache.org/jira/browse/KAFKA-6809 Project: Kafka Issue Type: Bug Affects Versions: 1.0.1 Reporter: Anna Povzner "connections-created" sensor is described as "new connections established". It currently records only connections that the broker creates, but does not count connections received. Seems like we should also count connections received – either include them into this metric (and also clarify the description) or add a new metric (separately counting two types of connections). I am not sure how useful is to separate them, so I think we should do the first approach. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6809) connections-created metric does not behave as expected
[ https://issues.apache.org/jira/browse/KAFKA-6809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-6809: --- Affects Version/s: 1.1.0 > connections-created metric does not behave as expected > -- > > Key: KAFKA-6809 > URL: https://issues.apache.org/jira/browse/KAFKA-6809 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 1.0.1 >Reporter: Anna Povzner >Priority: Major > Fix For: 1.2.0, 1.1.1 > > > "connections-created" sensor is described as "new connections established". > It currently records only connections that the broker/client creates, but > does not count connections received. Seems like we should also count > connections received – either include them into this metric (and also clarify > the description) or add a new metric (separately counting two types of > connections). I am not sure how useful is to separate them, so I think we > should do the first approach. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6809) connections-created metric does not behave as expected
[ https://issues.apache.org/jira/browse/KAFKA-6809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-6809: --- Fix Version/s: 1.1.1 1.2.0 > connections-created metric does not behave as expected > -- > > Key: KAFKA-6809 > URL: https://issues.apache.org/jira/browse/KAFKA-6809 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 1.0.1 >Reporter: Anna Povzner >Priority: Major > Fix For: 1.2.0, 1.1.1 > > > "connections-created" sensor is described as "new connections established". > It currently records only connections that the broker/client creates, but > does not count connections received. Seems like we should also count > connections received – either include them into this metric (and also clarify > the description) or add a new metric (separately counting two types of > connections). I am not sure how useful is to separate them, so I think we > should do the first approach. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6809) connections-created metric does not behave as expected
[ https://issues.apache.org/jira/browse/KAFKA-6809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-6809: --- Description: "connections-created" sensor is described as "new connections established". It currently records only connections that the broker/client creates, but does not count connections received. Seems like we should also count connections received – either include them into this metric (and also clarify the description) or add a new metric (separately counting two types of connections). I am not sure how useful is to separate them, so I think we should do the first approach. (was: "connections-created" sensor is described as "new connections established". It currently records only connections that the broker creates, but does not count connections received. Seems like we should also count connections received – either include them into this metric (and also clarify the description) or add a new metric (separately counting two types of connections). I am not sure how useful is to separate them, so I think we should do the first approach.) > connections-created metric does not behave as expected > -- > > Key: KAFKA-6809 > URL: https://issues.apache.org/jira/browse/KAFKA-6809 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.1.0, 1.0.1 >Reporter: Anna Povzner >Priority: Major > Fix For: 1.2.0, 1.1.1 > > > "connections-created" sensor is described as "new connections established". > It currently records only connections that the broker/client creates, but > does not count connections received. Seems like we should also count > connections received – either include them into this metric (and also clarify > the description) or add a new metric (separately counting two types of > connections). I am not sure how useful is to separate them, so I think we > should do the first approach. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6592) NullPointerException thrown when executing ConsoleCosumer with deserializer set to `WindowedDeserializer`
[ https://issues.apache.org/jira/browse/KAFKA-6592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16445120#comment-16445120 ] huxihx commented on KAFKA-6592: --- [~guozhang] Can this Jira issue be closed safely? > NullPointerException thrown when executing ConsoleCosumer with deserializer > set to `WindowedDeserializer` > - > > Key: KAFKA-6592 > URL: https://issues.apache.org/jira/browse/KAFKA-6592 > Project: Kafka > Issue Type: Bug > Components: streams, tools >Affects Versions: 1.0.0 >Reporter: huxihx >Assignee: huxihx >Priority: Minor > > When reading streams app's output topic with WindowedDeserializer deserilizer > using kafka-console-consumer.sh, NullPointerException was thrown due to the > fact that the inner deserializer was not initialized since there is no place > in ConsoleConsumer to set this class. > Complete stack trace is shown below: > {code:java} > [2018-02-26 14:56:04,736] ERROR Unknown error when running consumer: > (kafka.tools.ConsoleConsumer$) > java.lang.NullPointerException > at > org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:89) > at > org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:35) > at > kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:544) > at scala.Option.map(Option.scala:146) > at kafka.tools.DefaultMessageFormatter.write$1(ConsoleConsumer.scala:545) > at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:560) > at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:147) > at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84) > at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54) > at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6791) Add a CoordinatorNodeProvider in KafkaAdminClient
[ https://issues.apache.org/jira/browse/KAFKA-6791?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] huxihx reassigned KAFKA-6791: - Assignee: huxihx > Add a CoordinatorNodeProvider in KafkaAdminClient > - > > Key: KAFKA-6791 > URL: https://issues.apache.org/jira/browse/KAFKA-6791 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Assignee: huxihx >Priority: Major > > As we add more and more coordinator-related requests to the admin client, we > can consider adding a CoordinatorNodeProvider to consolidate the common logic > pattern of finding the coordinator first, then send the corresponding request. > Note that 1) with this provider interface it is almost not possible to batch > multiple groupIds per coordinator; there has to be a little more refactoring > to make it work. 2) for some requests like list consumers, group ids are not > known beforehand and hence we cannot use this provider as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6592) NullPointerException thrown when executing ConsoleCosumer with deserializer set to `WindowedDeserializer`
[ https://issues.apache.org/jira/browse/KAFKA-6592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-6592. -- Resolution: Fixed Thanks for the reminder! > NullPointerException thrown when executing ConsoleCosumer with deserializer > set to `WindowedDeserializer` > - > > Key: KAFKA-6592 > URL: https://issues.apache.org/jira/browse/KAFKA-6592 > Project: Kafka > Issue Type: Bug > Components: streams, tools >Affects Versions: 1.0.0 >Reporter: huxihx >Assignee: huxihx >Priority: Minor > > When reading streams app's output topic with WindowedDeserializer deserilizer > using kafka-console-consumer.sh, NullPointerException was thrown due to the > fact that the inner deserializer was not initialized since there is no place > in ConsoleConsumer to set this class. > Complete stack trace is shown below: > {code:java} > [2018-02-26 14:56:04,736] ERROR Unknown error when running consumer: > (kafka.tools.ConsoleConsumer$) > java.lang.NullPointerException > at > org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:89) > at > org.apache.kafka.streams.kstream.internals.WindowedDeserializer.deserialize(WindowedDeserializer.java:35) > at > kafka.tools.DefaultMessageFormatter.$anonfun$writeTo$2(ConsoleConsumer.scala:544) > at scala.Option.map(Option.scala:146) > at kafka.tools.DefaultMessageFormatter.write$1(ConsoleConsumer.scala:545) > at kafka.tools.DefaultMessageFormatter.writeTo(ConsoleConsumer.scala:560) > at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:147) > at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:84) > at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:54) > at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6791) Add a CoordinatorNodeProvider in KafkaAdminClient
[ https://issues.apache.org/jira/browse/KAFKA-6791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16445248#comment-16445248 ] ASF GitHub Bot commented on KAFKA-6791: --- huxihx opened a new pull request #4902: KAFKA-6791: Add CoordinatorNodeProvider in KafkaAdminClient URL: https://github.com/apache/kafka/pull/4902 KAFKA-6791: Add CoordinatorNodeProvider in KafkaAdminClient https://issues.apache.org/jira/browse/KAFKA-6791 Add CoordinatorNodeProvider interface to support batch retrieval for group coordinators. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a CoordinatorNodeProvider in KafkaAdminClient > - > > Key: KAFKA-6791 > URL: https://issues.apache.org/jira/browse/KAFKA-6791 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Assignee: huxihx >Priority: Major > > As we add more and more coordinator-related requests to the admin client, we > can consider adding a CoordinatorNodeProvider to consolidate the common logic > pattern of finding the coordinator first, then send the corresponding request. > Note that 1) with this provider interface it is almost not possible to batch > multiple groupIds per coordinator; there has to be a little more refactoring > to make it work. 2) for some requests like list consumers, group ids are not > known beforehand and hence we cannot use this provider as well. -- This message was sent by Atlassian JIRA (v7.6.3#76005)