[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

2020-06-02 Thread GitBox


mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r433660276



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -390,19 +387,17 @@ boolean tryToCompleteRestoration() {
 
 final List restoringTasks = new LinkedList<>();
 for (final Task task : tasks.values()) {
-if (task.state() == CREATED) {
-try {
-task.initializeIfNeeded();
-} catch (final LockException | TimeoutException e) {
-// it is possible that if there are multiple threads 
within the instance that one thread
-// trying to grab the task from the other, while the other 
has not released the lock since
-// it did not participate in the rebalance. In this case 
we can just retry in the next iteration
-log.debug("Could not initialize {} due to the following 
exception; will retry", task.id(), e);
-allRunning = false;
-}
+try {
+task.initializeIfNeeded();
+} catch (final LockException | TimeoutException e) {
+// it is possible that if there are multiple threads within 
the instance that one thread
+// trying to grab the task from the other, while the other has 
not released the lock since
+// it did not participate in the rebalance. In this case we 
can just retry in the next iteration
+log.debug("Could not initialize {} due to the following 
exception; will retry", task.id(), e);
+allRunning = false;
 }
 
-if (task.state() == RESTORING) {
+if (task.isActive()) {

Review comment:
   StandbyTask are never in `RESTORING` state.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

2020-06-02 Thread GitBox


mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r433660588



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -529,11 +524,7 @@ void handleLostAll() {
 for (final TaskId id : lockedTaskDirectories) {
 final Task task = tasks.get(id);
 if (task != null) {
-if (task.isActive() && task.state() == RUNNING) {
-taskOffsetSums.put(id, Task.LATEST_OFFSET);
-} else {
-taskOffsetSums.put(id, sumOfChangelogOffsets(id, 
task.changelogOffsets()));
-}
+taskOffsetSums.put(id, sumOfChangelogOffsets(id, 
task.changelogOffsets()));

Review comment:
   Make TM agnostic to task state -- putting some more logic into 
`sumOfChangelogOffsets` to make this work





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

2020-06-02 Thread GitBox


mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r433660588



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -529,11 +524,7 @@ void handleLostAll() {
 for (final TaskId id : lockedTaskDirectories) {
 final Task task = tasks.get(id);
 if (task != null) {
-if (task.isActive() && task.state() == RUNNING) {
-taskOffsetSums.put(id, Task.LATEST_OFFSET);
-} else {
-taskOffsetSums.put(id, sumOfChangelogOffsets(id, 
task.changelogOffsets()));
-}
+taskOffsetSums.put(id, sumOfChangelogOffsets(id, 
task.changelogOffsets()));

Review comment:
   Make TM agnostic to task state -- putting some more logic into 
`sumOfChangelogOffsets` to make this work -- note that 
`task.changelogOffsets()` set offsets to `LATEST_OFFSET` for `StreamsTasks` 
that are `RUNNING`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

2020-06-02 Thread GitBox


mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r433661972



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final 
Map changelogEntry : 
changelogOffsets.entrySet()) {
 final long offset = changelogEntry.getValue();
 
-offsetSum += offset;
-if (offsetSum < 0) {
-log.warn("Sum of changelog offsets for task {} overflowed, 
pinning to Long.MAX_VALUE", id);
-return Long.MAX_VALUE;
+if (offset == Task.LATEST_OFFSET) {

Review comment:
   If an active tasks is `RUNNING`, the offsets are set to `LATEST_OFFSET` 
in `task.changelogOffsets()` that is passed as parameter.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

2020-06-02 Thread GitBox


mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r433662342



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##
@@ -2637,11 +2646,17 @@ private File getCheckpointFile(final TaskId task) {
 public void initializeIfNeeded() {
 if (state() == State.CREATED) {
 transitionTo(State.RESTORING);
+if (!active) {
+transitionTo(State.RUNNING);

Review comment:
   A "standby" must transit to `RUNNING` here (cf `StandbyTask`)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

2020-06-02 Thread GitBox


mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r433662426



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
##
@@ -2637,11 +2646,17 @@ private File getCheckpointFile(final TaskId task) {
 public void initializeIfNeeded() {
 if (state() == State.CREATED) {
 transitionTo(State.RESTORING);
+if (!active) {
+transitionTo(State.RUNNING);
+}
 }
 }
 
 @Override
 public void completeRestoration() {
+if (state() == State.RUNNING) {

Review comment:
   Must be idempotent.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7538) Improve locking model used to update ISRs and HW

2020-06-02 Thread Viktor Somogyi-Vass (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17123415#comment-17123415
 ] 

Viktor Somogyi-Vass commented on KAFKA-7538:


[~rsivaram] has subcase 1 and 3 been fixed since resolving this jira or is it 
still pending?

> Improve locking model used to update ISRs and HW
> 
>
> Key: KAFKA-7538
> URL: https://issues.apache.org/jira/browse/KAFKA-7538
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.5.0
>
>
> We currently use a ReadWriteLock in Partition to update ISRs and high water 
> mark for the partition. This can result in severe lock contention if there 
> are multiple producers writing a large amount of data into a single partition.
> The current locking model is:
>  # read lock while appending to log on every Produce request on the request 
> handler thread
>  # write lock on leader change, updating ISRs etc. on request handler or 
> scheduler thread
>  # write lock on every replica fetch request to check if ISRs need to be 
> updated and to update HW and ISR on the request handler thread
> 2) is infrequent, but 1) and 3) may be frequent and can result in lock 
> contention. If there are lots of produce requests to a partition from 
> multiple processes, on the leader broker we may see:
>  # one slow log append locks up one request thread for that produce while 
> holding onto the read lock
>  # (replicationFactor-1) request threads can be blocked waiting for write 
> lock to process replica fetch request
>  # potentially several other request threads processing Produce may be queued 
> up to acquire read lock because of the waiting writers.
> In a thread dump with this issue, we noticed several request threads blocked 
> waiting for write, possibly to due to replication fetch retries.
>  
> Possible fixes:
>  # Process `Partition#maybeExpandIsr` on a single scheduler thread similar to 
> `Partition#maybeShrinkIsr` so that only a single thread is blocked on the 
> write lock. But this will delay updating ISRs and HW.
>  # Change locking in `Partition#maybeExpandIsr` so that only read lock is 
> acquired to check if ISR needs updating and write lock is acquired only to 
> update ISRs. Also use a different lock for updating HW (perhaps just the 
> Partition object lock) so that typical replica fetch requests complete 
> without acquiring Partition write lock on the request handler thread.
> I will submit a PR for 2) , but other suggestions to fix this are welcome.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10082) fix failed kafka.api.PlaintextConsumerTest.testMultiConsumerStickyAssignment

2020-06-02 Thread Luke Chen (Jira)
Luke Chen created KAFKA-10082:
-

 Summary: fix failed 
kafka.api.PlaintextConsumerTest.testMultiConsumerStickyAssignment
 Key: KAFKA-10082
 URL: https://issues.apache.org/jira/browse/KAFKA-10082
 Project: Kafka
  Issue Type: Bug
Reporter: Luke Chen
Assignee: Luke Chen


[https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/153/log/?start=0]

[https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk8/runs/4596/log/?start=0]

[https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk11/runs/1523/log/?start=0]

 

kafka.api.PlaintextConsumerTest > testMultiConsumerStickyAssignment FAILED
 java.lang.AssertionError: Expected only two topic partitions that have 
switched to other consumers. expected:<9> but was:<14>
 at org.junit.Assert.fail(Assert.java:89)
 at org.junit.Assert.failNotEquals(Assert.java:835)
 at org.junit.Assert.assertEquals(Assert.java:647)
 at 
kafka.api.PlaintextConsumerTest.testMultiConsumerStickyAssignment(PlaintextConsumerTest.scala:929)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon opened a new pull request #8777: KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment

2020-06-02 Thread GitBox


showuon opened a new pull request #8777:
URL: https://github.com/apache/kafka/pull/8777


   Fix the failed `testMultiConsumerStickyAssignment` by modify the logic error 
in `allSubscriptionsEqual` method. We will create the 
`consumerToOwnedPartitions` to keep the set of previously owned partitions 
encoded in the Subscription. It's our basis to do the reassignment. In the 
`allSubscriptionsEqual`, we'll get the member generation of the subscription, 
and remove all previously owned partitions as invalid if the current generation 
is higher. However, the logic before my fix, will remove the current highest 
member out of the `consumerToOwnedPartitions`. Fix this logic error. 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #8777: KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment

2020-06-02 Thread GitBox


showuon commented on pull request #8777:
URL: https://github.com/apache/kafka/pull/8777#issuecomment-637344872


   @ableegoldman , could you check this PR to fix the failed tests? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #8630: KAFKA-9969: Exclude ConnectorClientConfigRequest from class loading isolation

2020-06-02 Thread GitBox


kkonstantine commented on a change in pull request #8630:
URL: https://github.com/apache/kafka/pull/8630#discussion_r433640144



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
##
@@ -114,68 +90,264 @@ public void testConnectFrameworkClasses() {
 assertFalse(PluginUtils.shouldLoadInIsolation(
 "org.apache.kafka.clients.admin.KafkaAdminClient")
 );
-assertFalse(PluginUtils.shouldLoadInIsolation(
-"org.apache.kafka.connect.rest.ConnectRestExtension")
-);
 }
 
 @Test
-public void testAllowedConnectFrameworkClasses() {
-
assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.transforms."));
-assertTrue(PluginUtils.shouldLoadInIsolation(
-"org.apache.kafka.connect.transforms.ExtractField")
-);
-assertTrue(PluginUtils.shouldLoadInIsolation(
-"org.apache.kafka.connect.transforms.ExtractField$Key")
-);
-
assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.json."));
-assertTrue(PluginUtils.shouldLoadInIsolation(
-"org.apache.kafka.connect.json.JsonConverter")
-);
-assertTrue(PluginUtils.shouldLoadInIsolation(
-"org.apache.kafka.connect.json.JsonConverter$21")
-);
-
assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.file."));
-assertTrue(PluginUtils.shouldLoadInIsolation(
-"org.apache.kafka.connect.file.FileStreamSourceTask")
-);
-assertTrue(PluginUtils.shouldLoadInIsolation(
-"org.apache.kafka.connect.file.FileStreamSinkConnector")
-);
+public void testConnectApiClasses() {
+String[] apiClasses = new String[] {

Review comment:
   nit: do you mind using `List` and `Arrays.asList(...)`? I don't 
think array declaration is better if the result is not going to be used as an 
array. Also, won't work if you try to reinitialize a declared variable. 

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java
##
@@ -114,68 +90,264 @@ public void testConnectFrameworkClasses() {
 assertFalse(PluginUtils.shouldLoadInIsolation(
 "org.apache.kafka.clients.admin.KafkaAdminClient")
 );
-assertFalse(PluginUtils.shouldLoadInIsolation(
-"org.apache.kafka.connect.rest.ConnectRestExtension")
-);
 }
 
 @Test
-public void testAllowedConnectFrameworkClasses() {
-
assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.transforms."));
-assertTrue(PluginUtils.shouldLoadInIsolation(
-"org.apache.kafka.connect.transforms.ExtractField")
-);
-assertTrue(PluginUtils.shouldLoadInIsolation(
-"org.apache.kafka.connect.transforms.ExtractField$Key")
-);
-
assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.json."));
-assertTrue(PluginUtils.shouldLoadInIsolation(
-"org.apache.kafka.connect.json.JsonConverter")
-);
-assertTrue(PluginUtils.shouldLoadInIsolation(
-"org.apache.kafka.connect.json.JsonConverter$21")
-);
-
assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.file."));
-assertTrue(PluginUtils.shouldLoadInIsolation(
-"org.apache.kafka.connect.file.FileStreamSourceTask")
-);
-assertTrue(PluginUtils.shouldLoadInIsolation(
-"org.apache.kafka.connect.file.FileStreamSinkConnector")
-);
+public void testConnectApiClasses() {
+String[] apiClasses = new String[] {
+// Enumerate all packages and classes
+"org.apache.kafka.connect.",
+"org.apache.kafka.connect.components.",
+"org.apache.kafka.connect.components.Versioned",
+//"org.apache.kafka.connect.connector.policy.", isolated by default
+
"org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy",
+
"org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest",
+"org.apache.kafka.connect.connector.",
+"org.apache.kafka.connect.connector.Connector",
+"org.apache.kafka.connect.connector.ConnectorContext",
+"org.apache.kafka.connect.connector.ConnectRecord",
+"org.apache.kafka.connect.connector.Task",
+"org.apache.kafka.connect.data.",
+"org.apache.kafka.connect.data.ConnectSchema",
+"org.apache.kafka.connect.data.Date",
+"org.apache.kafka.connect.data.Decimal",
+"org.apache.kafka.connect.data.Field",
+"org.apache.kafka.connect.data.Schema",
+"org.apache.kafka.connect.data.SchemaAndValue",
+  

[GitHub] [kafka] d8tltanc edited a comment on pull request #8421: KAFKA-9800: [KIP-580] Admin Client Exponential Backoff Implementation

2020-06-02 Thread GitBox


d8tltanc edited a comment on pull request #8421:
URL: https://github.com/apache/kafka/pull/8421#issuecomment-637300554


   @skaundinya15 @ijuma @abbccdda Thanks for all the feedback and comments. 
This patch was made when I was new to Kafka. It's kind of naive to me at this 
time as I gained more insights into Kafka. Let me talk about two of my major 
concerns and thoughts about implementing the universal client exponential 
backoff.
   
   **AdminClient logic redundant**
   
   NetworkClient has request timeout handlers. Producer / Consumer are using 
NetworkClient to help handle timeout but AdminClient doesn’t. The reason, to my 
understanding, is that AdminClient is implementing the per-request timeout.
   
   For example,
   
   1. Wrapping the request builder into a new class `Call`, (the construction 
lambda adds tons of lines into the AdminClient.java, which should probably have 
been living in each AbstractRequest implementation classes files)
   2. Re-writing the request queues for different request status, while normal 
clients are fully using the NetworkClient.
   
   After we add support to the per-request timeout to all clients, the 
AdminClient per-request timeout demand won’t be special anymore. Thus, the code 
for supporting the per-request timeout in AdminClient is not useful anymore and 
might be removed. 
   
   Are we considering refactoring the AdminClient further and remove all the 
redundant logic which should have belonged to the networking layer and the 
AbstractRequest implementation classes?
   
   **Flexible backoff modes**
   
   Let's analyze the request backoff demands of all the types of clients at 
this point. In my opinion, there are simply two:
   
   1. Requests do not need exponential backoff. These requests need to be sent 
ASAP to avoid dataflow performance degradation, such as the `ProduceRequest` 
and its related/preceding metadata requests.
   
   2. Request do need exponential backoff. These requests are “second-class 
citizens” and can be throttled to avoid request storms on the broker side. Such 
as metadata related requests in AdminClient.
   
   Now the question comes. Even when two requests are of the same request type, 
one may have to get sent ASAP while the other one may wait, depending on the 
use case. We need to think deeper about how to make a classification.
   
   But the implementation would be simple. We can utilize the existing builder 
pattern AbstractRequest and build the request flexibly upon a given 
retry_backoff mode. For example, 
   
   1. AbstractRequest.Builder will interact with a new abstract class 
specifying the retry_backoff option, static or exponential. 
   2. AbstractRequest will have some new interfaces controlling the backoff. 
   
   Then, we can control if the request should have a static backoff or an 
exponential backoff when we construct each implementation instance of 
AbstractRequest.Builder. 
   
   
   I'll include more details in the Jira ticket and rewrite this PR. Before we 
talk more about the code details and start the new implementation, please let 
me know what you think about the AdminClient refactor and static/exponential 
retry_backoff classification rule. 
   
   As @abbccdda suggests, let's re-direct our further discussion to 
[Jira](https://issues.apache.org/jira/browse/KAFKA-9800) Thanks.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation

2020-06-02 Thread Cheng Tan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17123377#comment-17123377
 ] 

Cheng Tan edited comment on KAFKA-9800 at 6/2/20, 7:53 AM:
---

Recap the discussion in Github. We want to implement a per-request backoff for 
all types of clients.

 

Let me talk about two of my major concerns and thoughts about implementing the 
universal client exponential backoff.

 

*AdminClient logic redundant*

NetworkClient has request timeout handlers. Producer / Consumer are using 
NetworkClient to help handle timeout but AdminClient doesn’t. The reason, to my 
understanding, is that AdminClient is implementing the per-request timeout.

For example,
 # Wrapping the request builder into a new class {{Call}}, (the construction 
lambda adds tons of lines into the AdminClient.java, which should probably have 
been living in each AbstractRequest implementation classes files)
 # Re-writing the request queues for different request status, while normal 
clients are fully using the NetworkClient.

After we add support to the per-request timeout to all clients, the AdminClient 
per-request timeout demand won’t be special anymore. Thus, the code for 
supporting the per-request timeout in AdminClient is not useful anymore and 
might be removed.

Are we considering refactoring the AdminClient further and remove all the 
redundant logic which should have belonged to the networking layer and the 
AbstractRequest implementation classes?

*Flexible backoff modes*

Let's analyze the request backoff demands of all the types of clients at this 
point. In my opinion, there are simply two:
 # Requests do not need exponential backoff. These requests need to be sent 
ASAP to avoid dataflow performance degradation, such as the {{ProduceRequest}} 
and its related/preceding metadata requests.

 # Request do need exponential backoff. These requests are “second-class 
citizens” and can be throttled to avoid request storms on the broker side. Such 
as metadata related requests in AdminClient.

Now the question comes. Even when two requests are of the same request type, 
one may have to get sent ASAP while the other one may wait, depending on the 
use case. We need to think deeper about how to make a classification.

But the implementation would be simple. We can utilize the existing builder 
pattern AbstractRequest and build the request flexibly upon a given 
retry_backoff mode. For example,
 # AbstractRequest.Builder will interact with a new abstract class specifying 
the retry_backoff option, static or exponential.
 # AbstractRequest will have some new interfaces controlling the backoff.

Then, we can control if the request should have a static backoff or an 
exponential backoff when we construct each implementation instance of 
AbstractRequest.Builder.

I'll include more details in the Jira ticket and rewrite this PR. Before we 
talk more about the code details and start the new implementation, please let 
me know what you think about the AdminClient refactor and static/exponential 
retry_backoff classification rule.


was (Author: d8tltanc):
Recap the discussion in Github. We want to implement a per-request backoff for 
all types of clients.

 

Let me talk about two of my major concerns and thoughts about implementing the 
universal client exponential backoff.

*AdminClient logic redundant*

The main request flow difference btw AdminClient and normal clients (e.g. 
Producer and Consumer) would be that AdminClient wants to have a per request 
timeout while normal clients is okay with a static default timeout. Thus, 
AdminClient rewrote a quite amount of NetworkClient's functionality.

For example,
 # Wrapping the request builder into a new class {{Call}}, (the construction 
lambda adds tons of lines into the AdminClient.java, which should probably have 
been living in each AbstractRequest implementation classes files)
 # Re-writing the request queues for different request status, while normal 
clients are fully using the NetworkClient.

These logics will become redundant after we support exponential backoff in 
NetworkClient for all types of clients. Are we considering refactoring the 
AdminClient further and remove all the redundant logic which should have 
belonged to the networking layer and the AbstractRequest implementation classes?

*Flexible backoff modes*

Let's analyze the request backoff demands of all the types of clients at this 
point. In my opinion, there are simply two:
 # Requests do not need exponential backoff. These requests need to be sent 
ASAP to avoid dataflow performance degradation, such as the {{ProduceRequest}} 
and its related/preceding metadata requests.

 # Request do need exponential backoff. These requests are “second-class 
citizens” and can be throttled to avoid request storms on the broker side. Such 
as metadata related requests in AdminClient.

Now the qu

[GitHub] [kafka] dajac commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…

2020-06-02 Thread GitBox


dajac commented on a change in pull request #8768:
URL: https://github.com/apache/kafka/pull/8768#discussion_r433696374



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1287,15 +1309,97 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) 
extends Logging {
   private def maxListenerConnections(listenerName: ListenerName): Int =
 
maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue)
 
+  /**
+   * Calculates the delay needed to bring the observed connection creation 
rate to listener-level limit or to broker-wide
+   * limit, whichever the longest. The delay is capped to the quota window 
size defined by QuotaWindowSizeSecondsProp
+   *
+   * @param listenerName listener for which calculate the delay
+   * @param timeMs current time in milliseconds
+   * @return delay in milliseconds
+   */
+  private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, 
timeMs: Long): Long = {
+val listenerThrottleTimeMs = maxConnectionsPerListener
+  .get(listenerName)
+  .map(listenerQuota => 
recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs))
+  .getOrElse(0)
+
+if (protectedListener(listenerName)) {
+  listenerThrottleTimeMs
+} else {
+  val brokerThrottleTimeMs = 
recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs)
+  val throttleTimeMs = math.max(brokerThrottleTimeMs, 
listenerThrottleTimeMs)

Review comment:
   nit: This variable is not really needed.

##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1306,18 +1410,26 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) 
extends Logging {
   val value = maxConnections(configs)
   if (value <= 0)
 throw new ConfigException("Invalid max.connections $listenerMax")
+
+  val rate = maxConnectionCreationRate(configs)
+  if (rate <= 0)
+throw new ConfigException(s"Invalid 
${KafkaConfig.MaxConnectionCreationRateProp} $rate")
 }
 
 override def reconfigure(configs: util.Map[String, _]): Unit = {
   lock.synchronized {
 _maxConnections = maxConnections(configs)
+updateConnectionRateQuota(maxConnectionCreationRate(configs), 
Some(listener.value()))

Review comment:
   nit: `()` can be removed after `value`.

##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1256,11 +1272,17 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) 
extends Logging {
   private def waitForConnectionSlot(listenerName: ListenerName,
 acceptorBlockedPercentMeter: 
com.yammer.metrics.core.Meter): Unit = {
 counts.synchronized {
-  if (!connectionSlotAvailable(listenerName)) {
+  val startTimeMs = time.milliseconds()
+  val throttleTimeMs = 
math.max(recordConnectionAndGetThrottleTimeMs(listenerName, startTimeMs), 0)
+
+  if (throttleTimeMs > 0 || !connectionSlotAvailable(listenerName)) {
 val startNs = time.nanoseconds
+val endThrottleTimeMs = startTimeMs + throttleTimeMs
+var remainingThrottleTimeMs = throttleTimeMs
 do {
-  counts.wait()
-} while (!connectionSlotAvailable(listenerName))
+  counts.wait(remainingThrottleTimeMs)

Review comment:
   A thread waiting here will be notified when a connection is closed (when 
`dec` is called). As connections in AK are long lived, couldn't we end up in a 
case where a connection is throttled for a longer period than the computed 
`trottleTimeMs` if no connection is closed in between?

##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
##
@@ -161,6 +162,9 @@ private KafkaMbean removeAttribute(KafkaMetric metric, 
String mBeanName) {
 private String addAttribute(KafkaMetric metric) {
 try {
 MetricName metricName = metric.metricName();
+if (metricName.tags().containsKey(DO_NOT_REPORT_TAG)) {
+return null;
+}

Review comment:
   I am not convinced by this. The main issue being that other reporters 
will report the metric. If we really want to not report a metric, I think that 
we need a solution which works for all reporters. Could you perhaps elaborate 
more on the need here?
   
   I can think of the following alternatives:
   * add a flag to the sensor to indicate if it must be reported or not.
   * don't rely on metrics to create/store the sensor but have a local 
reference.

##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1306,18 +1410,26 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) 
extends Logging {
   val value = maxConnections(configs)
   if (value <= 0)
 throw new ConfigException("Invalid max.connections $listenerMax")
+
+  val rate = maxConnectionCreationRate(configs)
+  if (rate <= 0)
+throw new Config

[GitHub] [kafka] chia7712 commented on a change in pull request #8755: KAFKA-10069 The user-defined "predicate" and "negate" are not removed…

2020-06-02 Thread GitBox


chia7712 commented on a change in pull request #8755:
URL: https://github.com/apache/kafka/pull/8755#discussion_r433724296



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
##
@@ -434,5 +436,52 @@ public void configure(Map configs) {
 }
 }
 
+@Test
+public void testEnrichedConfigDef() {
+String alias = "hdt";
+String prefix = ConnectorConfig.TRANSFORMS_CONFIG + "." + alias + ".";
+Map props = new HashMap<>();
+props.put(ConnectorConfig.TRANSFORMS_CONFIG, alias);
+props.put(prefix + "type", 
HasDuplicateConfigTransformation.class.getName());
+ConfigDef def = ConnectorConfig.enrich(MOCK_PLUGINS, new ConfigDef(), 
props, false);
+assertEnrichedConfigDef(def, prefix, 
HasDuplicateConfigTransformation.MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN);
+assertEnrichedConfigDef(def, prefix, 
PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.STRING);
+assertEnrichedConfigDef(def, prefix, 
PredicatedTransformation.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN);
+}
+
+private static void assertEnrichedConfigDef(ConfigDef def, String prefix, 
String keyName, ConfigDef.Type expectedType) {

Review comment:
   @kkonstantine Please take a look at this new name





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas

2020-06-02 Thread Mateusz Jadczyk (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17123542#comment-17123542
 ] 

Mateusz Jadczyk commented on KAFKA-9891:


[~bchen225242] yes, it should be materialized. Duplicate key will be however 
performed only once for keyOne (during the very first processing), as this is 
thrown only for the poisonKey:
{code:java}
throw new IllegalStateException("Throw on " + poisonKey + " to trigger 
rebalance");
{code}
 

> Invalid state store content after task migration with exactly_once and 
> standby replicas
> ---
>
> Key: KAFKA-9891
> URL: https://issues.apache.org/jira/browse/KAFKA-9891
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.1, 2.4.1
>Reporter: Mateusz Jadczyk
>Assignee: Boyang Chen
>Priority: Blocker
> Attachments: failedtest, failedtest2, failedtest3, failedtest3_bug, 
> state_store_operations.txt, tasks_assignment.txt
>
>
> We have a simple command id deduplication mechanism (very similar to the one 
> from Kafka Streams examples) based on Kafka Streams State Stores. It stores 
> command ids from the past hour in _persistentWindowStore_. We encountered a 
> problem with the store if there's an exception thrown later in that topology.
>  We run 3 nodes using docker, each with multiple threads set for this 
> particular Streams Application.
> The business flow is as follows (performed within a single subtopology):
>  *  a valid command is sent with command id 
> (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active 
> task 1_2. First node in the topology analyses if this is a duplicate by 
> checking in the state store (_COMMAND_ID_STORE_), if not puts the command id 
> in the state store and processes the command properly.
>  * an invalid command is sent with the same key but new command id 
> (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the 
> duplicated command id is performed, it's not a duplicate, command id is put 
> into the state store. Next node in the topology throws an exception which 
> causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, 
> offsets are not committed. I double checked for the changelog topic - 
> relevant messages are not committed. Therefore, the changelog topic contains 
> only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and 
> not the one which caused a failure.
>  * in the meantime a standby task 1_2 running on NODE 3 replicated 
> _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local 
> _COMMAND_ID_STORE_
>  * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. 
> It checks if this command id is a duplicate - no, it isn't - tries to process 
> the faulty command and throws an exception. Again, transaction aborted, all 
> looks fine.
>  * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, 
> *it is a duplicate!* Even though the transaction has been aborted and the 
> changelog doesn't contain this command id: 
> _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._
>  
> After digging into the Streams logs and some discussion on ([Stack 
> Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan])
>  we concluded it has something to do with checkpoint files. Here are the 
> detailed logs relevant to checkpoint files.
>  
> {code:java}
> NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
> Processor-COMMAND_ID_STORE-changelog at checkpoint null
> NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
> standby-task [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/Processor/1_2/.checkpoint
> NODE_3 2020-

[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas

2020-06-02 Thread Mateusz Jadczyk (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17123544#comment-17123544
 ] 

Mateusz Jadczyk commented on KAFKA-9891:


The reason we need at least one materialized key is that we then have something 
on the changelog topic and some checkpoint files are used which mess things up.

> Invalid state store content after task migration with exactly_once and 
> standby replicas
> ---
>
> Key: KAFKA-9891
> URL: https://issues.apache.org/jira/browse/KAFKA-9891
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.1, 2.4.1
>Reporter: Mateusz Jadczyk
>Assignee: Boyang Chen
>Priority: Blocker
> Attachments: failedtest, failedtest2, failedtest3, failedtest3_bug, 
> state_store_operations.txt, tasks_assignment.txt
>
>
> We have a simple command id deduplication mechanism (very similar to the one 
> from Kafka Streams examples) based on Kafka Streams State Stores. It stores 
> command ids from the past hour in _persistentWindowStore_. We encountered a 
> problem with the store if there's an exception thrown later in that topology.
>  We run 3 nodes using docker, each with multiple threads set for this 
> particular Streams Application.
> The business flow is as follows (performed within a single subtopology):
>  *  a valid command is sent with command id 
> (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active 
> task 1_2. First node in the topology analyses if this is a duplicate by 
> checking in the state store (_COMMAND_ID_STORE_), if not puts the command id 
> in the state store and processes the command properly.
>  * an invalid command is sent with the same key but new command id 
> (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the 
> duplicated command id is performed, it's not a duplicate, command id is put 
> into the state store. Next node in the topology throws an exception which 
> causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, 
> offsets are not committed. I double checked for the changelog topic - 
> relevant messages are not committed. Therefore, the changelog topic contains 
> only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and 
> not the one which caused a failure.
>  * in the meantime a standby task 1_2 running on NODE 3 replicated 
> _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local 
> _COMMAND_ID_STORE_
>  * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. 
> It checks if this command id is a duplicate - no, it isn't - tries to process 
> the faulty command and throws an exception. Again, transaction aborted, all 
> looks fine.
>  * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, 
> *it is a duplicate!* Even though the transaction has been aborted and the 
> changelog doesn't contain this command id: 
> _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._
>  
> After digging into the Streams logs and some discussion on ([Stack 
> Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan])
>  we concluded it has something to do with checkpoint files. Here are the 
> detailed logs relevant to checkpoint files.
>  
> {code:java}
> NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
> Processor-COMMAND_ID_STORE-changelog at checkpoint null
> NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
> standby-task [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/Processor/1_2/.checkpoint
> NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-

[jira] [Commented] (KAFKA-9148) Consider forking RocksDB for Streams

2020-06-02 Thread adam Retter (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17123597#comment-17123597
 ] 

adam Retter commented on KAFKA-9148:


Thanks [~ableegoldman].

 

I just wanted to be understand further the issues that make it difficult to 
upgrade RocksDB in Kafaka. I was wondering, is it one or more of the following:


1. RocksDB API has changed and so changes need to made to Kafka.

2. You directly expose the RocksDB API to users of Kafka, therefore users code 
may also have to change.

3. Not enough resources to work on updating Kafka for a new version of RocksDB

4. Other...

 

 

> Consider forking RocksDB for Streams 
> -
>
> Key: KAFKA-9148
> URL: https://issues.apache.org/jira/browse/KAFKA-9148
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> We recently upgraded our RocksDB dependency to 5.18 for its memory-management 
> abilities (namely the WriteBufferManager, see KAFKA-8215). Unfortunately, 
> someone from Flink recently discovered a ~8% [performance 
> regression|https://github.com/facebook/rocksdb/issues/5774] that exists in 
> all versions 5.18+ (up through the current newest version, 6.2.2). Flink was 
> able to react to this by downgrading to 5.17 and [picking the 
> WriteBufferManage|https://github.com/dataArtisans/frocksdb/pull/4]r to their 
> fork (fRocksDB).
> Due to this and other reasons enumerated below, we should consider also 
> forking our own RocksDB for Streams.
> Pros:
>  * We can avoid passing sudden breaking changes on to our users, such removal 
> of methods with no deprecation period (see discussion on KAFKA-8897)
>  * We can pick whichever version has the best performance for our needs, and 
> pick over any new features, metrics, etc that we need to use rather than 
> being forced to upgrade (and breaking user code, introducing regression, etc)
>  * Support for some architectures does not exist in all RocksDB versions, 
> making Streams completely unusable for some users until we can upgrade the 
> rocksdb dependency to one that supports their specific case. It's worth 
> noting that we've only had [one 
> user|https://issues.apache.org/jira/browse/KAFKA-9225] hit this so far (that 
> we know of), and some workarounds have been discussed on the ticket.
>  * The Java API seems to be a very low priority to the rocksdb folks.
>  ** They leave out critical functionality, features, and configuration 
> options that have been in the c++ API for a very long time
>  ** Those that do make it over often have random gaps in the API such as 
> setters but no getters (see [rocksdb PR 
> #5186|https://github.com/facebook/rocksdb/pull/5186])
>  ** Others are poorly designed and require too many trips across the JNI, 
> making otherwise incredibly useful features prohibitively expensive.
>  *** [|#issuecomment-83145980] [Custom 
> Comparator|https://github.com/facebook/rocksdb/issues/538#issuecomment-83145980]:
>  a custom comparator could significantly improve the performance of session 
> windows. This is trivial to do but given the high performance cost of 
> crossing the jni, it is currently only practical to use a c++ comparator
>  *** [Prefix Seek|https://github.com/facebook/rocksdb/issues/6004]: not 
> currently used by Streams but a commonly requested feature, and may also 
> allow improved range queries
>  ** Even when an external contributor develops a solution for poorly 
> performing Java functionality and helpfully tries to contribute their patch 
> back to rocksdb, it gets ignored by the rocksdb people ([rocksdb PR 
> #2283|https://github.com/facebook/rocksdb/pull/2283])
> Cons:
>  * More work (not to be trivialized, the truth is we don't and can't know how 
> much extra work this will ultimately be)
> Given that we rarely upgrade the Rocks dependency, use only some fraction of 
> its features, and would need or want to make only minimal changes ourselves, 
> it seems like we could actually get away with very little extra work by 
> forking rocksdb. Note that as of this writing the frocksdb repo has only 
> needed to open 5 PRs on top of the actual rocksdb (two of them trivial). Of 
> course, the LOE to maintain this will only grow over time, so we should think 
> carefully about whether and when to start taking on this potential burden.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10075) Kafka client stucks after Kafka-cluster unavailability

2020-06-02 Thread Tom Bentley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17123656#comment-17123656
 ] 

Tom Bentley commented on KAFKA-10075:
-

Is the JVM in which you're running the client(s) caching DNS lookups? When the 
brokers get rescheduled on different pods (as can happen during an upgrade) 
their resolved IPs can change. There's a Java security property (nb, not a 
system property) which you can use to configure the DNS caching explicitly. See 
[https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/java-dg-jvm-ttl.html|https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/java-dg-jvm-ttl.html.]
 for example.

> Kafka client stucks after Kafka-cluster unavailability
> --
>
> Key: KAFKA-10075
> URL: https://issues.apache.org/jira/browse/KAFKA-10075
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.0
> Environment: Kafka v2.3.1 deployed by https://strimzi.io/ to 
> Kubernetes cluster
> openjdk version "1.8.0_242"
> OpenJDK Runtime Environment (build 1.8.0_242-b08)
> OpenJDK 64-Bit Server VM (build 25.242-b08, mixed mode)
>Reporter: Dmitry Mischenko
>Priority: Minor
>
> Several times we got an issue with kafka-client.
> What happened:
> We have Kafka v2.3.1 deployed by [https://strimzi.io/] to Kubernetes cluster 
> (Amazon EKS). 
>  # Kafka brokers were unavailable (due to cluster upgrade) and couldn't be 
> resolved by internal hostnames
> {code:java}
> 2020-05-28 17:19:50 WARN  NetworkClient:962 - [Producer 
> clientId=change_transformer-postgres_101.public.user_storage-9a89f512-43df-4179-a80f-db74f31ac724-StreamThread-1-producer]
>  Error connecting to node 
> data-kafka-dev-kafka-0.data-kafka-dev-kafka-brokers.data-kafka-dev.svc.cluster.local:9092
>  (id: -1 rack: null)2020-05-28 17:19:50 WARN  NetworkClient:962 - [Producer 
> clientId=change_transformer-postgres_101.public.user_storage-9a89f512-43df-4179-a80f-db74f31ac724-StreamThread-1-producer]
>  Error connecting to node 
> data-kafka-dev-kafka-0.data-kafka-dev-kafka-brokers.data-kafka-dev.svc.cluster.local:9092
>  (id: -1 rack: null)at 
> org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:289)at 
> org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)at
>  
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)at
>  org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:538)at 
> org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)at
>  org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)" 
> at java.base/java.net.InetAddress.getAllByName(Unknown Source)"at 
> org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)"
>  at java.base/java.net.InetAddress.getAllByName(Unknown Source)"" at 
> java.base/java.net.InetAddress$CachedAddresses.get(Unknown Source)"at 
> org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104)at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)at
>  java.base/java.net.InetAddress.getAllByName0(Unknown Source)at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444)at
>  
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)at
>  
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843)at
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)2020-05-28
>  17:19:50 WARN  NetworkClient:962 - [Producer 
> clientId=change_transformer-postgres_101.public.user_storage-9a89f512-43df-4179-a80f-db74f31ac724-StreamThread-1-producer]
>  Error connecting to node 
> data-kafka-dev-kafka-1.data-kafka-dev-kafka-brokers.data-kafka-dev.svc.cluster.local:9092
>  (id: -2 rack: null)at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:955)"
>  at java.base/java.net.InetAddress$CachedAddresses.get(Unknown Source)"at 
> org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363){code}
> But after the moment when cluster was repaired, kafka-admin-client couldn't 
> restore connection and only every 120s was throwing timeout exceptions for a 
> long time.
>  
> {code:java}
> 2020-05-28 17:21:14 INFO StreamThread:219 - stream-thread 
> [consumer_group-101.public.user_storage-714cfbe7-f34a-466a-97e1-bb145f0e34b7-StreamThread-1]
>  State transition from CREATED to STARTING
>  2020-05-28 17:21:14 WARN ConsumerConfig:355 - The configuration 
> 'admin.retry.backoff.ms' was supplied but isn't a known config.
>  2020-05-28 17:21:14 INFO AppInfoParser:118 - Kafka commit

[GitHub] [kafka] astubbs commented on pull request #8771: MINOR: Add explanation for disabling forwarding from value transformers

2020-06-02 Thread GitBox


astubbs commented on pull request #8771:
URL: https://github.com/apache/kafka/pull/8771#issuecomment-637501018


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10083) fix failed testReassignmentWithRandomSubscriptionsAndChanges

2020-06-02 Thread Luke Chen (Jira)
Luke Chen created KAFKA-10083:
-

 Summary: fix failed 
testReassignmentWithRandomSubscriptionsAndChanges
 Key: KAFKA-10083
 URL: https://issues.apache.org/jira/browse/KAFKA-10083
 Project: Kafka
  Issue Type: Bug
Reporter: Luke Chen
Assignee: Luke Chen


[https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/815/]

It can also locally reproduce this error.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10083) fix failed testReassignmentWithRandomSubscriptionsAndChanges

2020-06-02 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-10083:
--
Description: 
[https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/815/]

It can also locally reproduce this error.

 
h3. Error Message

java.lang.AssertionError
h3. Stacktrace

java.lang.AssertionError at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$PartitionMovements.getTheActualPartitionToBeMoved(AbstractStickyAssignor.java:836)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$PartitionMovements.access$100(AbstractStickyAssignor.java:780)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.reassignPartition(AbstractStickyAssignor.java:699)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.reassignPartition(AbstractStickyAssignor.java:689)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.performReassignments(AbstractStickyAssignor.java:661)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.balance(AbstractStickyAssignor.java:597)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.generalAssign(AbstractStickyAssignor.java:352)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assign(AbstractStickyAssignor.java:85)
 at 
org.apache.kafka.clients.consumer.CooperativeStickyAssignor.assign(CooperativeStickyAssignor.java:64)
 at 
org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.verifyValidityAndBalance(CooperativeStickyAssignorTest.java:68)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest.testReassignmentWithRandomSubscriptionsAndChanges(AbstractStickyAssignorTest.java:654)

  was:
[https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/815/]

It can also locally reproduce this error.


> fix failed testReassignmentWithRandomSubscriptionsAndChanges
> 
>
> Key: KAFKA-10083
> URL: https://issues.apache.org/jira/browse/KAFKA-10083
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/815/]
> It can also locally reproduce this error.
>  
> h3. Error Message
> java.lang.AssertionError
> h3. Stacktrace
> java.lang.AssertionError at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$PartitionMovements.getTheActualPartitionToBeMoved(AbstractStickyAssignor.java:836)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$PartitionMovements.access$100(AbstractStickyAssignor.java:780)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.reassignPartition(AbstractStickyAssignor.java:699)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.reassignPartition(AbstractStickyAssignor.java:689)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.performReassignments(AbstractStickyAssignor.java:661)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.balance(AbstractStickyAssignor.java:597)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.generalAssign(AbstractStickyAssignor.java:352)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assign(AbstractStickyAssignor.java:85)
>  at 
> org.apache.kafka.clients.consumer.CooperativeStickyAssignor.assign(CooperativeStickyAssignor.java:64)
>  at 
> org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.verifyValidityAndBalance(CooperativeStickyAssignorTest.java:68)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest.testReassignmentWithRandomSubscriptionsAndChanges(AbstractStickyAssignorTest.java:654)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon opened a new pull request #8778: KAFKA-10083: fix failed testReassignmentWithRandomSubscriptionsAndChanges tests

2020-06-02 Thread GitBox


showuon opened a new pull request #8778:
URL: https://github.com/apache/kafka/pull/8778


   The failed test is because we changed the class member `partitionMovements` 
initialization to the class instance created, from initialized when used within 
`assign` method. This won't have any issue when 1st used the 
`AbstractStickyAssignor` instance. But if it is used later, the 
`partitionMovements` will store the old info, and cause this failed tests. Fix 
it by moving the `partitionMovements` initialization back to `assign` method.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #8778: KAFKA-10083: fix failed testReassignmentWithRandomSubscriptionsAndChanges tests

2020-06-02 Thread GitBox


showuon commented on pull request #8778:
URL: https://github.com/apache/kafka/pull/8778#issuecomment-637512220


   @ableegoldman , could you review this PR to fix the failed tests? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10082) fix failed kafka.api.PlaintextConsumerTest.testMultiConsumerStickyAssignment

2020-06-02 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-10082:
--
Reviewer: Sophie Blee-Goldman

> fix failed kafka.api.PlaintextConsumerTest.testMultiConsumerStickyAssignment
> 
>
> Key: KAFKA-10082
> URL: https://issues.apache.org/jira/browse/KAFKA-10082
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> [https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/153/log/?start=0]
> [https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk8/runs/4596/log/?start=0]
> [https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk11/runs/1523/log/?start=0]
>  
> kafka.api.PlaintextConsumerTest > testMultiConsumerStickyAssignment FAILED
>  java.lang.AssertionError: Expected only two topic partitions that have 
> switched to other consumers. expected:<9> but was:<14>
>  at org.junit.Assert.fail(Assert.java:89)
>  at org.junit.Assert.failNotEquals(Assert.java:835)
>  at org.junit.Assert.assertEquals(Assert.java:647)
>  at 
> kafka.api.PlaintextConsumerTest.testMultiConsumerStickyAssignment(PlaintextConsumerTest.scala:929)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10083) fix failed testReassignmentWithRandomSubscriptionsAndChanges

2020-06-02 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-10083:
--
Description: 
[https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/815/]

 
h3. Error Message

java.lang.AssertionError
h3. Stacktrace

java.lang.AssertionError at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$PartitionMovements.getTheActualPartitionToBeMoved(AbstractStickyAssignor.java:836)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$PartitionMovements.access$100(AbstractStickyAssignor.java:780)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.reassignPartition(AbstractStickyAssignor.java:699)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.reassignPartition(AbstractStickyAssignor.java:689)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.performReassignments(AbstractStickyAssignor.java:661)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.balance(AbstractStickyAssignor.java:597)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.generalAssign(AbstractStickyAssignor.java:352)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assign(AbstractStickyAssignor.java:85)
 at 
org.apache.kafka.clients.consumer.CooperativeStickyAssignor.assign(CooperativeStickyAssignor.java:64)
 at 
org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.verifyValidityAndBalance(CooperativeStickyAssignorTest.java:68)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest.testReassignmentWithRandomSubscriptionsAndChanges(AbstractStickyAssignorTest.java:654)

  was:
[https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/815/]

It can also locally reproduce this error.

 
h3. Error Message

java.lang.AssertionError
h3. Stacktrace

java.lang.AssertionError at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$PartitionMovements.getTheActualPartitionToBeMoved(AbstractStickyAssignor.java:836)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$PartitionMovements.access$100(AbstractStickyAssignor.java:780)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.reassignPartition(AbstractStickyAssignor.java:699)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.reassignPartition(AbstractStickyAssignor.java:689)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.performReassignments(AbstractStickyAssignor.java:661)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.balance(AbstractStickyAssignor.java:597)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.generalAssign(AbstractStickyAssignor.java:352)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assign(AbstractStickyAssignor.java:85)
 at 
org.apache.kafka.clients.consumer.CooperativeStickyAssignor.assign(CooperativeStickyAssignor.java:64)
 at 
org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.verifyValidityAndBalance(CooperativeStickyAssignorTest.java:68)
 at 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest.testReassignmentWithRandomSubscriptionsAndChanges(AbstractStickyAssignorTest.java:654)


> fix failed testReassignmentWithRandomSubscriptionsAndChanges
> 
>
> Key: KAFKA-10083
> URL: https://issues.apache.org/jira/browse/KAFKA-10083
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/815/]
>  
> h3. Error Message
> java.lang.AssertionError
> h3. Stacktrace
> java.lang.AssertionError at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$PartitionMovements.getTheActualPartitionToBeMoved(AbstractStickyAssignor.java:836)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$PartitionMovements.access$100(AbstractStickyAssignor.java:780)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.reassignPartition(AbstractStickyAssignor.java:699)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.reassignPartition(AbstractStickyAssignor.java:689)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.performReassignments(AbstractStickyAssignor.java:661)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.balance(AbstractStickyAssignor.java:597)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.generalAssign(AbstractStickyAssignor.java:352)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assign(AbstractStickyAssignor.java:85)
>  at 
> org.apache.kafka.clients.consum

[GitHub] [kafka] nizhikov commented on pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)

2020-06-02 Thread GitBox


nizhikov commented on pull request #8695:
URL: https://github.com/apache/kafka/pull/8695#issuecomment-637515040


   @ijuma I found explanation of the test behavior.
   
   Full information can be found in the 
[guide](https://docs.oracle.com/en/java/javase/11/security/java-secure-socket-extension-jsse-reference-guide.html#GUID-4D421910-C36D-40A2-8BA2-7D42CCBED3C6)
 Please, navigate to the "Send ClientHello Message". You may want to take a 
look at the "client version" and "supported_versions (43)" fields.
   
   The root of the "strange" behavior is the structure of the SSL ClientHello 
message(quote from tutorial):
   > **Client version**: For TLS 1.3, this has a fixed value, TLSv1.2; TLS 1.3 
uses the extension supported_versions and not this field to negotiate protocol 
version
   > ...
   > **supported_versions**: Lists which versions of TLS the client supports. 
In particular, if the client 
   > requests TLS 1.3, then the client version field has the value TLSv1.2 and 
this extension 
   > contains the value TLSv1.3; if the client requests TLS 1.2, then the 
client version field has the
   > value TLSv1.2 and this extension either doesn’t exist or contains the 
value TLSv1.2 but not the value TLSv1.3.
   
   This means we can't connect with the following configuration:
   client: 
   ```
   ssl.protocol=TLSv1.2 #this will be used for ClientHello
   ssl.enabled.protocols=TLSv1.2,TLSv1.3 #TLS v1.3 will be ignored in 
ClientHello message.
   ```
   Server:
   ```
   ssl.protocol=TLSv1.3
   ssl.enabled.protocols=TLSv1.3 # Accept only TLSv1.3 
   ```
   
   You can see all details of the SSL connection process in the javax.net log. 
   It can be enabled like the following:
   ```
   public SslVersionsTransportLayerTest(List serverProtocols, 
List clientProtocols) {
   System.setProperty("javax.net.debug", "ssl:handshake"); //This will 
turn on the log from jdk SSL system classes.
   this.serverProtocols = serverProtocols;
   this.clientProtocols = clientProtocols;
   }
   ```
   
   So correct check should be:
   
   ```
   private boolean isCompatible(List serverProtocols, List 
clientProtocols) {
   return serverProtocols.contains(clientProtocols.get(0)) ||
   (clientProtocols.get(0).equals("TLSv1.3") && 
clientProtocols.contains("TLSv1.2"));
   }
   ```
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] itantiger commented on pull request #8753: KAFKA-10043:Some parameters will be overwritten which was configured …

2020-06-02 Thread GitBox


itantiger commented on pull request #8753:
URL: https://github.com/apache/kafka/pull/8753#issuecomment-637519467


   @guozhangwang Can you take a look at this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #8771: MINOR: Add explanation for disabling forwarding from value transformers

2020-06-02 Thread GitBox


bbejeck commented on pull request #8771:
URL: https://github.com/apache/kafka/pull/8771#issuecomment-637540041


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)

2020-06-02 Thread GitBox


ijuma commented on a change in pull request #8695:
URL: https://github.com/apache/kafka/pull/8695#discussion_r433892269



##
File path: 
clients/src/test/java/org/apache/kafka/common/network/SslVersionsTransportLayerTest.java
##
@@ -117,24 +123,51 @@ public void testTlsDefaults() throws Exception {
 server.waitForMetric("response", 1);
 } else {
 NetworkTestUtils.waitForChannelClose(selector, node, 
ChannelState.State.AUTHENTICATION_FAILED);
+server.verifyAuthenticationMetrics(0, 1);
 }
 }
 
+/**
+ * 
+ * The explanation of this check in the structure of the ClientHello SSL 
message.
+ * Please, take a look at the https://docs.oracle.com/en/java/javase/11/security/java-secure-socket-extension-jsse-reference-guide.html#GUID-4D421910-C36D-40A2-8BA2-7D42CCBED3C6";>Guide,
+ * "Send ClientHello Message" section.
+ * 
+ * > Client version: For TLS 1.3, this has a fixed value, TLSv1.2; TLS 1.3 
uses the extension supported_versions and not this field to negotiate protocol 
version
+ * ...
+ * > supported_versions: Lists which versions of TLS the client supports. 
In particular, if the client
+ * > requests TLS 1.3, then the client version field has the value TLSv1.2 
and this extension
+ * > contains the value TLSv1.3; if the client requests TLS 1.2, then the 
client version field has the
+ * > value TLSv1.2 and this extension either doesn’t exist or contains the 
value TLSv1.2 but not the value TLSv1.3.
+ * 
+ *
+ * This mean that TLSv1.3 client can fallback to TLSv1.2 but TLSv1.2 
client can't change protocol to TLSv1.3.
+ *
+ * @param serverProtocols Server protocols.
+ * @param clientProtocols Client protocols.
+ * @return {@code True} if client should be able to connect to the server.
+ */
+private boolean isCompatible(List serverProtocols, List 
clientProtocols) {
+return serverProtocols.contains(clientProtocols.get(0)) ||
+(clientProtocols.get(0).equals("TLSv1.3") && 
clientProtocols.contains("TLSv1.2"));
+}
+
 private static Map getTrustingConfig(CertStores 
certStores, CertStores peerCertStores, List tlsProtocols) {
 Map configs = 
certStores.getTrustingConfig(peerCertStores);
 configs.putAll(sslConfig(tlsProtocols));
 return configs;
 }
 
-private static Map sslConfig(List 
tlsServerProtocols) {
+private static Map sslConfig(List tlsProtocols) {
 Map sslConfig = new HashMap<>();
-sslConfig.put(SslConfigs.SSL_PROTOCOL_CONFIG, 
tlsServerProtocols.get(0));
-sslConfig.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, 
tlsServerProtocols);
+sslConfig.put(SslConfigs.SSL_PROTOCOL_CONFIG, tlsProtocols.get(0));
+sslConfig.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, tlsProtocols);
 return sslConfig;
 }
 
 private Selector createSelector(Map sslClientConfigs) {

Review comment:
   Should this be called `createClientSelector`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nizhikov commented on a change in pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)

2020-06-02 Thread GitBox


nizhikov commented on a change in pull request #8695:
URL: https://github.com/apache/kafka/pull/8695#discussion_r433899761



##
File path: 
clients/src/test/java/org/apache/kafka/common/network/SslVersionsTransportLayerTest.java
##
@@ -117,24 +123,51 @@ public void testTlsDefaults() throws Exception {
 server.waitForMetric("response", 1);
 } else {
 NetworkTestUtils.waitForChannelClose(selector, node, 
ChannelState.State.AUTHENTICATION_FAILED);
+server.verifyAuthenticationMetrics(0, 1);
 }
 }
 
+/**
+ * 
+ * The explanation of this check in the structure of the ClientHello SSL 
message.
+ * Please, take a look at the https://docs.oracle.com/en/java/javase/11/security/java-secure-socket-extension-jsse-reference-guide.html#GUID-4D421910-C36D-40A2-8BA2-7D42CCBED3C6";>Guide,
+ * "Send ClientHello Message" section.
+ * 
+ * > Client version: For TLS 1.3, this has a fixed value, TLSv1.2; TLS 1.3 
uses the extension supported_versions and not this field to negotiate protocol 
version
+ * ...
+ * > supported_versions: Lists which versions of TLS the client supports. 
In particular, if the client
+ * > requests TLS 1.3, then the client version field has the value TLSv1.2 
and this extension
+ * > contains the value TLSv1.3; if the client requests TLS 1.2, then the 
client version field has the
+ * > value TLSv1.2 and this extension either doesn’t exist or contains the 
value TLSv1.2 but not the value TLSv1.3.
+ * 
+ *
+ * This mean that TLSv1.3 client can fallback to TLSv1.2 but TLSv1.2 
client can't change protocol to TLSv1.3.
+ *
+ * @param serverProtocols Server protocols.
+ * @param clientProtocols Client protocols.
+ * @return {@code True} if client should be able to connect to the server.
+ */
+private boolean isCompatible(List serverProtocols, List 
clientProtocols) {
+return serverProtocols.contains(clientProtocols.get(0)) ||
+(clientProtocols.get(0).equals("TLSv1.3") && 
clientProtocols.contains("TLSv1.2"));
+}
+
 private static Map getTrustingConfig(CertStores 
certStores, CertStores peerCertStores, List tlsProtocols) {
 Map configs = 
certStores.getTrustingConfig(peerCertStores);
 configs.putAll(sslConfig(tlsProtocols));
 return configs;
 }
 
-private static Map sslConfig(List 
tlsServerProtocols) {
+private static Map sslConfig(List tlsProtocols) {
 Map sslConfig = new HashMap<>();
-sslConfig.put(SslConfigs.SSL_PROTOCOL_CONFIG, 
tlsServerProtocols.get(0));
-sslConfig.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, 
tlsServerProtocols);
+sslConfig.put(SslConfigs.SSL_PROTOCOL_CONFIG, tlsProtocols.get(0));
+sslConfig.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, tlsProtocols);
 return sslConfig;
 }
 
 private Selector createSelector(Map sslClientConfigs) {

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)

2020-06-02 Thread GitBox


ijuma commented on a change in pull request #8695:
URL: https://github.com/apache/kafka/pull/8695#discussion_r433903099



##
File path: 
clients/src/test/java/org/apache/kafka/common/network/SslVersionsTransportLayerTest.java
##
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.security.TestSecurityConfig;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.utils.Java;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ * Tests for the SSL transport layer.
+ * Checks different versions of the protocol usage on the server and client.
+ */
+@RunWith(value = Parameterized.class)
+public class SslVersionsTransportLayerTest {
+private static final int BUFFER_SIZE = 4 * 1024;
+private static final Time TIME = Time.SYSTEM;
+
+private final List serverProtocols;
+private final List clientProtocols;
+
+@Parameterized.Parameters(name = 
"tlsServerProtocol={0},tlsClientProtocol={1}")
+public static Collection data() {
+List values = new ArrayList<>();
+
+values.add(new Object[] {Collections.singletonList("TLSv1.2"), 
Collections.singletonList("TLSv1.2")});
+
+if (Java.IS_JAVA11_COMPATIBLE) {
+values.add(new Object[] {Collections.singletonList("TLSv1.2"), 
Collections.singletonList("TLSv1.3")});
+values.add(new Object[] {Collections.singletonList("TLSv1.3"), 
Collections.singletonList("TLSv1.2")});
+values.add(new Object[] {Collections.singletonList("TLSv1.3"), 
Collections.singletonList("TLSv1.3")});
+values.add(new Object[] {Collections.singletonList("TLSv1.2"), 
Arrays.asList("TLSv1.2", "TLSv1.3")});
+values.add(new Object[] {Collections.singletonList("TLSv1.2"), 
Arrays.asList("TLSv1.3", "TLSv1.2")});
+values.add(new Object[] {Collections.singletonList("TLSv1.3"), 
Arrays.asList("TLSv1.2", "TLSv1.3")});
+values.add(new Object[] {Collections.singletonList("TLSv1.3"), 
Arrays.asList("TLSv1.3", "TLSv1.2")});
+values.add(new Object[] {Arrays.asList("TLSv1.3", "TLSv1.2"), 
Collections.singletonList("TLSv1.3")});
+values.add(new Object[] {Arrays.asList("TLSv1.3", "TLSv1.2"), 
Collections.singletonList("TLSv1.2")});
+values.add(new Object[] {Arrays.asList("TLSv1.3", "TLSv1.2"), 
Arrays.asList("TLSv1.2", "TLSv1.3")});
+values.add(new Object[] {Arrays.asList("TLSv1.3", "TLSv1.2"), 
Arrays.asList("TLSv1.3", "TLSv1.2")});
+values.add(new Object[] {Arrays.asList("TLSv1.2", "TLSv1.3"), 
Collections.singletonList("TLSv1.3")});
+values.add(new Object[] {Arrays.asList("TLSv1.2", "TLSv1.3"), 
Collections.singletonList("TLSv1.2")});
+values.add(new Object[] {Arrays.asList("TLSv1.2", "TLSv1.3"), 
Arrays.asList("TLSv1.2", "TLSv1.3")});
+values.add(new Object[] {Arrays.asList("TLSv1.2", "TLSv1.3"), 
Arrays.asList("TLSv1.3", "TLSv1.2")});
+}
+return values;
+}
+
+/**
+ * Be aware that you can turn on debug mode for a javax.net.ssl library 
with the line {@code System.setProperty("javax.net.debug", "ssl:handshake");}
+ * @param serverProtocols Server protocols.
+ * @param clientProtocols Client protocols.
+ */
+public SslVersionsTransportLayerTest(List serverProtocols, 
List clientProtocols) {
+this.serverProtocols = serverProtocols;
+this.clientProtocols = clientProtocols;
+}
+
+/**
+ * Tests that connection success with the default TLS version.
+ */
+@Test
+public void testTlsDefaults() th

[GitHub] [kafka] ijuma commented on pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)

2020-06-02 Thread GitBox


ijuma commented on pull request #8695:
URL: https://github.com/apache/kafka/pull/8695#issuecomment-637569220


   Oh, one more thing, let's please add an entry to `upgrade.html`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nizhikov opened a new pull request #8779: [MINOR] Fixing spotbug fail - removing unused variable.

2020-06-02 Thread GitBox


nizhikov opened a new pull request #8779:
URL: https://github.com/apache/kafka/pull/8779


   Fixing nit in c6633a157eec1712116d294eb3785a96cba4e331.
   This commit break spotbug check with the "Dead store to isFreshAssignment in 
org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.generalAssign(Map,
 Map)"
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8774: KAFKA-10081: Remove an unused local variable to pass spotbugsMain check

2020-06-02 Thread GitBox


chia7712 commented on pull request #8774:
URL: https://github.com/apache/kafka/pull/8774#issuecomment-637570083


   ```testReassignmentWithRandomSubscriptionsAndChanges``` is traced by #8778
   ```testMultiConsumerStickyAssignment``` is traced by #8777
   
   +1 to merge this hotfix :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nizhikov commented on pull request #8779: [MINOR] Fixing spotbug fail - removing unused variable.

2020-06-02 Thread GitBox


nizhikov commented on pull request #8779:
URL: https://github.com/apache/kafka/pull/8779#issuecomment-637570728


   Hello @ableegoldman 
   It looks like your patch breaks spot bug check.
   I prepared oneliner fix for it.
   
   Can you, please, take a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)

2020-06-02 Thread GitBox


ijuma commented on pull request #8695:
URL: https://github.com/apache/kafka/pull/8695#issuecomment-637573626


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nizhikov commented on pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)

2020-06-02 Thread GitBox


nizhikov commented on pull request #8695:
URL: https://github.com/apache/kafka/pull/8695#issuecomment-637574517


   I think, currently, the trunk is broken with the 
c6633a157eec1712116d294eb3785a96cba4e331
   I prepared oneliner fix for it - #8779 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nizhikov commented on a change in pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)

2020-06-02 Thread GitBox


nizhikov commented on a change in pull request #8695:
URL: https://github.com/apache/kafka/pull/8695#discussion_r433920350



##
File path: 
clients/src/test/java/org/apache/kafka/common/network/SslVersionsTransportLayerTest.java
##
@@ -117,24 +123,51 @@ public void testTlsDefaults() throws Exception {
 server.waitForMetric("response", 1);
 } else {
 NetworkTestUtils.waitForChannelClose(selector, node, 
ChannelState.State.AUTHENTICATION_FAILED);
+server.verifyAuthenticationMetrics(0, 1);
 }
 }
 
+/**
+ * 
+ * The explanation of this check in the structure of the ClientHello SSL 
message.
+ * Please, take a look at the https://docs.oracle.com/en/java/javase/11/security/java-secure-socket-extension-jsse-reference-guide.html#GUID-4D421910-C36D-40A2-8BA2-7D42CCBED3C6";>Guide,
+ * "Send ClientHello Message" section.
+ * 
+ * > Client version: For TLS 1.3, this has a fixed value, TLSv1.2; TLS 1.3 
uses the extension supported_versions and not this field to negotiate protocol 
version
+ * ...
+ * > supported_versions: Lists which versions of TLS the client supports. 
In particular, if the client
+ * > requests TLS 1.3, then the client version field has the value TLSv1.2 
and this extension
+ * > contains the value TLSv1.3; if the client requests TLS 1.2, then the 
client version field has the
+ * > value TLSv1.2 and this extension either doesn’t exist or contains the 
value TLSv1.2 but not the value TLSv1.3.
+ * 
+ *
+ * This mean that TLSv1.3 client can fallback to TLSv1.2 but TLSv1.2 
client can't change protocol to TLSv1.3.
+ *
+ * @param serverProtocols Server protocols.
+ * @param clientProtocols Client protocols.
+ * @return {@code True} if client should be able to connect to the server.
+ */
+private boolean isCompatible(List serverProtocols, List 
clientProtocols) {
+return serverProtocols.contains(clientProtocols.get(0)) ||
+(clientProtocols.get(0).equals("TLSv1.3") && 
clientProtocols.contains("TLSv1.2"));

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8775: KAFKA-10079: improve thread-level stickiness

2020-06-02 Thread GitBox


vvcephei commented on pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#issuecomment-637578416


   Ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #8771: MINOR: Add explanation for disabling forwarding from value transformers

2020-06-02 Thread GitBox


bbejeck commented on pull request #8771:
URL: https://github.com/apache/kafka/pull/8771#issuecomment-637586221


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

2020-06-02 Thread GitBox


ijuma commented on pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#issuecomment-637591555


   Did we check the build before merging this? It seems to have broken it:
   https://github.com/apache/kafka/pull/8779



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma edited a comment on pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

2020-06-02 Thread GitBox


ijuma edited a comment on pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#issuecomment-637592541


   @guozhangwang Looks like 2.6, 2.5 and 2.4 are broken too. You should 
generally also build locally when cherry-picking.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

2020-06-02 Thread GitBox


ijuma commented on pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#issuecomment-637592541


   @guozhangwang Looks like 2.6, 2.5 and 2.4 are broken too. You should 
generally build locally when cherry-picking.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc edited a comment on pull request #8421: KAFKA-9800: [KIP-580] Admin Client Exponential Backoff Implementation

2020-06-02 Thread GitBox


d8tltanc edited a comment on pull request #8421:
URL: https://github.com/apache/kafka/pull/8421#issuecomment-637300554


   @skaundinya15 @ijuma @abbccdda Thanks for all the feedback and comments. 
This patch was made when I was new to Kafka. It's kind of naive to me at this 
time as I gained more insights into Kafka. Let me talk about two of my major 
concerns and thoughts about implementing the universal client exponential 
backoff.
   
   **AdminClient logic redundant**
   
   NetworkClient has request timeout handlers. Producer / Consumer are using 
NetworkClient to help handle timeout but AdminClient doesn’t. The reason, to my 
understanding, is that AdminClient is implementing the per-request timeout.
   
   For example,
   
   1. Wrapping the request builder into a new class `Call`, (the construction 
lambda adds tons of lines into the AdminClient.java, which should probably have 
been living in each AbstractRequest implementation classes files)
   2. Re-writing the request queues for different request status, while normal 
clients are fully using the NetworkClient.
   
   After we add support to the per-request retry backoff to all clients, we can 
implement the per-request timeout together by the way. Thus we can clean up the 
redundant request handling logic in AdminClient.
   
   Are we considering refactoring the AdminClient further and remove all the 
redundant logic which should have belonged to the networking layer and the 
AbstractRequest implementation classes?
   
   **Flexible backoff modes**
   
   Let's analyze the request backoff demands of all the types of clients at 
this point. In my opinion, there are simply two:
   
   1. Requests do not need exponential backoff. These requests need to be sent 
ASAP to avoid dataflow performance degradation, such as the `ProduceRequest` 
and its related/preceding metadata requests.
   
   2. Request do need exponential backoff. These requests are “second-class 
citizens” and can be throttled to avoid request storms on the broker side. Such 
as metadata related requests in AdminClient.
   
   Now the question comes. Even when two requests are of the same request type, 
one may have to get sent ASAP while the other one may wait, depending on the 
use case. We need to think deeper about how to make a classification.
   
   But the implementation would be simple. We can utilize the existing builder 
pattern AbstractRequest and build the request flexibly upon a given 
retry_backoff mode. For example, 
   
   1. AbstractRequest.Builder will interact with a new abstract class 
specifying the retry_backoff option, static or exponential. 
   2. AbstractRequest will have some new interfaces controlling the backoff. 
   
   Then, we can control if the request should have a static backoff or an 
exponential backoff when we construct each implementation instance of 
AbstractRequest.Builder. 
   
   
   I'll include more details in the Jira ticket and rewrite this PR. Before we 
talk more about the code details and start the new implementation, please let 
me know what you think about the AdminClient refactor and static/exponential 
retry_backoff classification rule. 
   
   As @abbccdda suggests, let's re-direct our further discussion to 
[Jira](https://issues.apache.org/jira/browse/KAFKA-9800) Thanks.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc edited a comment on pull request #8421: KAFKA-9800: [KIP-580] Admin Client Exponential Backoff Implementation

2020-06-02 Thread GitBox


d8tltanc edited a comment on pull request #8421:
URL: https://github.com/apache/kafka/pull/8421#issuecomment-637300554


   @skaundinya15 @ijuma @abbccdda Thanks for all the feedback and comments. 
This patch was made when I was new to Kafka. It's kind of naive to me at this 
time as I gained more insights into Kafka. Let me talk about two of my major 
concerns and thoughts about implementing the universal client exponential 
backoff.
   
   **AdminClient logic redundancy**
   
   NetworkClient has request timeout handlers. Producer / Consumer are using 
NetworkClient to help handle timeout but AdminClient doesn’t. The reason, to my 
understanding, is that AdminClient is implementing the per-request timeout.
   
   For example,
   
   1. Wrapping the request builder into a new class `Call`, (the construction 
lambda adds tons of lines into the AdminClient.java, which should probably have 
been living in each AbstractRequest implementation classes files)
   2. Re-writing the request queues for different request status, while normal 
clients are fully using the NetworkClient.
   
   After we add support to the per-request retry backoff to all clients, we can 
implement the per-request timeout together by the way. Thus we can clean up the 
redundant request handling logic in AdminClient.
   
   Are we considering refactoring the AdminClient further and remove all the 
redundant logic which should have belonged to the networking layer and the 
AbstractRequest implementation classes?
   
   **Flexible backoff modes**
   
   Let's analyze the request backoff demands of all the types of clients at 
this point. In my opinion, there are simply two:
   
   1. Requests do not need exponential backoff. These requests need to be sent 
ASAP to avoid dataflow performance degradation, such as the `ProduceRequest` 
and its related/preceding metadata requests.
   
   2. Request do need exponential backoff. These requests are “second-class 
citizens” and can be throttled to avoid request storms on the broker side. Such 
as metadata related requests in AdminClient.
   
   Now the question comes. Even when two requests are of the same request type, 
one may have to get sent ASAP while the other one may wait, depending on the 
use case. We need to think deeper about how to make a classification.
   
   But the implementation would be simple. We can utilize the existing builder 
pattern AbstractRequest and build the request flexibly upon a given 
retry_backoff mode. For example, 
   
   1. AbstractRequest.Builder will interact with a new abstract class 
specifying the retry_backoff option, static or exponential. 
   2. AbstractRequest will have some new interfaces controlling the backoff. 
   
   Then, we can control if the request should have a static backoff or an 
exponential backoff when we construct each implementation instance of 
AbstractRequest.Builder. 
   
   
   I'll include more details in the Jira ticket and rewrite this PR. Before we 
talk more about the code details and start the new implementation, please let 
me know what you think about the AdminClient refactor and static/exponential 
retry_backoff classification rule. 
   
   As @abbccdda suggests, let's re-direct our further discussion to 
[Jira](https://issues.apache.org/jira/browse/KAFKA-9800) Thanks.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation

2020-06-02 Thread Cheng Tan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17123377#comment-17123377
 ] 

Cheng Tan edited comment on KAFKA-9800 at 6/2/20, 3:12 PM:
---

Recap the discussion in Github. We want to implement a per-request backoff for 
all types of clients.

 

Let me talk about two of my major concerns and thoughts about implementing the 
universal client exponential backoff.

 

**AdminClient logic redundancy**

NetworkClient has request timeout handlers. Producer / Consumer are using 
NetworkClient to help handle timeout but AdminClient doesn’t. The reason, to my 
understanding, is that AdminClient is implementing the per-request timeout.

For example,

1. Wrapping the request builder into a new class `Call`, (the construction 
lambda adds tons of lines into the AdminClient.java, which should probably have 
been living in each AbstractRequest implementation classes files)
2. Re-writing the request queues for different request status, while normal 
clients are fully using the NetworkClient.

After we add support to the per-request retry backoff to all clients, we can 
implement the per-request timeout together by the way. Thus we can clean up the 
redundant request handling logic in AdminClient.

Are we considering refactoring the AdminClient further and remove all the 
redundant logic which should have belonged to the networking layer and the 
AbstractRequest implementation classes?

**Flexible backoff modes**

Let's analyze the request backoff demands of all the types of clients at this 
point. In my opinion, there are simply two:

1. Requests do not need exponential backoff. These requests need to be sent 
ASAP to avoid dataflow performance degradation, such as the `ProduceRequest` 
and its related/preceding metadata requests.

2. Request do need exponential backoff. These requests are “second-class 
citizens” and can be throttled to avoid request storms on the broker side. Such 
as metadata related requests in AdminClient.

Now the question comes. Even when two requests are of the same request type, 
one may have to get sent ASAP while the other one may wait, depending on the 
use case. We need to think deeper about how to make a classification.

But the implementation would be simple. We can utilize the existing builder 
pattern AbstractRequest and build the request flexibly upon a given 
retry_backoff mode. For example,

1. AbstractRequest.Builder will interact with a new abstract class specifying 
the retry_backoff option, static or exponential. 
2. AbstractRequest will have some new interfaces controlling the backoff.

Then, we can control if the request should have a static backoff or an 
exponential backoff when we construct each implementation instance of 
AbstractRequest.Builder.


I'll include more details in the Jira ticket and rewrite this PR. Before we 
talk more about the code details and start the new implementation, please let 
me know what you think about the AdminClient refactor and static/exponential 
retry_backoff classification rule.


was (Author: d8tltanc):
Recap the discussion in Github. We want to implement a per-request backoff for 
all types of clients.

 

Let me talk about two of my major concerns and thoughts about implementing the 
universal client exponential backoff.

 

*AdminClient logic redundant*

NetworkClient has request timeout handlers. Producer / Consumer are using 
NetworkClient to help handle timeout but AdminClient doesn’t. The reason, to my 
understanding, is that AdminClient is implementing the per-request timeout.

For example,
 # Wrapping the request builder into a new class {{Call}}, (the construction 
lambda adds tons of lines into the AdminClient.java, which should probably have 
been living in each AbstractRequest implementation classes files)
 # Re-writing the request queues for different request status, while normal 
clients are fully using the NetworkClient.

After we add support to the per-request timeout to all clients, the AdminClient 
per-request timeout demand won’t be special anymore. Thus, the code for 
supporting the per-request timeout in AdminClient is not useful anymore and 
might be removed.

Are we considering refactoring the AdminClient further and remove all the 
redundant logic which should have belonged to the networking layer and the 
AbstractRequest implementation classes?

*Flexible backoff modes*

Let's analyze the request backoff demands of all the types of clients at this 
point. In my opinion, there are simply two:
 # Requests do not need exponential backoff. These requests need to be sent 
ASAP to avoid dataflow performance degradation, such as the {{ProduceRequest}} 
and its related/preceding metadata requests.

 # Request do need exponential backoff. These requests are “second-class 
citizens” and can be throttled to avoid request storms on the broker side. Such 
as metadata related

[jira] [Commented] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation

2020-06-02 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17123935#comment-17123935
 ] 

Ismael Juma commented on KAFKA-9800:


I think you should apply the same backoff strategy for all request types. I 
don't see much benefit in the more complex approach.

> [KIP-580] Client Exponential Backoff Implementation
> ---
>
> Key: KAFKA-9800
> URL: https://issues.apache.org/jira/browse/KAFKA-9800
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Cheng Tan
>Assignee: Cheng Tan
>Priority: Major
>  Labels: KIP-580
>
> In {{KafkaAdminClient}}, we will have to modify the way the retry backoff is 
> calculated for the calls that have failed and need to be retried. >From the 
> current static retry backoff, we have to introduce a mechanism for all calls 
> that upon failure, the next retry time is dynamically calculated.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] collabH closed pull request #8780: Read kafka

2020-06-02 Thread GitBox


collabH closed pull request #8780:
URL: https://github.com/apache/kafka/pull/8780


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] collabH opened a new pull request #8780: Read kafka

2020-06-02 Thread GitBox


collabH opened a new pull request #8780:
URL: https://github.com/apache/kafka/pull/8780


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] collabH opened a new pull request #8781: Read kafka

2020-06-02 Thread GitBox


collabH opened a new pull request #8781:
URL: https://github.com/apache/kafka/pull/8781


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] collabH closed pull request #8781: Read kafka

2020-06-02 Thread GitBox


collabH closed pull request #8781:
URL: https://github.com/apache/kafka/pull/8781


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)

2020-06-02 Thread GitBox


ijuma commented on a change in pull request #8695:
URL: https://github.com/apache/kafka/pull/8695#discussion_r433981892



##
File path: clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
##
@@ -49,11 +50,12 @@
 
 public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol";
 public static final String SSL_PROTOCOL_DOC = "The SSL protocol used to 
generate the SSLContext. "
-+ "Default setting is TLSv1.2, which is fine for most cases. "
++ "Default setting is TLSv1.2(TLSv1.3 for modern JVM), which is 
fine for most cases. "
 + "Allowed values in recent JVMs are TLSv1.2 and TLSv1.3. TLS, 
TLSv1.1, SSL, SSLv2 and SSLv3 "
-+ "may be supported in older JVMs, but their usage is discouraged 
due to known security vulnerabilities.";
++ "may be supported in older JVMs, but their usage is discouraged 
due to known security vulnerabilities."
++ "Please, note, TLSv1.2 clients can't connect to the servers with 
TLSv1.3 only even if ssl.enabled.protocols contains TLSv1.3";

Review comment:
   How about:
   
   ```java
   "The SSL protocol used to generate the SSLContext. "
   + "The default is TLSv1.3 when running with Java 11 or newer, 
TLSv1.2 otherwise. "
   + "This value should be fine for most use cases. "
   + "Allowed values in recent JVMs are TLSv1.2 and TLSv1.3. TLS, 
TLSv1.1, SSL, SSLv2 and SSLv3 "
   + "may be supported in older JVMs, but their usage is 
discouraged due to known security vulnerabilities. ";
   + "With the default value for this config and 
ssl.enabled.protocols, clients will downgrade to TLSv1.2 if "
   + "the server does not support TLSv1.3. If this config is set to 
TLSv1.2, clients will not use TLSv1.3 even "
   + "if it is one of the values in ssl.enabled.protocols and the 
server only supports TLSv1.3."
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)

2020-06-02 Thread GitBox


ijuma commented on a change in pull request #8695:
URL: https://github.com/apache/kafka/pull/8695#discussion_r433987384



##
File path: clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
##
@@ -64,7 +66,17 @@
 
 public static final String SSL_ENABLED_PROTOCOLS_CONFIG = 
"ssl.enabled.protocols";
 public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of 
protocols enabled for SSL connections.";

Review comment:
   How about:
   
   ```
   The list of protocols enabled for SSL connections. The default is 
'TLSv1.2,TLSv1.3' when running with Java 11 or newer, 'TLSv1.2' otherwise. With 
the default value for Java 11, clients and servers will prefer TLSv1.3 if both 
support it and fallback to TLSv1.2 otherwise (assuming both support at least 
TLSv1.2). This default should be fine for most cases.
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)

2020-06-02 Thread GitBox


ijuma commented on a change in pull request #8695:
URL: https://github.com/apache/kafka/pull/8695#discussion_r433987384



##
File path: clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
##
@@ -64,7 +66,17 @@
 
 public static final String SSL_ENABLED_PROTOCOLS_CONFIG = 
"ssl.enabled.protocols";
 public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of 
protocols enabled for SSL connections.";

Review comment:
   How about:
   
   ```
   The list of protocols enabled for SSL connections. The default is 
'TLSv1.2,TLSv1.3' when running with Java 11 or newer, 'TLSv1.2' otherwise. With 
the default value for Java 11, clients and servers will prefer TLSv1.3 if both 
support it and fallback to TLSv1.2 otherwise (assuming both support at least 
TLSv1.2). This default should be fine for most cases. Also see the 
`ssl.protocol` config documentation.
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)

2020-06-02 Thread GitBox


ijuma commented on a change in pull request #8695:
URL: https://github.com/apache/kafka/pull/8695#discussion_r433990762



##
File path: docs/upgrade.html
##
@@ -18,6 +18,10 @@
 
 
 

[GitHub] [kafka] ijuma merged pull request #8779: [MINOR] Fixing spotbug fail - removing unused variable.

2020-06-02 Thread GitBox


ijuma merged pull request #8779:
URL: https://github.com/apache/kafka/pull/8779


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)

2020-06-02 Thread GitBox


ijuma commented on pull request #8695:
URL: https://github.com/apache/kafka/pull/8695#issuecomment-637647620


   @nizhikov I think we're good to merge this after the non code suggestions 
above are addressed (assuming we can get a Jenkins build, I merged your other 
PR fixing the build issue).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8774: KAFKA-10081: Remove an unused local variable to pass spotbugsMain check

2020-06-02 Thread GitBox


mjsax commented on pull request #8774:
URL: https://github.com/apache/kafka/pull/8774#issuecomment-637655627







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax removed a comment on pull request #8774: KAFKA-10081: Remove an unused local variable to pass spotbugsMain check

2020-06-02 Thread GitBox


mjsax removed a comment on pull request #8774:
URL: https://github.com/apache/kafka/pull/8774#issuecomment-637655696


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #8774: KAFKA-10081: Remove an unused local variable to pass spotbugsMain check

2020-06-02 Thread GitBox


mjsax merged pull request #8774:
URL: https://github.com/apache/kafka/pull/8774


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-9943) Enable TLSv.1.3 in system tests "run all" execution.

2020-06-02 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-9943:
--

Assignee: Nikolay Izhikov

> Enable TLSv.1.3 in system tests "run all" execution.
> 
>
> Key: KAFKA-9943
> URL: https://issues.apache.org/jira/browse/KAFKA-9943
> Project: Kafka
>  Issue Type: Test
>Reporter: Nikolay Izhikov
>Assignee: Nikolay Izhikov
>Priority: Major
>
> We need to enable system tests with the TLSv1.3 in "run all" execution.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9943) Enable TLSv.1.3 in system tests "run all" execution.

2020-06-02 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-9943:
---
Fix Version/s: 2.6.0

> Enable TLSv.1.3 in system tests "run all" execution.
> 
>
> Key: KAFKA-9943
> URL: https://issues.apache.org/jira/browse/KAFKA-9943
> Project: Kafka
>  Issue Type: Test
>Reporter: Nikolay Izhikov
>Assignee: Nikolay Izhikov
>Priority: Major
> Fix For: 2.6.0
>
>
> We need to enable system tests with the TLSv1.3 in "run all" execution.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8778: KAFKA-10083: fix failed testReassignmentWithRandomSubscriptionsAndChanges tests

2020-06-02 Thread GitBox


ableegoldman commented on a change in pull request #8778:
URL: https://github.com/apache/kafka/pull/8778#discussion_r434008302



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -43,7 +43,7 @@
 
 public static final int DEFAULT_GENERATION = -1;
 
-private PartitionMovements partitionMovements = new PartitionMovements();
+private PartitionMovements partitionMovements;

Review comment:
   Can we still initialize it here as well? I remember that was necessary 
for some tests to pass since they might never get to the `generalAssign` method 
and `isSticky` would hit NPE
   
   On the other hand, it seems like `isSticky` is pointless to call unless we 
get to the `generalAssign` method. So maybe we should just remove that from the 
tests that only do the `constrainedAssign` and just verify the stickiness 
directly?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8777: KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment

2020-06-02 Thread GitBox


ableegoldman commented on pull request #8777:
URL: https://github.com/apache/kafka/pull/8777#issuecomment-637664149


   cc @mjsax @guozhangwang , should be cherrypicked to 2.6, 2.5, and 2.4



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8778: KAFKA-10083: fix failed testReassignmentWithRandomSubscriptionsAndChanges tests

2020-06-02 Thread GitBox


ableegoldman commented on pull request #8778:
URL: https://github.com/apache/kafka/pull/8778#issuecomment-637665071


   cc @mjsax @guozhangwang , should be cherrypicked to 2.6, 2.5, and 2.4 (once 
my comment above is addressed)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8775: KAFKA-10079: improve thread-level stickiness

2020-06-02 Thread GitBox


vvcephei commented on a change in pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#discussion_r433923120



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -938,57 +930,9 @@ private void populatePartitionsByHostMaps(final 
Map versionProbingAssignment(final Map clientsMetadata,
- final Map> partitionsForTask,
- final 
Map> partitionsByHost,
- final 
Map> standbyPartitionsByHost,
- final 
Set allOwnedPartitions,
- final int 
minUserMetadataVersion,
- final int 
minSupportedMetadataVersion) {
-final Map assignment = new HashMap<>();
-
-// Since we know another rebalance will be triggered anyway, just try 
and generate a balanced assignment
-// (without violating cooperative protocol) now so that on the second 
rebalance we can just give tasks
-// back to their previous owners
-// within the client, distribute tasks to its owned consumers
-for (final ClientMetadata clientMetadata : clientsMetadata.values()) {
-final ClientState state = clientMetadata.state;
-
-final Map> interleavedActive =
-interleaveConsumerTasksByGroupId(state.activeTasks(), 
clientMetadata.consumers);
-final Map> interleavedStandby =
-interleaveConsumerTasksByGroupId(state.standbyTasks(), 
clientMetadata.consumers);
-
-addClientAssignments(
-assignment,
-clientMetadata,
-partitionsForTask,
-partitionsByHost,
-standbyPartitionsByHost,
-allOwnedPartitions,
-interleavedActive,
-interleavedStandby,
-minUserMetadataVersion,
-minSupportedMetadataVersion,
-true,
-false);
-}
-
-log.info("Finished unstable assignment of tasks, a followup rebalance 
will be scheduled due to version probing.");
-
-return assignment;
-}
-
 /**
  * Adds the encoded assignment for each StreamThread consumer in the 
client to the overall assignment map
- * @return true if this client has been told to schedule a followup 
rebalance
+ * @return true if a followup rebalance will be required due to revoekd 
tasks

Review comment:
   ```suggestion
* @return true if a followup rebalance will be required due to revoked 
tasks
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##
@@ -242,8 +250,9 @@ public void addOwnedPartitions(final 
Collection ownedPartitions,
 }
 }
 
-public void addPreviousTasksAndOffsetSums(final Map 
taskOffsetSums) {
+public void addPreviousTasksAndOffsetSums(final String consumerId, final 
Map taskOffsetSums) {
 this.taskOffsetSums.putAll(taskOffsetSums);
+consumerToPreviousTaskIds.put(consumerId, taskOffsetSums.keySet());

Review comment:
   We have several new methods, and also this new book-kept collection 
(`consumerToPreviousTaskIds`), but no new tests for them in ClientStateTest. 
Can you add the missing coverage?
   
   The new methods are more a matter of principle; I'm really concerned that we 
should have good coverage on the bookkeeping aspect of 
`consumerToPreviousTaskIds` because I fear future regressions when we have to 
maintain two data structures in a consistent fashion





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-06-02 Thread GitBox


chia7712 commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-637668386


   @junrao Could you take a look? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10063) UnsupportedOperation when querying cleaner metrics after shutdown

2020-06-02 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-10063:
--

Assignee: Chia-Ping Tsai

> UnsupportedOperation when querying cleaner metrics after shutdown
> -
>
> Key: KAFKA-10063
> URL: https://issues.apache.org/jira/browse/KAFKA-10063
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> We have a few log cleaner metrics which iterate the set of cleaners. For 
> example:
> {code}
>   newGauge("max-clean-time-secs", () => 
> cleaners.iterator.map(_.lastStats.elapsedSecs).max.toInt)
> {code}
> It seems possible currently for LogCleaner metrics to get queried after 
> shutdown of the log cleaner, which clears the `cleaners` collection. This can 
> lead to the following error:
> {code}
> java.lang.UnsupportedOperationException: empty.max
>   at scala.collection.IterableOnceOps.max(IterableOnce.scala:952)
>   at scala.collection.IterableOnceOps.max$(IterableOnce.scala:950)
>   at scala.collection.AbstractIterator.max(Iterator.scala:1279)
>   at 
> kafka.log.LogCleaner.kafka$log$LogCleaner$$$anonfun$new$9(LogCleaner.scala:132)
>   at kafka.log.LogCleaner$$anonfun$4.value(LogCleaner.scala:132)
>   at kafka.log.LogCleaner$$anonfun$4.value(LogCleaner.scala:132)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8775: KAFKA-10079: improve thread-level stickiness

2020-06-02 Thread GitBox


ableegoldman commented on a change in pull request #8775:
URL: https://github.com/apache/kafka/pull/8775#discussion_r434018826



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##
@@ -242,8 +250,9 @@ public void addOwnedPartitions(final 
Collection ownedPartitions,
 }
 }
 
-public void addPreviousTasksAndOffsetSums(final Map 
taskOffsetSums) {
+public void addPreviousTasksAndOffsetSums(final String consumerId, final 
Map taskOffsetSums) {
 this.taskOffsetSums.putAll(taskOffsetSums);
+consumerToPreviousTaskIds.put(consumerId, taskOffsetSums.keySet());

Review comment:
   Definitely. I meant to write tests but then I took Luna for a walk and 
forgot 😄 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

2020-06-02 Thread GitBox


ableegoldman commented on pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#issuecomment-637672028


   Sorry @ijuma, I think I only ever ran the local tests + checkstyle, not the 
full suite. My mistake



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case

2020-06-02 Thread GitBox


ableegoldman commented on a change in pull request #8668:
URL: https://github.com/apache/kafka/pull/8668#discussion_r434021704



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -65,9 +72,206 @@ public MemberData(List partitions, 
Optional generation)
 @Override
 public Map> assign(Map 
partitionsPerTopic,
 Map 
subscriptions) {
+Map> consumerToOwnedPartitions = new 
HashMap<>();
+if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, 
consumerToOwnedPartitions)) {
+log.debug("Detected that all consumers were subscribed to same set 
of topics, invoking the "
+  + "optimized assignment algorithm");
+partitionsTransferringOwnership = new HashMap<>();
+return constrainedAssign(partitionsPerTopic, 
consumerToOwnedPartitions);
+} else {
+log.debug("Detected that all not consumers were subscribed to same 
set of topics, falling back to the "
+  + "general case assignment algorithm");
+partitionsTransferringOwnership = null;
+return generalAssign(partitionsPerTopic, subscriptions);
+}
+}
+
+/**
+ * Returns true iff all consumers have an identical subscription. Also 
fills out the passed in
+ * {@code consumerToOwnedPartitions} with each consumer's previously owned 
and still-subscribed partitions
+ */
+private boolean allSubscriptionsEqual(Set allTopics,
+  Map 
subscriptions,
+  Map> 
consumerToOwnedPartitions) {
+Set membersWithOldGeneration = new HashSet<>();
+Set membersOfCurrentHighestGeneration = new HashSet<>();
+int maxGeneration = DEFAULT_GENERATION;
+
+Set subscribedTopics = new HashSet<>();
+
+for (Map.Entry subscriptionEntry : 
subscriptions.entrySet()) {
+String consumer = subscriptionEntry.getKey();
+Subscription subscription = subscriptionEntry.getValue();
+
+// initialize the subscribed topics set if this is the first 
subscription
+if (subscribedTopics.isEmpty()) {
+subscribedTopics.addAll(subscription.topics());
+} else if (!(subscription.topics().size() == 
subscribedTopics.size()
+&& subscribedTopics.containsAll(subscription.topics( {
+return false;
+}
+
+MemberData memberData = memberData(subscription);
+
+List ownedPartitions = new ArrayList<>();
+consumerToOwnedPartitions.put(consumer, ownedPartitions);
+
+// Only consider this consumer's owned partitions as valid if it 
is a member of the current highest
+// generation, or it's generation is not present but we have not 
seen any known generation so far
+if (memberData.generation.isPresent() && 
memberData.generation.get() >= maxGeneration
+|| !memberData.generation.isPresent() && maxGeneration == 
DEFAULT_GENERATION) {
+
+membersOfCurrentHighestGeneration.add(consumer);
+for (final TopicPartition tp : memberData.partitions) {
+// filter out any topics that no longer exist or aren't 
part of the current subscription
+if (allTopics.contains(tp.topic())) {
+ownedPartitions.add(tp);
+}
+}
+
+// If the current member's generation is higher, all the 
previous owned partitions are invalid
+if (memberData.generation.isPresent() && 
memberData.generation.get() > maxGeneration) {
+
membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration);
+membersOfCurrentHighestGeneration.clear();

Review comment:
   Just FYI, I introduced this bug right before merging. Luckily the tests 
caught it -- fix is https://github.com/apache/kafka/pull/8777





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-7599) Trogdor - Allow configuration for not throttling Benchmark Workers and expose messages per second in task status

2020-06-02 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski reassigned KAFKA-7599:
--

Assignee: (was: Stanislav Kozlovski)

> Trogdor - Allow configuration for not throttling Benchmark Workers and expose 
> messages per second in task status
> 
>
> Key: KAFKA-7599
> URL: https://issues.apache.org/jira/browse/KAFKA-7599
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Priority: Major
>
> In Trogdor, the ConsumeBench, ProduceBench and RoundTrip workers all take in 
> an argument called "targetMessagesPerSec". That argument works as an upper 
> bound on the number of messages that can be consumed/produced per second in 
> that worker.
> It is useful to support infinite messages per second. Currently, if the 
> `targetMessagesPerSec` field is not present in the request, the 
> RoundTripWorker will raise an exception, whereas the ConsumeBench and 
> ProduceBench workers will work as if they had `targetMessagesPerSec=10`.
> I propose we allow for unbounded `targetMessagesPerSec` if the field is not 
> present.
> Further, it would be very useful if some of these workers showed the 
> `messagesPerSecond` they have been producing/consuming at. 
> Even now, giving the worker a `targetMessagesPerSec` does not guarantee that 
> the worker will reach the needed `targetMessagesPerSec`. There is no easy way 
> of knowing how the worker performed - you have to subtract the status fields 
> `startedMs` and `doneMs` to get the total duration of the task, convert to 
> seconds and then divide that by the `maxMessages` field.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-8264) Flaky Test PlaintextConsumerTest#testLowMaxFetchSizeForRequestAndPartition

2020-06-02 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski reassigned KAFKA-8264:
--

Assignee: (was: Stanislav Kozlovski)

> Flaky Test PlaintextConsumerTest#testLowMaxFetchSizeForRequestAndPartition
> --
>
> Key: KAFKA-8264
> URL: https://issues.apache.org/jira/browse/KAFKA-8264
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.0.1, 2.3.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.6.0
>
>
> [https://builds.apache.org/blue/organizations/jenkins/kafka-2.0-jdk8/detail/kafka-2.0-jdk8/252/tests]
> {quote}org.apache.kafka.common.errors.TopicExistsException: Topic 'topic3' 
> already exists.{quote}
> STDOUT
>  
> {quote}[2019-04-19 03:54:20,080] ERROR [ReplicaFetcher replicaId=1, 
> leaderId=0, fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:20,080] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:20,312] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:20,313] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition topic-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:20,994] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:21,727] ERROR [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error for partition topicWithNewMessageFormat-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:28,696] ERROR [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:28,699] ERROR [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:29,246] ERROR [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error for partition topic-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:29,247] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:29,287] ERROR [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Error for partition topic-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:33,408] ERROR [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:33,408] ERROR [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:33,655] ERROR [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error 

[GitHub] [kafka] guozhangwang commented on pull request #8779: [MINOR] Fixing spotbug fail - removing unused variable.

2020-06-02 Thread GitBox


guozhangwang commented on pull request #8779:
URL: https://github.com/apache/kafka/pull/8779#issuecomment-637676075


   Cherry-picked to 2.6 / 2.5 / 2.4



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #8777: KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment

2020-06-02 Thread GitBox


guozhangwang merged pull request #8777:
URL: https://github.com/apache/kafka/pull/8777


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8777: KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment

2020-06-02 Thread GitBox


guozhangwang commented on pull request #8777:
URL: https://github.com/apache/kafka/pull/8777#issuecomment-637677433


   Cherry-picked to 2.6 / 2.5 / 2.4



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #8782: KAFKA-10080; Fix race condition on txn completion which can cause duplicate appends

2020-06-02 Thread GitBox


hachikuji opened a new pull request #8782:
URL: https://github.com/apache/kafka/pull/8782


   The method `maybeWriteTxnCompletion` is unsafe for concurrent calls. This 
can cause duplicate attempts to write the completion record to the log, which 
can ultimately lead to illegal state errors and possible to correctness 
violations if another transaction had been started before the duplicate was 
written. This patch fixes the problem by ensuring only one thread can 
successfully remove the pending completion from the map.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-8723) flaky test LeaderElectionCommandTest#testAllTopicPartition

2020-06-02 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski resolved KAFKA-8723.

Resolution: Fixed

> flaky test LeaderElectionCommandTest#testAllTopicPartition
> --
>
> Key: KAFKA-8723
> URL: https://issues.apache.org/jira/browse/KAFKA-8723
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Stanislav Kozlovski
>Priority: Major
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/23737/console]
>  
> *15:52:26* kafka.admin.LeaderElectionCommandTest > testAllTopicPartition 
> STARTED*15:53:08* kafka.admin.LeaderElectionCommandTest.testAllTopicPartition 
> failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11@2/core/build/reports/testOutput/kafka.admin.LeaderElectionCommandTest.testAllTopicPartition.test.stdout*15:53:08*
>  *15:53:08* kafka.admin.LeaderElectionCommandTest > testAllTopicPartition 
> FAILED*15:53:08* kafka.common.AdminCommandFailedException: Timeout 
> waiting for election results*15:53:08* at 
> kafka.admin.LeaderElectionCommand$.electLeaders(LeaderElectionCommand.scala:133)*15:53:08*
>  at 
> kafka.admin.LeaderElectionCommand$.run(LeaderElectionCommand.scala:88)*15:53:08*
>  at 
> kafka.admin.LeaderElectionCommand$.main(LeaderElectionCommand.scala:41)*15:53:08*
>  at 
> kafka.admin.LeaderElectionCommandTest$$anonfun$testAllTopicPartition$1.apply(LeaderElectionCommandTest.scala:91)*15:53:08*
>  at 
> kafka.admin.LeaderElectionCommandTest$$anonfun$testAllTopicPartition$1.apply(LeaderElectionCommandTest.scala:74)*15:53:08*
>  at kafka.utils.TestUtils$.resource(TestUtils.scala:1588)*15:53:08*   
>   at 
> kafka.admin.LeaderElectionCommandTest.testAllTopicPartition(LeaderElectionCommandTest.scala:74)*15:53:08*
>  *15:53:08* Caused by:*15:53:08* 
> org.apache.kafka.common.errors.TimeoutException: Aborted due to 
> timeout.*15:53:08*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-7940) Flaky Test CustomQuotaCallbackTest#testCustomQuotaCallback

2020-06-02 Thread Stanislav Kozlovski (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stanislav Kozlovski reassigned KAFKA-7940:
--

Assignee: (was: Stanislav Kozlovski)

> Flaky Test CustomQuotaCallbackTest#testCustomQuotaCallback
> --
>
> Key: KAFKA-7940
> URL: https://issues.apache.org/jira/browse/KAFKA-7940
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.2.0, 2.4.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.2.0
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/14/]
> {quote}java.lang.AssertionError: Too many quotaLimit calls Map(PRODUCE -> 1, 
> FETCH -> 1, REQUEST -> 4) at org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback(CustomQuotaCallbackTest.scala:105){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8406) kafka-topics throws wrong error on invalid configuration with bootstrap-server and alter config

2020-06-02 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17124052#comment-17124052
 ] 

Stanislav Kozlovski commented on KAFKA-8406:


[~savulchik] are you sure? Can you try the exact same commands I listed in the 
description? I just tested this in 2.5 and it is still an issue

> kafka-topics throws wrong error on invalid configuration with 
> bootstrap-server and alter config
> ---
>
> Key: KAFKA-8406
> URL: https://issues.apache.org/jira/browse/KAFKA-8406
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
>
> Running
> {code:java}
> ./kafka-topics --bootstrap-server  --alter --config 
> retention.ms=360 --topic topic{code}
> Results in
> {code:java}
> Missing required argument "[partitions]"{code}
> Running
> {code:java}
> ./kafka-topics --bootstrap-server  --alter --config 
> retention.ms=360 --topic topic --partitions 25{code}
> Results in
> {code:java}
> Option combination "[bootstrap-server],[config]" can't be used with option 
> "[alter]"{code}
> For better clarity, we should just throw the last error outright.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bob-barrett opened a new pull request #8784: KAFKA-9788: Use distinct names for transaction and group load time se…

2020-06-02 Thread GitBox


bob-barrett opened a new pull request #8784:
URL: https://github.com/apache/kafka/pull/8784


   …nsors
   
   Sensor objects are stored in the Kafka metrics registry and keyed by name. 
If a new sensor is created with the same name as an existing one, the existing 
one is returned rather than a new object being created. The partition load time 
sensors for the transaction and group coordinators used the same name, so data 
recorded to either was stored in the same object. This meant that the metrics 
values for both metrics were identical and consisted of the combined data. This 
patch changes the names to be distinct so that the data will be stored in 
separate Sensor objects.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 opened a new pull request #8783: KAFKA-10063 UnsupportedOperation when querying cleaner metrics after …

2020-06-02 Thread GitBox


chia7712 opened a new pull request #8783:
URL: https://github.com/apache/kafka/pull/8783


   https://issues.apache.org/jira/browse/KAFKA-10063
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #8117: KAFKA-8403: Suppress needs a Materialized variant

2020-06-02 Thread GitBox


dongjinleekr commented on pull request #8117:
URL: https://github.com/apache/kafka/pull/8117#issuecomment-637683206


   @vvcephei Sorry for being late, I just got out from my last project; I will 
have a look.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #7898: KAFKA-9366: please consider upgrade log4j to log4j2 due to critical security problem CVE-2019-17571

2020-06-02 Thread GitBox


dongjinleekr commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-637684403


   All // Sorry for being late, I just got out from my last project; I will 
have a look at this PR this weekend.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8782: KAFKA-10080; Fix race condition on txn completion which can cause duplicate appends

2020-06-02 Thread GitBox


chia7712 commented on a change in pull request #8782:
URL: https://github.com/apache/kafka/pull/8782#discussion_r434038469



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
##
@@ -215,8 +215,6 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
   }
 
   private def writeTxnCompletion(pendingCommitTxn: PendingCompleteTxn): Unit = 
{

Review comment:
   How about renaming ```pendingCommitTxn``` to ```pendingCompleteTxn```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-8480) Clients may fetch incomplete set of topic partitions during cluster startup

2020-06-02 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio reassigned KAFKA-8480:
-

Assignee: Jose Armando Garcia Sancio  (was: Anna Povzner)

> Clients may fetch incomplete set of topic partitions during cluster startup
> ---
>
> Key: KAFKA-8480
> URL: https://issues.apache.org/jira/browse/KAFKA-8480
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.1
>Reporter: Anna Povzner
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> KafkaConsumer#partitionsFor() or AdminClient#describeTopics() may return not 
> all partitions for a given topic when the cluster is starting up (after 
> cluster was down). 
> The cause is controller, on becoming a controller, sending 
> UpdateMetadataRequest for all partitions with at least one online replica, 
> and then a separate UpdateMetadataRequest for all partitions with at least 
> one offline replica. If client sends metadata request in between broker 
> processing those two update metadata requests, clients will get incomplete 
> set of partitions.
> Proposed fix: controller should send one UpdateMetadataRequest (containing 
> all partitions) in  ReplicaStateMachine#startup().



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

2020-06-02 Thread GitBox


ableegoldman commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434051190



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##
@@ -247,6 +247,9 @@ private void close(final boolean clean) {
 "state manager close",
 log
 );
+} else if (state() == State.CLOSED) {

Review comment:
   Should we switch to `switch` here as well? 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -218,6 +218,10 @@ public void initializeIfNeeded() {
  */
 @Override
 public void completeRestoration() {
+if (state() == State.RUNNING) {
+return;
+}
+

Review comment:
   Can we use if/ else if here for consistency?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -466,6 +510,11 @@ public void closeAndRecycleState() {
 stateMgr.recycle();
 recordCollector.close();
 break;
+
+case CLOSED:
+log.trace("Skip close since state is {}", state());

Review comment:
   I think this might be one of those exceptions where we should still 
enforce that the state is not `CLOSED` (ie throw `IllegalStateException`) since 
there are related actions that occur outside of the Task implementation that 
will fail if we try to recycle a CLOSED task. Similar to prepare/post commit, 
resume, etc

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -390,19 +387,17 @@ boolean tryToCompleteRestoration() {
 
 final List restoringTasks = new LinkedList<>();
 for (final Task task : tasks.values()) {
-if (task.state() == CREATED) {
-try {
-task.initializeIfNeeded();
-} catch (final LockException | TimeoutException e) {
-// it is possible that if there are multiple threads 
within the instance that one thread
-// trying to grab the task from the other, while the other 
has not released the lock since
-// it did not participate in the rebalance. In this case 
we can just retry in the next iteration
-log.debug("Could not initialize {} due to the following 
exception; will retry", task.id(), e);
-allRunning = false;
-}
+try {
+task.initializeIfNeeded();
+} catch (final LockException | TimeoutException e) {
+// it is possible that if there are multiple threads within 
the instance that one thread
+// trying to grab the task from the other, while the other has 
not released the lock since
+// it did not participate in the rebalance. In this case we 
can just retry in the next iteration
+log.debug("Could not initialize {} due to the following 
exception; will retry", task.id(), e);
+allRunning = false;
 }
 
-if (task.state() == RESTORING) {
+if (task.isActive()) {

Review comment:
   Can we add a comment or rename `restoringTasks` to clarify that it's ok 
to put an active-but-not-restoring task in here since 
`Task#completeRestoration` is idempotent/no-op for RUNNING tasks?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -218,6 +218,10 @@ public void initializeIfNeeded() {
  */
 @Override
 public void completeRestoration() {
+if (state() == State.RUNNING) {
+return;
+}
+
 if (state() == State.RESTORING) {
 initializeMetadata();
 initializeTopology();

Review comment:
   github won't let me leave a comment below this line, but can we use the 
`"Illegal state"`/`"Unknown state"` improvement in this method as well?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##
@@ -247,6 +247,9 @@ private void close(final boolean clean) {
 "state manager close",
 log
 );
+} else if (state() == State.CLOSED) {
+log.trace("Skip closing since state is {}", state());
+return;
 } else {
 throw new IllegalStateException("Illegal state " + state() + " 
while closing standby task " + id);

Review comment:
   `Illegal state` -> `Unknown state`? 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final 
Map changelogEntry : 
changelogOffsets.entrySet()) {
 final long offset = changel

[jira] [Resolved] (KAFKA-9987) Improve sticky partition assignor algorithm

2020-06-02 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman resolved KAFKA-9987.

Fix Version/s: 2.5.1
   2.4.2
   2.6.0
   Resolution: Fixed

> Improve sticky partition assignor algorithm
> ---
>
> Key: KAFKA-9987
> URL: https://issues.apache.org/jira/browse/KAFKA-9987
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0, 2.4.2, 2.5.1
>
>
> In 
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  we added the new CooperativeStickyAssignor which leverages on the underlying 
> sticky assignment algorithm of the existing StickyAssignor (moved to 
> AbstractStickyAssignor). The algorithm is fairly complex as it tries to 
> optimize stickiness while satisfying perfect balance _in the case individual 
> consumers may be subscribed to different subsets of the topics._ While it 
> does a pretty good job at what it promises to do, it doesn't scale well with 
> large numbers of consumers and partitions.
> To give a concrete example, users have reported that it takes 2.5 minutes for 
> the assignment to complete with just 2100 consumers reading from 2100 
> partitions. Since partitions revoked during the first of two cooperative 
> rebalances will remain unassigned until the end of the second rebalance, it's 
> important for the rebalance to be as fast as possible. And since one of the 
> primary improvements of the cooperative rebalancing protocol is better 
> scaling experience, the only OOTB cooperative assignor should not itself 
> scale poorly
> If we can constrain the problem a bit, we can simplify the algorithm greatly. 
> In many cases the individual consumers won't be subscribed to some random 
> subset of the total subscription, they will all be subscribed to the same set 
> of topics and rely on the assignor to balance the partition workload.
> We can detect this case by checking the group's individual subscriptions and 
> call on a more efficient assignment algorithm. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9987) Improve sticky partition assignor algorithm

2020-06-02 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9987:
---
Component/s: consumer

> Improve sticky partition assignor algorithm
> ---
>
> Key: KAFKA-9987
> URL: https://issues.apache.org/jira/browse/KAFKA-9987
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0, 2.4.2, 2.5.1
>
>
> In 
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  we added the new CooperativeStickyAssignor which leverages on the underlying 
> sticky assignment algorithm of the existing StickyAssignor (moved to 
> AbstractStickyAssignor). The algorithm is fairly complex as it tries to 
> optimize stickiness while satisfying perfect balance _in the case individual 
> consumers may be subscribed to different subsets of the topics._ While it 
> does a pretty good job at what it promises to do, it doesn't scale well with 
> large numbers of consumers and partitions.
> To give a concrete example, users have reported that it takes 2.5 minutes for 
> the assignment to complete with just 2100 consumers reading from 2100 
> partitions. Since partitions revoked during the first of two cooperative 
> rebalances will remain unassigned until the end of the second rebalance, it's 
> important for the rebalance to be as fast as possible. And since one of the 
> primary improvements of the cooperative rebalancing protocol is better 
> scaling experience, the only OOTB cooperative assignor should not itself 
> scale poorly
> If we can constrain the problem a bit, we can simplify the algorithm greatly. 
> In many cases the individual consumers won't be subscribed to some random 
> subset of the total subscription, they will all be subscribed to the same set 
> of topics and rely on the assignor to balance the partition workload.
> We can detect this case by checking the group's individual subscriptions and 
> call on a more efficient assignment algorithm. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #8782: KAFKA-10080; Fix race condition on txn completion which can cause duplicate appends

2020-06-02 Thread GitBox


hachikuji commented on a change in pull request #8782:
URL: https://github.com/apache/kafka/pull/8782#discussion_r434098545



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
##
@@ -285,15 +280,16 @@ class TransactionMarkerChannelManager(config: KafkaConfig,
 }
   }
 
-  private def maybeWriteTxnCompletion(transactionalId: String): Unit = {
-Option(transactionsWithPendingMarkers.get(transactionalId)).foreach { 
pendingCommitTxn =>

Review comment:
   Multiple threads may see the transaction still as pending and attempt 
completion.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

2020-06-02 Thread GitBox


mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434110314



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##
@@ -247,6 +247,9 @@ private void close(final boolean clean) {
 "state manager close",
 log
 );
+} else if (state() == State.CLOSED) {

Review comment:
   We could... (cf. comment below)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

2020-06-02 Thread GitBox


mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434111426



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##
@@ -247,6 +247,9 @@ private void close(final boolean clean) {
 "state manager close",
 log
 );
+} else if (state() == State.CLOSED) {
+log.trace("Skip closing since state is {}", state());
+return;
 } else {
 throw new IllegalStateException("Illegal state " + state() + " 
while closing standby task " + id);

Review comment:
   The state could be `RESTORING` what is _illegal_ but not _unknown_ -- We 
would need more conditions to distinguish both cases (introducing `switch()` 
would be helpful for this case). Thoughts?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

2020-06-02 Thread GitBox


mjsax commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434111426



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##
@@ -247,6 +247,9 @@ private void close(final boolean clean) {
 "state manager close",
 log
 );
+} else if (state() == State.CLOSED) {
+log.trace("Skip closing since state is {}", state());
+return;
 } else {
 throw new IllegalStateException("Illegal state " + state() + " 
while closing standby task " + id);

Review comment:
   The state could be `RESTORING` what is _illegal_ but not _unknown_ -- We 
would need more conditions to distinguish both cases (introducing `switch()` 
would be helpful for this case). Thoughts?
   
   I guess this applied to other places in the code, too. I am happy to update 
all. Was just hesitant.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10084) System test failure: StreamsEosTest.test_failure_and_recovery_complex

2020-06-02 Thread John Roesler (Jira)
John Roesler created KAFKA-10084:


 Summary: System test failure: 
StreamsEosTest.test_failure_and_recovery_complex
 Key: KAFKA-10084
 URL: https://issues.apache.org/jira/browse/KAFKA-10084
 Project: Kafka
  Issue Type: Task
Reporter: John Roesler
Assignee: John Roesler


The test failed with message:
{code:java}
RemoteCommandError: ubuntu@worker14: Command 'grep ALL-RECORDS-DELIVERED 
/mnt/streams/streams.stdout' returned non-zero exit status 1.{code}
And I found in the verifier's stderr:
{code:java}
java.lang.IllegalStateException: Offset for partition echo-1 is larger than 
topic endOffset: 2422 > 2421
at 
org.apache.kafka.streams.tests.EosTestDriver.verifyAllTransactionFinished(EosTestDriver.java:604)
at 
org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:184)
at 
org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:82){code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10084) System test failure: StreamsEosTest.test_failure_and_recovery_complex

2020-06-02 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10084:
-
Affects Version/s: 2.7.0

> System test failure: StreamsEosTest.test_failure_and_recovery_complex
> -
>
> Key: KAFKA-10084
> URL: https://issues.apache.org/jira/browse/KAFKA-10084
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 2.6.0, 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.6.0, 2.7.0
>
>
> The test failed with message:
> {code:java}
> RemoteCommandError: ubuntu@worker14: Command 'grep ALL-RECORDS-DELIVERED 
> /mnt/streams/streams.stdout' returned non-zero exit status 1.{code}
> And I found in the verifier's stderr:
> {code:java}
> java.lang.IllegalStateException: Offset for partition echo-1 is larger than 
> topic endOffset: 2422 > 2421
>   at 
> org.apache.kafka.streams.tests.EosTestDriver.verifyAllTransactionFinished(EosTestDriver.java:604)
>   at 
> org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:184)
>   at 
> org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:82){code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10084) System test failure: StreamsEosTest.test_failure_and_recovery_complex

2020-06-02 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10084:
-
Fix Version/s: 2.6.0
   2.7.0

> System test failure: StreamsEosTest.test_failure_and_recovery_complex
> -
>
> Key: KAFKA-10084
> URL: https://issues.apache.org/jira/browse/KAFKA-10084
> Project: Kafka
>  Issue Type: Task
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.6.0, 2.7.0
>
>
> The test failed with message:
> {code:java}
> RemoteCommandError: ubuntu@worker14: Command 'grep ALL-RECORDS-DELIVERED 
> /mnt/streams/streams.stdout' returned non-zero exit status 1.{code}
> And I found in the verifier's stderr:
> {code:java}
> java.lang.IllegalStateException: Offset for partition echo-1 is larger than 
> topic endOffset: 2422 > 2421
>   at 
> org.apache.kafka.streams.tests.EosTestDriver.verifyAllTransactionFinished(EosTestDriver.java:604)
>   at 
> org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:184)
>   at 
> org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:82){code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10084) System test failure: StreamsEosTest.test_failure_and_recovery_complex

2020-06-02 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10084:
-
Affects Version/s: 2.6.0

> System test failure: StreamsEosTest.test_failure_and_recovery_complex
> -
>
> Key: KAFKA-10084
> URL: https://issues.apache.org/jira/browse/KAFKA-10084
> Project: Kafka
>  Issue Type: Task
>Affects Versions: 2.6.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.6.0, 2.7.0
>
>
> The test failed with message:
> {code:java}
> RemoteCommandError: ubuntu@worker14: Command 'grep ALL-RECORDS-DELIVERED 
> /mnt/streams/streams.stdout' returned non-zero exit status 1.{code}
> And I found in the verifier's stderr:
> {code:java}
> java.lang.IllegalStateException: Offset for partition echo-1 is larger than 
> topic endOffset: 2422 > 2421
>   at 
> org.apache.kafka.streams.tests.EosTestDriver.verifyAllTransactionFinished(EosTestDriver.java:604)
>   at 
> org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:184)
>   at 
> org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:82){code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management

2020-06-02 Thread GitBox


abbccdda commented on a change in pull request #8776:
URL: https://github.com/apache/kafka/pull/8776#discussion_r434107127



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -1799,8 +1799,8 @@ public void shouldThrowIfClosingOnIllegalState() {
 task.closeClean(checkpoint);
 
 // close call are not idempotent since we are already in closed

Review comment:
   nit: call -> calls

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##
@@ -247,6 +247,9 @@ private void close(final boolean clean) {
 "state manager close",
 log
 );
+} else if (state() == State.CLOSED) {
+log.trace("Skip closing since state is {}", state());

Review comment:
   We could just say `Skip closing since state is closed` here

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -493,28 +542,45 @@ public void closeAndRecycleState() {
 private Map prepareClose(final boolean clean) {
 final Map checkpoint;
 
-if (state() == State.CREATED) {
-// the task is created and not initialized, just re-write the 
checkpoint file
-checkpoint = Collections.emptyMap();
-} else if (state() == State.RUNNING) {
-closeTopology(clean);
+switch (state()) {
+case CREATED:
+// the task is created and not initialized, just re-write the 
checkpoint file
+checkpoint = Collections.emptyMap();
 
-if (clean) {
-stateMgr.flush();
-recordCollector.flush();
-checkpoint = checkpointableOffsets();
-} else {
+break;
+
+case RUNNING:
+closeTopology(clean);
+
+if (clean) {
+stateMgr.flush();
+recordCollector.flush();
+checkpoint = checkpointableOffsets();
+} else {
+checkpoint = null; // `null` indicates to not write a 
checkpoint
+executeAndMaybeSwallow(false, stateMgr::flush, "state 
manager flush", log);
+}
+
+break;
+
+case RESTORING:
+executeAndMaybeSwallow(clean, stateMgr::flush, "state manager 
flush", log);
+checkpoint = Collections.emptyMap();
+
+break;
+
+case SUSPENDED:
+// if `SUSPENDED` do not need to checkpoint, since when 
suspending we've already committed the state
 checkpoint = null; // `null` indicates to not write a 
checkpoint
-executeAndMaybeSwallow(false, stateMgr::flush, "state manager 
flush", log);
-}
-} else if (state() == State.RESTORING) {
-executeAndMaybeSwallow(clean, stateMgr::flush, "state manager 
flush", log);
-checkpoint = Collections.emptyMap();
-} else if (state() == State.SUSPENDED) {
-// if `SUSPENDED` do not need to checkpoint, since when suspending 
we've already committed the state
-checkpoint = null; // `null` indicates to not write a checkpoint
-} else {
-throw new IllegalStateException("Illegal state " + state() + " 
while prepare closing active task " + id);
+
+break;
+case CLOSED:

Review comment:
   Could we merge the case `CLOSED` and `CREATED`? Also could you elaborate 
why we do empty checkpoint map instead of null?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final 
Map changelogEntry : 
changelogOffsets.entrySet()) {
 final long offset = changelogEntry.getValue();
 
-offsetSum += offset;
-if (offsetSum < 0) {
-log.warn("Sum of changelog offsets for task {} overflowed, 
pinning to Long.MAX_VALUE", id);
-return Long.MAX_VALUE;
+if (offset == Task.LATEST_OFFSET) {

Review comment:
   Should we also check `task.isActive` here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10085) Compute lag correctly for optimized source changelogs

2020-06-02 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10085:
---

 Summary: Compute lag correctly for optimized source changelogs
 Key: KAFKA-10085
 URL: https://issues.apache.org/jira/browse/KAFKA-10085
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Sophie Blee-Goldman
Assignee: Sophie Blee-Goldman
 Fix For: 2.6.0


During KIP-441 we originally decided to leave the special handling of optimized 
source changelogs as a potential future improvement, since over-estimating lag 
was acceptable.

 

But as always things have changed during the course of implementing this KIP, 
and the algorithm we ended up with requires accurate computation of lag. We 
should branch the lag computation in the assignor and use the correct end 
offset sum when computing lag



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   3   >