[GitHub] [kafka] chia7712 commented on pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
chia7712 commented on pull request #9590: URL: https://github.com/apache/kafka/pull/9590#issuecomment-730231950 (I'm still reading this story so pardon me for asking stupid question.) If using timestamp=0 can bring correct offset, why not following that way to handle ```ListOffsetRequest.EARLIEST_TIMESTAMP```? For example, (https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L1643) if timestamp is equal to ```ListOffsetRequest.EARLIEST_TIMESTAMP```, we pass 0 to find offset. Does it work? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10656) NetworkClient.java: print out the feature flags received at DEBUG level, as well as the other version information
[ https://issues.apache.org/jira/browse/KAFKA-10656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235305#comment-17235305 ] Tom Bentley commented on KAFKA-10656: - [~cmccabe] I opened a PR for this, please could you take a look? > NetworkClient.java: print out the feature flags received at DEBUG level, as > well as the other version information > - > > Key: KAFKA-10656 > URL: https://issues.apache.org/jira/browse/KAFKA-10656 > Project: Kafka > Issue Type: Bug >Reporter: Colin McCabe >Assignee: Tom Bentley >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on a change in pull request #9619: MINOR: Reduce sends created by `SendBuilder`
dajac commented on a change in pull request #9619: URL: https://github.com/apache/kafka/pull/9619#discussion_r526710893 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/SendBuilder.java ## @@ -122,6 +123,14 @@ public void writeVarlong(long i) { ByteUtils.writeVarlong(i, buffer); } +private void flushPendingSend() { +if (!buffers.isEmpty()) { +ByteBuffer[] byteBufferArray = buffers.toArray(new ByteBuffer[0]); +sends.add(new ByteBufferSend(destinationId, byteBufferArray)); Review comment: I just noticed that `ByteBufferSend` re-iterates over the byte buffers to compute the total size. We could compute the size while we accumulate them. I suppose that the number of buffers is usually small so it should not make a big difference. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #7498: KAFKA-9023: Log request destination when the Producer gets disconnected
dajac commented on a change in pull request #7498: URL: https://github.com/apache/kafka/pull/7498#discussion_r526718951 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ## @@ -659,6 +655,12 @@ else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED) this.accumulator.unmutePartition(batch.topicPartition); } +private String formatErrMsg(ProduceResponse.PartitionResponse response) { Review comment: Indeed, we could. I am not sure that it brings much more information though so I am fine with keeping it as it is. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #7498: KAFKA-9023: Log request destination when the Producer gets disconnected
dajac commented on pull request #7498: URL: https://github.com/apache/kafka/pull/7498#issuecomment-730250619 I have triggered another run of the CI. I will merge it afterwards. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
dajac commented on a change in pull request #9386: URL: https://github.com/apache/kafka/pull/9386#discussion_r526729269 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1324,7 +1404,60 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): Unit = { // if there is a connection waiting on the rate throttle delay, we will let it wait the original delay even if // the rate limit increases, because it is just one connection per listener and the code is simpler that way -updateConnectionRateQuota(maxConnectionRate) +updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity) + } + + /** + * Update the connection rate quota for a given IP and updates quota configs for updated IPs. + * If an IP is given, metric config will be updated only for the given IP, otherwise + * all metric configs will be checked and updated if required. + * + * @param ip ip to update or default if None + * @param maxConnectionRate new connection rate, or resets entity to default if None + */ + def updateIpConnectionRateQuota(ip: Option[InetAddress], maxConnectionRate: Option[Int]): Unit = synchronized { +def isIpConnectionRateMetric(metricName: MetricName) = { + metricName.name == ConnectionRateMetricName && + metricName.group == MetricsGroup && + metricName.tags.containsKey(IpMetricTag) +} + +def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = { + quotaLimit != metric.config.quota.bound +} +ip match { + case Some(address) => +counts.synchronized { + maxConnectionRate match { +case Some(rate) => + info(s"Updating max connection rate override for $address to $rate") + connectionRatePerIp.put(address, rate) +case None => + info(s"Removing max connection rate override for $address") + connectionRatePerIp.remove(address) + } +} +updateConnectionRateQuota(connectionRateForIp(address), IpQuotaEntity(address)) + case None => +counts.synchronized { + defaultConnectionRatePerIp = maxConnectionRate.getOrElse(DynamicConfig.Ip.DefaultConnectionCreationRate) +} +info(s"Updated default max IP connection rate to $defaultConnectionRatePerIp") +metrics.metrics.forEach { (metricName, metric) => + if (isIpConnectionRateMetric(metricName)) { +val quota = connectionRateForIp(InetAddress.getByName(metricName.tags.get(IpMetricTag))) Review comment: Make sense, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10114) Kafka producer stuck after broker crash
[ https://issues.apache.org/jira/browse/KAFKA-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235343#comment-17235343 ] Tim Fox commented on KAFKA-10114: - > We are still seeing this issue with version 2.6.0. Our app calls flush and it >hangs forever when brokers are down. The current KafkaProducer.flush() method will indeed wait for ever for flush() to complete. Flush clearly cannot complete if brokers are down. This seems like a reasonable default to me - we want to be sure that buffered messages aren't lost, yet we don't know how long it will take for brokers to be restarted, so it's very hard to choose a default timeout - should it be 1 minute? I hour? I day? However, without changing the API, perhaps we could allow for a flush timeout to be specified via a producer property? That way we could keep the default as "forever" but allow you to override it to a lower value. [~ijuma] [~hachikuji] thoughts? > Kafka producer stuck after broker crash > --- > > Key: KAFKA-10114 > URL: https://issues.apache.org/jira/browse/KAFKA-10114 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.3.1, 2.4.1 >Reporter: Itamar Benjamin >Priority: Critical > > Today two of our kafka brokers crashed (cluster of 3 brokers), and producers > were not able to send new messages. After brokers started again all producers > resumed sending data except for a single one. > at the beginning producer rejected all new messages with TimeoutException: > > {code:java} > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > incoming-mutable-RuntimeIIL-1:12 ms has passed since batch creation > {code} > > then after sometime exception changed to > > {code:java} > org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory > within the configured max blocking time 6 ms. > {code} > > > jstack shows kafka-producer-network-thread is waiting to get producer id: > > {code:java} > "kafka-producer-network-thread | producer-1" #767 daemon prio=5 os_prio=0 > cpu=63594017.16ms elapsed=1511219.38s tid=0x7fffd8353000 nid=0x4fa4 > sleeping [0x7ff55c177000] >java.lang.Thread.State: TIMED_WAITING (sleeping) > at java.lang.Thread.sleep(java.base@11.0.1/Native Method) > at org.apache.kafka.common.utils.Utils.sleep(Utils.java:296) > at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:41) > at > org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:565) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:306) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) > at java.lang.Thread.run(java.base@11.0.1/Thread.java:834) Locked > ownable synchronizers: > - None > {code} > > digging into maybeWaitForProducerId(), it waits until some broker is ready > (awaitNodeReady function) which in return calls leastLoadedNode() on > NetworkClient. This one iterates over all brokers and checks if a request can > be sent to it using canSendRequest(). > This is the code for canSendRequest(): > > {code:java} > return connectionStates.isReady(node, now) && selector.isChannelReady(node) > && inFlightRequests.canSendMore(node) > {code} > > > using some debugging tools i saw this expression always evaluates to false > since the last part (canSendMore) is false. > > This is the code for canSendMore: > {code:java} > public boolean canSendMore(String node) { > Deque queue = requests.get(node); return queue > == null || queue.isEmpty() || (queue.peekFirst().send.completed() && > queue.size() < this.maxInFlightRequestsPerConnection); } > {code} > > > i verified > {code:java} > queue.peekFirst().send.completed() > {code} > is true, and that leads to the live lock - since requests queues are full for > all nodes a new request to check broker availability and reconnect to it > cannot be submitted. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on pull request #9547: KAFKA-9630; Replace OffsetsForLeaderEpoch request/response with automated protocol
dajac commented on pull request #9547: URL: https://github.com/apache/kafka/pull/9547#issuecomment-730285178 Failed test seems unrelated to this PR. Merging to trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac merged pull request #9547: KAFKA-9630; Replace OffsetsForLeaderEpoch request/response with automated protocol
dajac merged pull request #9547: URL: https://github.com/apache/kafka/pull/9547 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9630) Replace OffsetsForLeaderEpoch request/response with automated protocol
[ https://issues.apache.org/jira/browse/KAFKA-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-9630. Fix Version/s: 2.8.0 Resolution: Fixed > Replace OffsetsForLeaderEpoch request/response with automated protocol > -- > > Key: KAFKA-9630 > URL: https://issues.apache.org/jira/browse/KAFKA-9630 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: David Jacot >Priority: Major > Fix For: 2.8.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.
dajac commented on pull request #9601: URL: https://github.com/apache/kafka/pull/9601#issuecomment-730287841 @gardnervickers https://github.com/apache/kafka/pull/9401 and https://github.com/apache/kafka/pull/9547 have been merged. You can bring them back in this PR if you like. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
dajac commented on pull request #9526: URL: https://github.com/apache/kafka/pull/9526#issuecomment-730292123 @anatasiavela Both PRs have been merged so we can proceed with this one. There is something that we must consider that I was not aware of: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java#L235. When the ProduceRequest is processed in the KafkaApis layer, its internal data is set to null to free up the memory. That means that we won't have it to log the request. We need to take this into account. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10114) Kafka producer stuck after broker crash
[ https://issues.apache.org/jira/browse/KAFKA-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235343#comment-17235343 ] Tim Fox edited comment on KAFKA-10114 at 11/19/20, 11:09 AM: - > We are still seeing this issue with version 2.6.0. Our app calls flush and it >hangs forever when brokers are down. [Revised my answer as it was previously based on a misunderstanding of the current code] Currently KafkaProducer.flush() will hang forever if there are queued batches and brokers are lost and not restarted. Queued batches won't be timed out as there are no "ready" nodes and the timeout logic currently occurs after a ready node has been obtained. Expectation is for flush() to complete with a TimeoutException if it does not complete successfully before the timeout as specified in delivery.timeout.ms [~ijuma] [~hachikuji] thoughts? was (Author: purplefox): > We are still seeing this issue with version 2.6.0. Our app calls flush and it >hangs forever when brokers are down. The current KafkaProducer.flush() method will indeed wait for ever for flush() to complete. Flush clearly cannot complete if brokers are down. This seems like a reasonable default to me - we want to be sure that buffered messages aren't lost, yet we don't know how long it will take for brokers to be restarted, so it's very hard to choose a default timeout - should it be 1 minute? I hour? I day? However, without changing the API, perhaps we could allow for a flush timeout to be specified via a producer property? That way we could keep the default as "forever" but allow you to override it to a lower value. [~ijuma] [~hachikuji] thoughts? > Kafka producer stuck after broker crash > --- > > Key: KAFKA-10114 > URL: https://issues.apache.org/jira/browse/KAFKA-10114 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.3.1, 2.4.1 >Reporter: Itamar Benjamin >Priority: Critical > > Today two of our kafka brokers crashed (cluster of 3 brokers), and producers > were not able to send new messages. After brokers started again all producers > resumed sending data except for a single one. > at the beginning producer rejected all new messages with TimeoutException: > > {code:java} > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > incoming-mutable-RuntimeIIL-1:12 ms has passed since batch creation > {code} > > then after sometime exception changed to > > {code:java} > org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory > within the configured max blocking time 6 ms. > {code} > > > jstack shows kafka-producer-network-thread is waiting to get producer id: > > {code:java} > "kafka-producer-network-thread | producer-1" #767 daemon prio=5 os_prio=0 > cpu=63594017.16ms elapsed=1511219.38s tid=0x7fffd8353000 nid=0x4fa4 > sleeping [0x7ff55c177000] >java.lang.Thread.State: TIMED_WAITING (sleeping) > at java.lang.Thread.sleep(java.base@11.0.1/Native Method) > at org.apache.kafka.common.utils.Utils.sleep(Utils.java:296) > at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:41) > at > org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:565) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:306) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) > at java.lang.Thread.run(java.base@11.0.1/Thread.java:834) Locked > ownable synchronizers: > - None > {code} > > digging into maybeWaitForProducerId(), it waits until some broker is ready > (awaitNodeReady function) which in return calls leastLoadedNode() on > NetworkClient. This one iterates over all brokers and checks if a request can > be sent to it using canSendRequest(). > This is the code for canSendRequest(): > > {code:java} > return connectionStates.isReady(node, now) && selector.isChannelReady(node) > && inFlightRequests.canSendMore(node) > {code} > > > using some debugging tools i saw this expression always evaluates to false > since the last part (canSendMore) is false. > > This is the code for canSendMore: > {code:java} > public boolean canSendMore(String node) { > Deque queue = requests.get(node); return queue > == null || queue.isEmpty() || (queue.peekFirst().send.completed() && > queue.size() < this.maxInFlightRequestsPerConnection); } > {code} > > > i verified > {code:java} > queue.peekFirst().send.completed() > {code} > is true, and that leads to the live lock - since requests queues are full for > all nodes a new request to check broker availability and reconnect to it > cannot be submitted. > -- This message was sent
[jira] [Comment Edited] (KAFKA-10114) Kafka producer stuck after broker crash
[ https://issues.apache.org/jira/browse/KAFKA-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235343#comment-17235343 ] Tim Fox edited comment on KAFKA-10114 at 11/19/20, 11:14 AM: - > We are still seeing this issue with version 2.6.0. Our app calls flush and it >hangs forever when brokers are down. [Revised my answer as it was previously based on a misunderstanding of the current code] Currently KafkaProducer.flush() will hang forever if there are pending batches and brokers have been lost and not restarted. Queued batches won't be timed out as there are no "ready" nodes and the timeout logic currently occurs after a ready node has been obtained. Expectation is for flush() to throw a TimeoutException if it does not complete successfully before delivery.timeout.ms [~ijuma] [~hachikuji] thoughts? was (Author: purplefox): > We are still seeing this issue with version 2.6.0. Our app calls flush and it >hangs forever when brokers are down. [Revised my answer as it was previously based on a misunderstanding of the current code] Currently KafkaProducer.flush() will hang forever if there are queued batches and brokers are lost and not restarted. Queued batches won't be timed out as there are no "ready" nodes and the timeout logic currently occurs after a ready node has been obtained. Expectation is for flush() to complete with a TimeoutException if it does not complete successfully before the timeout as specified in delivery.timeout.ms [~ijuma] [~hachikuji] thoughts? > Kafka producer stuck after broker crash > --- > > Key: KAFKA-10114 > URL: https://issues.apache.org/jira/browse/KAFKA-10114 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.3.1, 2.4.1 >Reporter: Itamar Benjamin >Priority: Critical > > Today two of our kafka brokers crashed (cluster of 3 brokers), and producers > were not able to send new messages. After brokers started again all producers > resumed sending data except for a single one. > at the beginning producer rejected all new messages with TimeoutException: > > {code:java} > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > incoming-mutable-RuntimeIIL-1:12 ms has passed since batch creation > {code} > > then after sometime exception changed to > > {code:java} > org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory > within the configured max blocking time 6 ms. > {code} > > > jstack shows kafka-producer-network-thread is waiting to get producer id: > > {code:java} > "kafka-producer-network-thread | producer-1" #767 daemon prio=5 os_prio=0 > cpu=63594017.16ms elapsed=1511219.38s tid=0x7fffd8353000 nid=0x4fa4 > sleeping [0x7ff55c177000] >java.lang.Thread.State: TIMED_WAITING (sleeping) > at java.lang.Thread.sleep(java.base@11.0.1/Native Method) > at org.apache.kafka.common.utils.Utils.sleep(Utils.java:296) > at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:41) > at > org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:565) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:306) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) > at java.lang.Thread.run(java.base@11.0.1/Thread.java:834) Locked > ownable synchronizers: > - None > {code} > > digging into maybeWaitForProducerId(), it waits until some broker is ready > (awaitNodeReady function) which in return calls leastLoadedNode() on > NetworkClient. This one iterates over all brokers and checks if a request can > be sent to it using canSendRequest(). > This is the code for canSendRequest(): > > {code:java} > return connectionStates.isReady(node, now) && selector.isChannelReady(node) > && inFlightRequests.canSendMore(node) > {code} > > > using some debugging tools i saw this expression always evaluates to false > since the last part (canSendMore) is false. > > This is the code for canSendMore: > {code:java} > public boolean canSendMore(String node) { > Deque queue = requests.get(node); return queue > == null || queue.isEmpty() || (queue.peekFirst().send.completed() && > queue.size() < this.maxInFlightRequestsPerConnection); } > {code} > > > i verified > {code:java} > queue.peekFirst().send.completed() > {code} > is true, and that leads to the live lock - since requests queues are full for > all nodes a new request to check broker availability and reconnect to it > cannot be submitted. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] bristy opened a new pull request #9621: KAFKA-9892: Producer state snapshot needs to be forced to disk
bristy opened a new pull request #9621: URL: https://github.com/apache/kafka/pull/9621 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* FileChannel.close() does not guarantee modified buffer would be written on the file system. We are changing it with force() semantics to enforce file buffer and metadata written to filesystem ( FileChannel.force(true) updates buffer and metadata). *Summary of testing strategy (including rationale) I have run unittests after making the changes. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10114) Kafka producer stuck after broker crash
[ https://issues.apache.org/jira/browse/KAFKA-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235343#comment-17235343 ] Tim Fox edited comment on KAFKA-10114 at 11/19/20, 11:53 AM: - > We are still seeing this issue with version 2.6.0. Our app calls flush and it >hangs forever when brokers are down. [Revised my answer as it was previously based on a misunderstanding of the current code] Currently KafkaProducer.flush() will hang forever if there are pending batches and brokers have been lost and not restarted. Queued batches won't be timed out as there are no "ready" nodes and the timeout logic currently occurs after a ready node has been obtained. Expectation is for flush() to throw a TimeoutException if it does not complete successfully before delivery.timeout.ms >From inspecting the code I am still unsure why record batches aren't being >expired properly. [~kwadhwa] if you could provide a thread dump when the >hanging occurs that will be help us diagnose the issue. was (Author: purplefox): > We are still seeing this issue with version 2.6.0. Our app calls flush and it >hangs forever when brokers are down. [Revised my answer as it was previously based on a misunderstanding of the current code] Currently KafkaProducer.flush() will hang forever if there are pending batches and brokers have been lost and not restarted. Queued batches won't be timed out as there are no "ready" nodes and the timeout logic currently occurs after a ready node has been obtained. Expectation is for flush() to throw a TimeoutException if it does not complete successfully before delivery.timeout.ms [~ijuma] [~hachikuji] thoughts? > Kafka producer stuck after broker crash > --- > > Key: KAFKA-10114 > URL: https://issues.apache.org/jira/browse/KAFKA-10114 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.3.1, 2.4.1 >Reporter: Itamar Benjamin >Priority: Critical > > Today two of our kafka brokers crashed (cluster of 3 brokers), and producers > were not able to send new messages. After brokers started again all producers > resumed sending data except for a single one. > at the beginning producer rejected all new messages with TimeoutException: > > {code:java} > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > incoming-mutable-RuntimeIIL-1:12 ms has passed since batch creation > {code} > > then after sometime exception changed to > > {code:java} > org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory > within the configured max blocking time 6 ms. > {code} > > > jstack shows kafka-producer-network-thread is waiting to get producer id: > > {code:java} > "kafka-producer-network-thread | producer-1" #767 daemon prio=5 os_prio=0 > cpu=63594017.16ms elapsed=1511219.38s tid=0x7fffd8353000 nid=0x4fa4 > sleeping [0x7ff55c177000] >java.lang.Thread.State: TIMED_WAITING (sleeping) > at java.lang.Thread.sleep(java.base@11.0.1/Native Method) > at org.apache.kafka.common.utils.Utils.sleep(Utils.java:296) > at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:41) > at > org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:565) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:306) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) > at java.lang.Thread.run(java.base@11.0.1/Thread.java:834) Locked > ownable synchronizers: > - None > {code} > > digging into maybeWaitForProducerId(), it waits until some broker is ready > (awaitNodeReady function) which in return calls leastLoadedNode() on > NetworkClient. This one iterates over all brokers and checks if a request can > be sent to it using canSendRequest(). > This is the code for canSendRequest(): > > {code:java} > return connectionStates.isReady(node, now) && selector.isChannelReady(node) > && inFlightRequests.canSendMore(node) > {code} > > > using some debugging tools i saw this expression always evaluates to false > since the last part (canSendMore) is false. > > This is the code for canSendMore: > {code:java} > public boolean canSendMore(String node) { > Deque queue = requests.get(node); return queue > == null || queue.isEmpty() || (queue.peekFirst().send.completed() && > queue.size() < this.maxInFlightRequestsPerConnection); } > {code} > > > i verified > {code:java} > queue.peekFirst().send.completed() > {code} > is true, and that leads to the live lock - since requests queues are full for > all nodes a new request to check broker availability and reconnect to it > cannot be submitted. > -- This message was sen
[GitHub] [kafka] dengziming opened a new pull request #9622: KAFKA-10547; add topicId in MetadataResp
dengziming opened a new pull request #9622: URL: https://github.com/apache/kafka/pull/9622 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* 1. Bump the version of MetadataReq and MetadataResp, add topicId in MetadataResp 2. Alter describeTopic in AdminClientTopicService and ZookeeperTopicService 3. TopicMetadata is cached in MetadataCache, so we need to add topicId to MetadataCache 4. MetadataCache is updated by UpdateMetadataRequest, bump the version of UpdateMetadataReq and UpdateMetadataResp, add topicId in UpdateMetadataReq. *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* Tested locally, here is some result: New server + new Client : kafka-topics.sh --describe --zookeeper localhost:2181 --topic old-version-topic Topic: old-version-topic TopicId: wRPl6VAlQeyE77bDxEESzg PartitionCount: 2 ReplicationFactor: 1Configs: Topic: old-version-topicPartition: 0Leader: 0 Replicas: 0 Isr: 0 Topic: old-version-topicPartition: 1Leader: 0 Replicas: 0 Isr: 0 kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic old-version-topic Topic: old-version-topic TopicId: wRPl6VAlQeyE77bDxEESzg PartitionCount: 2 ReplicationFactor: 1Configs: segment.bytes=1073741824 Topic: old-version-topicPartition: 0Leader: 0 Replicas: 0 Isr: 0 Topic: old-version-topicPartition: 1Leader: 0 Replicas: 0 Isr: 0 Old Server + new Client kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic old-version-topic Topic: old-version-topic TopicId:PartitionCount: 2 ReplicationFactor: 1Configs: segment.bytes=1073741824 Topic: old-version-topicPartition: 0Leader: 0 Replicas: 0 Isr: 0 Topic: old-version-topicPartition: 1Leader: 0 Replicas: 0 Isr: 0 New server + old client kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic old-version-topic Topic: old-version-topic PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824 Topic: old-version-topicPartition: 0Leader: 0 Replicas: 0 Isr: 0 Topic: old-version-topicPartition: 1Leader: 0 Replicas: 0 Isr: 0 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #9622: KAFKA-10547; add topicId in MetadataResp
dengziming commented on pull request #9622: URL: https://github.com/apache/kafka/pull/9622#issuecomment-730332596 @rajinisivaram @jolshan Hi, PTAL. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley opened a new pull request #9623: KAFKA-10692: Add delegation.token.secret.key, deprecate ...master.key
tombentley opened a new pull request #9623: URL: https://github.com/apache/kafka/pull/9623 Add delegation.token.secret.key broker config and deprecate delegation.token.master.key as described in [KIP-681](https://cwiki.apache.org/confluence/display/KAFKA/KIP-681%3A+Rename+master+key+in+delegation+token+feature) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9623: KAFKA-10692: Add delegation.token.secret.key, deprecate ...master.key
tombentley commented on pull request #9623: URL: https://github.com/apache/kafka/pull/9623#issuecomment-730336828 @omkreddy, @mimaison please could one of you review? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10114) Kafka producer stuck after broker crash
[ https://issues.apache.org/jira/browse/KAFKA-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235343#comment-17235343 ] Tim Fox edited comment on KAFKA-10114 at 11/19/20, 1:17 PM: > We are still seeing this issue with version 2.6.0. Our app calls flush and it >hangs forever when brokers are down. [Revised my answer as it was previously based on a misunderstanding of the current code] Currently KafkaProducer.flush() will hang forever if there are pending batches and brokers have been lost and not restarted. Queued batches won't be timed out as there are no "ready" nodes and the timeout logic currently occurs after a ready node has been obtained. Expectation is for flush() to throw a TimeoutException if it does not complete successfully before delivery.timeout.ms >From inspecting the code I am still unsure why record batches aren't being >expired properly. [~kwadhwa] if you could provide a thread dump when the >hanging occurs and enable trace logging that will be help us diagnose the >issue. was (Author: purplefox): > We are still seeing this issue with version 2.6.0. Our app calls flush and it >hangs forever when brokers are down. [Revised my answer as it was previously based on a misunderstanding of the current code] Currently KafkaProducer.flush() will hang forever if there are pending batches and brokers have been lost and not restarted. Queued batches won't be timed out as there are no "ready" nodes and the timeout logic currently occurs after a ready node has been obtained. Expectation is for flush() to throw a TimeoutException if it does not complete successfully before delivery.timeout.ms >From inspecting the code I am still unsure why record batches aren't being >expired properly. [~kwadhwa] if you could provide a thread dump when the >hanging occurs that will be help us diagnose the issue. > Kafka producer stuck after broker crash > --- > > Key: KAFKA-10114 > URL: https://issues.apache.org/jira/browse/KAFKA-10114 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.3.1, 2.4.1 >Reporter: Itamar Benjamin >Priority: Critical > > Today two of our kafka brokers crashed (cluster of 3 brokers), and producers > were not able to send new messages. After brokers started again all producers > resumed sending data except for a single one. > at the beginning producer rejected all new messages with TimeoutException: > > {code:java} > org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for > incoming-mutable-RuntimeIIL-1:12 ms has passed since batch creation > {code} > > then after sometime exception changed to > > {code:java} > org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory > within the configured max blocking time 6 ms. > {code} > > > jstack shows kafka-producer-network-thread is waiting to get producer id: > > {code:java} > "kafka-producer-network-thread | producer-1" #767 daemon prio=5 os_prio=0 > cpu=63594017.16ms elapsed=1511219.38s tid=0x7fffd8353000 nid=0x4fa4 > sleeping [0x7ff55c177000] >java.lang.Thread.State: TIMED_WAITING (sleeping) > at java.lang.Thread.sleep(java.base@11.0.1/Native Method) > at org.apache.kafka.common.utils.Utils.sleep(Utils.java:296) > at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:41) > at > org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:565) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:306) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) > at java.lang.Thread.run(java.base@11.0.1/Thread.java:834) Locked > ownable synchronizers: > - None > {code} > > digging into maybeWaitForProducerId(), it waits until some broker is ready > (awaitNodeReady function) which in return calls leastLoadedNode() on > NetworkClient. This one iterates over all brokers and checks if a request can > be sent to it using canSendRequest(). > This is the code for canSendRequest(): > > {code:java} > return connectionStates.isReady(node, now) && selector.isChannelReady(node) > && inFlightRequests.canSendMore(node) > {code} > > > using some debugging tools i saw this expression always evaluates to false > since the last part (canSendMore) is false. > > This is the code for canSendMore: > {code:java} > public boolean canSendMore(String node) { > Deque queue = requests.get(node); return queue > == null || queue.isEmpty() || (queue.peekFirst().send.completed() && > queue.size() < this.maxInFlightRequestsPerConnection); } > {code} > > > i verified > {code:java} > queue.peekFirst().send.completed() > {code} > is true, an
[GitHub] [kafka] ijuma commented on a change in pull request #9435: KAFKA-10606: Disable auto topic creation for fetch-all-topic-metadata request
ijuma commented on a change in pull request #9435: URL: https://github.com/apache/kafka/pull/9435#discussion_r526911200 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1333,7 +1343,17 @@ class KafkaApis(val requestChannel: RequestChannel, } } -val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata +val completeTopicMetadata = (if (metadataRequest.isAllTopics) { Review comment: This is a good point. We should pass a boolean to `getTopicMetadata` indicating that it's an "allTopics" request and have that method handle everything. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #9598: KAFKA-10701 : First line of detailed stats from consumer-perf-test.sh incorrect
mumrah commented on pull request #9598: URL: https://github.com/apache/kafka/pull/9598#issuecomment-730402458 @quanuw there are some statements that expect `joinStart` to be a timestamp in order to calculate durations. When it's zero, those durations become huge which leads to subsequent calculations being incorrect. You should be able to run the consumer perf test with `KAFKA_DEBUG=1` flag and attach a debugger from IntelliJ or Eclipse. This will allow more insight as to what's going on. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah merged pull request #9598: KAFKA-10701 : First line of detailed stats from consumer-perf-test.sh incorrect
mumrah merged pull request #9598: URL: https://github.com/apache/kafka/pull/9598 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9892) Producer state snapshot needs to be forced to disk
[ https://issues.apache.org/jira/browse/KAFKA-9892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235516#comment-17235516 ] Brajesh Kumar commented on KAFKA-9892: --- [~ijuma]Can you please help me with build on my CR ? > Producer state snapshot needs to be forced to disk > -- > > Key: KAFKA-9892 > URL: https://issues.apache.org/jira/browse/KAFKA-9892 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.6.0 >Reporter: Jun Rao >Assignee: Brajesh Kumar >Priority: Major > > Currently, ProducerStateManager.writeSnapshot() only calls > fileChannel.close(), but not explicitly fileChannel.force(). It seems force() > is not guaranteed to be called on close(). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads
cadonna commented on a change in pull request #9614: URL: https://github.com/apache/kafka/pull/9614#discussion_r526880412 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1070,7 +1070,9 @@ private Thread shutdownHelper(final boolean error) { adminClient.close(); streamsMetrics.removeAllClientLevelMetrics(); +streamsMetrics.removeAllClientLevelSensors(); Review comment: Could you please add a public method to `StreamsMetricsImpl` named `removeAllClientLevelSensorsAndMetrics()` that calls `removeAllClientLevelMetrics()` and `removeAllClientLevelSensors()` and make the latter two methods `private`? ## File path: streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java ## @@ -60,6 +65,7 @@ private ClientMetrics() {} "The description of the topology executed in the Kafka Streams client"; private static final String STATE_DESCRIPTION = "The state of the Kafka Streams client"; private static final String ALIVE_STREAM_THREADS_DESCRIPTION = "The current number of alive stream threads that are running or participating in rebalance"; +private static final String FAILED_STREAM_THREADS_DESCRIPTION = "The number of failed stream threads so far for a given Kafka Streams client"; Review comment: ```suggestion private static final String FAILED_STREAM_THREADS_DESCRIPTION = "The number of failed stream threads since the start of the Kafka Streams client"; ``` ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ## @@ -93,6 +93,7 @@ public int hashCode() { private final Version version; private final Deque clientLevelMetrics = new LinkedList<>(); +private final Map> clientLevelSensors = new HashMap<>(); Review comment: Here you should just need a queue as for `clientLevelMetrics`. We need a map for the other levels because there can be multiple objects for each level, e.g., there might be multiple stream thread and each one manages its sensors under a key in the map. However, there is only one client on client level. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ## @@ -253,6 +268,16 @@ public final void removeAllClientLevelMetrics() { } } +public final void removeAllClientLevelSensors() { Review comment: Unit tests for this method are missing. Please also consider my comment in class `KafkaStreams` for these unit tests. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -221,6 +221,9 @@ State setState(final State newState) { throw new StreamsException(logPrefix + "Unexpected state transition from " + oldState + " to " + newState); } else { log.info("State transition from {} to {}", oldState, newState); +if (newState == State.DEAD) { +failedStreamThreadSensor.record(); +} Review comment: Not every dead stream thread is a failed stream thread. You should record this metric where the uncaught exception handler is called because there we now that a stream thread died unexpectedly. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java ## @@ -377,7 +378,7 @@ private void shouldAddMetricsOnAllLevels(final String builtInMetricsVersion) thr builtInMetricsVersion ); checkCacheMetrics(builtInMetricsVersion); - +verifyFailedStreamThreadsSensor(0.0); Review comment: I would put the test whether the metric is recorded correctly in `StreamThreadTest`. An example for such a test is `shouldLogAndRecordSkippedRecordsForInvalidTimestamps()`. I do not think an integration test is needed. The test regarding the existence of the metric, i.e., `checkMetricByName(listMetricThread, FAILED_STREAM_THREADS, 1);` should stay here. ## File path: streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java ## @@ -99,6 +121,27 @@ public void shouldAddAliveStreamThreadsMetric() { ); } +@Test +public void shouldGetFailedStreamThreadsSensor() { +final String name = "failed-stream-threads"; +final String description = "The number of failed stream threads so far for a given Kafka Streams client"; +expect(streamsMetrics.clientLevelSensor(name, RecordingLevel.INFO)).andReturn(expectedSensor); +expect(streamsMetrics.clientLevelTagMap()).andReturn(tagMap); +StreamsMetricsImpl.addSumMetricToSensor( +expectedSensor, +CLIENT_LEVEL_GROUP, +tagMap, +name, +false, +description +
[GitHub] [kafka] mimaison merged pull request #9545: [mm2] Allow Checkpoints for consumers using static partition assignments
mimaison merged pull request #9545: URL: https://github.com/apache/kafka/pull/9545 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
dajac commented on pull request #9386: URL: https://github.com/apache/kafka/pull/9386#issuecomment-730447091 Builds are green. Merging to trunk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ryannedolan commented on pull request #9545: [mm2] Allow Checkpoints for consumers using static partition assignments
ryannedolan commented on pull request #9545: URL: https://github.com/apache/kafka/pull/9545#issuecomment-730448017 Sorry, late to the convo, but lgtm, thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #9623: KAFKA-10692: Add delegation.token.secret.key, deprecate ...master.key
mimaison merged pull request #9623: URL: https://github.com/apache/kafka/pull/9623 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10692) Rename broker master key config for KIP 681
[ https://issues.apache.org/jira/browse/KAFKA-10692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-10692. Fix Version/s: 2.8.0 Resolution: Fixed > Rename broker master key config for KIP 681 > --- > > Key: KAFKA-10692 > URL: https://issues.apache.org/jira/browse/KAFKA-10692 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Tom Bentley >Assignee: Tom Bentley >Priority: Major > Fix For: 2.8.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac merged pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
dajac merged pull request #9386: URL: https://github.com/apache/kafka/pull/9386 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10024) Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
[ https://issues.apache.org/jira/browse/KAFKA-10024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-10024. - Fix Version/s: 2.8.0 Resolution: Fixed > Add dynamic configuration and enforce quota for per-IP connection rate limits > (KIP-612, part 2) > --- > > Key: KAFKA-10024 > URL: https://issues.apache.org/jira/browse/KAFKA-10024 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Anna Povzner >Assignee: David Mao >Priority: Major > Labels: features > Fix For: 2.8.0 > > > This JIRA is for the second part of KIP-612 – Add per-IP connection creation > rate limits. > As described here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10746) Consumer poll timeout Expiration should be logged as WARNING not INFO.
Benedikt Linse created KAFKA-10746: -- Summary: Consumer poll timeout Expiration should be logged as WARNING not INFO. Key: KAFKA-10746 URL: https://issues.apache.org/jira/browse/KAFKA-10746 Project: Kafka Issue Type: Improvement Components: clients Affects Versions: 2.5.1, 2.6.0, 2.5.0 Reporter: Benedikt Linse When a consumer does not poll regularly, and the `max.poll.interval.ms` threshold is reached, the consumer leaves the consumer group, and the reason is logged as an INFO message: [https://github.com/a0x8o/kafka/blob/e032a4ad9bac2392b4406d0a3b245d6011edd15b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1356] [https://github.com/a0x8o/kafka/blob/e032a4ad9bac2392b4406d0a3b245d6011edd15b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1016] Most Kafka users ignore INFO messages or have the log level set to WARN. Still many users run into this issue, since their applications take too long to process the polled records, and then the consumer fails to commit the offsets, which leads to duplicate message processing. Not seeing the error message in the first place means that users loose a lot of time debugging and searching for the reason for duplicate message processing. Therefore it seems like the log level of this message should be increased to WARN. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10747) Implement ClientQuota APIs for altering and describing IP entity quotas
David Mao created KAFKA-10747: - Summary: Implement ClientQuota APIs for altering and describing IP entity quotas Key: KAFKA-10747 URL: https://issues.apache.org/jira/browse/KAFKA-10747 Project: Kafka Issue Type: Improvement Affects Versions: 2.8.0 Reporter: David Mao Assignee: David Mao -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10748) Add IP connection rate throttling metric
David Mao created KAFKA-10748: - Summary: Add IP connection rate throttling metric Key: KAFKA-10748 URL: https://issues.apache.org/jira/browse/KAFKA-10748 Project: Kafka Issue Type: Improvement Affects Versions: 2.8.0 Reporter: David Mao Assignee: David Mao -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on pull request #7498: KAFKA-9023: Log request destination when the Producer gets disconnected
dajac commented on pull request #7498: URL: https://github.com/apache/kafka/pull/7498#issuecomment-730458347 Failed tests are not related. Merging to trunk. * Build / JDK 11 / org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true] * Build / JDK 11 / org.apache.kafka.streams.integration.StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions[Optimization = none] This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10746) Consumer poll timeout Expiration should be logged as WARNING not INFO.
[ https://issues.apache.org/jira/browse/KAFKA-10746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Benedikt Linse updated KAFKA-10746: --- Description: When a consumer does not poll regularly, and the `max.poll.interval.ms` threshold is reached, the consumer leaves the consumer group, and the reason is logged as an INFO message: [https://github.com/a0x8o/kafka/blob/e032a4ad9bac2392b4406d0a3b245d6011edd15b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1356] [https://github.com/a0x8o/kafka/blob/e032a4ad9bac2392b4406d0a3b245d6011edd15b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1016] Most Kafka users ignore INFO messages or have the log level set to WARN. Still many users run into this issue, since their applications take too long to process the polled records, and then the consumer fails to commit the offsets, which leads to duplicate message processing. Not seeing the error message in the first place means that users lose a lot of time debugging and searching for the reason for duplicate message processing. Therefore it seems like the log level of this message should be increased to WARN. was: When a consumer does not poll regularly, and the `max.poll.interval.ms` threshold is reached, the consumer leaves the consumer group, and the reason is logged as an INFO message: [https://github.com/a0x8o/kafka/blob/e032a4ad9bac2392b4406d0a3b245d6011edd15b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1356] [https://github.com/a0x8o/kafka/blob/e032a4ad9bac2392b4406d0a3b245d6011edd15b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1016] Most Kafka users ignore INFO messages or have the log level set to WARN. Still many users run into this issue, since their applications take too long to process the polled records, and then the consumer fails to commit the offsets, which leads to duplicate message processing. Not seeing the error message in the first place means that users loose a lot of time debugging and searching for the reason for duplicate message processing. Therefore it seems like the log level of this message should be increased to WARN. > Consumer poll timeout Expiration should be logged as WARNING not INFO. > --- > > Key: KAFKA-10746 > URL: https://issues.apache.org/jira/browse/KAFKA-10746 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.5.0, 2.6.0, 2.5.1 >Reporter: Benedikt Linse >Priority: Minor > > When a consumer does not poll regularly, and the `max.poll.interval.ms` > threshold is reached, the consumer leaves the consumer group, and the reason > is logged as an INFO message: > [https://github.com/a0x8o/kafka/blob/e032a4ad9bac2392b4406d0a3b245d6011edd15b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1356] > [https://github.com/a0x8o/kafka/blob/e032a4ad9bac2392b4406d0a3b245d6011edd15b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1016] > Most Kafka users ignore INFO messages or have the log level set to WARN. > Still many users run into this issue, since their applications take too long > to process the polled records, and then the consumer fails to commit the > offsets, which leads to duplicate message processing. Not seeing the error > message in the first place means that users lose a lot of time debugging and > searching for the reason for duplicate message processing. > Therefore it seems like the log level of this message should be increased to > WARN. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac merged pull request #7498: KAFKA-9023: Log request destination when the Producer gets disconnected
dajac merged pull request #7498: URL: https://github.com/apache/kafka/pull/7498 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9023) Producer NETWORK_EXCEPTION response should log more information
[ https://issues.apache.org/jira/browse/KAFKA-9023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-9023. Fix Version/s: 2.8.0 Resolution: Fixed > Producer NETWORK_EXCEPTION response should log more information > --- > > Key: KAFKA-9023 > URL: https://issues.apache.org/jira/browse/KAFKA-9023 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > Fix For: 2.8.0 > > > When diagnosing network issues, it is useful to have a clear picture of which > client disconnected from which broker at what time. > Currently, when the producer receives a NETWORK_EXCEPTION in its responses, > it logs the following: > {code:java} > [Producer clientId=] Received invalid metadata error in produce > request on partition due to > org.apache.kafka.common.errors.NetworkException: The server disconnected > before a response was received.. Going to request metadata update now {code} > It would be good if we logged additional information regarding the > broker/host whose connection was disconnected -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on pull request #9224: KAFKA-10304: refactor MM2 integration tests
mimaison commented on pull request #9224: URL: https://github.com/apache/kafka/pull/9224#issuecomment-730467364 > If we have to control the complexity, I would prefer to drop testWithBrokerRestart and keep MirrorConnectorsIntegrationSSLTest, as it makes sense to run simple validation in SSL setup. I think that would be a good idea. We can try to introduce this test in a follow up PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10747) Implement ClientQuota APIs for altering and describing IP entity quotas
[ https://issues.apache.org/jira/browse/KAFKA-10747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Mao updated KAFKA-10747: -- Component/s: core config > Implement ClientQuota APIs for altering and describing IP entity quotas > > > Key: KAFKA-10747 > URL: https://issues.apache.org/jira/browse/KAFKA-10747 > Project: Kafka > Issue Type: Improvement > Components: config, core >Affects Versions: 2.8.0 >Reporter: David Mao >Assignee: David Mao >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10748) Add IP connection rate throttling metric
[ https://issues.apache.org/jira/browse/KAFKA-10748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Mao updated KAFKA-10748: -- Component/s: network core > Add IP connection rate throttling metric > > > Key: KAFKA-10748 > URL: https://issues.apache.org/jira/browse/KAFKA-10748 > Project: Kafka > Issue Type: Improvement > Components: core, network >Affects Versions: 2.8.0 >Reporter: David Mao >Assignee: David Mao >Priority: Minor > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10749) Add throttling of IPs by connection rate
David Mao created KAFKA-10749: - Summary: Add throttling of IPs by connection rate Key: KAFKA-10749 URL: https://issues.apache.org/jira/browse/KAFKA-10749 Project: Kafka Issue Type: New Feature Components: core, network Reporter: David Mao Assignee: David Mao Fix For: 2.8.0 This tracks the completion of IP connection rate throttling as detailed in [https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10747) Implement ClientQuota APIs for altering and describing IP entity quotas
[ https://issues.apache.org/jira/browse/KAFKA-10747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Mao updated KAFKA-10747: -- Parent: KAFKA-10749 Issue Type: Sub-task (was: Improvement) > Implement ClientQuota APIs for altering and describing IP entity quotas > > > Key: KAFKA-10747 > URL: https://issues.apache.org/jira/browse/KAFKA-10747 > Project: Kafka > Issue Type: Sub-task > Components: config, core >Affects Versions: 2.8.0 >Reporter: David Mao >Assignee: David Mao >Priority: Major > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10748) Add IP connection rate throttling metric
[ https://issues.apache.org/jira/browse/KAFKA-10748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Mao updated KAFKA-10748: -- Parent: KAFKA-10749 Issue Type: Sub-task (was: Improvement) > Add IP connection rate throttling metric > > > Key: KAFKA-10748 > URL: https://issues.apache.org/jira/browse/KAFKA-10748 > Project: Kafka > Issue Type: Sub-task > Components: core, network >Affects Versions: 2.8.0 >Reporter: David Mao >Assignee: David Mao >Priority: Minor > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10024) Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)
[ https://issues.apache.org/jira/browse/KAFKA-10024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Mao updated KAFKA-10024: -- Parent: KAFKA-10749 Issue Type: Sub-task (was: Improvement) > Add dynamic configuration and enforce quota for per-IP connection rate limits > (KIP-612, part 2) > --- > > Key: KAFKA-10024 > URL: https://issues.apache.org/jira/browse/KAFKA-10024 > Project: Kafka > Issue Type: Sub-task > Components: core >Reporter: Anna Povzner >Assignee: David Mao >Priority: Major > Labels: features > Fix For: 2.8.0 > > > This JIRA is for the second part of KIP-612 – Add per-IP connection creation > rate limits. > As described here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on a change in pull request #9224: KAFKA-10304: refactor MM2 integration tests
mimaison commented on a change in pull request #9224: URL: https://github.com/apache/kafka/pull/9224#discussion_r527005533 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationSSLTest.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.network.Mode; +import org.apache.kafka.connect.mirror.MirrorCheckpointConnector; +import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector; +import org.apache.kafka.connect.mirror.MirrorMakerConfig; +import org.apache.kafka.connect.mirror.MirrorSourceConnector; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestSslUtils; +import org.apache.kafka.test.TestUtils; +import kafka.server.KafkaConfig$; + +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.After; +import org.junit.Test; + +/** + * Tests MM2 replication with SSL enabled at backup kafka cluster + */ +@Category(IntegrationTest.class) +public class MirrorConnectorsIntegrationSSLTest extends MirrorConnectorsIntegrationBaseTest { + +private static final Logger log = LoggerFactory.getLogger(MirrorConnectorsIntegrationSSLTest.class); + +private static final List CONNECTOR_LIST = +Arrays.asList(MirrorSourceConnector.class, MirrorCheckpointConnector.class, MirrorHeartbeatConnector.class); + +@Before +public void setup() throws InterruptedException { +try { +Map sslConfig = TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), "testCert"); +backupBrokerProps.put(KafkaConfig$.MODULE$.ListenersProp(), "SSL://localhost:0"); + backupBrokerProps.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), "SSL"); +backupBrokerProps.putAll(sslConfig); +} catch (final Exception e) { +throw new RuntimeException(e); +} +startClusters(); +} + +@After +public void close() { Review comment: Yes it will This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9433: KAFKA-10607: Consistent behaviour for response errorCounts()
tombentley commented on pull request #9433: URL: https://github.com/apache/kafka/pull/9433#issuecomment-730476432 @abbccdda I think we're just waiting for your review here. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9433: KAFKA-10607: Consistent behaviour for response errorCounts()
chia7712 commented on pull request #9433: URL: https://github.com/apache/kafka/pull/9433#issuecomment-730478145 @tombentley Could you trigger QA again? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9433: KAFKA-10607: Consistent behaviour for response errorCounts()
tombentley commented on pull request #9433: URL: https://github.com/apache/kafka/pull/9433#issuecomment-730479015 @chia7712 I thought only committers can do that. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9433: KAFKA-10607: Consistent behaviour for response errorCounts()
chia7712 commented on pull request #9433: URL: https://github.com/apache/kafka/pull/9433#issuecomment-730480556 IIRC, the retest command does not work currently. Could you rebase code to trigger QA? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
jolshan commented on a change in pull request #9590: URL: https://github.com/apache/kafka/pull/9590#discussion_r527020272 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -1749,6 +1752,9 @@ class Log(@volatile private var _dir: File, checkIfMemoryMappedBufferClosed() // remove the segments for lookups removeAndDeleteSegments(deletable, asyncDelete = true, reason) + if (reason == LogCompaction) { + maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset, SegmentCompaction) + } maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset, SegmentDeletion) Review comment: Sorry if this is a little confusing. `reason` is for deletion and the other parameter `SegmentCompaction` is the reason for changing the offset. Currently that is used to handle the issue for updating past the high watermark. I agree that it is not the cleanest way and we should figure out what we want to do with compacting past the high watermark. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
jolshan commented on pull request #9590: URL: https://github.com/apache/kafka/pull/9590#issuecomment-730487839 @chia7712 The code path you linked is the code path beginningOffsets uses. Are you suggesting removing `ListOffsetRequest.EARLIEST_TIMESTAMP` and replacing it with 0? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
chia7712 commented on pull request #9590: URL: https://github.com/apache/kafka/pull/9590#issuecomment-730491021 > Are you suggesting removing ListOffsetRequest.EARLIEST_TIMESTAMP and replacing it with 0? yep, it seems like a simple solution without much changes. However, I have not understood this issue totally. It is just my imagination. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on pull request #9615: URL: https://github.com/apache/kafka/pull/9615#issuecomment-730500854 @cadonna @mjsax @vvcephei Part 3 of KIP-663 add threads is ready for review This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets
jolshan commented on pull request #9590: URL: https://github.com/apache/kafka/pull/9590#issuecomment-730503145 @chia7712 I did think about this solution initially. I'm wondering if we do want to update the segment offsets and logStartOffsets correctly though. If that isn't as important then maybe we can go with the simpler solution. This solution removes empty segments and keeps the baseOffsets updated. If these baseOffsets are used in other places, then maybe this solution is better. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10549) Add topic ID support to DeleteTopics,ListOffsets, OffsetForLeaders, StopReplica
[ https://issues.apache.org/jira/browse/KAFKA-10549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan reassigned KAFKA-10549: -- Assignee: Justine Olshan > Add topic ID support to DeleteTopics,ListOffsets, OffsetForLeaders, > StopReplica > --- > > Key: KAFKA-10549 > URL: https://issues.apache.org/jira/browse/KAFKA-10549 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > ListOffsets, OffsetForLeaders, and StopReplica protocols will replace topic > name with topic ID and will be used to prevent reads from deleted topics > Delete topics will be changed to support topic ids and delete sooner. > This may be split into two or more issues if necessary. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.
hachikuji commented on a change in pull request #9601: URL: https://github.com/apache/kafka/pull/9601#discussion_r527045689 ## File path: core/src/main/scala/kafka/api/ApiVersion.scala ## @@ -108,7 +108,9 @@ object ApiVersion { // Bup Fetch protocol for Raft protocol (KIP-595) KAFKA_2_7_IV1, // Introduced AlterIsr (KIP-497) -KAFKA_2_7_IV2 +KAFKA_2_7_IV2, +// Flexible versioning on ListOffsets Review comment: For what it's worth, `WriteTxnMarkers` and `OffsetsForLeaderEpoch` are also inter-broker APIs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda opened a new pull request #9624: KAFKA-10655: wrap and catch exception for appendAsLeader failure
abbccdda opened a new pull request #9624: URL: https://github.com/apache/kafka/pull/9624 When leader append fails, we should trigger the resign process and do a graceful shutdown afterwards. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9433: KAFKA-10607: Consistent behaviour for response errorCounts()
tombentley commented on pull request #9433: URL: https://github.com/apache/kafka/pull/9433#issuecomment-730508481 @chia7712 done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10548) Implement deletion logic for LeaderAndIsrRequests
[ https://issues.apache.org/jira/browse/KAFKA-10548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan reassigned KAFKA-10548: -- Assignee: Justine Olshan > Implement deletion logic for LeaderAndIsrRequests > - > > Key: KAFKA-10548 > URL: https://issues.apache.org/jira/browse/KAFKA-10548 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > This will allow for specialized deletion logic when receiving > LeaderAndIsrRequests > Will also create and utilize delete.stale.topic.delay.ms configuration option -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #9433: KAFKA-10607: Consistent behaviour for response errorCounts()
chia7712 commented on pull request #9433: URL: https://github.com/apache/kafka/pull/9433#issuecomment-730516709 @tombentley thanks! I will merge this PR tomorrow if no objection :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10580) Add topic ID support to Fetch request
[ https://issues.apache.org/jira/browse/KAFKA-10580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan reassigned KAFKA-10580: -- Assignee: Justine Olshan > Add topic ID support to Fetch request > - > > Key: KAFKA-10580 > URL: https://issues.apache.org/jira/browse/KAFKA-10580 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Major > > Prevent fetching a stale topic with topic IDs -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10750) test failure scenarios of MirrorMaker 2
Ning Zhang created KAFKA-10750: -- Summary: test failure scenarios of MirrorMaker 2 Key: KAFKA-10750 URL: https://issues.apache.org/jira/browse/KAFKA-10750 Project: Kafka Issue Type: Improvement Components: mirrormaker Affects Versions: 2.7.0 Reporter: Ning Zhang Assignee: Ning Zhang As a follow up of https://issues.apache.org/jira/browse/KAFKA-10304, it may be necessary to test the failure scenarios, e.g. Kafka broker stop then start To make PR [https://github.com/apache/kafka/pull/9224] smaller, we chopped down the testing code for failure scenarios, and plan to add them back in this ticket. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #9619: MINOR: Reduce sends created by `SendBuilder`
hachikuji commented on a change in pull request #9619: URL: https://github.com/apache/kafka/pull/9619#discussion_r527073144 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/SendBuilder.java ## @@ -122,6 +123,14 @@ public void writeVarlong(long i) { ByteUtils.writeVarlong(i, buffer); } +private void flushPendingSend() { +if (!buffers.isEmpty()) { +ByteBuffer[] byteBufferArray = buffers.toArray(new ByteBuffer[0]); +sends.add(new ByteBufferSend(destinationId, byteBufferArray)); Review comment: I was somewhat inclined to leave this as is, but finally I decided to do it since it might help (even if only little) with fetch overhead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #9539: KAFKA-10634: Adding LeaderId to Voters list in LeaderChangeMessage
vamossagar12 commented on a change in pull request #9539: URL: https://github.com/apache/kafka/pull/9539#discussion_r527073514 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java ## @@ -709,6 +709,9 @@ static void verifyLeaderChangeMessage( assertEquals(leaderId, leaderChangeMessage.leaderId()); assertEquals(voters.stream().map(voterId -> new Voter().setVoterId(voterId)).collect(Collectors.toList()), leaderChangeMessage.voters()); +assertEquals(voters.stream().map(voterId -> new Voter().setVoterId(voterId)).collect(Collectors.toSet()), Review comment: @hachikuji , I have added a test case where i initialize a quorum of 3 and got votes from majority. Needed to tweak a couple of methods as some of them were having hardcoded values of 1 on the expected value. Plz review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9619: MINOR: Reduce sends created by `SendBuilder`
hachikuji commented on a change in pull request #9619: URL: https://github.com/apache/kafka/pull/9619#discussion_r527073144 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/SendBuilder.java ## @@ -122,6 +123,14 @@ public void writeVarlong(long i) { ByteUtils.writeVarlong(i, buffer); } +private void flushPendingSend() { +if (!buffers.isEmpty()) { +ByteBuffer[] byteBufferArray = buffers.toArray(new ByteBuffer[0]); +sends.add(new ByteBufferSend(destinationId, byteBufferArray)); Review comment: I was somewhat inclined to leave this as is, but finally I decided to do it since it might help (even if only a little) with fetch overhead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #9224: KAFKA-10304: refactor MM2 integration tests
mimaison commented on a change in pull request #9224: URL: https://github.com/apache/kafka/pull/9224#discussion_r527014720 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java ## @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.mirror.integration; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.Config; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.DescribeConfigsResult; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.connect.connector.Connector; +import org.apache.kafka.connect.mirror.MirrorMakerConfig; +import org.apache.kafka.connect.mirror.SourceAndTarget; +import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; +import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster; +import static org.apache.kafka.connect.mirror.TestUtils.expectedRecords; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import static org.apache.kafka.test.TestUtils.waitForCondition; +import org.apache.kafka.test.IntegrationTest; +import kafka.server.KafkaConfig$; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import static org.junit.Assert.assertFalse; +import org.junit.experimental.categories.Category; + +/** + * Common Test functions for MM2 integration tests + */ +@Category(IntegrationTest.class) +public class MirrorConnectorsIntegrationBaseTest { +private static final Logger log = LoggerFactory.getLogger(MirrorConnectorsIntegrationBaseTest.class); + +protected static final int NUM_RECORDS_PER_PARTITION = 10; +public static final int NUM_PARTITIONS = 10; +protected static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION; +protected static final int RECORD_TRANSFER_DURATION_MS = 30_000; +protected static final int CHECKPOINT_DURATION_MS = 20_000; +protected static final int RECORD_CONSUME_DURATION_MS = 20_000; +protected static final int OFFSET_SYNC_DURATION_MS = 30_000; +protected static final int NUM_WORKERS = 3; +protected static final int CONSUMER_POLL_TIMEOUT_MS = 500; +protected static final int BROKER_RESTART_TIMEOUT_MS = 10_000; +protected static final String PRIMARY_CLUSTER_ALIAS = "primary"; +protected static final String BACKUP_CLUSTER_ALIAS = "backup"; + +protected Map mm2Props; +protected MirrorMakerConfig mm2Config; +protected EmbeddedConnectCluster primary; +protected EmbeddedConnectCluster backup; + +private final AtomicBoolean exited = new AtomicBoolean(false); +private Properties primaryBrokerProps = new Properties(); +protected Properties backupBrokerProps = new Properties(); +private Map primaryWorkerProps = new HashMap<>(); +private Map backupWorkerProps = new HashMap<>(); +private Properties sslProps = new Properties(); + +private void loadSslPropsFromBrokerConfig() { Review comment: Why do we have SSL specific methods here? Could we move all the SSL bits into the SSL class? We have fields for the configurations. So we could set them a
[GitHub] [kafka] cadonna commented on a change in pull request #9615: KAFKA-10500: Add thread option
cadonna commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r527055689 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +synchronized (stateLock) { +if (state == State.RUNNING || state == State.REBALANCING) { +final int threadIdx = threads.size() + 1; +final long cacheSizePerThread = getCacheSizePerThread(threadIdx); Review comment: This should be: ```suggestion final long cacheSizePerThread = getCacheSizePerThread(threads.size() + 1); ``` ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +synchronized (stateLock) { +if (state == State.RUNNING || state == State.REBALANCING) { +final int threadIdx = threads.size() + 1; Review comment: Assume the following thread list [t2, t3, t4], `threadIdx` would be 4, which is already there. You should keep the currently used `threadIdx`s and check those to decide on the next `threadIdx`. ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +synchronized (stateLock) { +if (state == State.RUNNING || state == State.REBALANCING)
[GitHub] [kafka] hachikuji commented on a change in pull request #9617: MINOR: Factor out common response parsing logic
hachikuji commented on a change in pull request #9617: URL: https://github.com/apache/kafka/pull/9617#discussion_r527095050 ## File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ## @@ -974,21 +969,6 @@ private void handleInitiateApiVersionRequests(long now) { } } -/** - * Validate that the response corresponds to the request we expect or else explode - */ -private static void correlate(RequestHeader requestHeader, ResponseHeader responseHeader) { Review comment: Why do you want to keep it if it is not used? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9617: MINOR: Factor out common response parsing logic
hachikuji commented on a change in pull request #9617: URL: https://github.com/apache/kafka/pull/9617#discussion_r527095539 ## File path: clients/src/main/java/org/apache/kafka/common/requests/CorrelationIdMismatchException.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +/** + * Raised if the correlationId in a response header does not match + * the expected value from the request header. + */ +public class CorrelationIdMismatchException extends IllegalStateException { +private final int requestCorrelationId; +private final int responseCorrelationId; + +public CorrelationIdMismatchException( +String message, +int requestCorrelationId, +int responseCorrelationId +) { +super(message); +this.requestCorrelationId = requestCorrelationId; +this.responseCorrelationId = responseCorrelationId; +} + +public int requestCorrelationId() { +return requestCorrelationId; Review comment: Yeah, I added it for completeness. It is a little odd if the mismatch exception only tells you what the response correlationId was. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…
junrao commented on a change in pull request #8826: URL: https://github.com/apache/kafka/pull/8826#discussion_r527096607 ## File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ## @@ -366,8 +379,9 @@ public void testMetadataFetch() throws InterruptedException { // Return empty cluster 4 times and cluster from then on when(metadata.fetch()).thenReturn(emptyCluster, emptyCluster, emptyCluster, emptyCluster, onePartitionCluster); -KafkaProducer producer = new KafkaProducer(configs, new StringSerializer(), -new StringSerializer(), metadata, new MockClient(Time.SYSTEM, metadata), null, Time.SYSTEM) { +KafkaProducer producer = new KafkaProducer( Review comment: Should we use the private static constructor in this class? Ditto below. ## File path: clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java ## @@ -1290,6 +1308,23 @@ public void serializerShouldSeeGeneratedClientId() { producer.close(); } +@Test +public void testUnusedConfigs() { +Map props = new HashMap<>(); +props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:"); +props.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLS"); +ProducerConfig config = new ProducerConfig(ProducerConfig.appendSerializerToConfig(props, +new StringSerializer(), new StringSerializer())); + +assertTrue(new ProducerConfig(config.originals(), false).unused().contains(SslConfigs.SSL_PROTOCOL_CONFIG)); +assertTrue(config.unused().contains(SslConfigs.SSL_PROTOCOL_CONFIG)); + +try (KafkaProducer producer = new KafkaProducer<>(config, null, null, Review comment: producer is unused. ## File path: clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java ## @@ -105,7 +105,9 @@ public AbstractConfig(ConfigDef definition, Map originals, Map throw new ConfigException(entry.getKey().toString(), entry.getValue(), "Key must be a string."); this.originals = resolveConfigVariables(configProviderProps, (Map) originals); -this.values = definition.parse(this.originals); +// pass a copy to definition.parse. Otherwise, the definition.parse adds all keys of definitions to "used" group +// since definition.parse needs to call "RecordingMap#get" when checking all definitions. +this.values = definition.parse(new HashMap<>(this.originals)); Review comment: Hmm, why is this necessary since we reset used to empty in the next line? ## File path: clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java ## @@ -159,24 +159,25 @@ private static ChannelBuilder create(SecurityProtocol securityProtocol, } // Visibility for testing -protected static Map channelBuilderConfigs(final AbstractConfig config, final ListenerName listenerName) { -Map parsedConfigs; +@SuppressWarnings("unchecked") +static Map channelBuilderConfigs(final AbstractConfig config, final ListenerName listenerName) { +Map parsedConfigs; if (listenerName == null) -parsedConfigs = config.values(); +parsedConfigs = (Map) config.values(); else parsedConfigs = config.valuesWithPrefixOverride(listenerName.configPrefix()); -// include any custom configs from original configs -Map configs = new HashMap<>(parsedConfigs); config.originals().entrySet().stream() .filter(e -> !parsedConfigs.containsKey(e.getKey())) // exclude already parsed configs // exclude already parsed listener prefix configs .filter(e -> !(listenerName != null && e.getKey().startsWith(listenerName.configPrefix()) && parsedConfigs.containsKey(e.getKey().substring(listenerName.configPrefix().length() // exclude keys like `{mechanism}.some.prop` if "listener.name." prefix is present and key `some.prop` exists in parsed configs. .filter(e -> !(listenerName != null && parsedConfigs.containsKey(e.getKey().substring(e.getKey().indexOf('.') + 1 -.forEach(e -> configs.put(e.getKey(), e.getValue())); -return configs; +.forEach(e -> parsedConfigs.put(e.getKey(), e.getValue())); +// The callers may add new elements to return map so we should not wrap it to a immutable map. Otherwise, +// the callers have to create a new map to carry more elements and then following Get ops are not recorded. Review comment: This comment is still not very clear to me. Are you saying if the caller needs to add more elements, it needs to create a new RecordingMap for the additional elements to be recorded? ## File path: clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java ## @@ -79,14 +79,29 @@
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r527099742 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +synchronized (stateLock) { +if (state == State.RUNNING || state == State.REBALANCING) { +final int threadIdx = threads.size() + 1; Review comment: Looks like I didn't understand threadIdx. that makes sense now ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +synchronized (stateLock) { +if (state == State.RUNNING || state == State.REBALANCING) { +final int threadIdx = threads.size() + 1; +final long cacheSizePerThread = getCacheSizePerThread(threadIdx); +resizeThreadCache(threadIdx); +final StreamThread streamThread = StreamThread.create( +internalTopologyBuilder, +config, +clientSupplier, +adminClient, +processId, +clientId, +streamsMetrics, +time, +streamsMetadataState, +cacheSizePerThread, +stateDirectory, +delegatingStateRestoreListener, +threadIdx, +KafkaStreams.this::closeToError, +streamsUncaughtExceptionHandler +); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +streamThread.setStateListener(streamStateListener); +return Optional.of(streamThread.getName()); +} else { +return Optional.empty(); +} +} +} Review comment: right before the positive return This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r527099779 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +synchronized (stateLock) { +if (state == State.RUNNING || state == State.REBALANCING) { +final int threadIdx = threads.size() + 1; +final long cacheSizePerThread = getCacheSizePerThread(threadIdx); +resizeThreadCache(threadIdx); +final StreamThread streamThread = StreamThread.create( +internalTopologyBuilder, +config, +clientSupplier, +adminClient, +processId, +clientId, +streamsMetrics, +time, +streamsMetadataState, +cacheSizePerThread, +stateDirectory, +delegatingStateRestoreListener, +threadIdx, +KafkaStreams.this::closeToError, +streamsUncaughtExceptionHandler +); +threads.add(streamThread); +threadState.put(streamThread.getId(), streamThread.state()); +storeProviders.add(new StreamThreadStateStoreProvider(streamThread)); +streamThread.setStateListener(streamStateListener); +return Optional.of(streamThread.getName()); +} else { +return Optional.empty(); +} +} +} Review comment: right before the positive return :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9617: MINOR: Factor out common response parsing logic
hachikuji commented on a change in pull request #9617: URL: https://github.com/apache/kafka/pull/9617#discussion_r527101086 ## File path: clients/src/main/java/org/apache/kafka/common/requests/CorrelationIdMismatchException.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +/** + * Raised if the correlationId in a response header does not match + * the expected value from the request header. + */ +public class CorrelationIdMismatchException extends IllegalStateException { Review comment: Yeah, I was debating the location. I decided to put it in `common/requests` since it is not a public package and the error is specific to the request protocol. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #9621: KAFKA-9892: Producer state snapshot needs to be forced to disk
junrao commented on a change in pull request #9621: URL: https://github.com/apache/kafka/pull/9621#discussion_r527105252 ## File path: core/src/main/scala/kafka/log/ProducerStateManager.scala ## @@ -437,7 +437,7 @@ object ProducerStateManager { val fileChannel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE) try fileChannel.write(buffer) -finally fileChannel.close() +finally fileChannel.force(true) Review comment: I think we need to do force and then close. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads
lct45 commented on a change in pull request #9614: URL: https://github.com/apache/kafka/pull/9614#discussion_r527117685 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -221,6 +221,9 @@ State setState(final State newState) { throw new StreamsException(logPrefix + "Unexpected state transition from " + oldState + " to " + newState); } else { log.info("State transition from {} to {}", oldState, newState); +if (newState == State.DEAD) { +failedStreamThreadSensor.record(); +} Review comment: Would that just be in `run()` of the GlobalStreamThread then? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao merged pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling
junrao merged pull request #9596: URL: https://github.com/apache/kafka/pull/9596 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads
cadonna commented on a change in pull request #9614: URL: https://github.com/apache/kafka/pull/9614#discussion_r527122772 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -221,6 +221,9 @@ State setState(final State newState) { throw new StreamsException(logPrefix + "Unexpected state transition from " + oldState + " to " + newState); } else { log.info("State transition from {} to {}", oldState, newState); +if (newState == State.DEAD) { +failedStreamThreadSensor.record(); +} Review comment: No, that would be in `StreamThread#runLoop()`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10723) LogManager leaks internal thread pool activity during shutdown
[ https://issues.apache.org/jira/browse/KAFKA-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-10723. - Fix Version/s: 2.8.0 Resolution: Fixed merged to trunk > LogManager leaks internal thread pool activity during shutdown > -- > > Key: KAFKA-10723 > URL: https://issues.apache.org/jira/browse/KAFKA-10723 > Project: Kafka > Issue Type: Bug >Reporter: Kowshik Prakasam >Assignee: Kowshik Prakasam >Priority: Major > Fix For: 2.8.0 > > > *TL;DR:* > The asynchronous shutdown in {{LogManager}} has the shortcoming that if > during shutdown any of the internal futures fail, then we do not always > ensure that all futures are completed before {{LogManager.shutdown}} returns. > As a result, despite the shut down completed message from KafkaServer is seen > in the error logs, some futures continue to run from inside LogManager > attempting to close the logs. This is misleading and it could possibly break > the general rule of avoiding post-shutdown activity in the Broker. > *Description:* > When LogManager is shutting down, exceptions in log closure are handled > [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501]. > However, this > [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502] > in the finally clause shuts down the thread pools *asynchronously*. The > code: _threadPools.foreach(.shutdown())_ initiates an orderly shutdown (for > each thread pool) in which previously submitted tasks are executed, but no > new tasks will be accepted (see javadoc link > [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()])_._ > As a result, if there is an exception during log closure, some of the thread > pools which are closing logs could be leaked and continue to run in the > background, after the control returns to the caller (i.e. {{KafkaServer}}). > As a result, even after the "shut down completed" message is seen in the > error logs (originating from {{KafkaServer}} shutdown sequence), log closures > continue to happen in the background, which is misleading. > > *Proposed options for fixes:* > It seems useful that we maintain the contract with {{KafkaServer}} that after > {{LogManager.shutdown}} is called once, all tasks that close the logs are > guaranteed to have completed before the call returns. There are probably > couple different ways to fix this: > # Replace {{_threadPools.foreach(.shutdown())_ with > _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait > for all threads to be shutdown before returning the {{_LogManager.shutdown_}} > call. > # Skip creating of checkpoint and clean shutdown file only for the affected > directory if any of its futures throw an error. We continue to wait for all > futures to complete for all directories. This can require some changes to > [this for > loop|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L481-L496], > so that we wait for all futures to complete regardless of whether one of > them threw an error. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10751) Generate log to help estimate messages lost during ULE
Lucas Wang created KAFKA-10751: -- Summary: Generate log to help estimate messages lost during ULE Key: KAFKA-10751 URL: https://issues.apache.org/jira/browse/KAFKA-10751 Project: Kafka Issue Type: Improvement Reporter: Lucas Wang Assignee: Lucas Wang During Unclean Leader Election, there could be data loss due to truncation at the resigned leader. Suppose there are 3 brokers that has replicas for a given partition: Broker A (leader) with largest offset 9 (log end offset 10) Broker B (follower) with largest offset 4 (log end offset 5) Broker C (follower) with largest offset 1 (log end offset 2) Only the leader A is in the ISR with B and C lagging behind. Now an unclean leader election causes the leadership to be transferred to C. Broker A would need to truncate 8 messages, and Broker B 3 messages. Case 1: if these messages have been produced with acks=0 or 1, then clients would experience 8 lost messages. Case 2: if the client is using acks=all and the partition's minISR setting is 2, and further let's assume broker B dropped out of the ISR after receiving the message with offset 4, then only the messages with offset<=4 have been acked to the client. The truncation effectively causes the client to lose 3 messages. Knowing the exact amount of data loss involves knowing the client's acks setting when the messages are produced, and also whether the messages have been sufficiently replicated according to the MinISR setting. If getting the exact data loss is too involved, at least there should be logs to help ESTIMATE the amount of data loss. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10751) Generate log to help estimate messages lost during ULE
[ https://issues.apache.org/jira/browse/KAFKA-10751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235702#comment-17235702 ] Lucas Wang commented on KAFKA-10751: PR submitted: https://github.com/apache/kafka/pull/9533 > Generate log to help estimate messages lost during ULE > -- > > Key: KAFKA-10751 > URL: https://issues.apache.org/jira/browse/KAFKA-10751 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Wang >Assignee: Lucas Wang >Priority: Major > > During Unclean Leader Election, there could be data loss due to truncation at > the resigned leader. > Suppose there are 3 brokers that has replicas for a given partition: > Broker A (leader) with largest offset 9 (log end offset 10) > Broker B (follower) with largest offset 4 (log end offset 5) > Broker C (follower) with largest offset 1 (log end offset 2) > Only the leader A is in the ISR with B and C lagging behind. > Now an unclean leader election causes the leadership to be transferred to C. > Broker A would need to truncate 8 messages, and Broker B 3 messages. > Case 1: if these messages have been produced with acks=0 or 1, then clients > would experience 8 lost messages. > Case 2: if the client is using acks=all and the partition's minISR setting is > 2, and further let's assume broker B dropped out of the ISR after receiving > the message with offset 4, then only the messages with offset<=4 have been > acked to the client. The truncation effectively causes the client to lose 3 > messages. > Knowing the exact amount of data loss involves knowing the client's acks > setting when the messages are produced, and also whether the messages have > been sufficiently replicated according to the MinISR setting. > If getting the exact data loss is too involved, at least there should be logs > to help ESTIMATE the amount of data loss. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on a change in pull request #9615: KAFKA-10500: Add thread option
cadonna commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r527145996 ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -588,6 +592,29 @@ public void testCloseIsIdempotent() { closeCount, MockMetricsReporter.CLOSE_COUNT.get()); } +@Test +public void testAddThread() { +props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); +final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); +streams.start(); +final int oldSize = streams.threads.size(); +try { +TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running"); +} catch (final InterruptedException e) { +e.printStackTrace(); +} Review comment: You should not use `try-catch` here but just add `throws InterruptedException` to the method signature. ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +synchronized (stateLock) { Review comment: Why do we need to synchronize the whole method on `stateLock`? ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +synchronized (stateLock) { +if (state == State.RUNNING || state == State.REBALANCING) { Review comment: Could we also use `isRunningOrRebalancing()` here? ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -588,6 +592,29 @@ public void testCloseIsIdempotent() { closeCount, MockMetricsReporter.CLOSE_COUNT.get()); } +@Test +public void testAddThread() { Review comment: I would prefer to use `shouldAddThread()` as name although the pattern is different for the other test methods. ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +/** + * Adds and starts a stream thread
[GitHub] [kafka] hachikuji merged pull request #9619: MINOR: Reduce sends created by `SendBuilder`
hachikuji merged pull request #9619: URL: https://github.com/apache/kafka/pull/9619 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-8872) Improvements to controller "deleting" state / topic Identifiers
[ https://issues.apache.org/jira/browse/KAFKA-8872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Justine Olshan reassigned KAFKA-8872: - Assignee: Justine Olshan > Improvements to controller "deleting" state / topic Identifiers > > > Key: KAFKA-8872 > URL: https://issues.apache.org/jira/browse/KAFKA-8872 > Project: Kafka > Issue Type: Improvement >Reporter: Lucas Bradstreet >Assignee: Justine Olshan >Priority: Major > > Kafka currently uniquely identifies a topic by its name. This is generally > sufficient, but there are flaws in this scheme if a topic is deleted and > recreated with the same name. As a result, Kafka attempts to prevent these > classes of issues by ensuring a topic is deleted from all replicas before > completing a deletion. This solution is not perfect, as it is possible for > partitions to be reassigned from brokers while they are down, and there are > no guarantees that this state will ever be cleaned up and will not cause > issues in the future. > As the controller must wait for all replicas to delete their local > partitions, deletes can also become blocked, preventing topics from being > created with the same name until the deletion is complete on all replicas. > This can mean that downtime for a single broker can effectively cause a > complete outage for everyone producing/consuming to that topic name, as the > topic cannot be recreated without manual intervention. > Unique topic IDs could help address this issue by associating a unique ID > with each topic, ensuring a newly created topic with a previously used name > cannot be confused with a previous topic with that name. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] lmr3796 commented on a change in pull request #9435: KAFKA-10606: Disable auto topic creation for fetch-all-topic-metadata request
lmr3796 commented on a change in pull request #9435: URL: https://github.com/apache/kafka/pull/9435#discussion_r527192802 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1333,7 +1343,17 @@ class KafkaApis(val requestChannel: RequestChannel, } } -val completeTopicMetadata = topicMetadata ++ unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata +val completeTopicMetadata = (if (metadataRequest.isAllTopics) { Review comment: Hi @chia7712 @ijuma , This is Joseph and I'm @Lincong 's colleague working on this patch with him. I think this is a good point and just updated the PR accordingly This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #9608: MINOR: Enable testLogCleanerStats
hachikuji merged pull request #9608: URL: https://github.com/apache/kafka/pull/9608 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9458) Kafka crashed in windows environment
[ https://issues.apache.org/jira/browse/KAFKA-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235769#comment-17235769 ] Daniel Dube commented on KAFKA-9458: Hi everyone. "Windows is not an officially supported platform". Is this specified anywhere? I've lost almost 3 months preparing a Kafka solution and cannot be used because can't delete a file? I could not find any Kafka documentation saying that is not Windows supported. Why do you publish it with windows scripts? I can't understand it. Thanks in advance and best regards > Kafka crashed in windows environment > > > Key: KAFKA-9458 > URL: https://issues.apache.org/jira/browse/KAFKA-9458 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 2.4.0 > Environment: Windows Server 2019 >Reporter: hirik >Priority: Critical > Labels: windows > Attachments: Windows_crash_fix.patch, logs.zip > > > Hi, > while I was trying to validate Kafka retention policy, Kafka Server crashed > with below exception trace. > [2020-01-21 17:10:40,475] INFO [Log partition=test1-3, > dir=C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka] > Rolled new log segment at offset 1 in 52 ms. (kafka.log.Log) > [2020-01-21 17:10:40,484] ERROR Error while deleting segments for test1-3 in > dir C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka > (kafka.server.LogDirFailureChannel) > java.nio.file.FileSystemException: > C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex > -> > C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted: > The process cannot access the file because it is being used by another > process. > at > java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92) > at > java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) > at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395) > at > java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292) > at java.base/java.nio.file.Files.move(Files.java:1425) > at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:795) > at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:209) > at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:497) > at kafka.log.Log.$anonfun$deleteSegmentFiles$1(Log.scala:2206) > at kafka.log.Log.$anonfun$deleteSegmentFiles$1$adapted(Log.scala:2206) > at scala.collection.immutable.List.foreach(List.scala:305) > at kafka.log.Log.deleteSegmentFiles(Log.scala:2206) > at kafka.log.Log.removeAndDeleteSegments(Log.scala:2191) > at kafka.log.Log.$anonfun$deleteSegments$2(Log.scala:1700) > at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.scala:17) > at kafka.log.Log.maybeHandleIOException(Log.scala:2316) > at kafka.log.Log.deleteSegments(Log.scala:1691) > at kafka.log.Log.deleteOldSegments(Log.scala:1686) > at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1763) > at kafka.log.Log.deleteOldSegments(Log.scala:1753) > at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:982) > at kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:979) > at scala.collection.immutable.List.foreach(List.scala:305) > at kafka.log.LogManager.cleanupLogs(LogManager.scala:979) > at kafka.log.LogManager.$anonfun$startup$2(LogManager.scala:403) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:116) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:65) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:830) > Suppressed: java.nio.file.FileSystemException: > C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex > -> > C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted: > The process cannot access the file because it is being used by another > process. > at > java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92) > at > java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) > at java.base/sun.nio.fs.WindowsFi
[jira] [Comment Edited] (KAFKA-9458) Kafka crashed in windows environment
[ https://issues.apache.org/jira/browse/KAFKA-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235769#comment-17235769 ] Daniel Dube edited comment on KAFKA-9458 at 11/19/20, 10:10 PM: Hi everyone. "Windows is not an officially supported platform". Is this specified anywhere? I've been workingt almost 3 months preparing a Kafka solution and cannot be used because can't delete a file? I still could not find any Kafka documentation saying that is not Windows supported. Why is published with windows scripts? I can't understand it. Sorry but I'm very frustrated today Thanks for your work and best regards was (Author: danieldube): Hi everyone. "Windows is not an officially supported platform". Is this specified anywhere? I've lost almost 3 months preparing a Kafka solution and cannot be used because can't delete a file? I could not find any Kafka documentation saying that is not Windows supported. Why do you publish it with windows scripts? I can't understand it. Thanks in advance and best regards > Kafka crashed in windows environment > > > Key: KAFKA-9458 > URL: https://issues.apache.org/jira/browse/KAFKA-9458 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 2.4.0 > Environment: Windows Server 2019 >Reporter: hirik >Priority: Critical > Labels: windows > Attachments: Windows_crash_fix.patch, logs.zip > > > Hi, > while I was trying to validate Kafka retention policy, Kafka Server crashed > with below exception trace. > [2020-01-21 17:10:40,475] INFO [Log partition=test1-3, > dir=C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka] > Rolled new log segment at offset 1 in 52 ms. (kafka.log.Log) > [2020-01-21 17:10:40,484] ERROR Error while deleting segments for test1-3 in > dir C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka > (kafka.server.LogDirFailureChannel) > java.nio.file.FileSystemException: > C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex > -> > C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted: > The process cannot access the file because it is being used by another > process. > at > java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92) > at > java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103) > at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395) > at > java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292) > at java.base/java.nio.file.Files.move(Files.java:1425) > at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:795) > at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:209) > at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:497) > at kafka.log.Log.$anonfun$deleteSegmentFiles$1(Log.scala:2206) > at kafka.log.Log.$anonfun$deleteSegmentFiles$1$adapted(Log.scala:2206) > at scala.collection.immutable.List.foreach(List.scala:305) > at kafka.log.Log.deleteSegmentFiles(Log.scala:2206) > at kafka.log.Log.removeAndDeleteSegments(Log.scala:2191) > at kafka.log.Log.$anonfun$deleteSegments$2(Log.scala:1700) > at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.scala:17) > at kafka.log.Log.maybeHandleIOException(Log.scala:2316) > at kafka.log.Log.deleteSegments(Log.scala:1691) > at kafka.log.Log.deleteOldSegments(Log.scala:1686) > at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1763) > at kafka.log.Log.deleteOldSegments(Log.scala:1753) > at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:982) > at kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:979) > at scala.collection.immutable.List.foreach(List.scala:305) > at kafka.log.LogManager.cleanupLogs(LogManager.scala:979) > at kafka.log.LogManager.$anonfun$startup$2(LogManager.scala:403) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:116) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:65) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:830) > Suppressed: java.nio.file.FileSystemException: > C:\Users\Administrator\Download
[GitHub] [kafka] Hamza-Slama opened a new pull request #9625: MINOR: remove semicolon
Hamza-Slama opened a new pull request #9625: URL: https://github.com/apache/kafka/pull/9625 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r527277135 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { Review comment: you are right, Ill add one This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option
wcarlson5 commented on a change in pull request #9615: URL: https://github.com/apache/kafka/pull/9615#discussion_r527277048 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder, queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider); stateDirCleaner = setupStateDirCleaner(); -oldHandler = false; maybeWarnAboutCodeInRocksDBConfigSetter(log, config); rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); } +/** + * Adds and starts a stream thread in addition to the stream threads that are already running in this + * Kafka Streams client. + * + * Since the number of stream threads increases, the sizes of the caches in the new stream thread + * and the existing stream threads are adapted so that the sum of the cache sizes over all stream + * threads does not exceed the total cache size specified in configuration + * {@code cache.max.bytes.buffering}. + * + * Stream threads can only be added if this Kafka Streams client is in state RUNNING or REBALANCING. + * + * @return name of the added stream thread or empty if a new stream thread could not be added + */ +public Optional addStreamThread() { +synchronized (stateLock) { Review comment: Well we don't want it changing state while adding a thread This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan opened a new pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers
jolshan opened a new pull request #9626: URL: https://github.com/apache/kafka/pull/9626 This change takes the topic IDs created in https://github.com/apache/kafka/pull/9473 and propagates them to brokers using LeaderAndIsr Request. It also removes the topic name from the LeaderAndIsr Response, reorganizes the response to be sorted by topic, and includes the topic ID. In addition, the topic ID is persisted to each replica in Log as well as in a file on disk. This file is read on startup and if the topic ID exists, it will be reloaded. This PR bumps the IBP and is expected to be merged at the same time as https://github.com/apache/kafka/pull/9622 as to not bump the protocol twice ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10547) Add topic IDs to MetadataResponse
[ https://issues.apache.org/jira/browse/KAFKA-10547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235796#comment-17235796 ] Justine Olshan commented on KAFKA-10547: Hi [~dengziming] I'm sorry I didn't see this until now. Commenting on your PR > Add topic IDs to MetadataResponse > - > > Key: KAFKA-10547 > URL: https://issues.apache.org/jira/browse/KAFKA-10547 > Project: Kafka > Issue Type: Sub-task >Reporter: Justine Olshan >Assignee: dengziming >Priority: Major > > Prevent reads from deleted topics > Will be able to use TopicDescription to identify the topic ID -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jolshan commented on pull request #9622: KAFKA-10547; add topicId in MetadataResp
jolshan commented on pull request #9622: URL: https://github.com/apache/kafka/pull/9622#issuecomment-730710397 Hi @dengziming! Thanks for the PR! I was hoping to add LeaderAndIsrRequests before UpdateMetadata/Metadata, as ordered in the [JIRA ticket.](https://issues.apache.org/jira/browse/KAFKA-10545) There are just a few features for persisting the topic IDs I wanted to include. I'm thinking we could review this PR and my PR: https://github.com/apache/kafka/pull/9626 at the same and merge mine first and yours immediately after. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan edited a comment on pull request #9622: KAFKA-10547; add topicId in MetadataResp
jolshan edited a comment on pull request #9622: URL: https://github.com/apache/kafka/pull/9622#issuecomment-730710397 Hi @dengziming! Thanks for the PR! I was hoping to add LeaderAndIsrRequests before UpdateMetadata/Metadata, following the ordering of the [JIRA tickets.](https://issues.apache.org/jira/browse/KAFKA-8872) There are just a few features for persisting the topic IDs I wanted to include. I'm thinking we could review this PR and my PR: https://github.com/apache/kafka/pull/9626 at the same and merge mine first and yours immediately after. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan edited a comment on pull request #9622: KAFKA-10547; add topicId in MetadataResp
jolshan edited a comment on pull request #9622: URL: https://github.com/apache/kafka/pull/9622#issuecomment-730710397 Hi @dengziming! Thanks for the PR! I was hoping to add LeaderAndIsrRequests before UpdateMetadata/Metadata, following the ordering of the [JIRA tickets.](https://issues.apache.org/jira/browse/KAFKA-8872) There are just a few features for persisting the topic IDs I wanted to include. I'm thinking we could review this PR and my PR: https://github.com/apache/kafka/pull/9626 at the same time and merge mine first and yours immediately after. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10745) Please let me know how I check the time which Source connector receive the data from source table.
[ https://issues.apache.org/jira/browse/KAFKA-10745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] NAYUSIK updated KAFKA-10745: Issue Type: Bug (was: Improvement) > Please let me know how I check the time which Source connector receive the > data from source table. > -- > > Key: KAFKA-10745 > URL: https://issues.apache.org/jira/browse/KAFKA-10745 > Project: Kafka > Issue Type: Bug >Reporter: NAYUSIK >Priority: Major > > Please let me know how I check the time which Source connector receive the > data from source table. > I want to check the time by section. > We are currently using JDBC Connector. > The time we can see is the time when the data is created on the source table, > the time when the data is entered into Kafka, and the time when the data is > generated on the target table. > But I also want to know the time when Source connector receive the data from > source table. > Please tell me what settings I need to set up on the Source connector. > Thank you for your support. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10745) Please let me know how I check the time which Source connector receive the data from source table.
[ https://issues.apache.org/jira/browse/KAFKA-10745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] NAYUSIK updated KAFKA-10745: Issue Type: Improvement (was: Bug) > Please let me know how I check the time which Source connector receive the > data from source table. > -- > > Key: KAFKA-10745 > URL: https://issues.apache.org/jira/browse/KAFKA-10745 > Project: Kafka > Issue Type: Improvement >Reporter: NAYUSIK >Priority: Major > > Please let me know how I check the time which Source connector receive the > data from source table. > I want to check the time by section. > We are currently using JDBC Connector. > The time we can see is the time when the data is created on the source table, > the time when the data is entered into Kafka, and the time when the data is > generated on the target table. > But I also want to know the time when Source connector receive the data from > source table. > Please tell me what settings I need to set up on the Source connector. > Thank you for your support. -- This message was sent by Atlassian Jira (v8.3.4#803005)