[GitHub] [kafka] chia7712 commented on pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2020-11-19 Thread GitBox


chia7712 commented on pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#issuecomment-730231950


   (I'm still reading this story so pardon me for asking stupid question.)
   
   If using timestamp=0 can bring correct offset, why not following that way to 
handle ```ListOffsetRequest.EARLIEST_TIMESTAMP```? For example, 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L1643)
 if timestamp is equal to ```ListOffsetRequest.EARLIEST_TIMESTAMP```, we pass 0 
to find offset. Does it work?
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10656) NetworkClient.java: print out the feature flags received at DEBUG level, as well as the other version information

2020-11-19 Thread Tom Bentley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235305#comment-17235305
 ] 

Tom Bentley commented on KAFKA-10656:
-

[~cmccabe] I opened a PR for this, please could you take a look?

> NetworkClient.java: print out the feature flags received at DEBUG level, as 
> well as the other version information
> -
>
> Key: KAFKA-10656
> URL: https://issues.apache.org/jira/browse/KAFKA-10656
> Project: Kafka
>  Issue Type: Bug
>Reporter: Colin McCabe
>Assignee: Tom Bentley
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on a change in pull request #9619: MINOR: Reduce sends created by `SendBuilder`

2020-11-19 Thread GitBox


dajac commented on a change in pull request #9619:
URL: https://github.com/apache/kafka/pull/9619#discussion_r526710893



##
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/SendBuilder.java
##
@@ -122,6 +123,14 @@ public void writeVarlong(long i) {
 ByteUtils.writeVarlong(i, buffer);
 }
 
+private void flushPendingSend() {
+if (!buffers.isEmpty()) {
+ByteBuffer[] byteBufferArray = buffers.toArray(new ByteBuffer[0]);
+sends.add(new ByteBufferSend(destinationId, byteBufferArray));

Review comment:
   I just noticed that `ByteBufferSend` re-iterates over the byte buffers 
to compute the total size. We could compute the size while we accumulate them. 
I suppose that the number of buffers is usually small so it should not make a 
big difference.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #7498: KAFKA-9023: Log request destination when the Producer gets disconnected

2020-11-19 Thread GitBox


dajac commented on a change in pull request #7498:
URL: https://github.com/apache/kafka/pull/7498#discussion_r526718951



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##
@@ -659,6 +655,12 @@ else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
 this.accumulator.unmutePartition(batch.topicPartition);
 }
 
+private String formatErrMsg(ProduceResponse.PartitionResponse response) {

Review comment:
   Indeed, we could. I am not sure that it brings much more information 
though so I am fine with keeping it as it is.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #7498: KAFKA-9023: Log request destination when the Producer gets disconnected

2020-11-19 Thread GitBox


dajac commented on pull request #7498:
URL: https://github.com/apache/kafka/pull/7498#issuecomment-730250619


   I have triggered another run of the CI. I will merge it afterwards.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-19 Thread GitBox


dajac commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r526729269



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1324,7 +1404,60 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required.
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRateQuota(ip: Option[InetAddress], maxConnectionRate: 
Option[Int]): Unit = synchronized {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+ip match {
+  case Some(address) =>
+counts.synchronized {
+  maxConnectionRate match {
+case Some(rate) =>
+  info(s"Updating max connection rate override for $address to 
$rate")
+  connectionRatePerIp.put(address, rate)
+case None =>
+  info(s"Removing max connection rate override for $address")
+  connectionRatePerIp.remove(address)
+  }
+}
+updateConnectionRateQuota(connectionRateForIp(address), 
IpQuotaEntity(address))
+  case None =>
+counts.synchronized {
+  defaultConnectionRatePerIp = 
maxConnectionRate.getOrElse(DynamicConfig.Ip.DefaultConnectionCreationRate)
+}
+info(s"Updated default max IP connection rate to 
$defaultConnectionRatePerIp")
+metrics.metrics.forEach { (metricName, metric) =>
+  if (isIpConnectionRateMetric(metricName)) {
+val quota = 
connectionRateForIp(InetAddress.getByName(metricName.tags.get(IpMetricTag)))

Review comment:
   Make sense, thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10114) Kafka producer stuck after broker crash

2020-11-19 Thread Tim Fox (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235343#comment-17235343
 ] 

Tim Fox commented on KAFKA-10114:
-

> We are still seeing this issue with version 2.6.0. Our app calls flush and it 
>hangs forever when brokers are down.

The current KafkaProducer.flush() method will indeed wait for ever for flush() 
to complete. Flush clearly cannot complete if brokers are down. This seems like 
a reasonable default to me - we want to be sure that buffered messages aren't 
lost, yet we don't know how long it will take for brokers to be restarted, so 
it's very hard to choose a default timeout - should it be 1 minute? I hour? I 
day?

However, without changing the API, perhaps we could allow for a flush timeout 
to be specified via a producer property? That way we could keep the default as 
"forever" but allow you to override it to a lower value.

[~ijuma] [~hachikuji] thoughts?

 

 

 

> Kafka producer stuck after broker crash
> ---
>
> Key: KAFKA-10114
> URL: https://issues.apache.org/jira/browse/KAFKA-10114
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.3.1, 2.4.1
>Reporter: Itamar Benjamin
>Priority: Critical
>
> Today two of our kafka brokers crashed (cluster of 3 brokers), and producers 
> were not able to send new messages. After brokers started again all producers 
> resumed sending data except for a single one.
> at the beginning producer rejected all new messages with TimeoutException:
>  
> {code:java}
>  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> incoming-mutable-RuntimeIIL-1:12 ms has passed since batch creation
> {code}
>  
> then after sometime exception changed to
>  
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory 
> within the configured max blocking time 6 ms.
> {code}
>  
>  
> jstack shows kafka-producer-network-thread is waiting to get producer id:
>  
> {code:java}
> "kafka-producer-network-thread | producer-1" #767 daemon prio=5 os_prio=0 
> cpu=63594017.16ms elapsed=1511219.38s tid=0x7fffd8353000 nid=0x4fa4 
> sleeping [0x7ff55c177000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
> at java.lang.Thread.sleep(java.base@11.0.1/Native Method)
> at org.apache.kafka.common.utils.Utils.sleep(Utils.java:296)
> at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:41)
> at 
> org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:565)
> at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:306)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
> at java.lang.Thread.run(java.base@11.0.1/Thread.java:834)   Locked 
> ownable synchronizers:
> - None
> {code}
>  
> digging into maybeWaitForProducerId(), it waits until some broker is ready 
> (awaitNodeReady function) which in return calls leastLoadedNode() on 
> NetworkClient. This one iterates over all brokers and checks if a request can 
> be sent to it using canSendRequest().
> This is the code for canSendRequest():
>  
> {code:java}
> return connectionStates.isReady(node, now) && selector.isChannelReady(node) 
> && inFlightRequests.canSendMore(node)
> {code}
>  
>  
> using some debugging tools i saw this expression always evaluates to false 
> since the last part (canSendMore) is false. 
>  
> This is the code for canSendMore:
> {code:java}
> public boolean canSendMore(String node) { 
> Deque queue = requests.get(node); return queue 
> == null || queue.isEmpty() || (queue.peekFirst().send.completed() && 
> queue.size() < this.maxInFlightRequestsPerConnection); }
> {code}
>  
>  
> i verified 
> {code:java}
> queue.peekFirst().send.completed()
> {code}
> is true, and that leads to the live lock - since requests queues are full for 
> all nodes a new request to check broker availability and reconnect to it 
> cannot be submitted.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on pull request #9547: KAFKA-9630; Replace OffsetsForLeaderEpoch request/response with automated protocol

2020-11-19 Thread GitBox


dajac commented on pull request #9547:
URL: https://github.com/apache/kafka/pull/9547#issuecomment-730285178


   Failed test seems unrelated to this PR. Merging to trunk. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac merged pull request #9547: KAFKA-9630; Replace OffsetsForLeaderEpoch request/response with automated protocol

2020-11-19 Thread GitBox


dajac merged pull request #9547:
URL: https://github.com/apache/kafka/pull/9547


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9630) Replace OffsetsForLeaderEpoch request/response with automated protocol

2020-11-19 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-9630.

Fix Version/s: 2.8.0
   Resolution: Fixed

> Replace OffsetsForLeaderEpoch request/response with automated protocol
> --
>
> Key: KAFKA-9630
> URL: https://issues.apache.org/jira/browse/KAFKA-9630
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: David Jacot
>Priority: Major
> Fix For: 2.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.

2020-11-19 Thread GitBox


dajac commented on pull request #9601:
URL: https://github.com/apache/kafka/pull/9601#issuecomment-730287841


   @gardnervickers https://github.com/apache/kafka/pull/9401 and 
https://github.com/apache/kafka/pull/9547 have been merged. You can bring them 
back in this PR if you like.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-19 Thread GitBox


dajac commented on pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#issuecomment-730292123


   @anatasiavela Both PRs have been merged so we can proceed with this one.
   
   There is something that we must consider that I was not aware of: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java#L235.
 When the ProduceRequest is processed in the KafkaApis layer, its internal data 
is set to null to free up the memory. That means that we won't have it to log 
the request. We need to take this into account.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10114) Kafka producer stuck after broker crash

2020-11-19 Thread Tim Fox (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235343#comment-17235343
 ] 

Tim Fox edited comment on KAFKA-10114 at 11/19/20, 11:09 AM:
-

> We are still seeing this issue with version 2.6.0. Our app calls flush and it 
>hangs forever when brokers are down.

[Revised my answer as it was previously based on a misunderstanding of the 
current code]

Currently KafkaProducer.flush() will hang forever if there are queued batches 
and brokers are lost and not restarted. Queued batches won't be timed out as 
there are no "ready" nodes and the timeout logic currently occurs after a ready 
node has been obtained.

Expectation is for flush() to complete with a TimeoutException if it does not 
complete successfully before the timeout as specified in delivery.timeout.ms

 

[~ijuma] [~hachikuji] thoughts?

 

 

 


was (Author: purplefox):
> We are still seeing this issue with version 2.6.0. Our app calls flush and it 
>hangs forever when brokers are down.

The current KafkaProducer.flush() method will indeed wait for ever for flush() 
to complete. Flush clearly cannot complete if brokers are down. This seems like 
a reasonable default to me - we want to be sure that buffered messages aren't 
lost, yet we don't know how long it will take for brokers to be restarted, so 
it's very hard to choose a default timeout - should it be 1 minute? I hour? I 
day?

However, without changing the API, perhaps we could allow for a flush timeout 
to be specified via a producer property? That way we could keep the default as 
"forever" but allow you to override it to a lower value.

[~ijuma] [~hachikuji] thoughts?

 

 

 

> Kafka producer stuck after broker crash
> ---
>
> Key: KAFKA-10114
> URL: https://issues.apache.org/jira/browse/KAFKA-10114
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.3.1, 2.4.1
>Reporter: Itamar Benjamin
>Priority: Critical
>
> Today two of our kafka brokers crashed (cluster of 3 brokers), and producers 
> were not able to send new messages. After brokers started again all producers 
> resumed sending data except for a single one.
> at the beginning producer rejected all new messages with TimeoutException:
>  
> {code:java}
>  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> incoming-mutable-RuntimeIIL-1:12 ms has passed since batch creation
> {code}
>  
> then after sometime exception changed to
>  
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory 
> within the configured max blocking time 6 ms.
> {code}
>  
>  
> jstack shows kafka-producer-network-thread is waiting to get producer id:
>  
> {code:java}
> "kafka-producer-network-thread | producer-1" #767 daemon prio=5 os_prio=0 
> cpu=63594017.16ms elapsed=1511219.38s tid=0x7fffd8353000 nid=0x4fa4 
> sleeping [0x7ff55c177000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
> at java.lang.Thread.sleep(java.base@11.0.1/Native Method)
> at org.apache.kafka.common.utils.Utils.sleep(Utils.java:296)
> at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:41)
> at 
> org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:565)
> at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:306)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
> at java.lang.Thread.run(java.base@11.0.1/Thread.java:834)   Locked 
> ownable synchronizers:
> - None
> {code}
>  
> digging into maybeWaitForProducerId(), it waits until some broker is ready 
> (awaitNodeReady function) which in return calls leastLoadedNode() on 
> NetworkClient. This one iterates over all brokers and checks if a request can 
> be sent to it using canSendRequest().
> This is the code for canSendRequest():
>  
> {code:java}
> return connectionStates.isReady(node, now) && selector.isChannelReady(node) 
> && inFlightRequests.canSendMore(node)
> {code}
>  
>  
> using some debugging tools i saw this expression always evaluates to false 
> since the last part (canSendMore) is false. 
>  
> This is the code for canSendMore:
> {code:java}
> public boolean canSendMore(String node) { 
> Deque queue = requests.get(node); return queue 
> == null || queue.isEmpty() || (queue.peekFirst().send.completed() && 
> queue.size() < this.maxInFlightRequestsPerConnection); }
> {code}
>  
>  
> i verified 
> {code:java}
> queue.peekFirst().send.completed()
> {code}
> is true, and that leads to the live lock - since requests queues are full for 
> all nodes a new request to check broker availability and reconnect to it 
> cannot be submitted.
>  



--
This message was sent

[jira] [Comment Edited] (KAFKA-10114) Kafka producer stuck after broker crash

2020-11-19 Thread Tim Fox (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235343#comment-17235343
 ] 

Tim Fox edited comment on KAFKA-10114 at 11/19/20, 11:14 AM:
-

> We are still seeing this issue with version 2.6.0. Our app calls flush and it 
>hangs forever when brokers are down.

[Revised my answer as it was previously based on a misunderstanding of the 
current code]

Currently KafkaProducer.flush() will hang forever if there are pending batches 
and brokers have been lost and not restarted. Queued batches won't be timed out 
as there are no "ready" nodes and the timeout logic currently occurs after a 
ready node has been obtained.

Expectation is for flush() to throw a TimeoutException if it does not complete 
successfully before delivery.timeout.ms

[~ijuma] [~hachikuji] thoughts?

 

 

 


was (Author: purplefox):
> We are still seeing this issue with version 2.6.0. Our app calls flush and it 
>hangs forever when brokers are down.

[Revised my answer as it was previously based on a misunderstanding of the 
current code]

Currently KafkaProducer.flush() will hang forever if there are queued batches 
and brokers are lost and not restarted. Queued batches won't be timed out as 
there are no "ready" nodes and the timeout logic currently occurs after a ready 
node has been obtained.

Expectation is for flush() to complete with a TimeoutException if it does not 
complete successfully before the timeout as specified in delivery.timeout.ms

 

[~ijuma] [~hachikuji] thoughts?

 

 

 

> Kafka producer stuck after broker crash
> ---
>
> Key: KAFKA-10114
> URL: https://issues.apache.org/jira/browse/KAFKA-10114
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.3.1, 2.4.1
>Reporter: Itamar Benjamin
>Priority: Critical
>
> Today two of our kafka brokers crashed (cluster of 3 brokers), and producers 
> were not able to send new messages. After brokers started again all producers 
> resumed sending data except for a single one.
> at the beginning producer rejected all new messages with TimeoutException:
>  
> {code:java}
>  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> incoming-mutable-RuntimeIIL-1:12 ms has passed since batch creation
> {code}
>  
> then after sometime exception changed to
>  
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory 
> within the configured max blocking time 6 ms.
> {code}
>  
>  
> jstack shows kafka-producer-network-thread is waiting to get producer id:
>  
> {code:java}
> "kafka-producer-network-thread | producer-1" #767 daemon prio=5 os_prio=0 
> cpu=63594017.16ms elapsed=1511219.38s tid=0x7fffd8353000 nid=0x4fa4 
> sleeping [0x7ff55c177000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
> at java.lang.Thread.sleep(java.base@11.0.1/Native Method)
> at org.apache.kafka.common.utils.Utils.sleep(Utils.java:296)
> at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:41)
> at 
> org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:565)
> at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:306)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
> at java.lang.Thread.run(java.base@11.0.1/Thread.java:834)   Locked 
> ownable synchronizers:
> - None
> {code}
>  
> digging into maybeWaitForProducerId(), it waits until some broker is ready 
> (awaitNodeReady function) which in return calls leastLoadedNode() on 
> NetworkClient. This one iterates over all brokers and checks if a request can 
> be sent to it using canSendRequest().
> This is the code for canSendRequest():
>  
> {code:java}
> return connectionStates.isReady(node, now) && selector.isChannelReady(node) 
> && inFlightRequests.canSendMore(node)
> {code}
>  
>  
> using some debugging tools i saw this expression always evaluates to false 
> since the last part (canSendMore) is false. 
>  
> This is the code for canSendMore:
> {code:java}
> public boolean canSendMore(String node) { 
> Deque queue = requests.get(node); return queue 
> == null || queue.isEmpty() || (queue.peekFirst().send.completed() && 
> queue.size() < this.maxInFlightRequestsPerConnection); }
> {code}
>  
>  
> i verified 
> {code:java}
> queue.peekFirst().send.completed()
> {code}
> is true, and that leads to the live lock - since requests queues are full for 
> all nodes a new request to check broker availability and reconnect to it 
> cannot be submitted.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bristy opened a new pull request #9621: KAFKA-9892: Producer state snapshot needs to be forced to disk

