[jira] [Commented] (KAFKA-6360) RocksDB segments not removed when store is closed causes re-initialization to fail
[ https://issues.apache.org/jira/browse/KAFKA-6360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290670#comment-16290670 ] ASF GitHub Bot commented on KAFKA-6360: --- GitHub user dguy opened a pull request: https://github.com/apache/kafka/pull/4324 KAFKA-6360: Clear RocksDB Segments when store is closed Now that we support re-initializing state stores, we need to clear the segments when the store is closed so that they can be re-opened. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) You can merge this pull request into a Git repository by running: $ git pull https://github.com/dguy/kafka kafka-6360 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4324.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4324 commit 4c84af7522d847f82b14e5b3b4c589b0223a5bd8 Author: Damian Guy Date: 2017-12-14T10:13:44Z clear segments on close > RocksDB segments not removed when store is closed causes re-initialization to > fail > -- > > Key: KAFKA-6360 > URL: https://issues.apache.org/jira/browse/KAFKA-6360 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Damian Guy >Assignee: Damian Guy >Priority: Blocker > Fix For: 1.1.0 > > > When a store is re-initialized it is first closed, before it is opened again. > When this happens the segments in the {{Segments}} class are closed, but they > are not removed from the list of segments. So when the store is > re-initialized the old closed segments are used. This results in: > {code} > [2017-12-13 09:29:32,037] ERROR [streams-saak-test-client-StreamThread-3] > task [1_3] Failed to flush state store > KSTREAM-AGGREGATE-STATE-STORE-24: > (org.apache.kafka.streams.processor.internals.ProcessorStateManager) > org.apache.kafka.streams.errors.InvalidStateStoreException: Store > KSTREAM-AGGREGATE-STATE-STORE-24.151308000 is currently closed > at > org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:241) > at > org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:289) > at > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:102) > at > org.apache.kafka.streams.state.internals.RocksDBSessionStore.put(RocksDBSessionStore.java:122) > at > org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:78) > at > org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:33) > at > org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:179) > at > org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38) > at > org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100) > at > org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127) > at > org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5123) Refactor ZkUtils readData* methods
[ https://issues.apache.org/jira/browse/KAFKA-5123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290704#comment-16290704 ] ASF GitHub Bot commented on KAFKA-5123: --- Github user baluchicken closed the pull request at: https://github.com/apache/kafka/pull/3554 > Refactor ZkUtils readData* methods > --- > > Key: KAFKA-5123 > URL: https://issues.apache.org/jira/browse/KAFKA-5123 > Project: Kafka > Issue Type: Bug >Reporter: Balint Molnar >Assignee: Balint Molnar >Priority: Minor > Fix For: 1.1.0 > > > Usually only the data value is required but every readData method in the > ZkUtils returns a Tuple with the data and the stat. > https://github.com/apache/kafka/pull/2888 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-5123) Refactor ZkUtils readData* methods
[ https://issues.apache.org/jira/browse/KAFKA-5123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Balint Molnar resolved KAFKA-5123. -- Resolution: Won't Fix > Refactor ZkUtils readData* methods > --- > > Key: KAFKA-5123 > URL: https://issues.apache.org/jira/browse/KAFKA-5123 > Project: Kafka > Issue Type: Bug >Reporter: Balint Molnar >Assignee: Balint Molnar >Priority: Minor > Fix For: 1.1.0 > > > Usually only the data value is required but every readData method in the > ZkUtils returns a Tuple with the data and the stat. > https://github.com/apache/kafka/pull/2888 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6361) Fast leader fail over can lead to log divergence between leader and follower
[ https://issues.apache.org/jira/browse/KAFKA-6361?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-6361: --- Labels: reliability (was: ) > Fast leader fail over can lead to log divergence between leader and follower > > > Key: KAFKA-6361 > URL: https://issues.apache.org/jira/browse/KAFKA-6361 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson > Labels: reliability > > We have observed an edge case in the replication failover logic which can > cause a replica to permanently fall out of sync with the leader or, in the > worst case, actually have localized divergence between logs. This occurs in > spite of the improved truncation logic from KIP-101. > Suppose we have brokers A and B. Initially A is the leader in epoch 1. It > appends two batches: one in the range (0, 10) and the other in the range (11, > 20). The first one successfully replicates to B, but the second one does not. > In other words, the logs on the brokers look like this: > {code} > Broker A: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets [11, 20], leader epoch: 1 > Broker B: > 0: offsets [0, 10], leader epoch: 1 > {code} > Broker A then has a zk session expiration and broker B is elected with epoch > 2. It appends a new batch with offsets (11, n) to its local log. So we now > have this: > {code} > Broker A: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets [11, 20], leader epoch: 1 > Broker B: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets: [11, n], leader epoch: 2 > {code} > Normally we expect broker A to truncate to offset 11 on becoming the > follower, but before it is able to do so, broker B has its own zk session > expiration and broker A again becomes leader, now with epoch 3. It then > appends a new entry in the range (21, 30). The updated logs look like this: > {code} > Broker A: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets [11, 20], leader epoch: 1 > 2: offsets: [21, 30], leader epoch: 3 > Broker B: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets: [11, n], leader epoch: 2 > {code} > Now what happens next depends on the last offset of the batch appended in > epoch 2. On becoming follower, broker B will send an OffsetForLeaderEpoch > request to broker A with epoch 2. Broker A will respond that epoch 2 ends at > offset 21. There are three cases: > 1) n < 20: In this case, broker B will not do any truncation. It will begin > fetching from offset n, which will ultimately cause an out of order offset > error because broker A will return the full batch beginning from offset 11 > which broker B will be unable to append. > 2) n == 20: Again broker B does not truncate. It will fetch from offset 21 > and everything will appear fine though the logs have actually diverged. > 3) n > 20: Broker B will attempt to truncate to offset 21. Since this is in > the middle of the batch, it will truncate all the way to offset 10. It can > begin fetching from offset 11 and everything is fine. > The case we have actually seen is the first one. The second one would likely > go unnoticed in practice and everything is fine in the third case. To > workaround the issue, we deleted the active segment on the replica which > allowed it to re-replicate consistently from the leader. > I'm not sure the best solution for this scenario. Maybe if the leader isn't > aware of an epoch, it should always respond with {{UNDEFINED_EPOCH_OFFSET}} > instead of using the offset of the next highest epoch. That would cause the > follower to truncate using its high watermark. Or perhaps instead of doing > so, it could send another OffsetForLeaderEpoch request at the next previous > cached epoch and then truncate using that. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6199) Single broker with fast growing heap usage
[ https://issues.apache.org/jira/browse/KAFKA-6199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robin Tweedie updated KAFKA-6199: - Attachment: jstack-2017-12-08.scrubbed.out Apologies for the delay in uploading this, I took a snapshot before restarting the broker on the 8th. I've scrubbed out a few IP addresses and hostnames as {{__SCRUBBED__}}. > Single broker with fast growing heap usage > -- > > Key: KAFKA-6199 > URL: https://issues.apache.org/jira/browse/KAFKA-6199 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.10.2.1 > Environment: Amazon Linux >Reporter: Robin Tweedie > Attachments: Screen Shot 2017-11-10 at 1.55.33 PM.png, Screen Shot > 2017-11-10 at 11.59.06 AM.png, dominator_tree.png, histo_live.txt, > histo_live_20171206.txt, histo_live_80.txt, jstack-2017-12-08.scrubbed.out, > merge_shortest_paths.png, path2gc.png > > > We have a single broker in our cluster of 25 with fast growing heap usage > which necessitates us restarting it every 12 hours. If we don't restart the > broker, it becomes very slow from long GC pauses and eventually has > {{OutOfMemory}} errors. > See {{Screen Shot 2017-11-10 at 11.59.06 AM.png}} for a graph of heap usage > percentage on the broker. A "normal" broker in the same cluster stays below > 50% (averaged) over the same time period. > We have taken heap dumps when the broker's heap usage is getting dangerously > high, and there are a lot of retained {{NetworkSend}} objects referencing > byte buffers. > We also noticed that the single affected broker logs a lot more of this kind > of warning than any other broker: > {noformat} > WARN Attempting to send response via channel for which there is no open > connection, connection id 13 (kafka.network.Processor) > {noformat} > See {{Screen Shot 2017-11-10 at 1.55.33 PM.png}} for counts of that WARN log > message visualized across all the brokers (to show it happens a bit on other > brokers, but not nearly as much as it does on the "bad" broker). > I can't make the heap dumps public, but would appreciate advice on how to pin > down the problem better. We're currently trying to narrow it down to a > particular client, but without much success so far. > Let me know what else I could investigate or share to track down the source > of this leak. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6362) auto commit not work since coordinatorUnknown() is always true.
Renkai Ge created KAFKA-6362: Summary: auto commit not work since coordinatorUnknown() is always true. Key: KAFKA-6362 URL: https://issues.apache.org/jira/browse/KAFKA-6362 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.10.2.1 Reporter: Renkai Ge {code} [2017-12-14 20:09:23.501] [Kafka 0.10 Fetcher for Source: source_bj-docker-large (14/40)] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [11.192.77.42:3002, 11.192.73.43:3002, 11.192.73.66:3002] check.crcs = true client.id = connections.max.idle.ms = 54 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = tcprtdetail_flink heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 30 max.poll.records = 500 metadata.max.age.ms = 30 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 3 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 6 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 1 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: source_bj-docker-large (14/40)] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1 [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: source_bj-docker-large (14/40)] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : e89bffd6b2eff799 {code} My kafka java client cannot auto commit.After add some debug log,I found that the coordinatorUnknown() function in [ConsumerCoordinator.java#L604|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L604] always returns true,and nextAutoCommitDeadline just increases infinitly.Should there be a lookupCoordinator() after line 604 like in [ConsumerCoordinator.java#L508|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L508].After I add lookupCoordinator() next to line 604.The consumer can auto commit offset properly. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6362) auto commit not work since coordinatorUnknown() is always true.
[ https://issues.apache.org/jira/browse/KAFKA-6362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Renkai Ge updated KAFKA-6362: - Description: {code} [2017-12-14 20:09:23.501] [Kafka 0.10 Fetcher for Source: source_bj-docker-large (14/40)] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [11.192.77.42:3002, 11.192.73.43:3002, 11.192.73.66:3002] check.crcs = true client.id = connections.max.idle.ms = 54 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = tcprtdetail_flink heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 30 max.poll.records = 500 metadata.max.age.ms = 30 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 3 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 6 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 1 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: source_bj-docker-large (14/40)] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.10.2.1 [2017-12-14 20:09:23.502] [Kafka 0.10 Fetcher for Source: source_bj-docker-large (14/40)] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : e89bffd6b2eff799 {code} My kafka java client cannot auto commit.After add some debug log,I found that the coordinatorUnknown() function in [ConsumerCoordinator.java#L604|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L604] always returns true,and nextAutoCommitDeadline just increases infinitly.Should there be a lookupCoordinator() after line 604 like in [ConsumerCoordinator.java#L508|https://github.com/apache/kafka/blob/0.10.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L508]?After I add lookupCoordinator() next to line 604.The consumer can auto commit offset properly. was: {code} [2017-12-14 20:09:23.501] [Kafka 0.10 Fetcher for Source: source_bj-docker-large (14/40)] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [11.192.77.42:3002, 11.192.73.43:3002, 11.192.73.66:3002] check.crcs = true client.id = connections.max.idle.ms = 54 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = tcprtdetail_flink heartbeat.interval.ms = 3000 interceptor.classes = null key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 30 max.poll.records = 500 metadata.max.age.ms = 30 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 3 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.ms = 50 request.timeout.ms = 305000
[jira] [Commented] (KAFKA-6260) AbstractCoordinator not clearly handles NULL Exception
[ https://issues.apache.org/jira/browse/KAFKA-6260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290783#comment-16290783 ] Rob Gevers commented on KAFKA-6260: --- We ran into this issue as well. In our situation the consumer was fine until something slowed down processing and caused us to pause consumption. Our config is very different, though. We are using the default request.timeout.ms which is 10 seconds, but our fetch.max.wait.ms is set to 1 second. That seems like a pretty significant margin so i'm not sure that the two configs being too close together is the problem for us. Once our consumers were blocked the issue was persistent and created a lot of problems since the NPE appears to have dropped that batch of messages but subsequent attempts still succeeded, creating out of order processing. I still have some more investigation to do but I think it will prevent us from using the 1.0.0 release as is. > AbstractCoordinator not clearly handles NULL Exception > -- > > Key: KAFKA-6260 > URL: https://issues.apache.org/jira/browse/KAFKA-6260 > Project: Kafka > Issue Type: Bug >Affects Versions: 1.0.0 > Environment: RedHat Linux >Reporter: Seweryn Habdank-Wojewodzki >Assignee: Jason Gustafson > Fix For: 1.1.0, 1.0.1 > > > The error reporting is not clear. But it seems that Kafka Heartbeat shuts > down application due to NULL exception caused by "fake" disconnections. > One more comment. We are processing messages in the stream, but sometimes we > have to block processing for minutes, as consumers are not handling too much > load. Is it possibble that when stream is waiting, then heartbeat is as well > blocked? > Can you check that? > {code} > 2017-11-23 23:54:47 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending Heartbeat request to coordinator > cljp01.eb.lan.at:9093 (id: 2147483646 rack: null) > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Sending HEARTBEAT > {group_id=kafka-endpoint,generation_id=3834,member_id=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer-94f18be5-e49a-4817-9e5a-fe82a64e0b08} > with correlation id 24 to node 2147483646 > 2017-11-23 23:54:50 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Completed receive from node 2147483646 for HEARTBEAT > with correlation id 24, received {throttle_time_ms=0,error_code=0} > 2017-11-23 23:54:50 DEBUG AbstractCoordinator:177 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Received successful Heartbeat response > 2017-11-23 23:54:52 DEBUG NetworkClient:183 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Disconnecting from node 1 due to request timeout. > 2017-11-23 23:54:52 TRACE NetworkClient:135 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled request > {replica_id=-1,max_wait_time=6000,min_bytes=1,max_bytes=52428800,isolation_level=0,topics=[{topic=clj_internal_topic,partitions=[{partition=6,fetch_offset=211558395,log_start_offset=-1,max_bytes=1048576},{partition=8,fetch_offset=210178209,log_start_offset=-1,max_bytes=1048576},{partition=0,fetch_offset=209353523,log_start_offset=-1,max_bytes=1048576},{partition=2,fetch_offset=209291462,log_start_offset=-1,max_bytes=1048576},{partition=4,fetch_offset=210728595,log_start_offset=-1,max_bytes=1048576}]}]} > with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG ConsumerNetworkClient:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Cancelled FETCH request RequestHeader(apiKey=FETCH, > apiVersion=6, > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > correlationId=21) with correlation id 21 due to node 1 being disconnected > 2017-11-23 23:54:52 DEBUG Fetcher:195 - [Consumer > clientId=kafka-endpoint-be51569b-8795-4709-8ec8-28c9cd099a31-StreamThread-1-consumer, > groupId=kafka-endpoint] Fetch request > {clj_internal_topic-6=(offset=211558395, logStartOffset=-1, > maxBytes=10
[jira] [Resolved] (KAFKA-6350) File descriptors leak with persistent KeyValueStore
[ https://issues.apache.org/jira/browse/KAFKA-6350?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alin Gheorghe resolved KAFKA-6350. -- Resolution: Not A Bug > File descriptors leak with persistent KeyValueStore > --- > > Key: KAFKA-6350 > URL: https://issues.apache.org/jira/browse/KAFKA-6350 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1, 1.0.0 >Reporter: Alin Gheorghe > > When using the low level processor API with persistent KV stores we observed > continuous increase in the number of SSTs on disk. The file descriptors > remain open until reaching the configured OS limit (100k in our case), when > Kafka Streams crashes with "Too many open files" exception. In our case this > happens regularly in about 17 hours of uptime. The commit interval is set to > 5 seconds and we never call it from our code. > Our topology consists in 1 source topic, 7 processors, 2 KV stores and 2 sink > topics. Retention policy is set to 2 days and the topics have 25 partitions. > Using the punctuation mechanism in Kafka Streams 1.0.0 we perform a cleanup > every 30 seconds which checks for keys that have not been updated for at > least 20 minutes. The KV stores hold temporary user sessions which last for 5 > minutes and have about 50 updates (user actions). > 2017-12-11 10:57:03 > {code:none} > ~ # lsof 1 | grep rocksdb.*.sst | wc -l > 54 > {code} > 2017-12-11 11:45:31 > {code:none} > ~ # lsof 1 | grep rocksdb.*.sst | wc -l > 6742 > {code} > We use the following state store APIs: *all*, *get*, *delete*, *put*. > When switching to in memory state stores this obviously doesn't happen. > We have also tried to override the RocksDB parameter to *max_open_files* > which defaults to -1, but the configured values seems to be ignored and > RocksDB surpasses that threshold. > Sometimes the application crashes with different error which may or may not > be related. We will file a different Jira issue if it seems unrelated: > {code:none} > RocksDBExceptionJni::ThrowNew/StatusJni - Error: unexpected exception! > 2017-12-12 11:37:25,758 > [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] WARN > org.apache.kafka.streams.KafkaStreams - stream-client > [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444]All stream threads have > died. The instance will be in error state and should be closed. > 2017-12-12 11:37:25,758 > [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] ERROR > com.X.Y.Z.ApiStreaming$ - [ApiStreaming] Thread 12 died with exception task > [0_257] Failed to flush state store eventQueueStore. Shutting down the entire > Kafka Streams process > org.apache.kafka.streams.errors.ProcessorStateException: task [0_257] Failed > to flush state store eventQueueStore > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248) > at > org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:324) > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:304) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:289) > at > org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:451) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:380) > at > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:309) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) > Caused by: java.lang.IllegalArgumentException: Illegal value provided for > SubCode. > at org.rocksdb.Status$SubCode.getSubCode(Status.java:109) > at org.rocksdb.Status.(Status.java:30) > at org.rocksdb.RocksDB.flush(Native Method) > at org.rocksdb.RocksDB.flush(RocksDB.java:1743) > at > org.apache.kafka.streams.state.internals.RocksDBStore.flushInternal(RocksDBStore.java:435) > at > org.apache.kafka.streams.
[jira] [Commented] (KAFKA-6350) File descriptors leak with persistent KeyValueStore
[ https://issues.apache.org/jira/browse/KAFKA-6350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290818#comment-16290818 ] Alin Gheorghe commented on KAFKA-6350: -- Hi [~damianguy]! Thank you for your prompt response! Indeed, that was the problem. We ran a longevity test and it seems to not leak file descriptors anymore. Thank you very much! > File descriptors leak with persistent KeyValueStore > --- > > Key: KAFKA-6350 > URL: https://issues.apache.org/jira/browse/KAFKA-6350 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1, 1.0.0 >Reporter: Alin Gheorghe > > When using the low level processor API with persistent KV stores we observed > continuous increase in the number of SSTs on disk. The file descriptors > remain open until reaching the configured OS limit (100k in our case), when > Kafka Streams crashes with "Too many open files" exception. In our case this > happens regularly in about 17 hours of uptime. The commit interval is set to > 5 seconds and we never call it from our code. > Our topology consists in 1 source topic, 7 processors, 2 KV stores and 2 sink > topics. Retention policy is set to 2 days and the topics have 25 partitions. > Using the punctuation mechanism in Kafka Streams 1.0.0 we perform a cleanup > every 30 seconds which checks for keys that have not been updated for at > least 20 minutes. The KV stores hold temporary user sessions which last for 5 > minutes and have about 50 updates (user actions). > 2017-12-11 10:57:03 > {code:none} > ~ # lsof 1 | grep rocksdb.*.sst | wc -l > 54 > {code} > 2017-12-11 11:45:31 > {code:none} > ~ # lsof 1 | grep rocksdb.*.sst | wc -l > 6742 > {code} > We use the following state store APIs: *all*, *get*, *delete*, *put*. > When switching to in memory state stores this obviously doesn't happen. > We have also tried to override the RocksDB parameter to *max_open_files* > which defaults to -1, but the configured values seems to be ignored and > RocksDB surpasses that threshold. > Sometimes the application crashes with different error which may or may not > be related. We will file a different Jira issue if it seems unrelated: > {code:none} > RocksDBExceptionJni::ThrowNew/StatusJni - Error: unexpected exception! > 2017-12-12 11:37:25,758 > [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] WARN > org.apache.kafka.streams.KafkaStreams - stream-client > [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444]All stream threads have > died. The instance will be in error state and should be closed. > 2017-12-12 11:37:25,758 > [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] ERROR > com.X.Y.Z.ApiStreaming$ - [ApiStreaming] Thread 12 died with exception task > [0_257] Failed to flush state store eventQueueStore. Shutting down the entire > Kafka Streams process > org.apache.kafka.streams.errors.ProcessorStateException: task [0_257] Failed > to flush state store eventQueueStore > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248) > at > org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:324) > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:304) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:289) > at > org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:451) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:380) > at > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:309) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) > Caused by: java.lang.IllegalArgumentException: Illegal value provided for > SubCode. > at org.rocksdb.Status$SubCode.getSubCode(Status.java:109) > at org.rocksdb.Status.(Status.java:30) > at org.rocksdb.RocksDB.flush(Nati
[jira] [Commented] (KAFKA-6350) File descriptors leak with persistent KeyValueStore
[ https://issues.apache.org/jira/browse/KAFKA-6350?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16290823#comment-16290823 ] Alin Gheorghe commented on KAFKA-6350: -- P.S.: We haven’t been able to reproduce the flush-related exception in the issue description either. > File descriptors leak with persistent KeyValueStore > --- > > Key: KAFKA-6350 > URL: https://issues.apache.org/jira/browse/KAFKA-6350 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.1, 1.0.0 >Reporter: Alin Gheorghe > > When using the low level processor API with persistent KV stores we observed > continuous increase in the number of SSTs on disk. The file descriptors > remain open until reaching the configured OS limit (100k in our case), when > Kafka Streams crashes with "Too many open files" exception. In our case this > happens regularly in about 17 hours of uptime. The commit interval is set to > 5 seconds and we never call it from our code. > Our topology consists in 1 source topic, 7 processors, 2 KV stores and 2 sink > topics. Retention policy is set to 2 days and the topics have 25 partitions. > Using the punctuation mechanism in Kafka Streams 1.0.0 we perform a cleanup > every 30 seconds which checks for keys that have not been updated for at > least 20 minutes. The KV stores hold temporary user sessions which last for 5 > minutes and have about 50 updates (user actions). > 2017-12-11 10:57:03 > {code:none} > ~ # lsof 1 | grep rocksdb.*.sst | wc -l > 54 > {code} > 2017-12-11 11:45:31 > {code:none} > ~ # lsof 1 | grep rocksdb.*.sst | wc -l > 6742 > {code} > We use the following state store APIs: *all*, *get*, *delete*, *put*. > When switching to in memory state stores this obviously doesn't happen. > We have also tried to override the RocksDB parameter to *max_open_files* > which defaults to -1, but the configured values seems to be ignored and > RocksDB surpasses that threshold. > Sometimes the application crashes with different error which may or may not > be related. We will file a different Jira issue if it seems unrelated: > {code:none} > RocksDBExceptionJni::ThrowNew/StatusJni - Error: unexpected exception! > 2017-12-12 11:37:25,758 > [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] WARN > org.apache.kafka.streams.KafkaStreams - stream-client > [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444]All stream threads have > died. The instance will be in error state and should be closed. > 2017-12-12 11:37:25,758 > [processing-08dd0a1b-a423-4a41-b7e0-995c9d985444-StreamThread-1] ERROR > com.X.Y.Z.ApiStreaming$ - [ApiStreaming] Thread 12 died with exception task > [0_257] Failed to flush state store eventQueueStore. Shutting down the entire > Kafka Streams process > org.apache.kafka.streams.errors.ProcessorStateException: task [0_257] Failed > to flush state store eventQueueStore > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248) > at > org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:324) > at > org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:304) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:289) > at > org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:451) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:380) > at > org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:309) > at > org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) > Caused by: java.lang.IllegalArgumentException: Illegal value provided for > SubCode. > at org.rocksdb.Status$SubCode.getSubCode(Status.java:109) > at org.rocksdb.Status.(Status.java:30) > at org.rocksdb.RocksDB.flush(Native Method) > at org.rocksdb.RocksDB.flush(RocksDB.java:1743) > at
[jira] [Commented] (KAFKA-6252) A metric named 'XX' already exists, can't register another one.
[ https://issues.apache.org/jira/browse/KAFKA-6252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291119#comment-16291119 ] Randall Hauch commented on KAFKA-6252: -- No, I don't think there are any workarounds, other than restarting the worker. Of course the best workaround would be for the connectors to correct their behavior, but I understand that the framework should be more tolerant of such implementations where it can. > A metric named 'XX' already exists, can't register another one. > --- > > Key: KAFKA-6252 > URL: https://issues.apache.org/jira/browse/KAFKA-6252 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 > Environment: Linux >Reporter: Alexis Sellier >Priority: Critical > > When a connector crashes (or is not implemented correctly by not > stopping/interrupting {{poll()}}), It cannot be restarted and an exception > like this is thrown > {code:java} > java.lang.IllegalArgumentException: A metric named 'MetricName > [name=offset-commit-max-time-ms, group=connector-task-metrics, > description=The maximum time in milliseconds taken by this task to commit > offsets., tags={connector=hdfs-sink-connector-recover, task=0}]' already > exists, can't register another one. > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:532) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:256) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:241) > at > org.apache.kafka.connect.runtime.WorkerTask$TaskMetricsGroup.(WorkerTask.java:328) > at > org.apache.kafka.connect.runtime.WorkerTask.(WorkerTask.java:69) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.(WorkerSinkTask.java:98) > at > org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:449) > at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:404) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:852) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:108) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:866) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder$13.call(DistributedHerder.java:862) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > I guess it's because the function taskMetricsGroup.close is not call in all > the cases -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-6360) RocksDB segments not removed when store is closed causes re-initialization to fail
[ https://issues.apache.org/jira/browse/KAFKA-6360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-6360. -- Resolution: Fixed Issue resolved by pull request 4324 [https://github.com/apache/kafka/pull/4324] > RocksDB segments not removed when store is closed causes re-initialization to > fail > -- > > Key: KAFKA-6360 > URL: https://issues.apache.org/jira/browse/KAFKA-6360 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Damian Guy >Assignee: Damian Guy >Priority: Blocker > Fix For: 1.1.0 > > > When a store is re-initialized it is first closed, before it is opened again. > When this happens the segments in the {{Segments}} class are closed, but they > are not removed from the list of segments. So when the store is > re-initialized the old closed segments are used. This results in: > {code} > [2017-12-13 09:29:32,037] ERROR [streams-saak-test-client-StreamThread-3] > task [1_3] Failed to flush state store > KSTREAM-AGGREGATE-STATE-STORE-24: > (org.apache.kafka.streams.processor.internals.ProcessorStateManager) > org.apache.kafka.streams.errors.InvalidStateStoreException: Store > KSTREAM-AGGREGATE-STATE-STORE-24.151308000 is currently closed > at > org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:241) > at > org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:289) > at > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:102) > at > org.apache.kafka.streams.state.internals.RocksDBSessionStore.put(RocksDBSessionStore.java:122) > at > org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:78) > at > org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:33) > at > org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:179) > at > org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38) > at > org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100) > at > org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127) > at > org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6360) RocksDB segments not removed when store is closed causes re-initialization to fail
[ https://issues.apache.org/jira/browse/KAFKA-6360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291263#comment-16291263 ] ASF GitHub Bot commented on KAFKA-6360: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/4324 > RocksDB segments not removed when store is closed causes re-initialization to > fail > -- > > Key: KAFKA-6360 > URL: https://issues.apache.org/jira/browse/KAFKA-6360 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Damian Guy >Assignee: Damian Guy >Priority: Blocker > Fix For: 1.1.0 > > > When a store is re-initialized it is first closed, before it is opened again. > When this happens the segments in the {{Segments}} class are closed, but they > are not removed from the list of segments. So when the store is > re-initialized the old closed segments are used. This results in: > {code} > [2017-12-13 09:29:32,037] ERROR [streams-saak-test-client-StreamThread-3] > task [1_3] Failed to flush state store > KSTREAM-AGGREGATE-STATE-STORE-24: > (org.apache.kafka.streams.processor.internals.ProcessorStateManager) > org.apache.kafka.streams.errors.InvalidStateStoreException: Store > KSTREAM-AGGREGATE-STATE-STORE-24.151308000 is currently closed > at > org.apache.kafka.streams.state.internals.RocksDBStore.validateStoreOpen(RocksDBStore.java:241) > at > org.apache.kafka.streams.state.internals.RocksDBStore.put(RocksDBStore.java:289) > at > org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:102) > at > org.apache.kafka.streams.state.internals.RocksDBSessionStore.put(RocksDBSessionStore.java:122) > at > org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:78) > at > org.apache.kafka.streams.state.internals.ChangeLoggingSessionBytesStore.put(ChangeLoggingSessionBytesStore.java:33) > at > org.apache.kafka.streams.state.internals.CachingSessionStore.putAndMaybeForward(CachingSessionStore.java:179) > at > org.apache.kafka.streams.state.internals.CachingSessionStore.access$000(CachingSessionStore.java:38) > at > org.apache.kafka.streams.state.internals.CachingSessionStore$1.apply(CachingSessionStore.java:88) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:142) > at > org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:100) > at > org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127) > at > org.apache.kafka.streams.state.internals.CachingSessionStore.flush(CachingSessionStore.java:196) > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6363) Use MockAdminClient for any unit tests that depend on AdminClient
Guozhang Wang created KAFKA-6363: Summary: Use MockAdminClient for any unit tests that depend on AdminClient Key: KAFKA-6363 URL: https://issues.apache.org/jira/browse/KAFKA-6363 Project: Kafka Issue Type: Bug Reporter: Guozhang Wang Today we have a few unit tests other than KafkaAdminClientTest that relies on MockKafkaAdminClientEnv. About this class and MockKafkaAdminClientEnv, my thoughts: 1. MockKafkaAdminClientEnv is actually using a MockClient for the inner KafkaClient; it should be only used for the unit test of KafkaAdminClient itself. 2. For any other unit tests on classes that depend on AdminClient, we should be using the MockAdminClient that mocks the whole AdminClient. So I suggest 1) in TopicAdminTest use MockAdminClient instead; 2) in KafkaAdminClientTest use MockClient and added a new static constructor that takes a KafkaClient; 3) remove the MockKafkaAdminClientEnv. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6363) Use MockAdminClient for any unit tests that depend on AdminClient
[ https://issues.apache.org/jira/browse/KAFKA-6363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-6363: - Labels: newbie (was: ) > Use MockAdminClient for any unit tests that depend on AdminClient > - > > Key: KAFKA-6363 > URL: https://issues.apache.org/jira/browse/KAFKA-6363 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang > Labels: newbie > > Today we have a few unit tests other than KafkaAdminClientTest that relies on > MockKafkaAdminClientEnv. > About this class and MockKafkaAdminClientEnv, my thoughts: > 1. MockKafkaAdminClientEnv is actually using a MockClient for the inner > KafkaClient; it should be only used for the unit test of KafkaAdminClient > itself. > 2. For any other unit tests on classes that depend on AdminClient, we should > be using the MockAdminClient that mocks the whole AdminClient. > So I suggest 1) in TopicAdminTest use MockAdminClient instead; 2) in > KafkaAdminClientTest use MockClient and added a new static constructor that > takes a KafkaClient; 3) remove the MockKafkaAdminClientEnv. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6363) Use MockAdminClient for any unit tests that depend on AdminClient
[ https://issues.apache.org/jira/browse/KAFKA-6363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291310#comment-16291310 ] Guozhang Wang commented on KAFKA-6363: -- >From the previous discussion: {code} cmccabe 3 days ago Contributor Reposting some of the stuff I said offline here: It's great that we now have a MockAdminClient. This is something we discussed having before, but never got around to doing. As @guozhangwang said, MockKafkaAdminClientEnv is really just useful for testing KafkaAdminClient. I don't completely understand the proposal for removing MockKafkaAdminClientEnv -- it still seems useful for KafkaAdminClientTest-- but I'm probably missing something here. Is the idea to make it a static class in KafkaAdminClientTest? That might be good. guozhangwang a minute ago Contributor @cmccabe We could consider using a MockClient and added a new static constructor that takes a KafkaClient for KafkaAdminClientTest; this is basically what MockKafkaAdminClientEnv does. {code} > Use MockAdminClient for any unit tests that depend on AdminClient > - > > Key: KAFKA-6363 > URL: https://issues.apache.org/jira/browse/KAFKA-6363 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang > Labels: newbie > > Today we have a few unit tests other than KafkaAdminClientTest that relies on > MockKafkaAdminClientEnv. > About this class and MockKafkaAdminClientEnv, my thoughts: > 1. MockKafkaAdminClientEnv is actually using a MockClient for the inner > KafkaClient; it should be only used for the unit test of KafkaAdminClient > itself. > 2. For any other unit tests on classes that depend on AdminClient, we should > be using the MockAdminClient that mocks the whole AdminClient. > So I suggest 1) in TopicAdminTest use MockAdminClient instead; 2) in > KafkaAdminClientTest use MockClient and added a new static constructor that > takes a KafkaClient; 3) remove the MockKafkaAdminClientEnv. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6364) Add Second Check for End Offset During Restore
Bill Bejeck created KAFKA-6364: -- Summary: Add Second Check for End Offset During Restore Key: KAFKA-6364 URL: https://issues.apache.org/jira/browse/KAFKA-6364 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 1.0.0 Reporter: Bill Bejeck Assignee: Bill Bejeck Fix For: 1.0.1 We need to re-check the ending offset when restoring a changelog topic to guard against the race condition of an additional record appended to log immediately on restoring start. Also, need to add a check for KTable source topic and if offset limit is set. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5696) SourceConnector does not commit offset on rebalance
[ https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291456#comment-16291456 ] Victor Chicu commented on KAFKA-5696: - Any news? I have the same issue in case of rebalance if task is killed before offsets are committed Kafka Connect does not commit on stop partitions and offsets from source record to the OffsetStorage. In this case the connector take once again the same records from external storage and hence duplicate the records in the topic. :( > SourceConnector does not commit offset on rebalance > --- > > Key: KAFKA-5696 > URL: https://issues.apache.org/jira/browse/KAFKA-5696 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Oleg Kuznetsov >Priority: Critical > Labels: newbie > > I'm running SourceConnector, that reads files from storage and put data in > kafka. I want, in case of reconfiguration, offsets to be flushed. > Say, a file is completely processed, but source records are not yet committed > and in case of reconfiguration their offsets might be missing in store. > Is it possible to force committing offsets on reconfiguration? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5696) SourceConnector does not commit offset on rebalance
[ https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291456#comment-16291456 ] Victor Chicu edited comment on KAFKA-5696 at 12/14/17 7:42 PM: --- Any news? I have the same issue in case of rebalance if task is killed before offsets are committed Kafka Connect does not commit on stop partitions and offsets from source record to the OffsetStorage. In this case the any of kind of source connector takes once again the same data from external storage and hence duplicate the records in the topic. :( was (Author: loopnotzero): Any news? I have the same issue in case of rebalance if task is killed before offsets are committed Kafka Connect does not commit on stop partitions and offsets from source record to the OffsetStorage. In this case the connector take once again the same records from external storage and hence duplicate the records in the topic. :( > SourceConnector does not commit offset on rebalance > --- > > Key: KAFKA-5696 > URL: https://issues.apache.org/jira/browse/KAFKA-5696 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Oleg Kuznetsov >Priority: Critical > Labels: newbie > > I'm running SourceConnector, that reads files from storage and put data in > kafka. I want, in case of reconfiguration, offsets to be flushed. > Say, a file is completely processed, but source records are not yet committed > and in case of reconfiguration their offsets might be missing in store. > Is it possible to force committing offsets on reconfiguration? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5696) SourceConnector does not commit offset on rebalance
[ https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291456#comment-16291456 ] Victor Chicu edited comment on KAFKA-5696 at 12/14/17 7:42 PM: --- Any news? I have the same issue in case of rebalance if task is killed before offsets are committed Kafka Connect does not commit on stop partitions and offsets from source record to the OffsetStorage. In this case the any kind of source connectors takes once again the same data from external storage and hence duplicate the records in the topic. :( was (Author: loopnotzero): Any news? I have the same issue in case of rebalance if task is killed before offsets are committed Kafka Connect does not commit on stop partitions and offsets from source record to the OffsetStorage. In this case the any of kind of source connector takes once again the same data from external storage and hence duplicate the records in the topic. :( > SourceConnector does not commit offset on rebalance > --- > > Key: KAFKA-5696 > URL: https://issues.apache.org/jira/browse/KAFKA-5696 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Oleg Kuznetsov >Priority: Critical > Labels: newbie > > I'm running SourceConnector, that reads files from storage and put data in > kafka. I want, in case of reconfiguration, offsets to be flushed. > Say, a file is completely processed, but source records are not yet committed > and in case of reconfiguration their offsets might be missing in store. > Is it possible to force committing offsets on reconfiguration? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Issue Comment Deleted] (KAFKA-5696) SourceConnector does not commit offset on rebalance
[ https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victor Chicu updated KAFKA-5696: Comment: was deleted (was: Any news? I have the same issue in case of rebalance if task is killed before offsets are committed Kafka Connect does not commit on stop partitions and offsets from source record to the OffsetStorage. In this case the any kind of source connectors takes once again the same data from external storage and hence duplicate the records in the topic. :() > SourceConnector does not commit offset on rebalance > --- > > Key: KAFKA-5696 > URL: https://issues.apache.org/jira/browse/KAFKA-5696 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Oleg Kuznetsov >Priority: Critical > Labels: newbie > > I'm running SourceConnector, that reads files from storage and put data in > kafka. I want, in case of reconfiguration, offsets to be flushed. > Say, a file is completely processed, but source records are not yet committed > and in case of reconfiguration their offsets might be missing in store. > Is it possible to force committing offsets on reconfiguration? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5696) SourceConnector does not commit offset on rebalance
[ https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291466#comment-16291466 ] Victor Chicu commented on KAFKA-5696: - AI faced the same issue in the following casny > SourceConnector does not commit offset on rebalance > --- > > Key: KAFKA-5696 > URL: https://issues.apache.org/jira/browse/KAFKA-5696 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Oleg Kuznetsov >Priority: Critical > Labels: newbie > > I'm running SourceConnector, that reads files from storage and put data in > kafka. I want, in case of reconfiguration, offsets to be flushed. > Say, a file is completely processed, but source records are not yet committed > and in case of reconfiguration their offsets might be missing in store. > Is it possible to force committing offsets on reconfiguration? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5722) Refactor ConfigCommand to use the AdminClient
[ https://issues.apache.org/jira/browse/KAFKA-5722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291488#comment-16291488 ] Viktor Somogyi commented on KAFKA-5722: --- [~rsivaram] just wanted to update you, might publish it only next week, takes a bit more work than anticipated :). > Refactor ConfigCommand to use the AdminClient > - > > Key: KAFKA-5722 > URL: https://issues.apache.org/jira/browse/KAFKA-5722 > Project: Kafka > Issue Type: Improvement >Reporter: Viktor Somogyi >Assignee: Viktor Somogyi > Labels: kip, needs-kip > Fix For: 1.1.0 > > > The ConfigCommand currently uses a direct connection to zookeeper. The > zookeeper dependency should be deprecated and an AdminClient API created to > be used instead. > This change requires a KIP. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6365) How to add a client to list of available clients?
Lev Gorodinski created KAFKA-6365: - Summary: How to add a client to list of available clients? Key: KAFKA-6365 URL: https://issues.apache.org/jira/browse/KAFKA-6365 Project: Kafka Issue Type: Wish Reporter: Lev Gorodinski Priority: Trivial I'd like to add a client to: https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-.NET The client is: https://github.com/jet/kafunk .NET written in F# supports 0.8 0.9 0.10 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5696) SourceConnector does not commit offset on rebalance
[ https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291466#comment-16291466 ] Victor Chicu edited comment on KAFKA-5696 at 12/14/17 8:07 PM: --- Any news? I faced the same issue with JDBC connector and I think this problem can be reproduced not only with JDBC. To reproduce the problem you can start the connector with SELECT TOP 1 what will produce 1 record to the topic and right after trough REST API restart the connector task before offsets are going to be committed by hitting the offset flush interval. In this case before rebalancing on the task stop methods Kafka Connect does not commit offsets to the Kafka Node and even to the offsets storage. When task going up the JDBC connector trying again to query data from the database and it's gave again the same record that cause duplication data in the topic. This is because OffsetStorageReader does not contains last stored offset cause on rebalacing it was lost.. was (Author: loopnotzero): AI faced the same issue in the following casny > SourceConnector does not commit offset on rebalance > --- > > Key: KAFKA-5696 > URL: https://issues.apache.org/jira/browse/KAFKA-5696 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Oleg Kuznetsov >Priority: Critical > Labels: newbie > > I'm running SourceConnector, that reads files from storage and put data in > kafka. I want, in case of reconfiguration, offsets to be flushed. > Say, a file is completely processed, but source records are not yet committed > and in case of reconfiguration their offsets might be missing in store. > Is it possible to force committing offsets on reconfiguration? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6365) How to add a client to list of available clients?
[ https://issues.apache.org/jira/browse/KAFKA-6365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291524#comment-16291524 ] Jason Gustafson commented on KAFKA-6365: [~eulerfx] Do you have a username for https://cwiki.apache.org? If so, I can give you edit access and you can add your client. > How to add a client to list of available clients? > - > > Key: KAFKA-6365 > URL: https://issues.apache.org/jira/browse/KAFKA-6365 > Project: Kafka > Issue Type: Wish >Reporter: Lev Gorodinski >Priority: Trivial > > I'd like to add a client to: > https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-.NET > The client is: https://github.com/jet/kafunk > .NET written in F# supports 0.8 0.9 0.10 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5696) SourceConnector does not commit offset on rebalance
[ https://issues.apache.org/jira/browse/KAFKA-5696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291466#comment-16291466 ] Victor Chicu edited comment on KAFKA-5696 at 12/14/17 8:11 PM: --- Any news? I faced the same issue with JDBC connector and I think this problem can be reproduced not only with JDBC. To reproduce the problem you can start the connector with SELECT TOP 1 e.t.c. what will produce 1 record to the topic and after that try to request trough REST API restarting the connector task before offsets are going to be committed by hitting the offset flush interval. In this case before rebalancing on the task stop methods Kafka Connect does not commit offsets to the Kafka Node and even to the offsets storage. When task going up the JDBC connector trying again to query data from the database and it's gave again the same record that cause duplication data in the topic. This is because OffsetStorageReader does not contains last stored offset cause on rebalacing it was lost.. was (Author: loopnotzero): Any news? I faced the same issue with JDBC connector and I think this problem can be reproduced not only with JDBC. To reproduce the problem you can start the connector with SELECT TOP 1 what will produce 1 record to the topic and right after trough REST API restart the connector task before offsets are going to be committed by hitting the offset flush interval. In this case before rebalancing on the task stop methods Kafka Connect does not commit offsets to the Kafka Node and even to the offsets storage. When task going up the JDBC connector trying again to query data from the database and it's gave again the same record that cause duplication data in the topic. This is because OffsetStorageReader does not contains last stored offset cause on rebalacing it was lost.. > SourceConnector does not commit offset on rebalance > --- > > Key: KAFKA-5696 > URL: https://issues.apache.org/jira/browse/KAFKA-5696 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Oleg Kuznetsov >Priority: Critical > Labels: newbie > > I'm running SourceConnector, that reads files from storage and put data in > kafka. I want, in case of reconfiguration, offsets to be flushed. > Say, a file is completely processed, but source records are not yet committed > and in case of reconfiguration their offsets might be missing in store. > Is it possible to force committing offsets on reconfiguration? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6366) StackOverflowError in kafka-coordinator-heartbeat-thread
[ https://issues.apache.org/jira/browse/KAFKA-6366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291582#comment-16291582 ] Joerg Heinicke commented on KAFKA-6366: --- Ted Yu already suggested a fix on the mailing list: http://mail-archives.apache.org/mod_mbox/kafka-users/201712.mbox/%3CCALte62w6%3DpJObC%2Bi36BkoqbOLTKsQ%3DNrDDv6dM8abfwB5PspLA%40mail.gmail.com%3E > StackOverflowError in kafka-coordinator-heartbeat-thread > > > Key: KAFKA-6366 > URL: https://issues.apache.org/jira/browse/KAFKA-6366 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 1.0.0 >Reporter: Joerg Heinicke > > With Kafka 1.0 our consumer groups fall into a permanent cycle of rebalancing > once a StackOverflowError in the heartbeat thread occurred due to > connectivity issues of the consumers to the coordinating broker: > Immediately before the exception there are hundreds, if not thousands of log > entries of following type: > 2017-12-12 16:23:12.361 [kafka-coordinator-heartbeat-thread | > my-consumer-group] INFO - [Consumer clientId=consumer-4, > groupId=my-consumer-group] Marking the coordinator : (id: > 2147483645 rack: null) dead > The exceptions always happen somewhere in the DateFormat code, even > though at different lines. > 2017-12-12 16:23:12.363 [kafka-coordinator-heartbeat-thread | > my-consumer-group] ERROR - Uncaught exception in thread > 'kafka-coordinator-heartbeat-thread | my-consumer-group': > java.lang.StackOverflowError > at > java.text.DateFormatSymbols.getProviderInstance(DateFormatSymbols.java:362) > at > java.text.DateFormatSymbols.getInstance(DateFormatSymbols.java:340) > at java.util.Calendar.getDisplayName(Calendar.java:2110) > at java.text.SimpleDateFormat.subFormat(SimpleDateFormat.java:1125) > at java.text.SimpleDateFormat.format(SimpleDateFormat.java:966) > at java.text.SimpleDateFormat.format(SimpleDateFormat.java:936) > at java.text.DateFormat.format(DateFormat.java:345) > at > org.apache.log4j.helpers.PatternParser$DatePatternConverter.convert(PatternParser.java:443) > at > org.apache.log4j.helpers.PatternConverter.format(PatternConverter.java:65) > at org.apache.log4j.PatternLayout.format(PatternLayout.java:506) > at > org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310) > at org.apache.log4j.WriterAppender.append(WriterAppender.java:162) > at > org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) > at > org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) > at org.apache.log4j.Category.callAppenders(Category.java:206) > at org.apache.log4j.Category.forcedLog(Category.java:391) > at org.apache.log4j.Category.log(Category.java:856) > at > org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324) > at > org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496) > ... > the following 9 lines are repeated around hundred times. > ... > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797) > at > org.apache.kafka.clients.consumer.internal
[jira] [Created] (KAFKA-6366) StackOverflowError in kafka-coordinator-heartbeat-thread
Joerg Heinicke created KAFKA-6366: - Summary: StackOverflowError in kafka-coordinator-heartbeat-thread Key: KAFKA-6366 URL: https://issues.apache.org/jira/browse/KAFKA-6366 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 1.0.0 Reporter: Joerg Heinicke With Kafka 1.0 our consumer groups fall into a permanent cycle of rebalancing once a StackOverflowError in the heartbeat thread occurred due to connectivity issues of the consumers to the coordinating broker: Immediately before the exception there are hundreds, if not thousands of log entries of following type: 2017-12-12 16:23:12.361 [kafka-coordinator-heartbeat-thread | my-consumer-group] INFO - [Consumer clientId=consumer-4, groupId=my-consumer-group] Marking the coordinator : (id: 2147483645 rack: null) dead The exceptions always happen somewhere in the DateFormat code, even though at different lines. 2017-12-12 16:23:12.363 [kafka-coordinator-heartbeat-thread | my-consumer-group] ERROR - Uncaught exception in thread 'kafka-coordinator-heartbeat-thread | my-consumer-group': java.lang.StackOverflowError at java.text.DateFormatSymbols.getProviderInstance(DateFormatSymbols.java:362) at java.text.DateFormatSymbols.getInstance(DateFormatSymbols.java:340) at java.util.Calendar.getDisplayName(Calendar.java:2110) at java.text.SimpleDateFormat.subFormat(SimpleDateFormat.java:1125) at java.text.SimpleDateFormat.format(SimpleDateFormat.java:966) at java.text.SimpleDateFormat.format(SimpleDateFormat.java:936) at java.text.DateFormat.format(DateFormat.java:345) at org.apache.log4j.helpers.PatternParser$DatePatternConverter.convert(PatternParser.java:443) at org.apache.log4j.helpers.PatternConverter.format(PatternConverter.java:65) at org.apache.log4j.PatternLayout.format(PatternLayout.java:506) at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310) at org.apache.log4j.WriterAppender.append(WriterAppender.java:162) at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) at org.apache.log4j.Category.callAppenders(Category.java:206) at org.apache.log4j.Category.forcedLog(Category.java:391) at org.apache.log4j.Category.log(Category.java:856) at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324) at org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) at org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496) ... the following 9 lines are repeated around hundred times. ... at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797) at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) at org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) at org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6366) StackOverflowError in kafka-coordinator-heartbeat-thread
[ https://issues.apache.org/jira/browse/KAFKA-6366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated KAFKA-6366: -- Attachment: 6366.v1.txt First attempt for fixing the stack overflow > StackOverflowError in kafka-coordinator-heartbeat-thread > > > Key: KAFKA-6366 > URL: https://issues.apache.org/jira/browse/KAFKA-6366 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 1.0.0 >Reporter: Joerg Heinicke > Attachments: 6366.v1.txt > > > With Kafka 1.0 our consumer groups fall into a permanent cycle of rebalancing > once a StackOverflowError in the heartbeat thread occurred due to > connectivity issues of the consumers to the coordinating broker: > Immediately before the exception there are hundreds, if not thousands of log > entries of following type: > 2017-12-12 16:23:12.361 [kafka-coordinator-heartbeat-thread | > my-consumer-group] INFO - [Consumer clientId=consumer-4, > groupId=my-consumer-group] Marking the coordinator : (id: > 2147483645 rack: null) dead > The exceptions always happen somewhere in the DateFormat code, even > though at different lines. > 2017-12-12 16:23:12.363 [kafka-coordinator-heartbeat-thread | > my-consumer-group] ERROR - Uncaught exception in thread > 'kafka-coordinator-heartbeat-thread | my-consumer-group': > java.lang.StackOverflowError > at > java.text.DateFormatSymbols.getProviderInstance(DateFormatSymbols.java:362) > at > java.text.DateFormatSymbols.getInstance(DateFormatSymbols.java:340) > at java.util.Calendar.getDisplayName(Calendar.java:2110) > at java.text.SimpleDateFormat.subFormat(SimpleDateFormat.java:1125) > at java.text.SimpleDateFormat.format(SimpleDateFormat.java:966) > at java.text.SimpleDateFormat.format(SimpleDateFormat.java:936) > at java.text.DateFormat.format(DateFormat.java:345) > at > org.apache.log4j.helpers.PatternParser$DatePatternConverter.convert(PatternParser.java:443) > at > org.apache.log4j.helpers.PatternConverter.format(PatternConverter.java:65) > at org.apache.log4j.PatternLayout.format(PatternLayout.java:506) > at > org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310) > at org.apache.log4j.WriterAppender.append(WriterAppender.java:162) > at > org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) > at > org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) > at org.apache.log4j.Category.callAppenders(Category.java:206) > at org.apache.log4j.Category.forcedLog(Category.java:391) > at org.apache.log4j.Category.log(Category.java:856) > at > org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324) > at > org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496) > ... > the following 9 lines are repeated around hundred times. > ... > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:653) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
[jira] [Commented] (KAFKA-6365) How to add a client to list of available clients?
[ https://issues.apache.org/jira/browse/KAFKA-6365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291595#comment-16291595 ] Lev Gorodinski commented on KAFKA-6365: --- [~hachikuji] I do, it is *eulerfx*. Thanks! > How to add a client to list of available clients? > - > > Key: KAFKA-6365 > URL: https://issues.apache.org/jira/browse/KAFKA-6365 > Project: Kafka > Issue Type: Wish >Reporter: Lev Gorodinski >Priority: Trivial > > I'd like to add a client to: > https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-.NET > The client is: https://github.com/jet/kafunk > .NET written in F# supports 0.8 0.9 0.10 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5722) Refactor ConfigCommand to use the AdminClient
[ https://issues.apache.org/jira/browse/KAFKA-5722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291597#comment-16291597 ] Rajini Sivaram commented on KAFKA-5722: --- [~viktorsomogyi] Thank you for working on this! > Refactor ConfigCommand to use the AdminClient > - > > Key: KAFKA-5722 > URL: https://issues.apache.org/jira/browse/KAFKA-5722 > Project: Kafka > Issue Type: Improvement >Reporter: Viktor Somogyi >Assignee: Viktor Somogyi > Labels: kip, needs-kip > Fix For: 1.1.0 > > > The ConfigCommand currently uses a direct connection to zookeeper. The > zookeeper dependency should be deprecated and an AdminClient API created to > be used instead. > This change requires a KIP. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6365) How to add a client to list of available clients?
[ https://issues.apache.org/jira/browse/KAFKA-6365?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291615#comment-16291615 ] Jason Gustafson commented on KAFKA-6365: [~eulerfx] Cool, I've given you permission to edit the page. I'll go ahead and resolve this issue. Thanks for contributing! > How to add a client to list of available clients? > - > > Key: KAFKA-6365 > URL: https://issues.apache.org/jira/browse/KAFKA-6365 > Project: Kafka > Issue Type: Wish >Reporter: Lev Gorodinski >Priority: Trivial > > I'd like to add a client to: > https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-.NET > The client is: https://github.com/jet/kafunk > .NET written in F# supports 0.8 0.9 0.10 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-6365) How to add a client to list of available clients?
[ https://issues.apache.org/jira/browse/KAFKA-6365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6365. Resolution: Resolved > How to add a client to list of available clients? > - > > Key: KAFKA-6365 > URL: https://issues.apache.org/jira/browse/KAFKA-6365 > Project: Kafka > Issue Type: Wish >Reporter: Lev Gorodinski >Priority: Trivial > > I'd like to add a client to: > https://cwiki.apache.org/confluence/display/KAFKA/Clients#Clients-.NET > The client is: https://github.com/jet/kafunk > .NET written in F# supports 0.8 0.9 0.10 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6366) StackOverflowError in kafka-coordinator-heartbeat-thread
[ https://issues.apache.org/jira/browse/KAFKA-6366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291699#comment-16291699 ] Jason Gustafson commented on KAFKA-6366: [~joerg.heinicke] Thanks for reporting the issue. I'm puzzling a bit over the trace. It looks like {{ConsumerNetworkClient.disconnect()}} should be reentrant (I tested locally to be sure). Maybe we're hitting a situation where the foreground thread is in a loop trying to add a request which the heartbeat thread is stuck cancelling in an infinite recursion? Is there another explanation? > StackOverflowError in kafka-coordinator-heartbeat-thread > > > Key: KAFKA-6366 > URL: https://issues.apache.org/jira/browse/KAFKA-6366 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 1.0.0 >Reporter: Joerg Heinicke > Attachments: 6366.v1.txt > > > With Kafka 1.0 our consumer groups fall into a permanent cycle of rebalancing > once a StackOverflowError in the heartbeat thread occurred due to > connectivity issues of the consumers to the coordinating broker: > Immediately before the exception there are hundreds, if not thousands of log > entries of following type: > 2017-12-12 16:23:12.361 [kafka-coordinator-heartbeat-thread | > my-consumer-group] INFO - [Consumer clientId=consumer-4, > groupId=my-consumer-group] Marking the coordinator : (id: > 2147483645 rack: null) dead > The exceptions always happen somewhere in the DateFormat code, even > though at different lines. > 2017-12-12 16:23:12.363 [kafka-coordinator-heartbeat-thread | > my-consumer-group] ERROR - Uncaught exception in thread > 'kafka-coordinator-heartbeat-thread | my-consumer-group': > java.lang.StackOverflowError > at > java.text.DateFormatSymbols.getProviderInstance(DateFormatSymbols.java:362) > at > java.text.DateFormatSymbols.getInstance(DateFormatSymbols.java:340) > at java.util.Calendar.getDisplayName(Calendar.java:2110) > at java.text.SimpleDateFormat.subFormat(SimpleDateFormat.java:1125) > at java.text.SimpleDateFormat.format(SimpleDateFormat.java:966) > at java.text.SimpleDateFormat.format(SimpleDateFormat.java:936) > at java.text.DateFormat.format(DateFormat.java:345) > at > org.apache.log4j.helpers.PatternParser$DatePatternConverter.convert(PatternParser.java:443) > at > org.apache.log4j.helpers.PatternConverter.format(PatternConverter.java:65) > at org.apache.log4j.PatternLayout.format(PatternLayout.java:506) > at > org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:310) > at org.apache.log4j.WriterAppender.append(WriterAppender.java:162) > at > org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) > at > org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) > at org.apache.log4j.Category.callAppenders(Category.java:206) > at org.apache.log4j.Category.forcedLog(Category.java:391) > at org.apache.log4j.Category.log(Category.java:856) > at > org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:324) > at > org.apache.kafka.common.utils.LogContext$KafkaLogger.info(LogContext.java:341) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(AbstractCoordinator.java:649) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onFailure(AbstractCoordinator.java:797) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496) > ... > the following 9 lines are repeated around hundred times. > ... > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:496) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.failUnsentRequests(ConsumerNetworkClient.java:416) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disconnect(ConsumerNetworkClient.java:388) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.coordinatorDead(Abs
[jira] [Created] (KAFKA-6367) Fix StateRestoreListener To Use Correct Ending Offset
Bill Bejeck created KAFKA-6367: -- Summary: Fix StateRestoreListener To Use Correct Ending Offset Key: KAFKA-6367 URL: https://issues.apache.org/jira/browse/KAFKA-6367 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0.0, 0.11.0.0 Reporter: Bill Bejeck Fix For: 1.0.1 {{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} long but the {{nextPosition}} is not correct, it should be the offset of the latest restored offset, but nextPosition is the offset of the first not restored offset. We can't just automatically use {{nextPosition}} - 1 as this could be a commit marker. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6367) Fix StateRestoreListener To Use Correct Ending Offset
[ https://issues.apache.org/jira/browse/KAFKA-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-6367: --- Description: {{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} long but the {{nextPosition}} is not correct, it should be the offset of the latest restored offset, but nextPosition is the offset of the first not restored offset. We can't automatically use {{nextPosition}} - 1 as this could be a commit marker. was: {{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} long but the {{nextPosition}} is not correct, it should be the offset of the latest restored offset, but nextPosition is the offset of the first not restored offset. We can't just automatically use {{nextPosition}} - 1 as this could be a commit marker. > Fix StateRestoreListener To Use Correct Ending Offset > - > > Key: KAFKA-6367 > URL: https://issues.apache.org/jira/browse/KAFKA-6367 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 1.0.0 >Reporter: Bill Bejeck > Fix For: 1.0.1 > > > {{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} > long but the {{nextPosition}} is not correct, it should be the offset of the > latest restored offset, but nextPosition is the offset of the first not > restored offset. > We can't automatically use {{nextPosition}} - 1 as this could be a commit > marker. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6367) Fix StateRestoreListener To Use Correct Ending Offset
[ https://issues.apache.org/jira/browse/KAFKA-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-6367: --- Description: {{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} long but the {{nextPosition}} is not correct, it should be the offset of the latest restored offset, but {{nextPosition}} is the offset of the first not restored offset. We can't automatically use {{nextPosition}} - 1 as this could be a commit marker. was: {{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} long but the {{nextPosition}} is not correct, it should be the offset of the latest restored offset, but nextPosition is the offset of the first not restored offset. We can't automatically use {{nextPosition}} - 1 as this could be a commit marker. > Fix StateRestoreListener To Use Correct Ending Offset > - > > Key: KAFKA-6367 > URL: https://issues.apache.org/jira/browse/KAFKA-6367 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 1.0.0 >Reporter: Bill Bejeck > Fix For: 1.0.1 > > > {{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} > long but the {{nextPosition}} is not correct, it should be the offset of the > latest restored offset, but {{nextPosition}} is the offset of the first not > restored offset. > We can't automatically use {{nextPosition}} - 1 as this could be a commit > marker. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (KAFKA-6367) Fix StateRestoreListener To Use Correct Ending Offset
[ https://issues.apache.org/jira/browse/KAFKA-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck reassigned KAFKA-6367: -- Assignee: Bill Bejeck > Fix StateRestoreListener To Use Correct Ending Offset > - > > Key: KAFKA-6367 > URL: https://issues.apache.org/jira/browse/KAFKA-6367 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 1.0.0 >Reporter: Bill Bejeck >Assignee: Bill Bejeck > Fix For: 1.0.1 > > > {{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} > long but the {{nextPosition}} is not correct, it should be the offset of the > latest restored offset, but {{nextPosition}} is the offset of the first not > restored offset. > We can't automatically use {{nextPosition}} - 1 as this could be a commit > marker. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6311) Expose Kafka cluster ID in Connect REST API
[ https://issues.apache.org/jira/browse/KAFKA-6311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291735#comment-16291735 ] Randall Hauch commented on KAFKA-6311: -- And KIP is available here: https://cwiki.apache.org/confluence/display/KAFKA/KIP-238%3A+Expose+Kafka+cluster+ID+in+Connect+REST+API > Expose Kafka cluster ID in Connect REST API > --- > > Key: KAFKA-6311 > URL: https://issues.apache.org/jira/browse/KAFKA-6311 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Xavier Léauté >Assignee: Ewen Cheslack-Postava > Labels: needs-kip > > Connect currently does not expose any information about the Kafka cluster it > is connected to. > In an environment with multiple Kafka clusters it would be useful to know > which cluster Connect is talking to. Exposing this information enables > programmatic discovery of Kafka cluster metadata for the purpose of > configuring connectors. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-6102) Consolidate MockTime implementations between connect and clients
[ https://issues.apache.org/jira/browse/KAFKA-6102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6102. Resolution: Fixed Fix Version/s: 1.0.1 Issue resolved by pull request 4105 [https://github.com/apache/kafka/pull/4105] > Consolidate MockTime implementations between connect and clients > > > Key: KAFKA-6102 > URL: https://issues.apache.org/jira/browse/KAFKA-6102 > Project: Kafka > Issue Type: Improvement >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe >Priority: Minor > Fix For: 1.0.1 > > > Consolidate MockTime implementations between connect and clients -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6102) Consolidate MockTime implementations between connect and clients
[ https://issues.apache.org/jira/browse/KAFKA-6102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291749#comment-16291749 ] ASF GitHub Bot commented on KAFKA-6102: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/4105 > Consolidate MockTime implementations between connect and clients > > > Key: KAFKA-6102 > URL: https://issues.apache.org/jira/browse/KAFKA-6102 > Project: Kafka > Issue Type: Improvement >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe >Priority: Minor > Fix For: 1.0.1 > > > Consolidate MockTime implementations between connect and clients -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-3821) Allow Kafka Connect source tasks to produce offset without writing to topics
[ https://issues.apache.org/jira/browse/KAFKA-3821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291750#comment-16291750 ] Randall Hauch commented on KAFKA-3821: -- [~gunnar.morling], no, there is no KIP for this yet. I agree, the {{OffsetRecord}} approach seems to be the cleanest API. We will have to think about how the transformations will be affected, since many of the implementations (not just those we provided in AK) are likely not necessarily expecting a null topic. The {{SourceRecord}} documentation is not explicit about what fields are required. > Allow Kafka Connect source tasks to produce offset without writing to topics > > > Key: KAFKA-3821 > URL: https://issues.apache.org/jira/browse/KAFKA-3821 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.9.0.1 >Reporter: Randall Hauch > Labels: needs-kip > Fix For: 1.1.0 > > > Provide a way for a {{SourceTask}} implementation to record a new offset for > a given partition without necessarily writing a source record to a topic. > Consider a connector task that uses the same offset when producing an unknown > number of {{SourceRecord}} objects (e.g., it is taking a snapshot of a > database). Once the task completes those records, the connector wants to > update the offsets (e.g., the snapshot is complete) but has no more records > to be written to a topic. With this change, the task could simply supply an > updated offset. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6368) AdminClient should contact multiple nodes before timing out a call
Colin P. McCabe created KAFKA-6368: -- Summary: AdminClient should contact multiple nodes before timing out a call Key: KAFKA-6368 URL: https://issues.apache.org/jira/browse/KAFKA-6368 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 0.10.1.0 Reporter: Colin P. McCabe Assignee: Colin P. McCabe The AdminClient should contact multiple nodes before timing out a call. Right now, we could use up our entire call timeout just waiting for one very slow node to respond. We probably need to decouple the call timeout from the NetworkClient request timeout. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (KAFKA-6368) AdminClient should contact multiple nodes before timing out a call
[ https://issues.apache.org/jira/browse/KAFKA-6368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin P. McCabe updated KAFKA-6368: --- Affects Version/s: (was: 0.10.1.0) 0.11.0.0 > AdminClient should contact multiple nodes before timing out a call > -- > > Key: KAFKA-6368 > URL: https://issues.apache.org/jira/browse/KAFKA-6368 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 0.11.0.0 >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe > > The AdminClient should contact multiple nodes before timing out a call. > Right now, we could use up our entire call timeout just waiting for one very > slow node to respond. We probably need to decouple the call timeout from the > NetworkClient request timeout. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6308) Connect: Struct equals/hashCode method should use Arrays#deep* methods
[ https://issues.apache.org/jira/browse/KAFKA-6308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291794#comment-16291794 ] ASF GitHub Bot commented on KAFKA-6308: --- Github user asfgit closed the pull request at: https://github.com/apache/kafka/pull/4293 > Connect: Struct equals/hashCode method should use Arrays#deep* methods > -- > > Key: KAFKA-6308 > URL: https://issues.apache.org/jira/browse/KAFKA-6308 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Tobias Gies > Labels: easyfix, newbie > Fix For: 1.0.1 > > > At the moment, {{org.apache.kafka.connect.data.Struct#equals}} checks two > things, after ensuring the incoming {{Object o}} is indeed of the correct > type: > * Whether the schemas of {{this}} and {{o}} are equal, via {{Objects#equals}} > * Whether the values of {{this}} and {{o}} are qual, via {{Arrays#equals}}. > The latter check is problematic. {{Arrays#equals}} is meant for > one-dimensional arrays of any kind, and thus simply checks the {{equals}} > methods of all corresponding elements of its parameters {{a1}} and {{a2}}. > However, elements of the {{Struct#values}} array may themselves be arrays in > a specific case, namely if a field has a {{BYTES}} Schema Type and the user's > input for this field is of type {{byte[]}}. > Given that, I would suggest to use {{Arrays#deepEquals}} to compare the > {{values}} arrays of two {{Struct}} instances. With similar reasoning, I > would also suggest to use {{Arrays#deepHashCode}} in the Struct's > {{hashCode}} method. > This would allow to properly compare and hash structs that get byte arrays > passed in as field values instead of the recommended ByteBuffers. An > alternative might be to automatically wrap byte arrays passed into any > {{put}} method in a ByteBuffer. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (KAFKA-6308) Connect: Struct equals/hashCode method should use Arrays#deep* methods
[ https://issues.apache.org/jira/browse/KAFKA-6308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6308. Resolution: Fixed Fix Version/s: 1.0.1 Issue resolved by pull request 4293 [https://github.com/apache/kafka/pull/4293] > Connect: Struct equals/hashCode method should use Arrays#deep* methods > -- > > Key: KAFKA-6308 > URL: https://issues.apache.org/jira/browse/KAFKA-6308 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Tobias Gies > Labels: easyfix, newbie > Fix For: 1.0.1 > > > At the moment, {{org.apache.kafka.connect.data.Struct#equals}} checks two > things, after ensuring the incoming {{Object o}} is indeed of the correct > type: > * Whether the schemas of {{this}} and {{o}} are equal, via {{Objects#equals}} > * Whether the values of {{this}} and {{o}} are qual, via {{Arrays#equals}}. > The latter check is problematic. {{Arrays#equals}} is meant for > one-dimensional arrays of any kind, and thus simply checks the {{equals}} > methods of all corresponding elements of its parameters {{a1}} and {{a2}}. > However, elements of the {{Struct#values}} array may themselves be arrays in > a specific case, namely if a field has a {{BYTES}} Schema Type and the user's > input for this field is of type {{byte[]}}. > Given that, I would suggest to use {{Arrays#deepEquals}} to compare the > {{values}} arrays of two {{Struct}} instances. With similar reasoning, I > would also suggest to use {{Arrays#deepHashCode}} in the Struct's > {{hashCode}} method. > This would allow to properly compare and hash structs that get byte arrays > passed in as field values instead of the recommended ByteBuffers. An > alternative might be to automatically wrap byte arrays passed into any > {{put}} method in a ByteBuffer. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6361) Fast leader fail over can lead to log divergence between leader and follower
[ https://issues.apache.org/jira/browse/KAFKA-6361?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291812#comment-16291812 ] Jun Rao commented on KAFKA-6361: [~hachikuji], thanks for the info. The problem in the description can indeed happen if the first leader change is due to preferred leader election, in which case, the ISR won't change. To address this issue, we could send another OffsetForLeaderEpoch with the previous leader epoch as you suggested. This may require multiple rounds of OffsetForLeaderEpoch requests. Another way is to change OffsetForLeaderEpoch request to send a sequence of (leader epoch, start offset) for the epoch between the follower's HW and LEO. On the leader side, we find the longest consecutive sequence of leader epoch whose start offset matches the leader's. We then return the end offset of the last matching leader epoch. The above approach doesn't fully fix the issue for a compacted topic. When all messages for a leader epoch are removed, we may lose the leader epoch. Thus, the leader epochs between the follower and the leader may not perfectly match. One way to address this issue is to preserve the offset of the first message in a leader epoch during log cleaning. This probably can be done separately since it causes problems rarely. > Fast leader fail over can lead to log divergence between leader and follower > > > Key: KAFKA-6361 > URL: https://issues.apache.org/jira/browse/KAFKA-6361 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson > Labels: reliability > > We have observed an edge case in the replication failover logic which can > cause a replica to permanently fall out of sync with the leader or, in the > worst case, actually have localized divergence between logs. This occurs in > spite of the improved truncation logic from KIP-101. > Suppose we have brokers A and B. Initially A is the leader in epoch 1. It > appends two batches: one in the range (0, 10) and the other in the range (11, > 20). The first one successfully replicates to B, but the second one does not. > In other words, the logs on the brokers look like this: > {code} > Broker A: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets [11, 20], leader epoch: 1 > Broker B: > 0: offsets [0, 10], leader epoch: 1 > {code} > Broker A then has a zk session expiration and broker B is elected with epoch > 2. It appends a new batch with offsets (11, n) to its local log. So we now > have this: > {code} > Broker A: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets [11, 20], leader epoch: 1 > Broker B: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets: [11, n], leader epoch: 2 > {code} > Normally we expect broker A to truncate to offset 11 on becoming the > follower, but before it is able to do so, broker B has its own zk session > expiration and broker A again becomes leader, now with epoch 3. It then > appends a new entry in the range (21, 30). The updated logs look like this: > {code} > Broker A: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets [11, 20], leader epoch: 1 > 2: offsets: [21, 30], leader epoch: 3 > Broker B: > 0: offsets [0, 10], leader epoch: 1 > 1: offsets: [11, n], leader epoch: 2 > {code} > Now what happens next depends on the last offset of the batch appended in > epoch 2. On becoming follower, broker B will send an OffsetForLeaderEpoch > request to broker A with epoch 2. Broker A will respond that epoch 2 ends at > offset 21. There are three cases: > 1) n < 20: In this case, broker B will not do any truncation. It will begin > fetching from offset n, which will ultimately cause an out of order offset > error because broker A will return the full batch beginning from offset 11 > which broker B will be unable to append. > 2) n == 20: Again broker B does not truncate. It will fetch from offset 21 > and everything will appear fine though the logs have actually diverged. > 3) n > 20: Broker B will attempt to truncate to offset 21. Since this is in > the middle of the batch, it will truncate all the way to offset 10. It can > begin fetching from offset 11 and everything is fine. > The case we have actually seen is the first one. The second one would likely > go unnoticed in practice and everything is fine in the third case. To > workaround the issue, we deleted the active segment on the replica which > allowed it to re-replicate consistently from the leader. > I'm not sure the best solution for this scenario. Maybe if the leader isn't > aware of an epoch, it should always respond with {{UNDEFINED_EPOCH_OFFSET}} > instead of using the offset of the next highest epoch. That would cause the > follower to truncate using its high watermark. Or perhaps instead of doing > so, it could
[jira] [Created] (KAFKA-6369) General wildcard support for ACL's in kafka
Antony Stubbs created KAFKA-6369: Summary: General wildcard support for ACL's in kafka Key: KAFKA-6369 URL: https://issues.apache.org/jira/browse/KAFKA-6369 Project: Kafka Issue Type: New Feature Reporter: Antony Stubbs Especially for streams apps where all intermediate topics are prefixed with the application id. For example, add read and write access to mystreamsapp.* so any new topics created by the app don't need to have specific permissions applied to them. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6363) Use MockAdminClient for any unit tests that depend on AdminClient
[ https://issues.apache.org/jira/browse/KAFKA-6363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291888#comment-16291888 ] Colin P. McCabe commented on KAFKA-6363: Thanks for working on refactoring this. I think maybe it's just the name that is confusing? {{MockAdminClientEnv}} suggests that it is an environment for {{MockAdminClient}}, but of course it is not. Perhaps we could just rename it to {{AdminClientUnitTestEnv}}? That would make it clear, I think. > Use MockAdminClient for any unit tests that depend on AdminClient > - > > Key: KAFKA-6363 > URL: https://issues.apache.org/jira/browse/KAFKA-6363 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang > Labels: newbie > > Today we have a few unit tests other than KafkaAdminClientTest that relies on > MockKafkaAdminClientEnv. > About this class and MockKafkaAdminClientEnv, my thoughts: > 1. MockKafkaAdminClientEnv is actually using a MockClient for the inner > KafkaClient; it should be only used for the unit test of KafkaAdminClient > itself. > 2. For any other unit tests on classes that depend on AdminClient, we should > be using the MockAdminClient that mocks the whole AdminClient. > So I suggest 1) in TopicAdminTest use MockAdminClient instead; 2) in > KafkaAdminClientTest use MockClient and added a new static constructor that > takes a KafkaClient; 3) remove the MockKafkaAdminClientEnv. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6362) auto commit not work since coordinatorUnknown() is always true.
[ https://issues.apache.org/jira/browse/KAFKA-6362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291952#comment-16291952 ] ASF GitHub Bot commented on KAFKA-6362: --- GitHub user huxihx opened a pull request: https://github.com/apache/kafka/pull/4326 KAFKA-6362: maybeAutoCommitOffsetsAsync should try to discover coordinator Currently, `maybeAutoCommitOffsetsAsync` may not retry to find out coordinator even after the coordinator goes back to service. As a result, all asynchronous offset commits will fail. This patch refines `maybeAutoCommitOffsetsAsync` to have it periodically retry the coordinator discovering. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) You can merge this pull request into a Git repository by running: $ git pull https://github.com/huxihx/kafka KAFKA-6362 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4326.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4326 commit a2e2da2b2215da2053ffc9e6dace4a36e8777e12 Author: huxihx Date: 2017-12-15T02:39:31Z KAFKA-6362: ConsumerCoordinator.maybeAutoCommitOffsetsAsync should try to discover coordinator. Currently, `maybeAutoCommitOffsetsAsync` may not retry to find out coordinator even after the coordinator goes back to service. As a result, all asynchronous offset commits will fail. This patch refines `maybeAutoCommitOffsetsAsync` to have it periodically retry the coordinator discovering. > auto commit not work since coordinatorUnknown() is always true. > --- > > Key: KAFKA-6362 > URL: https://issues.apache.org/jira/browse/KAFKA-6362 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.10.2.1 >Reporter: Renkai Ge > > {code} > [2017-12-14 20:09:23.501] [Kafka 0.10 Fetcher for Source: > source_bj-docker-large (14/40)] INFO > org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: > auto.commit.interval.ms = 5000 > auto.offset.reset = latest > bootstrap.servers = [11.192.77.42:3002, 11.192.73.43:3002, > 11.192.73.66:3002] > check.crcs = true > client.id = > connections.max.idle.ms = 54 > enable.auto.commit = true > exclude.internal.topics = true > fetch.max.bytes = 52428800 > fetch.max.wait.ms = 500 > fetch.min.bytes = 1 > group.id = tcprtdetail_flink > heartbeat.interval.ms = 3000 > interceptor.classes = null > key.deserializer = class > org.apache.kafka.common.serialization.ByteArrayDeserializer > max.partition.fetch.bytes = 1048576 > max.poll.interval.ms = 30 > max.poll.records = 500 > metadata.max.age.ms = 30 > metric.reporters = [] > metrics.num.samples = 2 > metrics.recording.level = INFO > metrics.sample.window.ms = 3 > partition.assignment.strategy = [class > org.apache.kafka.clients.consumer.RangeAssignor] > receive.buffer.bytes = 65536 > reconnect.backoff.ms = 50 > request.timeout.ms = 305000 > retry.backoff.ms = 100 > sasl.jaas.config = null > sasl.kerberos.kinit.cmd = /usr/bin/kinit > sasl.kerberos.min.time.before.relogin = 6 > sasl.kerberos.service.name = null > sasl.kerberos.ticket.renew.jitter = 0.05 > sasl.kerberos.ticket.renew.window.factor = 0.8 > sasl.mechanism = GSSAPI > security.protocol = PLAINTEXT > send.buffer.bytes = 131072 > session.timeout.ms = 1 > ssl.cipher.suites = null > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > ssl.endpoint.identification.algorithm = null > ssl.key.password = null > ssl.keymanager.algorithm = SunX509 > ssl.keystore.location = null > ssl.keystore.password = null > ssl.keystore.type = JKS > ssl.protocol = TLS > ssl.provider = null > ssl.secure.random.implementation = null > ssl.trustmanager.algorithm = PKIX > ssl.truststore.location = null > ssl.truststore.password = null >
[jira] [Commented] (KAFKA-5285) optimize upper / lower byte range for key range scan on windowed stores
[ https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291962#comment-16291962 ] Peter Davis commented on KAFKA-5285: In debugging a recent performance blocker in an app of mine, I'm suspecting that when calling `ReadOnlySessionStore.fetch(from,to)`, which uses from (Timestamp) = Long.MAX_VALUE, it calls `upperRange` with a `maxSuffix` filled with mostly 0xFF's. The resulting upperRange is therefore also mostly 0xFF and the resulting RocksDB iterator effectively iterates over (binaryKeyFrom...infinity). With a large number of keys, this is much worse than a mere performance issue (though the result appears "correct" since SessionKeySchema.hasNextCondition filters out the bogus results). It iterates over thousands of unnecessary records and is slow as molasses. It looks like the issue dates to KIP-155. In [`SessionKeySchema#upperRange`](https://github.com/apache/kafka/commit/e28752357705568219315375c666f8e500db9c12#diff-52e7d2701ecab21b32621d9b13b7f33bR57), why is `putLong(to)` (timestamp) repeated twice and it does not use `put(key)` to build the `maxRange`? When using a timestamp less than `Long.MAX_VALUE`, the issue is avoided because `OrderedBytes.upperRange` copies more of the real key. But `ReadOnlySessionStore.fetch` does not let one specify a different timestamp. > optimize upper / lower byte range for key range scan on windowed stores > --- > > Key: KAFKA-5285 > URL: https://issues.apache.org/jira/browse/KAFKA-5285 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Xavier Léauté >Assignee: Xavier Léauté > Labels: performance > > The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} > {{upperRange}} and {{lowerRange}} does not make any assumptions with respect > to the other key bound (e.g. the upper byte bound does not depends on lower > key bound). > It should be possible to optimize the byte range somewhat further using the > information provided by the lower bound. > More specifically, by incorporating that information, we should be able to > eliminate the corresponding {{upperRangeFixedSize}} and > {{lowerRangeFixedSize}}, since the result should be the same if we implement > that optimization. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (KAFKA-5285) optimize upper / lower byte range for key range scan on windowed stores
[ https://issues.apache.org/jira/browse/KAFKA-5285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16291962#comment-16291962 ] Peter Davis edited comment on KAFKA-5285 at 12/15/17 3:08 AM: -- In debugging a recent performance blocker in an app of mine, I'm suspecting that when calling {{ReadOnlySessionStore.fetch(from,to)}}, which uses from (Timestamp) = Long.MAX_VALUE, it calls {{upperRange}} with a maxSuffix filled with mostly 0xFF's. The resulting upperRange is therefore also mostly 0xFF and the resulting RocksDB iterator effectively iterates over (binaryKeyFrom...infinity)! With a large number of keys, this is much worse than a mere performance issue (though the result appears "correct" since SessionKeySchema.hasNextCondition filters out the bogus results). It iterates over thousands of unnecessary records and is slow as molasses. It looks like the issue dates to KIP-155. In [{{SessionKeySchema#upperRange}}|https://github.com/apache/kafka/commit/e28752357705568219315375c666f8e500db9c12#diff-52e7d2701ecab21b32621d9b13b7f33bR57], why is {{putLong(to)}} (timestamp) repeated twice and it does not use the actual {{key}} to build the {{maxRange}}? When using a timestamp less than {{Long.MAX_VALUE}}, the issue is avoided because the first timestamp put into {{maxRange}} begins mostly with 0 bytes, so {{OrderedBytes.upperRange}} copies more of the real key in its mask loop. But {{ReadOnlySessionStore.fetch}} does not let one specify a different timestamp. was (Author: davispw): In debugging a recent performance blocker in an app of mine, I'm suspecting that when calling `ReadOnlySessionStore.fetch(from,to)`, which uses from (Timestamp) = Long.MAX_VALUE, it calls `upperRange` with a `maxSuffix` filled with mostly 0xFF's. The resulting upperRange is therefore also mostly 0xFF and the resulting RocksDB iterator effectively iterates over (binaryKeyFrom...infinity). With a large number of keys, this is much worse than a mere performance issue (though the result appears "correct" since SessionKeySchema.hasNextCondition filters out the bogus results). It iterates over thousands of unnecessary records and is slow as molasses. It looks like the issue dates to KIP-155. In [`SessionKeySchema#upperRange`](https://github.com/apache/kafka/commit/e28752357705568219315375c666f8e500db9c12#diff-52e7d2701ecab21b32621d9b13b7f33bR57), why is `putLong(to)` (timestamp) repeated twice and it does not use `put(key)` to build the `maxRange`? When using a timestamp less than `Long.MAX_VALUE`, the issue is avoided because `OrderedBytes.upperRange` copies more of the real key. But `ReadOnlySessionStore.fetch` does not let one specify a different timestamp. > optimize upper / lower byte range for key range scan on windowed stores > --- > > Key: KAFKA-5285 > URL: https://issues.apache.org/jira/browse/KAFKA-5285 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Xavier Léauté >Assignee: Xavier Léauté > Labels: performance > > The current implementation of {{WindowKeySchema}} / {{SessionKeySchema}} > {{upperRange}} and {{lowerRange}} does not make any assumptions with respect > to the other key bound (e.g. the upper byte bound does not depends on lower > key bound). > It should be possible to optimize the byte range somewhat further using the > information provided by the lower bound. > More specifically, by incorporating that information, we should be able to > eliminate the corresponding {{upperRangeFixedSize}} and > {{lowerRangeFixedSize}}, since the result should be the same if we implement > that optimization. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6370) MirrorMakerIntegrationTest#testCommaSeparatedRegex may fail due to NullPointerException
Ted Yu created KAFKA-6370: - Summary: MirrorMakerIntegrationTest#testCommaSeparatedRegex may fail due to NullPointerException Key: KAFKA-6370 URL: https://issues.apache.org/jira/browse/KAFKA-6370 Project: Kafka Issue Type: Bug Reporter: Ted Yu Priority: Minor >From >https://builds.apache.org/job/kafka-trunk-jdk8/2277/testReport/junit/kafka.tools/MirrorMakerIntegrationTest/testCommaSeparatedRegex/ > : {code} java.lang.NullPointerException at scala.collection.immutable.StringLike.$anonfun$format$1(StringLike.scala:351) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38) at scala.collection.TraversableLike.map(TraversableLike.scala:234) at scala.collection.TraversableLike.map$(TraversableLike.scala:227) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at scala.collection.immutable.StringLike.format(StringLike.scala:351) at scala.collection.immutable.StringLike.format$(StringLike.scala:350) at scala.collection.immutable.StringOps.format(StringOps.scala:29) at kafka.metrics.KafkaMetricsGroup$.$anonfun$toScope$3(KafkaMetricsGroup.scala:170) at scala.collection.immutable.List.map(List.scala:283) at kafka.metrics.KafkaMetricsGroup$.kafka$metrics$KafkaMetricsGroup$$toScope(KafkaMetricsGroup.scala:170) at kafka.metrics.KafkaMetricsGroup.explicitMetricName(KafkaMetricsGroup.scala:67) at kafka.metrics.KafkaMetricsGroup.explicitMetricName$(KafkaMetricsGroup.scala:51) at kafka.network.RequestMetrics.explicitMetricName(RequestChannel.scala:352) at kafka.metrics.KafkaMetricsGroup.metricName(KafkaMetricsGroup.scala:47) at kafka.metrics.KafkaMetricsGroup.metricName$(KafkaMetricsGroup.scala:42) at kafka.network.RequestMetrics.metricName(RequestChannel.scala:352) at kafka.metrics.KafkaMetricsGroup.newHistogram(KafkaMetricsGroup.scala:81) at kafka.metrics.KafkaMetricsGroup.newHistogram$(KafkaMetricsGroup.scala:80) at kafka.network.RequestMetrics.newHistogram(RequestChannel.scala:352) at kafka.network.RequestMetrics.(RequestChannel.scala:364) at kafka.network.RequestChannel$Metrics.$anonfun$new$2(RequestChannel.scala:57) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.network.RequestChannel$Metrics.(RequestChannel.scala:56) at kafka.network.RequestChannel.(RequestChannel.scala:243) at kafka.network.SocketServer.(SocketServer.scala:71) at kafka.server.KafkaServer.startup(KafkaServer.scala:238) at kafka.utils.TestUtils$.createServer(TestUtils.scala:135) at kafka.integration.KafkaServerTestHarness.$anonfun$setUp$1(KafkaServerTestHarness.scala:93) {code} Here is the code from KafkaMetricsGroup.scala : {code} .map { case (key, value) => "%s.%s".format(key, value.replaceAll("\\.", "_"))} {code} It seems (some) value was null. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-6367) Fix StateRestoreListener To Use Correct Ending Offset
[ https://issues.apache.org/jira/browse/KAFKA-6367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16292054#comment-16292054 ] Matthias J. Sax commented on KAFKA-6367: {quote} We can't automatically use {{nextPosition - 1}} as this could be a commit marker. {quote} It a valid point, but I am not sure if returning the offset of a commit marker would be an issue or not. The provided offset is called {{batchEndOffset}} -- this does not imply "offset of last restored record"; or does it? If we don't return commit marker offsets, we get "gaps" -- thus, if someone tracks the offsets in the callback and assume "completeness" this would be violated. On the other hand, if there are aborted messages, we might have gaps anyway... Not sure what other think. To me it seems, we don't get much advantage if we don't "return" commit markers but get lot of additional complexity. Thus, I tend to think that using {{nextPosition - 1}} might be ok. > Fix StateRestoreListener To Use Correct Ending Offset > - > > Key: KAFKA-6367 > URL: https://issues.apache.org/jira/browse/KAFKA-6367 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.0, 1.0.0 >Reporter: Bill Bejeck >Assignee: Bill Bejeck > Fix For: 1.0.1 > > > {{StateRestoreListener#restoreBatchCompleted}} takes the {{nextPosition}} > long but the {{nextPosition}} is not correct, it should be the offset of the > latest restored offset, but {{nextPosition}} is the offset of the first not > restored offset. > We can't automatically use {{nextPosition}} - 1 as this could be a commit > marker. -- This message was sent by Atlassian JIRA (v6.4.14#64029)