2020-11-19 Thread GitBox


bristy opened a new pull request #9621:
URL: https://github.com/apache/kafka/pull/9621


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   FileChannel.close() does not guarantee modified buffer would be written on 
the file system. We are changing  it with force() semantics to enforce file 
buffer and metadata written to filesystem ( FileChannel.force(true) updates 
buffer and metadata).
   
   *Summary of testing strategy (including rationale)
   I have run unittests after making the changes.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10114) Kafka producer stuck after broker crash

2020-11-19 Thread Tim Fox (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235343#comment-17235343
 ] 

Tim Fox edited comment on KAFKA-10114 at 11/19/20, 11:53 AM:
-

> We are still seeing this issue with version 2.6.0. Our app calls flush and it 
>hangs forever when brokers are down.

[Revised my answer as it was previously based on a misunderstanding of the 
current code]

Currently KafkaProducer.flush() will hang forever if there are pending batches 
and brokers have been lost and not restarted. Queued batches won't be timed out 
as there are no "ready" nodes and the timeout logic currently occurs after a 
ready node has been obtained.

Expectation is for flush() to throw a TimeoutException if it does not complete 
successfully before delivery.timeout.ms

>From inspecting the code I am still unsure why record batches aren't being 
>expired properly. [~kwadhwa] if you could provide a thread dump when the 
>hanging occurs that will be help us diagnose the issue. 

 


was (Author: purplefox):
> We are still seeing this issue with version 2.6.0. Our app calls flush and it 
>hangs forever when brokers are down.

[Revised my answer as it was previously based on a misunderstanding of the 
current code]

Currently KafkaProducer.flush() will hang forever if there are pending batches 
and brokers have been lost and not restarted. Queued batches won't be timed out 
as there are no "ready" nodes and the timeout logic currently occurs after a 
ready node has been obtained.

Expectation is for flush() to throw a TimeoutException if it does not complete 
successfully before delivery.timeout.ms

[~ijuma] [~hachikuji] thoughts?

 

 

 

> Kafka producer stuck after broker crash
> ---
>
> Key: KAFKA-10114
> URL: https://issues.apache.org/jira/browse/KAFKA-10114
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.3.1, 2.4.1
>Reporter: Itamar Benjamin
>Priority: Critical
>
> Today two of our kafka brokers crashed (cluster of 3 brokers), and producers 
> were not able to send new messages. After brokers started again all producers 
> resumed sending data except for a single one.
> at the beginning producer rejected all new messages with TimeoutException:
>  
> {code:java}
>  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> incoming-mutable-RuntimeIIL-1:12 ms has passed since batch creation
> {code}
>  
> then after sometime exception changed to
>  
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory 
> within the configured max blocking time 6 ms.
> {code}
>  
>  
> jstack shows kafka-producer-network-thread is waiting to get producer id:
>  
> {code:java}
> "kafka-producer-network-thread | producer-1" #767 daemon prio=5 os_prio=0 
> cpu=63594017.16ms elapsed=1511219.38s tid=0x7fffd8353000 nid=0x4fa4 
> sleeping [0x7ff55c177000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
> at java.lang.Thread.sleep(java.base@11.0.1/Native Method)
> at org.apache.kafka.common.utils.Utils.sleep(Utils.java:296)
> at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:41)
> at 
> org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:565)
> at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:306)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
> at java.lang.Thread.run(java.base@11.0.1/Thread.java:834)   Locked 
> ownable synchronizers:
> - None
> {code}
>  
> digging into maybeWaitForProducerId(), it waits until some broker is ready 
> (awaitNodeReady function) which in return calls leastLoadedNode() on 
> NetworkClient. This one iterates over all brokers and checks if a request can 
> be sent to it using canSendRequest().
> This is the code for canSendRequest():
>  
> {code:java}
> return connectionStates.isReady(node, now) && selector.isChannelReady(node) 
> && inFlightRequests.canSendMore(node)
> {code}
>  
>  
> using some debugging tools i saw this expression always evaluates to false 
> since the last part (canSendMore) is false. 
>  
> This is the code for canSendMore:
> {code:java}
> public boolean canSendMore(String node) { 
> Deque queue = requests.get(node); return queue 
> == null || queue.isEmpty() || (queue.peekFirst().send.completed() && 
> queue.size() < this.maxInFlightRequestsPerConnection); }
> {code}
>  
>  
> i verified 
> {code:java}
> queue.peekFirst().send.completed()
> {code}
> is true, and that leads to the live lock - since requests queues are full for 
> all nodes a new request to check broker availability and reconnect to it 
> cannot be submitted.
>  



--
This message was sen

[GitHub] [kafka] dengziming opened a new pull request #9622: KAFKA-10547; add topicId in MetadataResp

2020-11-19 Thread GitBox


dengziming opened a new pull request #9622:
URL: https://github.com/apache/kafka/pull/9622


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   1. Bump the version of MetadataReq and MetadataResp, add topicId in 
MetadataResp
   2. Alter describeTopic in AdminClientTopicService and ZookeeperTopicService
   3. TopicMetadata is cached in MetadataCache, so we need to add topicId to 
MetadataCache
   4. MetadataCache is updated by UpdateMetadataRequest, bump the version of 
UpdateMetadataReq and UpdateMetadataResp, add topicId in UpdateMetadataReq.
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   Tested locally, here is some result:
   
   New server + new Client : 
   
   kafka-topics.sh --describe --zookeeper localhost:2181 --topic 
old-version-topic 
   Topic: old-version-topic TopicId: wRPl6VAlQeyE77bDxEESzg PartitionCount: 
2   ReplicationFactor: 1Configs: 
Topic: old-version-topicPartition: 0Leader: 0   
Replicas: 0 Isr: 0
Topic: old-version-topicPartition: 1Leader: 0   
Replicas: 0 Isr: 0
   
   kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic 
old-version-topic 
   Topic: old-version-topic TopicId: wRPl6VAlQeyE77bDxEESzg PartitionCount: 
2   ReplicationFactor: 1Configs: segment.bytes=1073741824
Topic: old-version-topicPartition: 0Leader: 0   
Replicas: 0 Isr: 0
Topic: old-version-topicPartition: 1Leader: 0   
Replicas: 0 Isr: 0
   
   Old Server + new Client
   
   kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic 
old-version-topic 
   Topic: old-version-topic TopicId:PartitionCount: 2   
ReplicationFactor: 1Configs: segment.bytes=1073741824
Topic: old-version-topicPartition: 0Leader: 0   
Replicas: 0 Isr: 0
Topic: old-version-topicPartition: 1Leader: 0   
Replicas: 0 Isr: 0
   
   New server + old client
   
   kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic 
old-version-topic 
   Topic: old-version-topic PartitionCount: 2   ReplicationFactor: 1
Configs: segment.bytes=1073741824
Topic: old-version-topicPartition: 0Leader: 0   
Replicas: 0 Isr: 0
Topic: old-version-topicPartition: 1Leader: 0   
Replicas: 0 Isr: 0
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #9622: KAFKA-10547; add topicId in MetadataResp

2020-11-19 Thread GitBox


dengziming commented on pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#issuecomment-730332596


   @rajinisivaram @jolshan Hi, PTAL.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley opened a new pull request #9623: KAFKA-10692: Add delegation.token.secret.key, deprecate ...master.key

2020-11-19 Thread GitBox


tombentley opened a new pull request #9623:
URL: https://github.com/apache/kafka/pull/9623


   Add delegation.token.secret.key broker config and deprecate 
delegation.token.master.key as described in 
[KIP-681](https://cwiki.apache.org/confluence/display/KAFKA/KIP-681%3A+Rename+master+key+in+delegation+token+feature)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9623: KAFKA-10692: Add delegation.token.secret.key, deprecate ...master.key

2020-11-19 Thread GitBox


tombentley commented on pull request #9623:
URL: https://github.com/apache/kafka/pull/9623#issuecomment-730336828


   @omkreddy, @mimaison please could one of you review?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10114) Kafka producer stuck after broker crash

2020-11-19 Thread Tim Fox (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235343#comment-17235343
 ] 

Tim Fox edited comment on KAFKA-10114 at 11/19/20, 1:17 PM:


> We are still seeing this issue with version 2.6.0. Our app calls flush and it 
>hangs forever when brokers are down.

[Revised my answer as it was previously based on a misunderstanding of the 
current code]

Currently KafkaProducer.flush() will hang forever if there are pending batches 
and brokers have been lost and not restarted. Queued batches won't be timed out 
as there are no "ready" nodes and the timeout logic currently occurs after a 
ready node has been obtained.

Expectation is for flush() to throw a TimeoutException if it does not complete 
successfully before delivery.timeout.ms

>From inspecting the code I am still unsure why record batches aren't being 
>expired properly. [~kwadhwa] if you could provide a thread dump when the 
>hanging occurs and enable trace logging that will be help us diagnose the 
>issue. 

 


was (Author: purplefox):
> We are still seeing this issue with version 2.6.0. Our app calls flush and it 
>hangs forever when brokers are down.

[Revised my answer as it was previously based on a misunderstanding of the 
current code]

Currently KafkaProducer.flush() will hang forever if there are pending batches 
and brokers have been lost and not restarted. Queued batches won't be timed out 
as there are no "ready" nodes and the timeout logic currently occurs after a 
ready node has been obtained.

Expectation is for flush() to throw a TimeoutException if it does not complete 
successfully before delivery.timeout.ms

>From inspecting the code I am still unsure why record batches aren't being 
>expired properly. [~kwadhwa] if you could provide a thread dump when the 
>hanging occurs that will be help us diagnose the issue. 

 

> Kafka producer stuck after broker crash
> ---
>
> Key: KAFKA-10114
> URL: https://issues.apache.org/jira/browse/KAFKA-10114
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.3.1, 2.4.1
>Reporter: Itamar Benjamin
>Priority: Critical
>
> Today two of our kafka brokers crashed (cluster of 3 brokers), and producers 
> were not able to send new messages. After brokers started again all producers 
> resumed sending data except for a single one.
> at the beginning producer rejected all new messages with TimeoutException:
>  
> {code:java}
>  org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
> incoming-mutable-RuntimeIIL-1:12 ms has passed since batch creation
> {code}
>  
> then after sometime exception changed to
>  
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory 
> within the configured max blocking time 6 ms.
> {code}
>  
>  
> jstack shows kafka-producer-network-thread is waiting to get producer id:
>  
> {code:java}
> "kafka-producer-network-thread | producer-1" #767 daemon prio=5 os_prio=0 
> cpu=63594017.16ms elapsed=1511219.38s tid=0x7fffd8353000 nid=0x4fa4 
> sleeping [0x7ff55c177000]
>java.lang.Thread.State: TIMED_WAITING (sleeping)
> at java.lang.Thread.sleep(java.base@11.0.1/Native Method)
> at org.apache.kafka.common.utils.Utils.sleep(Utils.java:296)
> at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:41)
> at 
> org.apache.kafka.clients.producer.internals.Sender.maybeWaitForProducerId(Sender.java:565)
> at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:306)
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
> at java.lang.Thread.run(java.base@11.0.1/Thread.java:834)   Locked 
> ownable synchronizers:
> - None
> {code}
>  
> digging into maybeWaitForProducerId(), it waits until some broker is ready 
> (awaitNodeReady function) which in return calls leastLoadedNode() on 
> NetworkClient. This one iterates over all brokers and checks if a request can 
> be sent to it using canSendRequest().
> This is the code for canSendRequest():
>  
> {code:java}
> return connectionStates.isReady(node, now) && selector.isChannelReady(node) 
> && inFlightRequests.canSendMore(node)
> {code}
>  
>  
> using some debugging tools i saw this expression always evaluates to false 
> since the last part (canSendMore) is false. 
>  
> This is the code for canSendMore:
> {code:java}
> public boolean canSendMore(String node) { 
> Deque queue = requests.get(node); return queue 
> == null || queue.isEmpty() || (queue.peekFirst().send.completed() && 
> queue.size() < this.maxInFlightRequestsPerConnection); }
> {code}
>  
>  
> i verified 
> {code:java}
> queue.peekFirst().send.completed()
> {code}
> is true, an

[GitHub] [kafka] ijuma commented on a change in pull request #9435: KAFKA-10606: Disable auto topic creation for fetch-all-topic-metadata request

2020-11-19 Thread GitBox


ijuma commented on a change in pull request #9435:
URL: https://github.com/apache/kafka/pull/9435#discussion_r526911200



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1333,7 +1343,17 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 }
 
-val completeTopicMetadata = topicMetadata ++ 
unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata
+val completeTopicMetadata = (if (metadataRequest.isAllTopics) {

Review comment:
   This is a good point. We should pass a boolean to `getTopicMetadata` 
indicating that it's an "allTopics" request and have that method handle 
everything.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #9598: KAFKA-10701 : First line of detailed stats from consumer-perf-test.sh incorrect

2020-11-19 Thread GitBox


mumrah commented on pull request #9598:
URL: https://github.com/apache/kafka/pull/9598#issuecomment-730402458


   @quanuw there are some statements that expect `joinStart` to be a timestamp 
in order to calculate durations. When it's zero, those durations become huge 
which leads to subsequent calculations being incorrect. 
   
   You should be able to run the consumer perf test with `KAFKA_DEBUG=1` flag 
and attach a debugger from IntelliJ or Eclipse. This will allow more insight as 
to what's going on.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah merged pull request #9598: KAFKA-10701 : First line of detailed stats from consumer-perf-test.sh incorrect

2020-11-19 Thread GitBox


mumrah merged pull request #9598:
URL: https://github.com/apache/kafka/pull/9598


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9892) Producer state snapshot needs to be forced to disk

2020-11-19 Thread Brajesh Kumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235516#comment-17235516
 ] 

 Brajesh Kumar commented on KAFKA-9892:
---

[~ijuma]Can you please help me with build on my CR ?

> Producer state snapshot needs to be forced to disk
> --
>
> Key: KAFKA-9892
> URL: https://issues.apache.org/jira/browse/KAFKA-9892
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.6.0
>Reporter: Jun Rao
>Assignee:  Brajesh Kumar
>Priority: Major
>
> Currently, ProducerStateManager.writeSnapshot() only calls 
> fileChannel.close(), but not explicitly fileChannel.force(). It seems force() 
> is not guaranteed to be called on close(). 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

2020-11-19 Thread GitBox


cadonna commented on a change in pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#discussion_r526880412



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1070,7 +1070,9 @@ private Thread shutdownHelper(final boolean error) {
 adminClient.close();
 
 streamsMetrics.removeAllClientLevelMetrics();
+streamsMetrics.removeAllClientLevelSensors();

Review comment:
   Could you please add a public method to `StreamsMetricsImpl` named 
`removeAllClientLevelSensorsAndMetrics()` that calls 
`removeAllClientLevelMetrics()` and `removeAllClientLevelSensors()` and make 
the latter two methods `private`?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/internals/metrics/ClientMetrics.java
##
@@ -60,6 +65,7 @@ private ClientMetrics() {}
 "The description of the topology executed in the Kafka Streams client";
 private static final String STATE_DESCRIPTION = "The state of the Kafka 
Streams client";
 private static final String ALIVE_STREAM_THREADS_DESCRIPTION = "The 
current number of alive stream threads that are running or participating in 
rebalance";
+private static final String FAILED_STREAM_THREADS_DESCRIPTION = "The 
number of failed stream threads so far for a given Kafka Streams client";

Review comment:
   ```suggestion
   private static final String FAILED_STREAM_THREADS_DESCRIPTION = "The 
number of failed stream threads since the start of the Kafka Streams client";
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##
@@ -93,6 +93,7 @@ public int hashCode() {
 
 private final Version version;
 private final Deque clientLevelMetrics = new LinkedList<>();
+private final Map> clientLevelSensors = new 
HashMap<>();

Review comment:
   Here you should just need a queue as for `clientLevelMetrics`. We need a 
map for the other levels because there can be multiple objects for each level, 
e.g., there might be multiple stream thread and each one manages its sensors 
under a key in the map. However, there is only one client on client level.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##
@@ -253,6 +268,16 @@ public final void removeAllClientLevelMetrics() {
 }
 }
 
+public final void removeAllClientLevelSensors() {

Review comment:
   Unit tests for this method are missing. Please also consider my comment 
in class `KafkaStreams` for these unit tests. 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -221,6 +221,9 @@ State setState(final State newState) {
 throw new StreamsException(logPrefix + "Unexpected state 
transition from " + oldState + " to " + newState);
 } else {
 log.info("State transition from {} to {}", oldState, newState);
+if (newState == State.DEAD) {
+failedStreamThreadSensor.record();
+}

Review comment:
   Not every dead stream thread is a failed stream thread. You should 
record this metric where the uncaught exception handler is called because there 
we now that a stream thread died unexpectedly.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
##
@@ -377,7 +378,7 @@ private void shouldAddMetricsOnAllLevels(final String 
builtInMetricsVersion) thr
 builtInMetricsVersion
 );
 checkCacheMetrics(builtInMetricsVersion);
-
+verifyFailedStreamThreadsSensor(0.0);

Review comment:
   I would put the test whether the metric is recorded correctly in 
`StreamThreadTest`. An example for such a test is 
`shouldLogAndRecordSkippedRecordsForInvalidTimestamps()`. I do not think an 
integration test is needed. The test regarding the existence of the metric, 
i.e., `checkMetricByName(listMetricThread, FAILED_STREAM_THREADS, 1);` should 
stay here.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/internals/metrics/ClientMetricsTest.java
##
@@ -99,6 +121,27 @@ public void shouldAddAliveStreamThreadsMetric() {
 );
 }
 
+@Test
+public void shouldGetFailedStreamThreadsSensor() {
+final String name = "failed-stream-threads";
+final String description = "The number of failed stream threads so far 
for a given Kafka Streams client";
+expect(streamsMetrics.clientLevelSensor(name, 
RecordingLevel.INFO)).andReturn(expectedSensor);
+expect(streamsMetrics.clientLevelTagMap()).andReturn(tagMap);
+StreamsMetricsImpl.addSumMetricToSensor(
+expectedSensor,
+CLIENT_LEVEL_GROUP,
+tagMap,
+name,
+false,
+description
+  

[GitHub] [kafka] mimaison merged pull request #9545: [mm2] Allow Checkpoints for consumers using static partition assignments

2020-11-19 Thread GitBox


mimaison merged pull request #9545:
URL: https://github.com/apache/kafka/pull/9545


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-19 Thread GitBox


dajac commented on pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#issuecomment-730447091


   Builds are green. Merging to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ryannedolan commented on pull request #9545: [mm2] Allow Checkpoints for consumers using static partition assignments

2020-11-19 Thread GitBox


ryannedolan commented on pull request #9545:
URL: https://github.com/apache/kafka/pull/9545#issuecomment-730448017


   Sorry, late to the convo, but lgtm, thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #9623: KAFKA-10692: Add delegation.token.secret.key, deprecate ...master.key

2020-11-19 Thread GitBox


mimaison merged pull request #9623:
URL: https://github.com/apache/kafka/pull/9623


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10692) Rename broker master key config for KIP 681

2020-11-19 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10692?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-10692.

Fix Version/s: 2.8.0
   Resolution: Fixed

> Rename broker master key config for KIP 681
> ---
>
> Key: KAFKA-10692
> URL: https://issues.apache.org/jira/browse/KAFKA-10692
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Major
> Fix For: 2.8.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac merged pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-19 Thread GitBox


dajac merged pull request #9386:
URL: https://github.com/apache/kafka/pull/9386


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10024) Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-19 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-10024.
-
Fix Version/s: 2.8.0
   Resolution: Fixed

> Add dynamic configuration and enforce quota for per-IP connection rate limits 
> (KIP-612, part 2)
> ---
>
> Key: KAFKA-10024
> URL: https://issues.apache.org/jira/browse/KAFKA-10024
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Anna Povzner
>Assignee: David Mao
>Priority: Major
>  Labels: features
> Fix For: 2.8.0
>
>
> This JIRA is for the second part of KIP-612 – Add per-IP connection creation 
> rate limits.
> As described here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10746) Consumer poll timeout Expiration should be logged as WARNING not INFO.

2020-11-19 Thread Benedikt Linse (Jira)
Benedikt Linse created KAFKA-10746:
--

 Summary: Consumer poll timeout Expiration should be logged as 
WARNING not INFO. 
 Key: KAFKA-10746
 URL: https://issues.apache.org/jira/browse/KAFKA-10746
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 2.5.1, 2.6.0, 2.5.0
Reporter: Benedikt Linse


When a consumer does not poll regularly, and the `max.poll.interval.ms` 
threshold is reached, the consumer leaves the consumer group, and the reason is 
logged as an INFO message:

[https://github.com/a0x8o/kafka/blob/e032a4ad9bac2392b4406d0a3b245d6011edd15b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1356]

[https://github.com/a0x8o/kafka/blob/e032a4ad9bac2392b4406d0a3b245d6011edd15b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1016]

Most Kafka users ignore INFO messages or have the log level set to WARN. Still 
many users run into this issue, since their applications take too long to 
process the polled records, and then the consumer fails to commit the offsets, 
which leads to duplicate message processing. Not seeing the error message in 
the first place means that users loose a lot of time debugging and searching 
for the reason for duplicate message processing.

Therefore it seems like the log level of this message should be increased to 
WARN. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10747) Implement ClientQuota APIs for altering and describing IP entity quotas

2020-11-19 Thread David Mao (Jira)
David Mao created KAFKA-10747:
-

 Summary: Implement ClientQuota APIs for altering and describing IP 
entity quotas 
 Key: KAFKA-10747
 URL: https://issues.apache.org/jira/browse/KAFKA-10747
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 2.8.0
Reporter: David Mao
Assignee: David Mao






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10748) Add IP connection rate throttling metric

2020-11-19 Thread David Mao (Jira)
David Mao created KAFKA-10748:
-

 Summary: Add IP connection rate throttling metric
 Key: KAFKA-10748
 URL: https://issues.apache.org/jira/browse/KAFKA-10748
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 2.8.0
Reporter: David Mao
Assignee: David Mao






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on pull request #7498: KAFKA-9023: Log request destination when the Producer gets disconnected

2020-11-19 Thread GitBox


dajac commented on pull request #7498:
URL: https://github.com/apache/kafka/pull/7498#issuecomment-730458347


   Failed tests are not related. Merging to trunk.
   * Build / JDK 11 / 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   * Build / JDK 11 / 
org.apache.kafka.streams.integration.StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions[Optimization
 = none]
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10746) Consumer poll timeout Expiration should be logged as WARNING not INFO.

2020-11-19 Thread Benedikt Linse (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10746?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benedikt Linse updated KAFKA-10746:
---
Description: 
When a consumer does not poll regularly, and the `max.poll.interval.ms` 
threshold is reached, the consumer leaves the consumer group, and the reason is 
logged as an INFO message:

[https://github.com/a0x8o/kafka/blob/e032a4ad9bac2392b4406d0a3b245d6011edd15b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1356]

[https://github.com/a0x8o/kafka/blob/e032a4ad9bac2392b4406d0a3b245d6011edd15b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1016]

Most Kafka users ignore INFO messages or have the log level set to WARN. Still 
many users run into this issue, since their applications take too long to 
process the polled records, and then the consumer fails to commit the offsets, 
which leads to duplicate message processing. Not seeing the error message in 
the first place means that users lose a lot of time debugging and searching for 
the reason for duplicate message processing.

Therefore it seems like the log level of this message should be increased to 
WARN. 

  was:
When a consumer does not poll regularly, and the `max.poll.interval.ms` 
threshold is reached, the consumer leaves the consumer group, and the reason is 
logged as an INFO message:

[https://github.com/a0x8o/kafka/blob/e032a4ad9bac2392b4406d0a3b245d6011edd15b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1356]

[https://github.com/a0x8o/kafka/blob/e032a4ad9bac2392b4406d0a3b245d6011edd15b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1016]

Most Kafka users ignore INFO messages or have the log level set to WARN. Still 
many users run into this issue, since their applications take too long to 
process the polled records, and then the consumer fails to commit the offsets, 
which leads to duplicate message processing. Not seeing the error message in 
the first place means that users loose a lot of time debugging and searching 
for the reason for duplicate message processing.

Therefore it seems like the log level of this message should be increased to 
WARN. 


> Consumer poll timeout Expiration should be logged as WARNING not INFO. 
> ---
>
> Key: KAFKA-10746
> URL: https://issues.apache.org/jira/browse/KAFKA-10746
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.5.0, 2.6.0, 2.5.1
>Reporter: Benedikt Linse
>Priority: Minor
>
> When a consumer does not poll regularly, and the `max.poll.interval.ms` 
> threshold is reached, the consumer leaves the consumer group, and the reason 
> is logged as an INFO message:
> [https://github.com/a0x8o/kafka/blob/e032a4ad9bac2392b4406d0a3b245d6011edd15b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1356]
> [https://github.com/a0x8o/kafka/blob/e032a4ad9bac2392b4406d0a3b245d6011edd15b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1016]
> Most Kafka users ignore INFO messages or have the log level set to WARN. 
> Still many users run into this issue, since their applications take too long 
> to process the polled records, and then the consumer fails to commit the 
> offsets, which leads to duplicate message processing. Not seeing the error 
> message in the first place means that users lose a lot of time debugging and 
> searching for the reason for duplicate message processing.
> Therefore it seems like the log level of this message should be increased to 
> WARN. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac merged pull request #7498: KAFKA-9023: Log request destination when the Producer gets disconnected

2020-11-19 Thread GitBox


dajac merged pull request #7498:
URL: https://github.com/apache/kafka/pull/7498


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9023) Producer NETWORK_EXCEPTION response should log more information

2020-11-19 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-9023.

Fix Version/s: 2.8.0
   Resolution: Fixed

> Producer NETWORK_EXCEPTION response should log more information
> ---
>
> Key: KAFKA-9023
> URL: https://issues.apache.org/jira/browse/KAFKA-9023
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
> Fix For: 2.8.0
>
>
> When diagnosing network issues, it is useful to have a clear picture of which 
> client disconnected from which broker at what time.
> Currently, when the producer receives a NETWORK_EXCEPTION in its responses, 
> it logs the following:
> {code:java}
> [Producer clientId=] Received invalid metadata error in produce 
> request on partition  due to 
> org.apache.kafka.common.errors.NetworkException: The server disconnected 
> before a response was received.. Going to request metadata update now {code}
> It would be good if we logged additional information regarding the 
> broker/host whose connection was disconnected



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-11-19 Thread GitBox


mimaison commented on pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#issuecomment-730467364


   > If we have to control the complexity, I would prefer to drop 
testWithBrokerRestart and keep MirrorConnectorsIntegrationSSLTest, as it makes 
sense to run simple validation in SSL setup.
   
   I think that would be a good idea. We can try to introduce this test in a 
follow up PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10747) Implement ClientQuota APIs for altering and describing IP entity quotas

2020-11-19 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-10747:
--
Component/s: core
 config

> Implement ClientQuota APIs for altering and describing IP entity quotas 
> 
>
> Key: KAFKA-10747
> URL: https://issues.apache.org/jira/browse/KAFKA-10747
> Project: Kafka
>  Issue Type: Improvement
>  Components: config, core
>Affects Versions: 2.8.0
>Reporter: David Mao
>Assignee: David Mao
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10748) Add IP connection rate throttling metric

2020-11-19 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-10748:
--
Component/s: network
 core

> Add IP connection rate throttling metric
> 
>
> Key: KAFKA-10748
> URL: https://issues.apache.org/jira/browse/KAFKA-10748
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, network
>Affects Versions: 2.8.0
>Reporter: David Mao
>Assignee: David Mao
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10749) Add throttling of IPs by connection rate

2020-11-19 Thread David Mao (Jira)
David Mao created KAFKA-10749:
-

 Summary: Add throttling of IPs by connection rate
 Key: KAFKA-10749
 URL: https://issues.apache.org/jira/browse/KAFKA-10749
 Project: Kafka
  Issue Type: New Feature
  Components: core, network
Reporter: David Mao
Assignee: David Mao
 Fix For: 2.8.0


This tracks the completion of IP connection rate throttling as detailed in

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers]

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10747) Implement ClientQuota APIs for altering and describing IP entity quotas

2020-11-19 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10747?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-10747:
--
Parent: KAFKA-10749
Issue Type: Sub-task  (was: Improvement)

> Implement ClientQuota APIs for altering and describing IP entity quotas 
> 
>
> Key: KAFKA-10747
> URL: https://issues.apache.org/jira/browse/KAFKA-10747
> Project: Kafka
>  Issue Type: Sub-task
>  Components: config, core
>Affects Versions: 2.8.0
>Reporter: David Mao
>Assignee: David Mao
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10748) Add IP connection rate throttling metric

2020-11-19 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-10748:
--
Parent: KAFKA-10749
Issue Type: Sub-task  (was: Improvement)

> Add IP connection rate throttling metric
> 
>
> Key: KAFKA-10748
> URL: https://issues.apache.org/jira/browse/KAFKA-10748
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core, network
>Affects Versions: 2.8.0
>Reporter: David Mao
>Assignee: David Mao
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10024) Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-19 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-10024:
--
Parent: KAFKA-10749
Issue Type: Sub-task  (was: Improvement)

> Add dynamic configuration and enforce quota for per-IP connection rate limits 
> (KIP-612, part 2)
> ---
>
> Key: KAFKA-10024
> URL: https://issues.apache.org/jira/browse/KAFKA-10024
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Reporter: Anna Povzner
>Assignee: David Mao
>Priority: Major
>  Labels: features
> Fix For: 2.8.0
>
>
> This JIRA is for the second part of KIP-612 – Add per-IP connection creation 
> rate limits.
> As described here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-612%3A+Ability+to+Limit+Connection+Creation+Rate+on+Brokers]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on a change in pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-11-19 Thread GitBox


mimaison commented on a change in pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#discussion_r527005533



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationSSLTest.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
+import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
+import org.apache.kafka.connect.mirror.MirrorMakerConfig;
+import org.apache.kafka.connect.mirror.MirrorSourceConnector;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.test.TestUtils;
+import kafka.server.KafkaConfig$;
+
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Before;
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * Tests MM2 replication with SSL enabled at backup kafka cluster
+ */
+@Category(IntegrationTest.class)
+public class MirrorConnectorsIntegrationSSLTest extends 
MirrorConnectorsIntegrationBaseTest {
+
+private static final Logger log = 
LoggerFactory.getLogger(MirrorConnectorsIntegrationSSLTest.class);
+
+private static final List CONNECTOR_LIST = 
+Arrays.asList(MirrorSourceConnector.class, 
MirrorCheckpointConnector.class, MirrorHeartbeatConnector.class);
+
+@Before
+public void setup() throws InterruptedException {
+try {
+Map sslConfig = 
TestSslUtils.createSslConfig(false, true, Mode.SERVER, TestUtils.tempFile(), 
"testCert");
+backupBrokerProps.put(KafkaConfig$.MODULE$.ListenersProp(), 
"SSL://localhost:0");
+
backupBrokerProps.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), 
"SSL");
+backupBrokerProps.putAll(sslConfig);
+} catch (final Exception e) {
+throw new RuntimeException(e);
+}
+startClusters();
+}
+
+@After
+public void close() {

Review comment:
   Yes it will





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9433: KAFKA-10607: Consistent behaviour for response errorCounts()

2020-11-19 Thread GitBox


tombentley commented on pull request #9433:
URL: https://github.com/apache/kafka/pull/9433#issuecomment-730476432


   @abbccdda I think we're just waiting for your review here. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9433: KAFKA-10607: Consistent behaviour for response errorCounts()

2020-11-19 Thread GitBox


chia7712 commented on pull request #9433:
URL: https://github.com/apache/kafka/pull/9433#issuecomment-730478145


   @tombentley Could you trigger QA again? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9433: KAFKA-10607: Consistent behaviour for response errorCounts()

2020-11-19 Thread GitBox


tombentley commented on pull request #9433:
URL: https://github.com/apache/kafka/pull/9433#issuecomment-730479015


   @chia7712 I thought only committers can do that.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9433: KAFKA-10607: Consistent behaviour for response errorCounts()

2020-11-19 Thread GitBox


chia7712 commented on pull request #9433:
URL: https://github.com/apache/kafka/pull/9433#issuecomment-730480556


   IIRC, the retest command does not work currently. Could you rebase code to 
trigger QA?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2020-11-19 Thread GitBox


jolshan commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r527020272



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1749,6 +1752,9 @@ class Log(@volatile private var _dir: File,
   checkIfMemoryMappedBufferClosed()
   // remove the segments for lookups
   removeAndDeleteSegments(deletable, asyncDelete = true, reason)
+  if (reason == LogCompaction) {
+
maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset, 
SegmentCompaction)
+  }
   
maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset, 
SegmentDeletion)

Review comment:
   Sorry if this is a little confusing. `reason` is for deletion and the 
other parameter `SegmentCompaction` is the reason for changing the offset. 
Currently that is used to handle the issue for updating past the high 
watermark. I agree that it is not the cleanest way and we should figure out 
what we want to do with compacting past the high watermark.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2020-11-19 Thread GitBox


jolshan commented on pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#issuecomment-730487839


   @chia7712 The code path you linked is the code path beginningOffsets uses. 
Are you suggesting removing `ListOffsetRequest.EARLIEST_TIMESTAMP` and 
replacing it with 0?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2020-11-19 Thread GitBox


chia7712 commented on pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#issuecomment-730491021


   > Are you suggesting removing ListOffsetRequest.EARLIEST_TIMESTAMP and 
replacing it with 0? 
   
   yep, it seems like a simple solution without much changes. However, I have 
not understood this issue totally. It is just my imagination.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #9615: KAFKA-10500: Add thread option

2020-11-19 Thread GitBox


wcarlson5 commented on pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#issuecomment-730500854


   @cadonna @mjsax @vvcephei Part 3 of KIP-663 add threads is ready for review



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2020-11-19 Thread GitBox


jolshan commented on pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#issuecomment-730503145


   @chia7712 I did think about this solution initially. I'm wondering if we do 
want to update the segment offsets and logStartOffsets correctly though. If 
that isn't as important then maybe we can go with the simpler solution. This 
solution removes empty segments and keeps the baseOffsets updated. If these 
baseOffsets are used in other places, then maybe this solution is better.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10549) Add topic ID support to DeleteTopics,ListOffsets, OffsetForLeaders, StopReplica

2020-11-19 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan reassigned KAFKA-10549:
--

Assignee: Justine Olshan

> Add topic ID support to DeleteTopics,ListOffsets, OffsetForLeaders, 
> StopReplica
> ---
>
> Key: KAFKA-10549
> URL: https://issues.apache.org/jira/browse/KAFKA-10549
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> ListOffsets, OffsetForLeaders, and StopReplica protocols will replace topic 
> name with topic ID and will be used to prevent reads from deleted topics
> Delete topics will be changed to support topic ids and delete sooner.
> This may be split into two or more issues if necessary.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9601: KAFKA-10729: Bump remaining RPC's to use tagged fields.

2020-11-19 Thread GitBox


hachikuji commented on a change in pull request #9601:
URL: https://github.com/apache/kafka/pull/9601#discussion_r527045689



##
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##
@@ -108,7 +108,9 @@ object ApiVersion {
 // Bup Fetch protocol for Raft protocol (KIP-595)
 KAFKA_2_7_IV1,
 // Introduced AlterIsr (KIP-497)
-KAFKA_2_7_IV2
+KAFKA_2_7_IV2,
+// Flexible versioning on ListOffsets

Review comment:
   For what it's worth, `WriteTxnMarkers` and `OffsetsForLeaderEpoch` are 
also inter-broker APIs.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda opened a new pull request #9624: KAFKA-10655: wrap and catch exception for appendAsLeader failure

2020-11-19 Thread GitBox


abbccdda opened a new pull request #9624:
URL: https://github.com/apache/kafka/pull/9624


   When leader append fails, we should trigger the resign process and do a 
graceful shutdown afterwards.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9433: KAFKA-10607: Consistent behaviour for response errorCounts()

2020-11-19 Thread GitBox


tombentley commented on pull request #9433:
URL: https://github.com/apache/kafka/pull/9433#issuecomment-730508481


   @chia7712 done



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10548) Implement deletion logic for LeaderAndIsrRequests

2020-11-19 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan reassigned KAFKA-10548:
--

Assignee: Justine Olshan

> Implement deletion logic for LeaderAndIsrRequests
> -
>
> Key: KAFKA-10548
> URL: https://issues.apache.org/jira/browse/KAFKA-10548
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> This will allow for specialized deletion logic when receiving 
> LeaderAndIsrRequests
> Will also create and utilize delete.stale.topic.delay.ms configuration option



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #9433: KAFKA-10607: Consistent behaviour for response errorCounts()

2020-11-19 Thread GitBox


chia7712 commented on pull request #9433:
URL: https://github.com/apache/kafka/pull/9433#issuecomment-730516709


   @tombentley thanks! I will merge this PR tomorrow if no objection :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10580) Add topic ID support to Fetch request

2020-11-19 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10580?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan reassigned KAFKA-10580:
--

Assignee: Justine Olshan

> Add topic ID support to Fetch request
> -
>
> Key: KAFKA-10580
> URL: https://issues.apache.org/jira/browse/KAFKA-10580
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> Prevent fetching a stale topic with topic IDs



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10750) test failure scenarios of MirrorMaker 2

2020-11-19 Thread Ning Zhang (Jira)
Ning Zhang created KAFKA-10750:
--

 Summary: test failure scenarios of MirrorMaker 2
 Key: KAFKA-10750
 URL: https://issues.apache.org/jira/browse/KAFKA-10750
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Affects Versions: 2.7.0
Reporter: Ning Zhang
Assignee: Ning Zhang


As a follow up of https://issues.apache.org/jira/browse/KAFKA-10304, it may be 
necessary to test the failure scenarios, e.g. Kafka broker stop then start

To make PR [https://github.com/apache/kafka/pull/9224] smaller, we chopped down 
the testing code for failure scenarios, and plan to add them back in this 
ticket.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9619: MINOR: Reduce sends created by `SendBuilder`

2020-11-19 Thread GitBox


hachikuji commented on a change in pull request #9619:
URL: https://github.com/apache/kafka/pull/9619#discussion_r527073144



##
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/SendBuilder.java
##
@@ -122,6 +123,14 @@ public void writeVarlong(long i) {
 ByteUtils.writeVarlong(i, buffer);
 }
 
+private void flushPendingSend() {
+if (!buffers.isEmpty()) {
+ByteBuffer[] byteBufferArray = buffers.toArray(new ByteBuffer[0]);
+sends.add(new ByteBufferSend(destinationId, byteBufferArray));

Review comment:
   I was somewhat inclined to leave this as is, but finally I decided to do 
it since it might help (even if only little) with fetch overhead.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9539: KAFKA-10634: Adding LeaderId to Voters list in LeaderChangeMessage

2020-11-19 Thread GitBox


vamossagar12 commented on a change in pull request #9539:
URL: https://github.com/apache/kafka/pull/9539#discussion_r527073514



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
##
@@ -709,6 +709,9 @@ static void verifyLeaderChangeMessage(
 assertEquals(leaderId, leaderChangeMessage.leaderId());
 assertEquals(voters.stream().map(voterId -> new 
Voter().setVoterId(voterId)).collect(Collectors.toList()),
 leaderChangeMessage.voters());
+assertEquals(voters.stream().map(voterId -> new 
Voter().setVoterId(voterId)).collect(Collectors.toSet()),

Review comment:
   @hachikuji , I have added a test case where i initialize a quorum of 3 
and got votes from majority. Needed to tweak a couple of methods as some of 
them were having hardcoded values of 1 on the expected value. Plz review.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9619: MINOR: Reduce sends created by `SendBuilder`

2020-11-19 Thread GitBox


hachikuji commented on a change in pull request #9619:
URL: https://github.com/apache/kafka/pull/9619#discussion_r527073144



##
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/SendBuilder.java
##
@@ -122,6 +123,14 @@ public void writeVarlong(long i) {
 ByteUtils.writeVarlong(i, buffer);
 }
 
+private void flushPendingSend() {
+if (!buffers.isEmpty()) {
+ByteBuffer[] byteBufferArray = buffers.toArray(new ByteBuffer[0]);
+sends.add(new ByteBufferSend(destinationId, byteBufferArray));

Review comment:
   I was somewhat inclined to leave this as is, but finally I decided to do 
it since it might help (even if only a little) with fetch overhead.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-11-19 Thread GitBox


mimaison commented on a change in pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#discussion_r527014720



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
##
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.mirror.MirrorMakerConfig;
+import org.apache.kafka.connect.mirror.SourceAndTarget;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import static org.apache.kafka.connect.mirror.TestUtils.expectedRecords;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import org.apache.kafka.test.IntegrationTest;
+import kafka.server.KafkaConfig$;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static org.junit.Assert.assertFalse;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Common Test functions for MM2 integration tests
+ */
+@Category(IntegrationTest.class)
+public class MirrorConnectorsIntegrationBaseTest {
+private static final Logger log = 
LoggerFactory.getLogger(MirrorConnectorsIntegrationBaseTest.class);
+
+protected static final int NUM_RECORDS_PER_PARTITION = 10;
+public static final int NUM_PARTITIONS = 10;
+protected static final int NUM_RECORDS_PRODUCED = NUM_PARTITIONS * 
NUM_RECORDS_PER_PARTITION;
+protected static final int RECORD_TRANSFER_DURATION_MS = 30_000;
+protected static final int CHECKPOINT_DURATION_MS = 20_000;
+protected static final int RECORD_CONSUME_DURATION_MS = 20_000;
+protected static final int OFFSET_SYNC_DURATION_MS = 30_000;
+protected static final int NUM_WORKERS = 3;
+protected static final int CONSUMER_POLL_TIMEOUT_MS = 500;
+protected static final int BROKER_RESTART_TIMEOUT_MS = 10_000;
+protected static final String PRIMARY_CLUSTER_ALIAS = "primary";
+protected static final String BACKUP_CLUSTER_ALIAS = "backup";
+
+protected Map mm2Props;
+protected MirrorMakerConfig mm2Config; 
+protected EmbeddedConnectCluster primary;
+protected EmbeddedConnectCluster backup;
+
+private final AtomicBoolean exited = new AtomicBoolean(false);
+private Properties primaryBrokerProps = new Properties();
+protected Properties backupBrokerProps = new Properties();
+private Map primaryWorkerProps = new HashMap<>();
+private Map backupWorkerProps = new HashMap<>();
+private Properties sslProps = new Properties();
+
+private void loadSslPropsFromBrokerConfig() {   

Review comment:
   Why do we have SSL specific methods here? Could we move all the SSL bits 
into the SSL class?
   
   We have fields for the configurations. So we could set them a

[GitHub] [kafka] cadonna commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-11-19 Thread GitBox


cadonna commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527055689



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (stateLock) {
+if (state == State.RUNNING || state == State.REBALANCING) {
+final int threadIdx = threads.size() + 1;
+final long cacheSizePerThread = 
getCacheSizePerThread(threadIdx);

Review comment:
   This should be:
   ```suggestion
   final long cacheSizePerThread = 
getCacheSizePerThread(threads.size() + 1);
   ```

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (stateLock) {
+if (state == State.RUNNING || state == State.REBALANCING) {
+final int threadIdx = threads.size() + 1;

Review comment:
   Assume the following thread list [t2, t3, t4], `threadIdx` would be 4, 
which is already there. You should keep the currently used `threadIdx`s and 
check those to decide on the next `threadIdx`.
   
   

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (stateLock) {
+if (state == State.RUNNING || state == State.REBALANCING)

[GitHub] [kafka] hachikuji commented on a change in pull request #9617: MINOR: Factor out common response parsing logic

2020-11-19 Thread GitBox


hachikuji commented on a change in pull request #9617:
URL: https://github.com/apache/kafka/pull/9617#discussion_r527095050



##
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##
@@ -974,21 +969,6 @@ private void handleInitiateApiVersionRequests(long now) {
 }
 }
 
-/**
- * Validate that the response corresponds to the request we expect or else 
explode
- */
-private static void correlate(RequestHeader requestHeader, ResponseHeader 
responseHeader) {

Review comment:
   Why do you want to keep it if it is not used?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9617: MINOR: Factor out common response parsing logic

2020-11-19 Thread GitBox


hachikuji commented on a change in pull request #9617:
URL: https://github.com/apache/kafka/pull/9617#discussion_r527095539



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/CorrelationIdMismatchException.java
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+/**
+ * Raised if the correlationId in a response header does not match
+ * the expected value from the request header.
+ */
+public class CorrelationIdMismatchException extends IllegalStateException {
+private final int requestCorrelationId;
+private final int responseCorrelationId;
+
+public CorrelationIdMismatchException(
+String message,
+int requestCorrelationId,
+int responseCorrelationId
+) {
+super(message);
+this.requestCorrelationId = requestCorrelationId;
+this.responseCorrelationId = responseCorrelationId;
+}
+
+public int requestCorrelationId() {
+return requestCorrelationId;

Review comment:
   Yeah, I added it for completeness. It is a little odd if the mismatch 
exception only tells you what the response correlationId was.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…

2020-11-19 Thread GitBox


junrao commented on a change in pull request #8826:
URL: https://github.com/apache/kafka/pull/8826#discussion_r527096607



##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##
@@ -366,8 +379,9 @@ public void testMetadataFetch() throws InterruptedException 
{
 // Return empty cluster 4 times and cluster from then on
 when(metadata.fetch()).thenReturn(emptyCluster, emptyCluster, 
emptyCluster, emptyCluster, onePartitionCluster);
 
-KafkaProducer producer = new KafkaProducer(configs, new StringSerializer(),
-new StringSerializer(), metadata, new MockClient(Time.SYSTEM, 
metadata), null, Time.SYSTEM) {
+KafkaProducer producer = new KafkaProducer(

Review comment:
   Should we use the private static constructor in this class? Ditto below.

##
File path: 
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
##
@@ -1290,6 +1308,23 @@ public void serializerShouldSeeGeneratedClientId() {
 producer.close();
 }
 
+@Test
+public void testUnusedConfigs() {
+Map props = new HashMap<>();
+props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:");
+props.put(SslConfigs.SSL_PROTOCOL_CONFIG, "TLS");
+ProducerConfig config = new 
ProducerConfig(ProducerConfig.appendSerializerToConfig(props,
+new StringSerializer(), new StringSerializer()));
+
+assertTrue(new ProducerConfig(config.originals(), 
false).unused().contains(SslConfigs.SSL_PROTOCOL_CONFIG));
+assertTrue(config.unused().contains(SslConfigs.SSL_PROTOCOL_CONFIG));
+
+try (KafkaProducer producer = new 
KafkaProducer<>(config, null, null,

Review comment:
   producer is unused.

##
File path: 
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
##
@@ -105,7 +105,9 @@ public AbstractConfig(ConfigDef definition, Map 
originals,  Map
 throw new ConfigException(entry.getKey().toString(), 
entry.getValue(), "Key must be a string.");
 
 this.originals = resolveConfigVariables(configProviderProps, 
(Map) originals);
-this.values = definition.parse(this.originals);
+// pass a copy to definition.parse. Otherwise, the definition.parse 
adds all keys of definitions to "used" group
+// since definition.parse needs to call "RecordingMap#get" when 
checking all definitions.
+this.values = definition.parse(new HashMap<>(this.originals));

Review comment:
   Hmm, why is this necessary since we reset used to empty in the next line?

##
File path: 
clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
##
@@ -159,24 +159,25 @@ private static ChannelBuilder create(SecurityProtocol 
securityProtocol,
 }
 
 // Visibility for testing
-protected static Map channelBuilderConfigs(final 
AbstractConfig config, final ListenerName listenerName) {
-Map parsedConfigs;
+@SuppressWarnings("unchecked")
+static Map channelBuilderConfigs(final AbstractConfig 
config, final ListenerName listenerName) {
+Map parsedConfigs;
 if (listenerName == null)
-parsedConfigs = config.values();
+parsedConfigs = (Map) config.values();
 else
 parsedConfigs = 
config.valuesWithPrefixOverride(listenerName.configPrefix());
 
-// include any custom configs from original configs
-Map configs = new HashMap<>(parsedConfigs);
 config.originals().entrySet().stream()
 .filter(e -> !parsedConfigs.containsKey(e.getKey())) // exclude 
already parsed configs
 // exclude already parsed listener prefix configs
 .filter(e -> !(listenerName != null && 
e.getKey().startsWith(listenerName.configPrefix()) &&
 
parsedConfigs.containsKey(e.getKey().substring(listenerName.configPrefix().length()
 // exclude keys like `{mechanism}.some.prop` if "listener.name." 
prefix is present and key `some.prop` exists in parsed configs.
 .filter(e -> !(listenerName != null && 
parsedConfigs.containsKey(e.getKey().substring(e.getKey().indexOf('.') + 1
-.forEach(e -> configs.put(e.getKey(), e.getValue()));
-return configs;
+.forEach(e -> parsedConfigs.put(e.getKey(), e.getValue()));
+// The callers may add new elements to return map so we should not 
wrap it to a immutable map. Otherwise,
+// the callers have to create a new map to carry more elements and 
then following Get ops are not recorded.

Review comment:
   This comment is still not very clear to me. Are you saying if the caller 
needs to add more elements, it needs to create a new RecordingMap for the 
additional elements to be recorded?

##
File path: 
clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
##
@@ -79,14 +79,29 @@ 

[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-11-19 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527099742



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (stateLock) {
+if (state == State.RUNNING || state == State.REBALANCING) {
+final int threadIdx = threads.size() + 1;

Review comment:
   Looks like I didn't understand threadIdx. that makes sense now

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (stateLock) {
+if (state == State.RUNNING || state == State.REBALANCING) {
+final int threadIdx = threads.size() + 1;
+final long cacheSizePerThread = 
getCacheSizePerThread(threadIdx);
+resizeThreadCache(threadIdx);
+final StreamThread streamThread = StreamThread.create(
+internalTopologyBuilder,
+config,
+clientSupplier,
+adminClient,
+processId,
+clientId,
+streamsMetrics,
+time,
+streamsMetadataState,
+cacheSizePerThread,
+stateDirectory,
+delegatingStateRestoreListener,
+threadIdx,
+KafkaStreams.this::closeToError,
+streamsUncaughtExceptionHandler
+);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
+streamThread.setStateListener(streamStateListener);
+return Optional.of(streamThread.getName());
+} else {
+return Optional.empty();
+}
+}
+}

Review comment:
   right before the positive return





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-11-19 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527099779



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +904,57 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (stateLock) {
+if (state == State.RUNNING || state == State.REBALANCING) {
+final int threadIdx = threads.size() + 1;
+final long cacheSizePerThread = 
getCacheSizePerThread(threadIdx);
+resizeThreadCache(threadIdx);
+final StreamThread streamThread = StreamThread.create(
+internalTopologyBuilder,
+config,
+clientSupplier,
+adminClient,
+processId,
+clientId,
+streamsMetrics,
+time,
+streamsMetadataState,
+cacheSizePerThread,
+stateDirectory,
+delegatingStateRestoreListener,
+threadIdx,
+KafkaStreams.this::closeToError,
+streamsUncaughtExceptionHandler
+);
+threads.add(streamThread);
+threadState.put(streamThread.getId(), streamThread.state());
+storeProviders.add(new 
StreamThreadStateStoreProvider(streamThread));
+streamThread.setStateListener(streamStateListener);
+return Optional.of(streamThread.getName());
+} else {
+return Optional.empty();
+}
+}
+}

Review comment:
   right before the positive return :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9617: MINOR: Factor out common response parsing logic

2020-11-19 Thread GitBox


hachikuji commented on a change in pull request #9617:
URL: https://github.com/apache/kafka/pull/9617#discussion_r527101086



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/CorrelationIdMismatchException.java
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+/**
+ * Raised if the correlationId in a response header does not match
+ * the expected value from the request header.
+ */
+public class CorrelationIdMismatchException extends IllegalStateException {

Review comment:
   Yeah, I was debating the location. I decided to put it in 
`common/requests` since it is not a public package and the error is specific to 
the request protocol.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9621: KAFKA-9892: Producer state snapshot needs to be forced to disk

2020-11-19 Thread GitBox


junrao commented on a change in pull request #9621:
URL: https://github.com/apache/kafka/pull/9621#discussion_r527105252



##
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##
@@ -437,7 +437,7 @@ object ProducerStateManager {
 
 val fileChannel = FileChannel.open(file.toPath, StandardOpenOption.CREATE, 
StandardOpenOption.WRITE)
 try fileChannel.write(buffer)
-finally fileChannel.close()
+finally fileChannel.force(true)

Review comment:
   I think we need to do force and then close.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

2020-11-19 Thread GitBox


lct45 commented on a change in pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#discussion_r527117685



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -221,6 +221,9 @@ State setState(final State newState) {
 throw new StreamsException(logPrefix + "Unexpected state 
transition from " + oldState + " to " + newState);
 } else {
 log.info("State transition from {} to {}", oldState, newState);
+if (newState == State.DEAD) {
+failedStreamThreadSensor.record();
+}

Review comment:
   Would that just be in `run()` of the GlobalStreamThread then?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao merged pull request #9596: KAFKA-10723: Fix LogManager shutdown error handling

2020-11-19 Thread GitBox


junrao merged pull request #9596:
URL: https://github.com/apache/kafka/pull/9596


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9614: KAFKA-10500: Add failed-stream-threads metric for adding + removing stream threads

2020-11-19 Thread GitBox


cadonna commented on a change in pull request #9614:
URL: https://github.com/apache/kafka/pull/9614#discussion_r527122772



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -221,6 +221,9 @@ State setState(final State newState) {
 throw new StreamsException(logPrefix + "Unexpected state 
transition from " + oldState + " to " + newState);
 } else {
 log.info("State transition from {} to {}", oldState, newState);
+if (newState == State.DEAD) {
+failedStreamThreadSensor.record();
+}

Review comment:
   No, that would be in `StreamThread#runLoop()`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10723) LogManager leaks internal thread pool activity during shutdown

2020-11-19 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-10723.
-
Fix Version/s: 2.8.0
   Resolution: Fixed

merged to trunk

> LogManager leaks internal thread pool activity during shutdown
> --
>
> Key: KAFKA-10723
> URL: https://issues.apache.org/jira/browse/KAFKA-10723
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kowshik Prakasam
>Assignee: Kowshik Prakasam
>Priority: Major
> Fix For: 2.8.0
>
>
> *TL;DR:*
> The asynchronous shutdown in {{LogManager}} has the shortcoming that if 
> during shutdown any of the internal futures fail, then we do not always 
> ensure that all futures are completed before {{LogManager.shutdown}} returns. 
> As a result, despite the shut down completed message from KafkaServer is seen 
> in the error logs, some futures continue to run from inside LogManager 
> attempting to close the logs. This is misleading and it could possibly break 
> the general rule of avoiding post-shutdown activity in the Broker.
> *Description:*
> When LogManager is shutting down, exceptions in log closure are handled 
> [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501].
>  However, this 
> [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502]
>  in the finally clause shuts down the thread pools *asynchronously*. The 
> code: _threadPools.foreach(.shutdown())_ initiates an orderly shutdown (for 
> each thread pool) in which previously submitted tasks are executed, but no 
> new tasks will be accepted (see javadoc link 
> [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()])_._
>  As a result, if there is an exception during log closure, some of the thread 
> pools which are closing logs could be leaked and continue to run in the 
> background, after the control returns to the caller (i.e. {{KafkaServer}}). 
> As a result, even after the "shut down completed" message is seen in the 
> error logs (originating from {{KafkaServer}} shutdown sequence), log closures 
> continue to happen in the background, which is misleading.
>   
> *Proposed options for fixes:*
> It seems useful that we maintain the contract with {{KafkaServer}} that after 
> {{LogManager.shutdown}} is called once, all tasks that close the logs are 
> guaranteed to have completed before the call returns. There are probably 
> couple different ways to fix this:
>  # Replace {{_threadPools.foreach(.shutdown())_ with 
> _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait 
> for all threads to be shutdown before returning the {{_LogManager.shutdown_}} 
> call.
>  # Skip creating of checkpoint and clean shutdown file only for the affected 
> directory if any of its futures throw an error. We continue to wait for all 
> futures to complete for all directories. This can require some changes to 
> [this for 
> loop|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L481-L496],
>  so that we wait for all futures to complete regardless of whether one of 
> them threw an error.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10751) Generate log to help estimate messages lost during ULE

2020-11-19 Thread Lucas Wang (Jira)
Lucas Wang created KAFKA-10751:
--

 Summary: Generate log to help estimate messages lost during ULE
 Key: KAFKA-10751
 URL: https://issues.apache.org/jira/browse/KAFKA-10751
 Project: Kafka
  Issue Type: Improvement
Reporter: Lucas Wang
Assignee: Lucas Wang


During Unclean Leader Election, there could be data loss due to truncation at 
the resigned leader.

Suppose there are 3 brokers that has replicas for a given partition:
Broker A (leader) with largest offset 9 (log end offset 10)
Broker B (follower) with largest offset 4 (log end offset 5)
Broker C (follower) with largest offset 1 (log end offset 2)

Only the leader A is in the ISR with B and C lagging behind.
Now an unclean leader election causes the leadership to be transferred to C. 
Broker A would need to truncate 8 messages, and Broker B 3 messages.

Case 1: if these messages have been produced with acks=0 or 1, then clients 
would experience 8 lost messages.
Case 2: if the client is using acks=all and the partition's minISR setting is 
2, and further let's assume broker B dropped out of the ISR after receiving the 
message with offset 4, then only the messages with offset<=4 have been acked to 
the client. The truncation effectively causes the client to lose 3 messages.

Knowing the exact amount of data loss involves knowing the client's acks 
setting when the messages are produced, and also whether the messages have been 
sufficiently replicated according to the MinISR setting.


If getting the exact data loss is too involved, at least there should be logs 
to help ESTIMATE the amount of data loss.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10751) Generate log to help estimate messages lost during ULE

2020-11-19 Thread Lucas Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235702#comment-17235702
 ] 

Lucas Wang commented on KAFKA-10751:


PR submitted: https://github.com/apache/kafka/pull/9533

> Generate log to help estimate messages lost during ULE
> --
>
> Key: KAFKA-10751
> URL: https://issues.apache.org/jira/browse/KAFKA-10751
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Wang
>Assignee: Lucas Wang
>Priority: Major
>
> During Unclean Leader Election, there could be data loss due to truncation at 
> the resigned leader.
> Suppose there are 3 brokers that has replicas for a given partition:
> Broker A (leader) with largest offset 9 (log end offset 10)
> Broker B (follower) with largest offset 4 (log end offset 5)
> Broker C (follower) with largest offset 1 (log end offset 2)
> Only the leader A is in the ISR with B and C lagging behind.
> Now an unclean leader election causes the leadership to be transferred to C. 
> Broker A would need to truncate 8 messages, and Broker B 3 messages.
> Case 1: if these messages have been produced with acks=0 or 1, then clients 
> would experience 8 lost messages.
> Case 2: if the client is using acks=all and the partition's minISR setting is 
> 2, and further let's assume broker B dropped out of the ISR after receiving 
> the message with offset 4, then only the messages with offset<=4 have been 
> acked to the client. The truncation effectively causes the client to lose 3 
> messages.
> Knowing the exact amount of data loss involves knowing the client's acks 
> setting when the messages are produced, and also whether the messages have 
> been sufficiently replicated according to the MinISR setting.
> If getting the exact data loss is too involved, at least there should be logs 
> to help ESTIMATE the amount of data loss.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-11-19 Thread GitBox


cadonna commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527145996



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -588,6 +592,29 @@ public void testCloseIsIdempotent() {
 closeCount, MockMetricsReporter.CLOSE_COUNT.get());
 }
 
+@Test
+public void testAddThread() {
+props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
+final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+streams.start();
+final int oldSize = streams.threads.size();
+try {
+TestUtils.waitForCondition(() -> streams.state() == 
KafkaStreams.State.RUNNING, 15L, "wait until running");
+} catch (final InterruptedException e) {
+e.printStackTrace();
+}

Review comment:
   You should not use `try-catch` here but just add `throws 
InterruptedException` to the method signature.

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (stateLock) {

Review comment:
   Why do we need to synchronize the whole method on `stateLock`?

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (stateLock) {
+if (state == State.RUNNING || state == State.REBALANCING) {

Review comment:
   Could we also use `isRunningOrRebalancing()` here?

##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -588,6 +592,29 @@ public void testCloseIsIdempotent() {
 closeCount, MockMetricsReporter.CLOSE_COUNT.get());
 }
 
+@Test
+public void testAddThread() {

Review comment:
   I would prefer to use `shouldAddThread()` as name although the pattern 
is different for the other test methods.

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+/**
+ * Adds and starts a stream thread

[GitHub] [kafka] hachikuji merged pull request #9619: MINOR: Reduce sends created by `SendBuilder`

2020-11-19 Thread GitBox


hachikuji merged pull request #9619:
URL: https://github.com/apache/kafka/pull/9619


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-8872) Improvements to controller "deleting" state / topic Identifiers

2020-11-19 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan reassigned KAFKA-8872:
-

Assignee: Justine Olshan

> Improvements to controller "deleting" state /  topic Identifiers
> 
>
> Key: KAFKA-8872
> URL: https://issues.apache.org/jira/browse/KAFKA-8872
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Lucas Bradstreet
>Assignee: Justine Olshan
>Priority: Major
>
> Kafka currently uniquely identifies a topic by its name. This is generally 
> sufficient, but there are flaws in this scheme if a topic is deleted and 
> recreated with the same name. As a result, Kafka attempts to prevent these 
> classes of issues by ensuring a topic is deleted from all replicas before 
> completing a deletion. This solution is not perfect, as it is possible for 
> partitions to be reassigned from brokers while they are down, and there are 
> no guarantees that this state will ever be cleaned up and will not cause 
> issues in the future.
> As the controller must wait for all replicas to delete their local 
> partitions, deletes can also become blocked, preventing topics from being 
> created with the same name until the deletion is complete on all replicas. 
> This can mean that downtime for a single broker can effectively cause a 
> complete outage for everyone producing/consuming to that topic name, as the 
> topic cannot be recreated without manual intervention.
> Unique topic IDs could help address this issue by associating a unique ID 
> with each topic, ensuring a newly created topic with a previously used name 
> cannot be confused with a previous topic with that name.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lmr3796 commented on a change in pull request #9435: KAFKA-10606: Disable auto topic creation for fetch-all-topic-metadata request

2020-11-19 Thread GitBox


lmr3796 commented on a change in pull request #9435:
URL: https://github.com/apache/kafka/pull/9435#discussion_r527192802



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1333,7 +1343,17 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 }
 
-val completeTopicMetadata = topicMetadata ++ 
unauthorizedForCreateTopicMetadata ++ unauthorizedForDescribeTopicMetadata
+val completeTopicMetadata = (if (metadataRequest.isAllTopics) {

Review comment:
   Hi @chia7712 @ijuma ,
   
   This is Joseph and I'm @Lincong 's colleague working on this patch with him. 
 I think this is a good point and just updated the PR accordingly





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9608: MINOR: Enable testLogCleanerStats

2020-11-19 Thread GitBox


hachikuji merged pull request #9608:
URL: https://github.com/apache/kafka/pull/9608


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9458) Kafka crashed in windows environment

2020-11-19 Thread Daniel Dube (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235769#comment-17235769
 ] 

Daniel Dube commented on KAFKA-9458:


Hi everyone. "Windows is not an officially supported platform". Is this 
specified anywhere? I've lost almost 3 months preparing a Kafka solution and 
cannot be used because can't delete a file? I could not find any Kafka 
documentation saying that is not Windows supported. Why do you publish it with 
windows scripts? I can't understand it.

Thanks in advance and best regards

> Kafka crashed in windows environment
> 
>
> Key: KAFKA-9458
> URL: https://issues.apache.org/jira/browse/KAFKA-9458
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 2.4.0
> Environment: Windows Server 2019
>Reporter: hirik
>Priority: Critical
>  Labels: windows
> Attachments: Windows_crash_fix.patch, logs.zip
>
>
> Hi,
> while I was trying to validate Kafka retention policy, Kafka Server crashed 
> with below exception trace. 
> [2020-01-21 17:10:40,475] INFO [Log partition=test1-3, 
> dir=C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka] 
> Rolled new log segment at offset 1 in 52 ms. (kafka.log.Log)
> [2020-01-21 17:10:40,484] ERROR Error while deleting segments for test1-3 in 
> dir C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka 
> (kafka.server.LogDirFailureChannel)
> java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:795)
>  at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:209)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:497)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1(Log.scala:2206)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1$adapted(Log.scala:2206)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.Log.deleteSegmentFiles(Log.scala:2206)
>  at kafka.log.Log.removeAndDeleteSegments(Log.scala:2191)
>  at kafka.log.Log.$anonfun$deleteSegments$2(Log.scala:1700)
>  at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.scala:17)
>  at kafka.log.Log.maybeHandleIOException(Log.scala:2316)
>  at kafka.log.Log.deleteSegments(Log.scala:1691)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1686)
>  at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1763)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1753)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:982)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:979)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:979)
>  at kafka.log.LogManager.$anonfun$startup$2(LogManager.scala:403)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:116)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:65)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>  at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:830)
>  Suppressed: java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFi

[jira] [Comment Edited] (KAFKA-9458) Kafka crashed in windows environment

2020-11-19 Thread Daniel Dube (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235769#comment-17235769
 ] 

Daniel Dube edited comment on KAFKA-9458 at 11/19/20, 10:10 PM:


Hi everyone. "Windows is not an officially supported platform". Is this 
specified anywhere? I've been workingt almost 3 months preparing a Kafka 
solution and cannot be used because can't delete a file? I still could not find 
any Kafka documentation saying that is not Windows supported. Why is published 
with windows scripts? I can't understand it. Sorry but I'm very frustrated 
today 

Thanks for your work and best regards


was (Author: danieldube):
Hi everyone. "Windows is not an officially supported platform". Is this 
specified anywhere? I've lost almost 3 months preparing a Kafka solution and 
cannot be used because can't delete a file? I could not find any Kafka 
documentation saying that is not Windows supported. Why do you publish it with 
windows scripts? I can't understand it.

Thanks in advance and best regards

> Kafka crashed in windows environment
> 
>
> Key: KAFKA-9458
> URL: https://issues.apache.org/jira/browse/KAFKA-9458
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 2.4.0
> Environment: Windows Server 2019
>Reporter: hirik
>Priority: Critical
>  Labels: windows
> Attachments: Windows_crash_fix.patch, logs.zip
>
>
> Hi,
> while I was trying to validate Kafka retention policy, Kafka Server crashed 
> with below exception trace. 
> [2020-01-21 17:10:40,475] INFO [Log partition=test1-3, 
> dir=C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka] 
> Rolled new log segment at offset 1 in 52 ms. (kafka.log.Log)
> [2020-01-21 17:10:40,484] ERROR Error while deleting segments for test1-3 in 
> dir C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka 
> (kafka.server.LogDirFailureChannel)
> java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:795)
>  at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:209)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:497)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1(Log.scala:2206)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1$adapted(Log.scala:2206)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.Log.deleteSegmentFiles(Log.scala:2206)
>  at kafka.log.Log.removeAndDeleteSegments(Log.scala:2191)
>  at kafka.log.Log.$anonfun$deleteSegments$2(Log.scala:1700)
>  at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.scala:17)
>  at kafka.log.Log.maybeHandleIOException(Log.scala:2316)
>  at kafka.log.Log.deleteSegments(Log.scala:1691)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1686)
>  at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1763)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1753)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:982)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:979)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:979)
>  at kafka.log.LogManager.$anonfun$startup$2(LogManager.scala:403)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:116)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:65)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>  at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:830)
>  Suppressed: java.nio.file.FileSystemException: 
> C:\Users\Administrator\Download

[GitHub] [kafka] Hamza-Slama opened a new pull request #9625: MINOR: remove semicolon

2020-11-19 Thread GitBox


Hamza-Slama opened a new pull request #9625:
URL: https://github.com/apache/kafka/pull/9625


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-11-19 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527277135



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {

Review comment:
   you are right, Ill add one





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9615: KAFKA-10500: Add thread option

2020-11-19 Thread GitBox


wcarlson5 commented on a change in pull request #9615:
URL: https://github.com/apache/kafka/pull/9615#discussion_r527277048



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -894,11 +904,72 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 queryableStoreProvider = new QueryableStoreProvider(storeProviders, 
globalStateStoreProvider);
 
 stateDirCleaner = setupStateDirCleaner();
-oldHandler = false;
 maybeWarnAboutCodeInRocksDBConfigSetter(log, config);
 rocksDBMetricsRecordingService = 
maybeCreateRocksDBMetricsRecordingService(clientId, config);
 }
 
+/**
+ * Adds and starts a stream thread in addition to the stream threads that 
are already running in this
+ * Kafka Streams client.
+ *
+ * Since the number of stream threads increases, the sizes of the caches 
in the new stream thread
+ * and the existing stream threads are adapted so that the sum of the 
cache sizes over all stream
+ * threads does not exceed the total cache size specified in configuration
+ * {@code cache.max.bytes.buffering}.
+ *
+ * Stream threads can only be added if this Kafka Streams client is in 
state RUNNING or REBALANCING.
+ *
+ * @return name of the added stream thread or empty if a new stream thread 
could not be added
+ */
+public Optional addStreamThread() {
+synchronized (stateLock) {

Review comment:
   Well we don't want it changing state while adding a thread





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan opened a new pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

2020-11-19 Thread GitBox


jolshan opened a new pull request #9626:
URL: https://github.com/apache/kafka/pull/9626


   This change takes the topic IDs created in 
https://github.com/apache/kafka/pull/9473 and propagates them to brokers using 
LeaderAndIsr Request. It also removes the topic name from the LeaderAndIsr 
Response, reorganizes the response to be sorted by topic, and includes the 
topic ID.
   
   In addition, the topic ID is persisted to each replica in Log as well as in 
a file on disk. This file is read on startup and if the topic ID exists, it 
will be reloaded. 
   
   This PR bumps the IBP and is expected to be merged at the same time as 
https://github.com/apache/kafka/pull/9622 as to not bump the protocol twice
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10547) Add topic IDs to MetadataResponse

2020-11-19 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235796#comment-17235796
 ] 

Justine Olshan commented on KAFKA-10547:


Hi [~dengziming] I'm sorry I didn't see this until now. Commenting on your PR

> Add topic IDs to MetadataResponse
> -
>
> Key: KAFKA-10547
> URL: https://issues.apache.org/jira/browse/KAFKA-10547
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: dengziming
>Priority: Major
>
> Prevent reads from deleted topics
> Will be able to use TopicDescription to identify the topic ID



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on pull request #9622: KAFKA-10547; add topicId in MetadataResp

2020-11-19 Thread GitBox


jolshan commented on pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#issuecomment-730710397


   Hi @dengziming! Thanks for the PR! I was hoping to add LeaderAndIsrRequests 
before UpdateMetadata/Metadata, as ordered in the [JIRA 
ticket.](https://issues.apache.org/jira/browse/KAFKA-10545) There are just a 
few features for persisting the topic IDs I wanted to include. I'm thinking we 
could review this PR and my PR: https://github.com/apache/kafka/pull/9626 at 
the same and merge mine first and yours immediately after. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan edited a comment on pull request #9622: KAFKA-10547; add topicId in MetadataResp

2020-11-19 Thread GitBox


jolshan edited a comment on pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#issuecomment-730710397


   Hi @dengziming! Thanks for the PR! I was hoping to add LeaderAndIsrRequests 
before UpdateMetadata/Metadata, following the ordering of the  [JIRA 
tickets.](https://issues.apache.org/jira/browse/KAFKA-8872) There are just a 
few features for persisting the topic IDs I wanted to include. I'm thinking we 
could review this PR and my PR: https://github.com/apache/kafka/pull/9626 at 
the same and merge mine first and yours immediately after. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan edited a comment on pull request #9622: KAFKA-10547; add topicId in MetadataResp

2020-11-19 Thread GitBox


jolshan edited a comment on pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#issuecomment-730710397


   Hi @dengziming! Thanks for the PR! I was hoping to add LeaderAndIsrRequests 
before UpdateMetadata/Metadata, following the ordering of the  [JIRA 
tickets.](https://issues.apache.org/jira/browse/KAFKA-8872) There are just a 
few features for persisting the topic IDs I wanted to include. I'm thinking we 
could review this PR and my PR: https://github.com/apache/kafka/pull/9626 at 
the same time and merge mine first and yours immediately after. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10745) Please let me know how I check the time which Source connector receive the data from source table.

2020-11-19 Thread NAYUSIK (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

NAYUSIK updated KAFKA-10745:

Issue Type: Bug  (was: Improvement)

> Please let me know how I check the time which Source connector receive the 
> data from source table.
> --
>
> Key: KAFKA-10745
> URL: https://issues.apache.org/jira/browse/KAFKA-10745
> Project: Kafka
>  Issue Type: Bug
>Reporter: NAYUSIK
>Priority: Major
>
> Please let me know how I check the time which Source connector receive the 
> data from source table.
> I want to check the time by section.
> We are currently using JDBC Connector.
> The time we can see is the time when the data is created on the source table, 
> the time when the data is entered into Kafka, and the time when the data is 
> generated on the target table.
> But I also want to know the time when Source connector receive the data from 
> source table.
> Please tell me what settings I need to set up on the Source connector.
> Thank you for your support.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10745) Please let me know how I check the time which Source connector receive the data from source table.

2020-11-19 Thread NAYUSIK (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

NAYUSIK updated KAFKA-10745:

Issue Type: Improvement  (was: Bug)

> Please let me know how I check the time which Source connector receive the 
> data from source table.
> --
>
> Key: KAFKA-10745
> URL: https://issues.apache.org/jira/browse/KAFKA-10745
> Project: Kafka
>  Issue Type: Improvement
>Reporter: NAYUSIK
>Priority: Major
>
> Please let me know how I check the time which Source connector receive the 
> data from source table.
> I want to check the time by section.
> We are currently using JDBC Connector.
> The time we can see is the time when the data is created on the source table, 
> the time when the data is entered into Kafka, and the time when the data is 
> generated on the target table.
> But I also want to know the time when Source connector receive the data from 
> source table.
> Please tell me what settings I need to set up on the Source connector.
> Thank you for your support.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >