[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management
mjsax commented on a change in pull request #8776: URL: https://github.com/apache/kafka/pull/8776#discussion_r433660276 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -390,19 +387,17 @@ boolean tryToCompleteRestoration() { final List restoringTasks = new LinkedList<>(); for (final Task task : tasks.values()) { -if (task.state() == CREATED) { -try { -task.initializeIfNeeded(); -} catch (final LockException | TimeoutException e) { -// it is possible that if there are multiple threads within the instance that one thread -// trying to grab the task from the other, while the other has not released the lock since -// it did not participate in the rebalance. In this case we can just retry in the next iteration -log.debug("Could not initialize {} due to the following exception; will retry", task.id(), e); -allRunning = false; -} +try { +task.initializeIfNeeded(); +} catch (final LockException | TimeoutException e) { +// it is possible that if there are multiple threads within the instance that one thread +// trying to grab the task from the other, while the other has not released the lock since +// it did not participate in the rebalance. In this case we can just retry in the next iteration +log.debug("Could not initialize {} due to the following exception; will retry", task.id(), e); +allRunning = false; } -if (task.state() == RESTORING) { +if (task.isActive()) { Review comment: StandbyTask are never in `RESTORING` state. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management
mjsax commented on a change in pull request #8776: URL: https://github.com/apache/kafka/pull/8776#discussion_r433660588 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -529,11 +524,7 @@ void handleLostAll() { for (final TaskId id : lockedTaskDirectories) { final Task task = tasks.get(id); if (task != null) { -if (task.isActive() && task.state() == RUNNING) { -taskOffsetSums.put(id, Task.LATEST_OFFSET); -} else { -taskOffsetSums.put(id, sumOfChangelogOffsets(id, task.changelogOffsets())); -} +taskOffsetSums.put(id, sumOfChangelogOffsets(id, task.changelogOffsets())); Review comment: Make TM agnostic to task state -- putting some more logic into `sumOfChangelogOffsets` to make this work This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management
mjsax commented on a change in pull request #8776: URL: https://github.com/apache/kafka/pull/8776#discussion_r433660588 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -529,11 +524,7 @@ void handleLostAll() { for (final TaskId id : lockedTaskDirectories) { final Task task = tasks.get(id); if (task != null) { -if (task.isActive() && task.state() == RUNNING) { -taskOffsetSums.put(id, Task.LATEST_OFFSET); -} else { -taskOffsetSums.put(id, sumOfChangelogOffsets(id, task.changelogOffsets())); -} +taskOffsetSums.put(id, sumOfChangelogOffsets(id, task.changelogOffsets())); Review comment: Make TM agnostic to task state -- putting some more logic into `sumOfChangelogOffsets` to make this work -- note that `task.changelogOffsets()` set offsets to `LATEST_OFFSET` for `StreamsTasks` that are `RUNNING`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management
mjsax commented on a change in pull request #8776: URL: https://github.com/apache/kafka/pull/8776#discussion_r433661972 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map changelogEntry : changelogOffsets.entrySet()) { final long offset = changelogEntry.getValue(); -offsetSum += offset; -if (offsetSum < 0) { -log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id); -return Long.MAX_VALUE; +if (offset == Task.LATEST_OFFSET) { Review comment: If an active tasks is `RUNNING`, the offsets are set to `LATEST_OFFSET` in `task.changelogOffsets()` that is passed as parameter. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management
mjsax commented on a change in pull request #8776: URL: https://github.com/apache/kafka/pull/8776#discussion_r433662342 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java ## @@ -2637,11 +2646,17 @@ private File getCheckpointFile(final TaskId task) { public void initializeIfNeeded() { if (state() == State.CREATED) { transitionTo(State.RESTORING); +if (!active) { +transitionTo(State.RUNNING); Review comment: A "standby" must transit to `RUNNING` here (cf `StandbyTask`) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management
mjsax commented on a change in pull request #8776: URL: https://github.com/apache/kafka/pull/8776#discussion_r433662426 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java ## @@ -2637,11 +2646,17 @@ private File getCheckpointFile(final TaskId task) { public void initializeIfNeeded() { if (state() == State.CREATED) { transitionTo(State.RESTORING); +if (!active) { +transitionTo(State.RUNNING); +} } } @Override public void completeRestoration() { +if (state() == State.RUNNING) { Review comment: Must be idempotent. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-7538) Improve locking model used to update ISRs and HW
[ https://issues.apache.org/jira/browse/KAFKA-7538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17123415#comment-17123415 ] Viktor Somogyi-Vass commented on KAFKA-7538: [~rsivaram] has subcase 1 and 3 been fixed since resolving this jira or is it still pending? > Improve locking model used to update ISRs and HW > > > Key: KAFKA-7538 > URL: https://issues.apache.org/jira/browse/KAFKA-7538 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.1.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.5.0 > > > We currently use a ReadWriteLock in Partition to update ISRs and high water > mark for the partition. This can result in severe lock contention if there > are multiple producers writing a large amount of data into a single partition. > The current locking model is: > # read lock while appending to log on every Produce request on the request > handler thread > # write lock on leader change, updating ISRs etc. on request handler or > scheduler thread > # write lock on every replica fetch request to check if ISRs need to be > updated and to update HW and ISR on the request handler thread > 2) is infrequent, but 1) and 3) may be frequent and can result in lock > contention. If there are lots of produce requests to a partition from > multiple processes, on the leader broker we may see: > # one slow log append locks up one request thread for that produce while > holding onto the read lock > # (replicationFactor-1) request threads can be blocked waiting for write > lock to process replica fetch request > # potentially several other request threads processing Produce may be queued > up to acquire read lock because of the waiting writers. > In a thread dump with this issue, we noticed several request threads blocked > waiting for write, possibly to due to replication fetch retries. > > Possible fixes: > # Process `Partition#maybeExpandIsr` on a single scheduler thread similar to > `Partition#maybeShrinkIsr` so that only a single thread is blocked on the > write lock. But this will delay updating ISRs and HW. > # Change locking in `Partition#maybeExpandIsr` so that only read lock is > acquired to check if ISR needs updating and write lock is acquired only to > update ISRs. Also use a different lock for updating HW (perhaps just the > Partition object lock) so that typical replica fetch requests complete > without acquiring Partition write lock on the request handler thread. > I will submit a PR for 2) , but other suggestions to fix this are welcome. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10082) fix failed kafka.api.PlaintextConsumerTest.testMultiConsumerStickyAssignment
Luke Chen created KAFKA-10082: - Summary: fix failed kafka.api.PlaintextConsumerTest.testMultiConsumerStickyAssignment Key: KAFKA-10082 URL: https://issues.apache.org/jira/browse/KAFKA-10082 Project: Kafka Issue Type: Bug Reporter: Luke Chen Assignee: Luke Chen [https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/153/log/?start=0] [https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk8/runs/4596/log/?start=0] [https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk11/runs/1523/log/?start=0] kafka.api.PlaintextConsumerTest > testMultiConsumerStickyAssignment FAILED java.lang.AssertionError: Expected only two topic partitions that have switched to other consumers. expected:<9> but was:<14> at org.junit.Assert.fail(Assert.java:89) at org.junit.Assert.failNotEquals(Assert.java:835) at org.junit.Assert.assertEquals(Assert.java:647) at kafka.api.PlaintextConsumerTest.testMultiConsumerStickyAssignment(PlaintextConsumerTest.scala:929) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon opened a new pull request #8777: KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment
showuon opened a new pull request #8777: URL: https://github.com/apache/kafka/pull/8777 Fix the failed `testMultiConsumerStickyAssignment` by modify the logic error in `allSubscriptionsEqual` method. We will create the `consumerToOwnedPartitions` to keep the set of previously owned partitions encoded in the Subscription. It's our basis to do the reassignment. In the `allSubscriptionsEqual`, we'll get the member generation of the subscription, and remove all previously owned partitions as invalid if the current generation is higher. However, the logic before my fix, will remove the current highest member out of the `consumerToOwnedPartitions`. Fix this logic error. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #8777: KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment
showuon commented on pull request #8777: URL: https://github.com/apache/kafka/pull/8777#issuecomment-637344872 @ableegoldman , could you check this PR to fix the failed tests? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #8630: KAFKA-9969: Exclude ConnectorClientConfigRequest from class loading isolation
kkonstantine commented on a change in pull request #8630: URL: https://github.com/apache/kafka/pull/8630#discussion_r433640144 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java ## @@ -114,68 +90,264 @@ public void testConnectFrameworkClasses() { assertFalse(PluginUtils.shouldLoadInIsolation( "org.apache.kafka.clients.admin.KafkaAdminClient") ); -assertFalse(PluginUtils.shouldLoadInIsolation( -"org.apache.kafka.connect.rest.ConnectRestExtension") -); } @Test -public void testAllowedConnectFrameworkClasses() { - assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.transforms.")); -assertTrue(PluginUtils.shouldLoadInIsolation( -"org.apache.kafka.connect.transforms.ExtractField") -); -assertTrue(PluginUtils.shouldLoadInIsolation( -"org.apache.kafka.connect.transforms.ExtractField$Key") -); - assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.json.")); -assertTrue(PluginUtils.shouldLoadInIsolation( -"org.apache.kafka.connect.json.JsonConverter") -); -assertTrue(PluginUtils.shouldLoadInIsolation( -"org.apache.kafka.connect.json.JsonConverter$21") -); - assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.file.")); -assertTrue(PluginUtils.shouldLoadInIsolation( -"org.apache.kafka.connect.file.FileStreamSourceTask") -); -assertTrue(PluginUtils.shouldLoadInIsolation( -"org.apache.kafka.connect.file.FileStreamSinkConnector") -); +public void testConnectApiClasses() { +String[] apiClasses = new String[] { Review comment: nit: do you mind using `List` and `Arrays.asList(...)`? I don't think array declaration is better if the result is not going to be used as an array. Also, won't work if you try to reinitialize a declared variable. ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginUtilsTest.java ## @@ -114,68 +90,264 @@ public void testConnectFrameworkClasses() { assertFalse(PluginUtils.shouldLoadInIsolation( "org.apache.kafka.clients.admin.KafkaAdminClient") ); -assertFalse(PluginUtils.shouldLoadInIsolation( -"org.apache.kafka.connect.rest.ConnectRestExtension") -); } @Test -public void testAllowedConnectFrameworkClasses() { - assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.transforms.")); -assertTrue(PluginUtils.shouldLoadInIsolation( -"org.apache.kafka.connect.transforms.ExtractField") -); -assertTrue(PluginUtils.shouldLoadInIsolation( -"org.apache.kafka.connect.transforms.ExtractField$Key") -); - assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.json.")); -assertTrue(PluginUtils.shouldLoadInIsolation( -"org.apache.kafka.connect.json.JsonConverter") -); -assertTrue(PluginUtils.shouldLoadInIsolation( -"org.apache.kafka.connect.json.JsonConverter$21") -); - assertTrue(PluginUtils.shouldLoadInIsolation("org.apache.kafka.connect.file.")); -assertTrue(PluginUtils.shouldLoadInIsolation( -"org.apache.kafka.connect.file.FileStreamSourceTask") -); -assertTrue(PluginUtils.shouldLoadInIsolation( -"org.apache.kafka.connect.file.FileStreamSinkConnector") -); +public void testConnectApiClasses() { +String[] apiClasses = new String[] { +// Enumerate all packages and classes +"org.apache.kafka.connect.", +"org.apache.kafka.connect.components.", +"org.apache.kafka.connect.components.Versioned", +//"org.apache.kafka.connect.connector.policy.", isolated by default + "org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy", + "org.apache.kafka.connect.connector.policy.ConnectorClientConfigRequest", +"org.apache.kafka.connect.connector.", +"org.apache.kafka.connect.connector.Connector", +"org.apache.kafka.connect.connector.ConnectorContext", +"org.apache.kafka.connect.connector.ConnectRecord", +"org.apache.kafka.connect.connector.Task", +"org.apache.kafka.connect.data.", +"org.apache.kafka.connect.data.ConnectSchema", +"org.apache.kafka.connect.data.Date", +"org.apache.kafka.connect.data.Decimal", +"org.apache.kafka.connect.data.Field", +"org.apache.kafka.connect.data.Schema", +"org.apache.kafka.connect.data.SchemaAndValue", +
[GitHub] [kafka] d8tltanc edited a comment on pull request #8421: KAFKA-9800: [KIP-580] Admin Client Exponential Backoff Implementation
d8tltanc edited a comment on pull request #8421: URL: https://github.com/apache/kafka/pull/8421#issuecomment-637300554 @skaundinya15 @ijuma @abbccdda Thanks for all the feedback and comments. This patch was made when I was new to Kafka. It's kind of naive to me at this time as I gained more insights into Kafka. Let me talk about two of my major concerns and thoughts about implementing the universal client exponential backoff. **AdminClient logic redundant** NetworkClient has request timeout handlers. Producer / Consumer are using NetworkClient to help handle timeout but AdminClient doesn’t. The reason, to my understanding, is that AdminClient is implementing the per-request timeout. For example, 1. Wrapping the request builder into a new class `Call`, (the construction lambda adds tons of lines into the AdminClient.java, which should probably have been living in each AbstractRequest implementation classes files) 2. Re-writing the request queues for different request status, while normal clients are fully using the NetworkClient. After we add support to the per-request timeout to all clients, the AdminClient per-request timeout demand won’t be special anymore. Thus, the code for supporting the per-request timeout in AdminClient is not useful anymore and might be removed. Are we considering refactoring the AdminClient further and remove all the redundant logic which should have belonged to the networking layer and the AbstractRequest implementation classes? **Flexible backoff modes** Let's analyze the request backoff demands of all the types of clients at this point. In my opinion, there are simply two: 1. Requests do not need exponential backoff. These requests need to be sent ASAP to avoid dataflow performance degradation, such as the `ProduceRequest` and its related/preceding metadata requests. 2. Request do need exponential backoff. These requests are “second-class citizens” and can be throttled to avoid request storms on the broker side. Such as metadata related requests in AdminClient. Now the question comes. Even when two requests are of the same request type, one may have to get sent ASAP while the other one may wait, depending on the use case. We need to think deeper about how to make a classification. But the implementation would be simple. We can utilize the existing builder pattern AbstractRequest and build the request flexibly upon a given retry_backoff mode. For example, 1. AbstractRequest.Builder will interact with a new abstract class specifying the retry_backoff option, static or exponential. 2. AbstractRequest will have some new interfaces controlling the backoff. Then, we can control if the request should have a static backoff or an exponential backoff when we construct each implementation instance of AbstractRequest.Builder. I'll include more details in the Jira ticket and rewrite this PR. Before we talk more about the code details and start the new implementation, please let me know what you think about the AdminClient refactor and static/exponential retry_backoff classification rule. As @abbccdda suggests, let's re-direct our further discussion to [Jira](https://issues.apache.org/jira/browse/KAFKA-9800) Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation
[ https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17123377#comment-17123377 ] Cheng Tan edited comment on KAFKA-9800 at 6/2/20, 7:53 AM: --- Recap the discussion in Github. We want to implement a per-request backoff for all types of clients. Let me talk about two of my major concerns and thoughts about implementing the universal client exponential backoff. *AdminClient logic redundant* NetworkClient has request timeout handlers. Producer / Consumer are using NetworkClient to help handle timeout but AdminClient doesn’t. The reason, to my understanding, is that AdminClient is implementing the per-request timeout. For example, # Wrapping the request builder into a new class {{Call}}, (the construction lambda adds tons of lines into the AdminClient.java, which should probably have been living in each AbstractRequest implementation classes files) # Re-writing the request queues for different request status, while normal clients are fully using the NetworkClient. After we add support to the per-request timeout to all clients, the AdminClient per-request timeout demand won’t be special anymore. Thus, the code for supporting the per-request timeout in AdminClient is not useful anymore and might be removed. Are we considering refactoring the AdminClient further and remove all the redundant logic which should have belonged to the networking layer and the AbstractRequest implementation classes? *Flexible backoff modes* Let's analyze the request backoff demands of all the types of clients at this point. In my opinion, there are simply two: # Requests do not need exponential backoff. These requests need to be sent ASAP to avoid dataflow performance degradation, such as the {{ProduceRequest}} and its related/preceding metadata requests. # Request do need exponential backoff. These requests are “second-class citizens” and can be throttled to avoid request storms on the broker side. Such as metadata related requests in AdminClient. Now the question comes. Even when two requests are of the same request type, one may have to get sent ASAP while the other one may wait, depending on the use case. We need to think deeper about how to make a classification. But the implementation would be simple. We can utilize the existing builder pattern AbstractRequest and build the request flexibly upon a given retry_backoff mode. For example, # AbstractRequest.Builder will interact with a new abstract class specifying the retry_backoff option, static or exponential. # AbstractRequest will have some new interfaces controlling the backoff. Then, we can control if the request should have a static backoff or an exponential backoff when we construct each implementation instance of AbstractRequest.Builder. I'll include more details in the Jira ticket and rewrite this PR. Before we talk more about the code details and start the new implementation, please let me know what you think about the AdminClient refactor and static/exponential retry_backoff classification rule. was (Author: d8tltanc): Recap the discussion in Github. We want to implement a per-request backoff for all types of clients. Let me talk about two of my major concerns and thoughts about implementing the universal client exponential backoff. *AdminClient logic redundant* The main request flow difference btw AdminClient and normal clients (e.g. Producer and Consumer) would be that AdminClient wants to have a per request timeout while normal clients is okay with a static default timeout. Thus, AdminClient rewrote a quite amount of NetworkClient's functionality. For example, # Wrapping the request builder into a new class {{Call}}, (the construction lambda adds tons of lines into the AdminClient.java, which should probably have been living in each AbstractRequest implementation classes files) # Re-writing the request queues for different request status, while normal clients are fully using the NetworkClient. These logics will become redundant after we support exponential backoff in NetworkClient for all types of clients. Are we considering refactoring the AdminClient further and remove all the redundant logic which should have belonged to the networking layer and the AbstractRequest implementation classes? *Flexible backoff modes* Let's analyze the request backoff demands of all the types of clients at this point. In my opinion, there are simply two: # Requests do not need exponential backoff. These requests need to be sent ASAP to avoid dataflow performance degradation, such as the {{ProduceRequest}} and its related/preceding metadata requests. # Request do need exponential backoff. These requests are “second-class citizens” and can be throttled to avoid request storms on the broker side. Such as metadata related requests in AdminClient. Now the qu
[GitHub] [kafka] dajac commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…
dajac commented on a change in pull request #8768: URL: https://github.com/apache/kafka/pull/8768#discussion_r433696374 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1287,15 +1309,97 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { private def maxListenerConnections(listenerName: ListenerName): Int = maxConnectionsPerListener.get(listenerName).map(_.maxConnections).getOrElse(Int.MaxValue) + /** + * Calculates the delay needed to bring the observed connection creation rate to listener-level limit or to broker-wide + * limit, whichever the longest. The delay is capped to the quota window size defined by QuotaWindowSizeSecondsProp + * + * @param listenerName listener for which calculate the delay + * @param timeMs current time in milliseconds + * @return delay in milliseconds + */ + private def recordConnectionAndGetThrottleTimeMs(listenerName: ListenerName, timeMs: Long): Long = { +val listenerThrottleTimeMs = maxConnectionsPerListener + .get(listenerName) + .map(listenerQuota => recordAndGetThrottleTimeMs(listenerQuota.connectionRateSensor, timeMs)) + .getOrElse(0) + +if (protectedListener(listenerName)) { + listenerThrottleTimeMs +} else { + val brokerThrottleTimeMs = recordAndGetThrottleTimeMs(brokerConnectionRateSensor, timeMs) + val throttleTimeMs = math.max(brokerThrottleTimeMs, listenerThrottleTimeMs) Review comment: nit: This variable is not really needed. ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1306,18 +1410,26 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { val value = maxConnections(configs) if (value <= 0) throw new ConfigException("Invalid max.connections $listenerMax") + + val rate = maxConnectionCreationRate(configs) + if (rate <= 0) +throw new ConfigException(s"Invalid ${KafkaConfig.MaxConnectionCreationRateProp} $rate") } override def reconfigure(configs: util.Map[String, _]): Unit = { lock.synchronized { _maxConnections = maxConnections(configs) +updateConnectionRateQuota(maxConnectionCreationRate(configs), Some(listener.value())) Review comment: nit: `()` can be removed after `value`. ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1256,11 +1272,17 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { private def waitForConnectionSlot(listenerName: ListenerName, acceptorBlockedPercentMeter: com.yammer.metrics.core.Meter): Unit = { counts.synchronized { - if (!connectionSlotAvailable(listenerName)) { + val startTimeMs = time.milliseconds() + val throttleTimeMs = math.max(recordConnectionAndGetThrottleTimeMs(listenerName, startTimeMs), 0) + + if (throttleTimeMs > 0 || !connectionSlotAvailable(listenerName)) { val startNs = time.nanoseconds +val endThrottleTimeMs = startTimeMs + throttleTimeMs +var remainingThrottleTimeMs = throttleTimeMs do { - counts.wait() -} while (!connectionSlotAvailable(listenerName)) + counts.wait(remainingThrottleTimeMs) Review comment: A thread waiting here will be notified when a connection is closed (when `dec` is called). As connections in AK are long lived, couldn't we end up in a case where a connection is throttled for a longer period than the computed `trottleTimeMs` if no connection is closed in between? ## File path: clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java ## @@ -161,6 +162,9 @@ private KafkaMbean removeAttribute(KafkaMetric metric, String mBeanName) { private String addAttribute(KafkaMetric metric) { try { MetricName metricName = metric.metricName(); +if (metricName.tags().containsKey(DO_NOT_REPORT_TAG)) { +return null; +} Review comment: I am not convinced by this. The main issue being that other reporters will report the metric. If we really want to not report a metric, I think that we need a solution which works for all reporters. Could you perhaps elaborate more on the need here? I can think of the following alternatives: * add a flag to the sensor to indicate if it must be reported or not. * don't rely on metrics to create/store the sensor but have a local reference. ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -1306,18 +1410,26 @@ class ConnectionQuotas(config: KafkaConfig, time: Time) extends Logging { val value = maxConnections(configs) if (value <= 0) throw new ConfigException("Invalid max.connections $listenerMax") + + val rate = maxConnectionCreationRate(configs) + if (rate <= 0) +throw new Config
[GitHub] [kafka] chia7712 commented on a change in pull request #8755: KAFKA-10069 The user-defined "predicate" and "negate" are not removed…
chia7712 commented on a change in pull request #8755: URL: https://github.com/apache/kafka/pull/8755#discussion_r433724296 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java ## @@ -434,5 +436,52 @@ public void configure(Map configs) { } } +@Test +public void testEnrichedConfigDef() { +String alias = "hdt"; +String prefix = ConnectorConfig.TRANSFORMS_CONFIG + "." + alias + "."; +Map props = new HashMap<>(); +props.put(ConnectorConfig.TRANSFORMS_CONFIG, alias); +props.put(prefix + "type", HasDuplicateConfigTransformation.class.getName()); +ConfigDef def = ConnectorConfig.enrich(MOCK_PLUGINS, new ConfigDef(), props, false); +assertEnrichedConfigDef(def, prefix, HasDuplicateConfigTransformation.MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN); +assertEnrichedConfigDef(def, prefix, PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.STRING); +assertEnrichedConfigDef(def, prefix, PredicatedTransformation.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN); +} + +private static void assertEnrichedConfigDef(ConfigDef def, String prefix, String keyName, ConfigDef.Type expectedType) { Review comment: @kkonstantine Please take a look at this new name This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas
[ https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17123542#comment-17123542 ] Mateusz Jadczyk commented on KAFKA-9891: [~bchen225242] yes, it should be materialized. Duplicate key will be however performed only once for keyOne (during the very first processing), as this is thrown only for the poisonKey: {code:java} throw new IllegalStateException("Throw on " + poisonKey + " to trigger rebalance"); {code} > Invalid state store content after task migration with exactly_once and > standby replicas > --- > > Key: KAFKA-9891 > URL: https://issues.apache.org/jira/browse/KAFKA-9891 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.1, 2.4.1 >Reporter: Mateusz Jadczyk >Assignee: Boyang Chen >Priority: Blocker > Attachments: failedtest, failedtest2, failedtest3, failedtest3_bug, > state_store_operations.txt, tasks_assignment.txt > > > We have a simple command id deduplication mechanism (very similar to the one > from Kafka Streams examples) based on Kafka Streams State Stores. It stores > command ids from the past hour in _persistentWindowStore_. We encountered a > problem with the store if there's an exception thrown later in that topology. > We run 3 nodes using docker, each with multiple threads set for this > particular Streams Application. > The business flow is as follows (performed within a single subtopology): > * a valid command is sent with command id > (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active > task 1_2. First node in the topology analyses if this is a duplicate by > checking in the state store (_COMMAND_ID_STORE_), if not puts the command id > in the state store and processes the command properly. > * an invalid command is sent with the same key but new command id > (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the > duplicated command id is performed, it's not a duplicate, command id is put > into the state store. Next node in the topology throws an exception which > causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, > offsets are not committed. I double checked for the changelog topic - > relevant messages are not committed. Therefore, the changelog topic contains > only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and > not the one which caused a failure. > * in the meantime a standby task 1_2 running on NODE 3 replicated > _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local > _COMMAND_ID_STORE_ > * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. > It checks if this command id is a duplicate - no, it isn't - tries to process > the faulty command and throws an exception. Again, transaction aborted, all > looks fine. > * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, > *it is a duplicate!* Even though the transaction has been aborted and the > changelog doesn't contain this command id: > _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._ > > After digging into the Streams logs and some discussion on ([Stack > Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan]) > we concluded it has something to do with checkpoint files. Here are the > detailed logs relevant to checkpoint files. > > {code:java} > NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task > [1_2] Checkpointable offsets read from checkpoint: {} > NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task > [1_2] Restoring state store COMMAND_ID_STORE from changelog topic > Processor-COMMAND_ID_STORE-changelog at checkpoint null > NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] > standby-task [1_2] Checkpointable offsets read from checkpoint: {} > NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] > o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] > o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > /tmp/kafka-streams/Processor/1_2/.checkpoint > NODE_3 2020-
[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas
[ https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17123544#comment-17123544 ] Mateusz Jadczyk commented on KAFKA-9891: The reason we need at least one materialized key is that we then have something on the changelog topic and some checkpoint files are used which mess things up. > Invalid state store content after task migration with exactly_once and > standby replicas > --- > > Key: KAFKA-9891 > URL: https://issues.apache.org/jira/browse/KAFKA-9891 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.1, 2.4.1 >Reporter: Mateusz Jadczyk >Assignee: Boyang Chen >Priority: Blocker > Attachments: failedtest, failedtest2, failedtest3, failedtest3_bug, > state_store_operations.txt, tasks_assignment.txt > > > We have a simple command id deduplication mechanism (very similar to the one > from Kafka Streams examples) based on Kafka Streams State Stores. It stores > command ids from the past hour in _persistentWindowStore_. We encountered a > problem with the store if there's an exception thrown later in that topology. > We run 3 nodes using docker, each with multiple threads set for this > particular Streams Application. > The business flow is as follows (performed within a single subtopology): > * a valid command is sent with command id > (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active > task 1_2. First node in the topology analyses if this is a duplicate by > checking in the state store (_COMMAND_ID_STORE_), if not puts the command id > in the state store and processes the command properly. > * an invalid command is sent with the same key but new command id > (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the > duplicated command id is performed, it's not a duplicate, command id is put > into the state store. Next node in the topology throws an exception which > causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, > offsets are not committed. I double checked for the changelog topic - > relevant messages are not committed. Therefore, the changelog topic contains > only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and > not the one which caused a failure. > * in the meantime a standby task 1_2 running on NODE 3 replicated > _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local > _COMMAND_ID_STORE_ > * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. > It checks if this command id is a duplicate - no, it isn't - tries to process > the faulty command and throws an exception. Again, transaction aborted, all > looks fine. > * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, > *it is a duplicate!* Even though the transaction has been aborted and the > changelog doesn't contain this command id: > _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._ > > After digging into the Streams logs and some discussion on ([Stack > Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan]) > we concluded it has something to do with checkpoint files. Here are the > detailed logs relevant to checkpoint files. > > {code:java} > NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task > [1_2] Checkpointable offsets read from checkpoint: {} > NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task > [1_2] Restoring state store COMMAND_ID_STORE from changelog topic > Processor-COMMAND_ID_STORE-changelog at checkpoint null > NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] > o.a.k.s.p.i.ProcessorStateManager : stream-thread > [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] > standby-task [1_2] Checkpointable offsets read from checkpoint: {} > NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] > o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] > o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file > /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp > /tmp/kafka-streams/Processor/1_2/.checkpoint > NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] > o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file > /tmp/kafka-
[jira] [Commented] (KAFKA-9148) Consider forking RocksDB for Streams
[ https://issues.apache.org/jira/browse/KAFKA-9148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17123597#comment-17123597 ] adam Retter commented on KAFKA-9148: Thanks [~ableegoldman]. I just wanted to be understand further the issues that make it difficult to upgrade RocksDB in Kafaka. I was wondering, is it one or more of the following: 1. RocksDB API has changed and so changes need to made to Kafka. 2. You directly expose the RocksDB API to users of Kafka, therefore users code may also have to change. 3. Not enough resources to work on updating Kafka for a new version of RocksDB 4. Other... > Consider forking RocksDB for Streams > - > > Key: KAFKA-9148 > URL: https://issues.apache.org/jira/browse/KAFKA-9148 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Major > > We recently upgraded our RocksDB dependency to 5.18 for its memory-management > abilities (namely the WriteBufferManager, see KAFKA-8215). Unfortunately, > someone from Flink recently discovered a ~8% [performance > regression|https://github.com/facebook/rocksdb/issues/5774] that exists in > all versions 5.18+ (up through the current newest version, 6.2.2). Flink was > able to react to this by downgrading to 5.17 and [picking the > WriteBufferManage|https://github.com/dataArtisans/frocksdb/pull/4]r to their > fork (fRocksDB). > Due to this and other reasons enumerated below, we should consider also > forking our own RocksDB for Streams. > Pros: > * We can avoid passing sudden breaking changes on to our users, such removal > of methods with no deprecation period (see discussion on KAFKA-8897) > * We can pick whichever version has the best performance for our needs, and > pick over any new features, metrics, etc that we need to use rather than > being forced to upgrade (and breaking user code, introducing regression, etc) > * Support for some architectures does not exist in all RocksDB versions, > making Streams completely unusable for some users until we can upgrade the > rocksdb dependency to one that supports their specific case. It's worth > noting that we've only had [one > user|https://issues.apache.org/jira/browse/KAFKA-9225] hit this so far (that > we know of), and some workarounds have been discussed on the ticket. > * The Java API seems to be a very low priority to the rocksdb folks. > ** They leave out critical functionality, features, and configuration > options that have been in the c++ API for a very long time > ** Those that do make it over often have random gaps in the API such as > setters but no getters (see [rocksdb PR > #5186|https://github.com/facebook/rocksdb/pull/5186]) > ** Others are poorly designed and require too many trips across the JNI, > making otherwise incredibly useful features prohibitively expensive. > *** [|#issuecomment-83145980] [Custom > Comparator|https://github.com/facebook/rocksdb/issues/538#issuecomment-83145980]: > a custom comparator could significantly improve the performance of session > windows. This is trivial to do but given the high performance cost of > crossing the jni, it is currently only practical to use a c++ comparator > *** [Prefix Seek|https://github.com/facebook/rocksdb/issues/6004]: not > currently used by Streams but a commonly requested feature, and may also > allow improved range queries > ** Even when an external contributor develops a solution for poorly > performing Java functionality and helpfully tries to contribute their patch > back to rocksdb, it gets ignored by the rocksdb people ([rocksdb PR > #2283|https://github.com/facebook/rocksdb/pull/2283]) > Cons: > * More work (not to be trivialized, the truth is we don't and can't know how > much extra work this will ultimately be) > Given that we rarely upgrade the Rocks dependency, use only some fraction of > its features, and would need or want to make only minimal changes ourselves, > it seems like we could actually get away with very little extra work by > forking rocksdb. Note that as of this writing the frocksdb repo has only > needed to open 5 PRs on top of the actual rocksdb (two of them trivial). Of > course, the LOE to maintain this will only grow over time, so we should think > carefully about whether and when to start taking on this potential burden. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10075) Kafka client stucks after Kafka-cluster unavailability
[ https://issues.apache.org/jira/browse/KAFKA-10075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17123656#comment-17123656 ] Tom Bentley commented on KAFKA-10075: - Is the JVM in which you're running the client(s) caching DNS lookups? When the brokers get rescheduled on different pods (as can happen during an upgrade) their resolved IPs can change. There's a Java security property (nb, not a system property) which you can use to configure the DNS caching explicitly. See [https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/java-dg-jvm-ttl.html|https://docs.aws.amazon.com/sdk-for-java/v1/developer-guide/java-dg-jvm-ttl.html.] for example. > Kafka client stucks after Kafka-cluster unavailability > -- > > Key: KAFKA-10075 > URL: https://issues.apache.org/jira/browse/KAFKA-10075 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.4.0 > Environment: Kafka v2.3.1 deployed by https://strimzi.io/ to > Kubernetes cluster > openjdk version "1.8.0_242" > OpenJDK Runtime Environment (build 1.8.0_242-b08) > OpenJDK 64-Bit Server VM (build 25.242-b08, mixed mode) >Reporter: Dmitry Mischenko >Priority: Minor > > Several times we got an issue with kafka-client. > What happened: > We have Kafka v2.3.1 deployed by [https://strimzi.io/] to Kubernetes cluster > (Amazon EKS). > # Kafka brokers were unavailable (due to cluster upgrade) and couldn't be > resolved by internal hostnames > {code:java} > 2020-05-28 17:19:50 WARN NetworkClient:962 - [Producer > clientId=change_transformer-postgres_101.public.user_storage-9a89f512-43df-4179-a80f-db74f31ac724-StreamThread-1-producer] > Error connecting to node > data-kafka-dev-kafka-0.data-kafka-dev-kafka-brokers.data-kafka-dev.svc.cluster.local:9092 > (id: -1 rack: null)2020-05-28 17:19:50 WARN NetworkClient:962 - [Producer > clientId=change_transformer-postgres_101.public.user_storage-9a89f512-43df-4179-a80f-db74f31ac724-StreamThread-1-producer] > Error connecting to node > data-kafka-dev-kafka-0.data-kafka-dev-kafka-brokers.data-kafka-dev.svc.cluster.local:9092 > (id: -1 rack: null)at > org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:289)at > org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)at > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:538)at > org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)" > at java.base/java.net.InetAddress.getAllByName(Unknown Source)"at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)" > at java.base/java.net.InetAddress.getAllByName(Unknown Source)"" at > java.base/java.net.InetAddress$CachedAddresses.get(Unknown Source)"at > org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104)at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)at > java.base/java.net.InetAddress.getAllByName0(Unknown Source)at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:444)at > > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)at > > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:843)at > > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)2020-05-28 > 17:19:50 WARN NetworkClient:962 - [Producer > clientId=change_transformer-postgres_101.public.user_storage-9a89f512-43df-4179-a80f-db74f31ac724-StreamThread-1-producer] > Error connecting to node > data-kafka-dev-kafka-1.data-kafka-dev-kafka-brokers.data-kafka-dev.svc.cluster.local:9092 > (id: -2 rack: null)at > org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:955)" > at java.base/java.net.InetAddress$CachedAddresses.get(Unknown Source)"at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363){code} > But after the moment when cluster was repaired, kafka-admin-client couldn't > restore connection and only every 120s was throwing timeout exceptions for a > long time. > > {code:java} > 2020-05-28 17:21:14 INFO StreamThread:219 - stream-thread > [consumer_group-101.public.user_storage-714cfbe7-f34a-466a-97e1-bb145f0e34b7-StreamThread-1] > State transition from CREATED to STARTING > 2020-05-28 17:21:14 WARN ConsumerConfig:355 - The configuration > 'admin.retry.backoff.ms' was supplied but isn't a known config. > 2020-05-28 17:21:14 INFO AppInfoParser:118 - Kafka commit
[GitHub] [kafka] astubbs commented on pull request #8771: MINOR: Add explanation for disabling forwarding from value transformers
astubbs commented on pull request #8771: URL: https://github.com/apache/kafka/pull/8771#issuecomment-637501018 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10083) fix failed testReassignmentWithRandomSubscriptionsAndChanges
Luke Chen created KAFKA-10083: - Summary: fix failed testReassignmentWithRandomSubscriptionsAndChanges Key: KAFKA-10083 URL: https://issues.apache.org/jira/browse/KAFKA-10083 Project: Kafka Issue Type: Bug Reporter: Luke Chen Assignee: Luke Chen [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/815/] It can also locally reproduce this error. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10083) fix failed testReassignmentWithRandomSubscriptionsAndChanges
[ https://issues.apache.org/jira/browse/KAFKA-10083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-10083: -- Description: [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/815/] It can also locally reproduce this error. h3. Error Message java.lang.AssertionError h3. Stacktrace java.lang.AssertionError at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$PartitionMovements.getTheActualPartitionToBeMoved(AbstractStickyAssignor.java:836) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$PartitionMovements.access$100(AbstractStickyAssignor.java:780) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.reassignPartition(AbstractStickyAssignor.java:699) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.reassignPartition(AbstractStickyAssignor.java:689) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.performReassignments(AbstractStickyAssignor.java:661) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.balance(AbstractStickyAssignor.java:597) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.generalAssign(AbstractStickyAssignor.java:352) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assign(AbstractStickyAssignor.java:85) at org.apache.kafka.clients.consumer.CooperativeStickyAssignor.assign(CooperativeStickyAssignor.java:64) at org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.verifyValidityAndBalance(CooperativeStickyAssignorTest.java:68) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest.testReassignmentWithRandomSubscriptionsAndChanges(AbstractStickyAssignorTest.java:654) was: [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/815/] It can also locally reproduce this error. > fix failed testReassignmentWithRandomSubscriptionsAndChanges > > > Key: KAFKA-10083 > URL: https://issues.apache.org/jira/browse/KAFKA-10083 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/815/] > It can also locally reproduce this error. > > h3. Error Message > java.lang.AssertionError > h3. Stacktrace > java.lang.AssertionError at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$PartitionMovements.getTheActualPartitionToBeMoved(AbstractStickyAssignor.java:836) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$PartitionMovements.access$100(AbstractStickyAssignor.java:780) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.reassignPartition(AbstractStickyAssignor.java:699) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.reassignPartition(AbstractStickyAssignor.java:689) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.performReassignments(AbstractStickyAssignor.java:661) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.balance(AbstractStickyAssignor.java:597) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.generalAssign(AbstractStickyAssignor.java:352) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assign(AbstractStickyAssignor.java:85) > at > org.apache.kafka.clients.consumer.CooperativeStickyAssignor.assign(CooperativeStickyAssignor.java:64) > at > org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.verifyValidityAndBalance(CooperativeStickyAssignorTest.java:68) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest.testReassignmentWithRandomSubscriptionsAndChanges(AbstractStickyAssignorTest.java:654) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon opened a new pull request #8778: KAFKA-10083: fix failed testReassignmentWithRandomSubscriptionsAndChanges tests
showuon opened a new pull request #8778: URL: https://github.com/apache/kafka/pull/8778 The failed test is because we changed the class member `partitionMovements` initialization to the class instance created, from initialized when used within `assign` method. This won't have any issue when 1st used the `AbstractStickyAssignor` instance. But if it is used later, the `partitionMovements` will store the old info, and cause this failed tests. Fix it by moving the `partitionMovements` initialization back to `assign` method. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #8778: KAFKA-10083: fix failed testReassignmentWithRandomSubscriptionsAndChanges tests
showuon commented on pull request #8778: URL: https://github.com/apache/kafka/pull/8778#issuecomment-637512220 @ableegoldman , could you review this PR to fix the failed tests? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10082) fix failed kafka.api.PlaintextConsumerTest.testMultiConsumerStickyAssignment
[ https://issues.apache.org/jira/browse/KAFKA-10082?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-10082: -- Reviewer: Sophie Blee-Goldman > fix failed kafka.api.PlaintextConsumerTest.testMultiConsumerStickyAssignment > > > Key: KAFKA-10082 > URL: https://issues.apache.org/jira/browse/KAFKA-10082 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > [https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk14/runs/153/log/?start=0] > [https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk8/runs/4596/log/?start=0] > [https://builds.apache.org/blue/rest/organizations/jenkins/pipelines/kafka-trunk-jdk11/runs/1523/log/?start=0] > > kafka.api.PlaintextConsumerTest > testMultiConsumerStickyAssignment FAILED > java.lang.AssertionError: Expected only two topic partitions that have > switched to other consumers. expected:<9> but was:<14> > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.failNotEquals(Assert.java:835) > at org.junit.Assert.assertEquals(Assert.java:647) > at > kafka.api.PlaintextConsumerTest.testMultiConsumerStickyAssignment(PlaintextConsumerTest.scala:929) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10083) fix failed testReassignmentWithRandomSubscriptionsAndChanges
[ https://issues.apache.org/jira/browse/KAFKA-10083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-10083: -- Description: [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/815/] h3. Error Message java.lang.AssertionError h3. Stacktrace java.lang.AssertionError at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$PartitionMovements.getTheActualPartitionToBeMoved(AbstractStickyAssignor.java:836) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$PartitionMovements.access$100(AbstractStickyAssignor.java:780) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.reassignPartition(AbstractStickyAssignor.java:699) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.reassignPartition(AbstractStickyAssignor.java:689) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.performReassignments(AbstractStickyAssignor.java:661) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.balance(AbstractStickyAssignor.java:597) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.generalAssign(AbstractStickyAssignor.java:352) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assign(AbstractStickyAssignor.java:85) at org.apache.kafka.clients.consumer.CooperativeStickyAssignor.assign(CooperativeStickyAssignor.java:64) at org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.verifyValidityAndBalance(CooperativeStickyAssignorTest.java:68) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest.testReassignmentWithRandomSubscriptionsAndChanges(AbstractStickyAssignorTest.java:654) was: [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/815/] It can also locally reproduce this error. h3. Error Message java.lang.AssertionError h3. Stacktrace java.lang.AssertionError at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$PartitionMovements.getTheActualPartitionToBeMoved(AbstractStickyAssignor.java:836) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$PartitionMovements.access$100(AbstractStickyAssignor.java:780) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.reassignPartition(AbstractStickyAssignor.java:699) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.reassignPartition(AbstractStickyAssignor.java:689) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.performReassignments(AbstractStickyAssignor.java:661) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.balance(AbstractStickyAssignor.java:597) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.generalAssign(AbstractStickyAssignor.java:352) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assign(AbstractStickyAssignor.java:85) at org.apache.kafka.clients.consumer.CooperativeStickyAssignor.assign(CooperativeStickyAssignor.java:64) at org.apache.kafka.clients.consumer.CooperativeStickyAssignorTest.verifyValidityAndBalance(CooperativeStickyAssignorTest.java:68) at org.apache.kafka.clients.consumer.internals.AbstractStickyAssignorTest.testReassignmentWithRandomSubscriptionsAndChanges(AbstractStickyAssignorTest.java:654) > fix failed testReassignmentWithRandomSubscriptionsAndChanges > > > Key: KAFKA-10083 > URL: https://issues.apache.org/jira/browse/KAFKA-10083 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > > [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/815/] > > h3. Error Message > java.lang.AssertionError > h3. Stacktrace > java.lang.AssertionError at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$PartitionMovements.getTheActualPartitionToBeMoved(AbstractStickyAssignor.java:836) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor$PartitionMovements.access$100(AbstractStickyAssignor.java:780) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.reassignPartition(AbstractStickyAssignor.java:699) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.reassignPartition(AbstractStickyAssignor.java:689) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.performReassignments(AbstractStickyAssignor.java:661) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.balance(AbstractStickyAssignor.java:597) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.generalAssign(AbstractStickyAssignor.java:352) > at > org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.assign(AbstractStickyAssignor.java:85) > at > org.apache.kafka.clients.consum
[GitHub] [kafka] nizhikov commented on pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)
nizhikov commented on pull request #8695: URL: https://github.com/apache/kafka/pull/8695#issuecomment-637515040 @ijuma I found explanation of the test behavior. Full information can be found in the [guide](https://docs.oracle.com/en/java/javase/11/security/java-secure-socket-extension-jsse-reference-guide.html#GUID-4D421910-C36D-40A2-8BA2-7D42CCBED3C6) Please, navigate to the "Send ClientHello Message". You may want to take a look at the "client version" and "supported_versions (43)" fields. The root of the "strange" behavior is the structure of the SSL ClientHello message(quote from tutorial): > **Client version**: For TLS 1.3, this has a fixed value, TLSv1.2; TLS 1.3 uses the extension supported_versions and not this field to negotiate protocol version > ... > **supported_versions**: Lists which versions of TLS the client supports. In particular, if the client > requests TLS 1.3, then the client version field has the value TLSv1.2 and this extension > contains the value TLSv1.3; if the client requests TLS 1.2, then the client version field has the > value TLSv1.2 and this extension either doesn’t exist or contains the value TLSv1.2 but not the value TLSv1.3. This means we can't connect with the following configuration: client: ``` ssl.protocol=TLSv1.2 #this will be used for ClientHello ssl.enabled.protocols=TLSv1.2,TLSv1.3 #TLS v1.3 will be ignored in ClientHello message. ``` Server: ``` ssl.protocol=TLSv1.3 ssl.enabled.protocols=TLSv1.3 # Accept only TLSv1.3 ``` You can see all details of the SSL connection process in the javax.net log. It can be enabled like the following: ``` public SslVersionsTransportLayerTest(List serverProtocols, List clientProtocols) { System.setProperty("javax.net.debug", "ssl:handshake"); //This will turn on the log from jdk SSL system classes. this.serverProtocols = serverProtocols; this.clientProtocols = clientProtocols; } ``` So correct check should be: ``` private boolean isCompatible(List serverProtocols, List clientProtocols) { return serverProtocols.contains(clientProtocols.get(0)) || (clientProtocols.get(0).equals("TLSv1.3") && clientProtocols.contains("TLSv1.2")); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] itantiger commented on pull request #8753: KAFKA-10043:Some parameters will be overwritten which was configured …
itantiger commented on pull request #8753: URL: https://github.com/apache/kafka/pull/8753#issuecomment-637519467 @guozhangwang Can you take a look at this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #8771: MINOR: Add explanation for disabling forwarding from value transformers
bbejeck commented on pull request #8771: URL: https://github.com/apache/kafka/pull/8771#issuecomment-637540041 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)
ijuma commented on a change in pull request #8695: URL: https://github.com/apache/kafka/pull/8695#discussion_r433892269 ## File path: clients/src/test/java/org/apache/kafka/common/network/SslVersionsTransportLayerTest.java ## @@ -117,24 +123,51 @@ public void testTlsDefaults() throws Exception { server.waitForMetric("response", 1); } else { NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED); +server.verifyAuthenticationMetrics(0, 1); } } +/** + * + * The explanation of this check in the structure of the ClientHello SSL message. + * Please, take a look at the https://docs.oracle.com/en/java/javase/11/security/java-secure-socket-extension-jsse-reference-guide.html#GUID-4D421910-C36D-40A2-8BA2-7D42CCBED3C6";>Guide, + * "Send ClientHello Message" section. + * + * > Client version: For TLS 1.3, this has a fixed value, TLSv1.2; TLS 1.3 uses the extension supported_versions and not this field to negotiate protocol version + * ... + * > supported_versions: Lists which versions of TLS the client supports. In particular, if the client + * > requests TLS 1.3, then the client version field has the value TLSv1.2 and this extension + * > contains the value TLSv1.3; if the client requests TLS 1.2, then the client version field has the + * > value TLSv1.2 and this extension either doesn’t exist or contains the value TLSv1.2 but not the value TLSv1.3. + * + * + * This mean that TLSv1.3 client can fallback to TLSv1.2 but TLSv1.2 client can't change protocol to TLSv1.3. + * + * @param serverProtocols Server protocols. + * @param clientProtocols Client protocols. + * @return {@code True} if client should be able to connect to the server. + */ +private boolean isCompatible(List serverProtocols, List clientProtocols) { +return serverProtocols.contains(clientProtocols.get(0)) || +(clientProtocols.get(0).equals("TLSv1.3") && clientProtocols.contains("TLSv1.2")); +} + private static Map getTrustingConfig(CertStores certStores, CertStores peerCertStores, List tlsProtocols) { Map configs = certStores.getTrustingConfig(peerCertStores); configs.putAll(sslConfig(tlsProtocols)); return configs; } -private static Map sslConfig(List tlsServerProtocols) { +private static Map sslConfig(List tlsProtocols) { Map sslConfig = new HashMap<>(); -sslConfig.put(SslConfigs.SSL_PROTOCOL_CONFIG, tlsServerProtocols.get(0)); -sslConfig.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, tlsServerProtocols); +sslConfig.put(SslConfigs.SSL_PROTOCOL_CONFIG, tlsProtocols.get(0)); +sslConfig.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, tlsProtocols); return sslConfig; } private Selector createSelector(Map sslClientConfigs) { Review comment: Should this be called `createClientSelector`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on a change in pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)
nizhikov commented on a change in pull request #8695: URL: https://github.com/apache/kafka/pull/8695#discussion_r433899761 ## File path: clients/src/test/java/org/apache/kafka/common/network/SslVersionsTransportLayerTest.java ## @@ -117,24 +123,51 @@ public void testTlsDefaults() throws Exception { server.waitForMetric("response", 1); } else { NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED); +server.verifyAuthenticationMetrics(0, 1); } } +/** + * + * The explanation of this check in the structure of the ClientHello SSL message. + * Please, take a look at the https://docs.oracle.com/en/java/javase/11/security/java-secure-socket-extension-jsse-reference-guide.html#GUID-4D421910-C36D-40A2-8BA2-7D42CCBED3C6";>Guide, + * "Send ClientHello Message" section. + * + * > Client version: For TLS 1.3, this has a fixed value, TLSv1.2; TLS 1.3 uses the extension supported_versions and not this field to negotiate protocol version + * ... + * > supported_versions: Lists which versions of TLS the client supports. In particular, if the client + * > requests TLS 1.3, then the client version field has the value TLSv1.2 and this extension + * > contains the value TLSv1.3; if the client requests TLS 1.2, then the client version field has the + * > value TLSv1.2 and this extension either doesn’t exist or contains the value TLSv1.2 but not the value TLSv1.3. + * + * + * This mean that TLSv1.3 client can fallback to TLSv1.2 but TLSv1.2 client can't change protocol to TLSv1.3. + * + * @param serverProtocols Server protocols. + * @param clientProtocols Client protocols. + * @return {@code True} if client should be able to connect to the server. + */ +private boolean isCompatible(List serverProtocols, List clientProtocols) { +return serverProtocols.contains(clientProtocols.get(0)) || +(clientProtocols.get(0).equals("TLSv1.3") && clientProtocols.contains("TLSv1.2")); +} + private static Map getTrustingConfig(CertStores certStores, CertStores peerCertStores, List tlsProtocols) { Map configs = certStores.getTrustingConfig(peerCertStores); configs.putAll(sslConfig(tlsProtocols)); return configs; } -private static Map sslConfig(List tlsServerProtocols) { +private static Map sslConfig(List tlsProtocols) { Map sslConfig = new HashMap<>(); -sslConfig.put(SslConfigs.SSL_PROTOCOL_CONFIG, tlsServerProtocols.get(0)); -sslConfig.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, tlsServerProtocols); +sslConfig.put(SslConfigs.SSL_PROTOCOL_CONFIG, tlsProtocols.get(0)); +sslConfig.put(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, tlsProtocols); return sslConfig; } private Selector createSelector(Map sslClientConfigs) { Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)
ijuma commented on a change in pull request #8695: URL: https://github.com/apache/kafka/pull/8695#discussion_r433903099 ## File path: clients/src/test/java/org/apache/kafka/common/network/SslVersionsTransportLayerTest.java ## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.network; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.security.TestSecurityConfig; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.apache.kafka.common.utils.Java; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.test.TestUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Tests for the SSL transport layer. + * Checks different versions of the protocol usage on the server and client. + */ +@RunWith(value = Parameterized.class) +public class SslVersionsTransportLayerTest { +private static final int BUFFER_SIZE = 4 * 1024; +private static final Time TIME = Time.SYSTEM; + +private final List serverProtocols; +private final List clientProtocols; + +@Parameterized.Parameters(name = "tlsServerProtocol={0},tlsClientProtocol={1}") +public static Collection data() { +List values = new ArrayList<>(); + +values.add(new Object[] {Collections.singletonList("TLSv1.2"), Collections.singletonList("TLSv1.2")}); + +if (Java.IS_JAVA11_COMPATIBLE) { +values.add(new Object[] {Collections.singletonList("TLSv1.2"), Collections.singletonList("TLSv1.3")}); +values.add(new Object[] {Collections.singletonList("TLSv1.3"), Collections.singletonList("TLSv1.2")}); +values.add(new Object[] {Collections.singletonList("TLSv1.3"), Collections.singletonList("TLSv1.3")}); +values.add(new Object[] {Collections.singletonList("TLSv1.2"), Arrays.asList("TLSv1.2", "TLSv1.3")}); +values.add(new Object[] {Collections.singletonList("TLSv1.2"), Arrays.asList("TLSv1.3", "TLSv1.2")}); +values.add(new Object[] {Collections.singletonList("TLSv1.3"), Arrays.asList("TLSv1.2", "TLSv1.3")}); +values.add(new Object[] {Collections.singletonList("TLSv1.3"), Arrays.asList("TLSv1.3", "TLSv1.2")}); +values.add(new Object[] {Arrays.asList("TLSv1.3", "TLSv1.2"), Collections.singletonList("TLSv1.3")}); +values.add(new Object[] {Arrays.asList("TLSv1.3", "TLSv1.2"), Collections.singletonList("TLSv1.2")}); +values.add(new Object[] {Arrays.asList("TLSv1.3", "TLSv1.2"), Arrays.asList("TLSv1.2", "TLSv1.3")}); +values.add(new Object[] {Arrays.asList("TLSv1.3", "TLSv1.2"), Arrays.asList("TLSv1.3", "TLSv1.2")}); +values.add(new Object[] {Arrays.asList("TLSv1.2", "TLSv1.3"), Collections.singletonList("TLSv1.3")}); +values.add(new Object[] {Arrays.asList("TLSv1.2", "TLSv1.3"), Collections.singletonList("TLSv1.2")}); +values.add(new Object[] {Arrays.asList("TLSv1.2", "TLSv1.3"), Arrays.asList("TLSv1.2", "TLSv1.3")}); +values.add(new Object[] {Arrays.asList("TLSv1.2", "TLSv1.3"), Arrays.asList("TLSv1.3", "TLSv1.2")}); +} +return values; +} + +/** + * Be aware that you can turn on debug mode for a javax.net.ssl library with the line {@code System.setProperty("javax.net.debug", "ssl:handshake");} + * @param serverProtocols Server protocols. + * @param clientProtocols Client protocols. + */ +public SslVersionsTransportLayerTest(List serverProtocols, List clientProtocols) { +this.serverProtocols = serverProtocols; +this.clientProtocols = clientProtocols; +} + +/** + * Tests that connection success with the default TLS version. + */ +@Test +public void testTlsDefaults() th
[GitHub] [kafka] ijuma commented on pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)
ijuma commented on pull request #8695: URL: https://github.com/apache/kafka/pull/8695#issuecomment-637569220 Oh, one more thing, let's please add an entry to `upgrade.html`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov opened a new pull request #8779: [MINOR] Fixing spotbug fail - removing unused variable.
nizhikov opened a new pull request #8779: URL: https://github.com/apache/kafka/pull/8779 Fixing nit in c6633a157eec1712116d294eb3785a96cba4e331. This commit break spotbug check with the "Dead store to isFreshAssignment in org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor.generalAssign(Map, Map)" ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #8774: KAFKA-10081: Remove an unused local variable to pass spotbugsMain check
chia7712 commented on pull request #8774: URL: https://github.com/apache/kafka/pull/8774#issuecomment-637570083 ```testReassignmentWithRandomSubscriptionsAndChanges``` is traced by #8778 ```testMultiConsumerStickyAssignment``` is traced by #8777 +1 to merge this hotfix :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on pull request #8779: [MINOR] Fixing spotbug fail - removing unused variable.
nizhikov commented on pull request #8779: URL: https://github.com/apache/kafka/pull/8779#issuecomment-637570728 Hello @ableegoldman It looks like your patch breaks spot bug check. I prepared oneliner fix for it. Can you, please, take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)
ijuma commented on pull request #8695: URL: https://github.com/apache/kafka/pull/8695#issuecomment-637573626 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)
nizhikov commented on pull request #8695: URL: https://github.com/apache/kafka/pull/8695#issuecomment-637574517 I think, currently, the trunk is broken with the c6633a157eec1712116d294eb3785a96cba4e331 I prepared oneliner fix for it - #8779 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on a change in pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)
nizhikov commented on a change in pull request #8695: URL: https://github.com/apache/kafka/pull/8695#discussion_r433920350 ## File path: clients/src/test/java/org/apache/kafka/common/network/SslVersionsTransportLayerTest.java ## @@ -117,24 +123,51 @@ public void testTlsDefaults() throws Exception { server.waitForMetric("response", 1); } else { NetworkTestUtils.waitForChannelClose(selector, node, ChannelState.State.AUTHENTICATION_FAILED); +server.verifyAuthenticationMetrics(0, 1); } } +/** + * + * The explanation of this check in the structure of the ClientHello SSL message. + * Please, take a look at the https://docs.oracle.com/en/java/javase/11/security/java-secure-socket-extension-jsse-reference-guide.html#GUID-4D421910-C36D-40A2-8BA2-7D42CCBED3C6";>Guide, + * "Send ClientHello Message" section. + * + * > Client version: For TLS 1.3, this has a fixed value, TLSv1.2; TLS 1.3 uses the extension supported_versions and not this field to negotiate protocol version + * ... + * > supported_versions: Lists which versions of TLS the client supports. In particular, if the client + * > requests TLS 1.3, then the client version field has the value TLSv1.2 and this extension + * > contains the value TLSv1.3; if the client requests TLS 1.2, then the client version field has the + * > value TLSv1.2 and this extension either doesn’t exist or contains the value TLSv1.2 but not the value TLSv1.3. + * + * + * This mean that TLSv1.3 client can fallback to TLSv1.2 but TLSv1.2 client can't change protocol to TLSv1.3. + * + * @param serverProtocols Server protocols. + * @param clientProtocols Client protocols. + * @return {@code True} if client should be able to connect to the server. + */ +private boolean isCompatible(List serverProtocols, List clientProtocols) { +return serverProtocols.contains(clientProtocols.get(0)) || +(clientProtocols.get(0).equals("TLSv1.3") && clientProtocols.contains("TLSv1.2")); Review comment: Done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8775: KAFKA-10079: improve thread-level stickiness
vvcephei commented on pull request #8775: URL: https://github.com/apache/kafka/pull/8775#issuecomment-637578416 Ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #8771: MINOR: Add explanation for disabling forwarding from value transformers
bbejeck commented on pull request #8771: URL: https://github.com/apache/kafka/pull/8771#issuecomment-637586221 Retest this please. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case
ijuma commented on pull request #8668: URL: https://github.com/apache/kafka/pull/8668#issuecomment-637591555 Did we check the build before merging this? It seems to have broken it: https://github.com/apache/kafka/pull/8779 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma edited a comment on pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case
ijuma edited a comment on pull request #8668: URL: https://github.com/apache/kafka/pull/8668#issuecomment-637592541 @guozhangwang Looks like 2.6, 2.5 and 2.4 are broken too. You should generally also build locally when cherry-picking. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case
ijuma commented on pull request #8668: URL: https://github.com/apache/kafka/pull/8668#issuecomment-637592541 @guozhangwang Looks like 2.6, 2.5 and 2.4 are broken too. You should generally build locally when cherry-picking. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc edited a comment on pull request #8421: KAFKA-9800: [KIP-580] Admin Client Exponential Backoff Implementation
d8tltanc edited a comment on pull request #8421: URL: https://github.com/apache/kafka/pull/8421#issuecomment-637300554 @skaundinya15 @ijuma @abbccdda Thanks for all the feedback and comments. This patch was made when I was new to Kafka. It's kind of naive to me at this time as I gained more insights into Kafka. Let me talk about two of my major concerns and thoughts about implementing the universal client exponential backoff. **AdminClient logic redundant** NetworkClient has request timeout handlers. Producer / Consumer are using NetworkClient to help handle timeout but AdminClient doesn’t. The reason, to my understanding, is that AdminClient is implementing the per-request timeout. For example, 1. Wrapping the request builder into a new class `Call`, (the construction lambda adds tons of lines into the AdminClient.java, which should probably have been living in each AbstractRequest implementation classes files) 2. Re-writing the request queues for different request status, while normal clients are fully using the NetworkClient. After we add support to the per-request retry backoff to all clients, we can implement the per-request timeout together by the way. Thus we can clean up the redundant request handling logic in AdminClient. Are we considering refactoring the AdminClient further and remove all the redundant logic which should have belonged to the networking layer and the AbstractRequest implementation classes? **Flexible backoff modes** Let's analyze the request backoff demands of all the types of clients at this point. In my opinion, there are simply two: 1. Requests do not need exponential backoff. These requests need to be sent ASAP to avoid dataflow performance degradation, such as the `ProduceRequest` and its related/preceding metadata requests. 2. Request do need exponential backoff. These requests are “second-class citizens” and can be throttled to avoid request storms on the broker side. Such as metadata related requests in AdminClient. Now the question comes. Even when two requests are of the same request type, one may have to get sent ASAP while the other one may wait, depending on the use case. We need to think deeper about how to make a classification. But the implementation would be simple. We can utilize the existing builder pattern AbstractRequest and build the request flexibly upon a given retry_backoff mode. For example, 1. AbstractRequest.Builder will interact with a new abstract class specifying the retry_backoff option, static or exponential. 2. AbstractRequest will have some new interfaces controlling the backoff. Then, we can control if the request should have a static backoff or an exponential backoff when we construct each implementation instance of AbstractRequest.Builder. I'll include more details in the Jira ticket and rewrite this PR. Before we talk more about the code details and start the new implementation, please let me know what you think about the AdminClient refactor and static/exponential retry_backoff classification rule. As @abbccdda suggests, let's re-direct our further discussion to [Jira](https://issues.apache.org/jira/browse/KAFKA-9800) Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc edited a comment on pull request #8421: KAFKA-9800: [KIP-580] Admin Client Exponential Backoff Implementation
d8tltanc edited a comment on pull request #8421: URL: https://github.com/apache/kafka/pull/8421#issuecomment-637300554 @skaundinya15 @ijuma @abbccdda Thanks for all the feedback and comments. This patch was made when I was new to Kafka. It's kind of naive to me at this time as I gained more insights into Kafka. Let me talk about two of my major concerns and thoughts about implementing the universal client exponential backoff. **AdminClient logic redundancy** NetworkClient has request timeout handlers. Producer / Consumer are using NetworkClient to help handle timeout but AdminClient doesn’t. The reason, to my understanding, is that AdminClient is implementing the per-request timeout. For example, 1. Wrapping the request builder into a new class `Call`, (the construction lambda adds tons of lines into the AdminClient.java, which should probably have been living in each AbstractRequest implementation classes files) 2. Re-writing the request queues for different request status, while normal clients are fully using the NetworkClient. After we add support to the per-request retry backoff to all clients, we can implement the per-request timeout together by the way. Thus we can clean up the redundant request handling logic in AdminClient. Are we considering refactoring the AdminClient further and remove all the redundant logic which should have belonged to the networking layer and the AbstractRequest implementation classes? **Flexible backoff modes** Let's analyze the request backoff demands of all the types of clients at this point. In my opinion, there are simply two: 1. Requests do not need exponential backoff. These requests need to be sent ASAP to avoid dataflow performance degradation, such as the `ProduceRequest` and its related/preceding metadata requests. 2. Request do need exponential backoff. These requests are “second-class citizens” and can be throttled to avoid request storms on the broker side. Such as metadata related requests in AdminClient. Now the question comes. Even when two requests are of the same request type, one may have to get sent ASAP while the other one may wait, depending on the use case. We need to think deeper about how to make a classification. But the implementation would be simple. We can utilize the existing builder pattern AbstractRequest and build the request flexibly upon a given retry_backoff mode. For example, 1. AbstractRequest.Builder will interact with a new abstract class specifying the retry_backoff option, static or exponential. 2. AbstractRequest will have some new interfaces controlling the backoff. Then, we can control if the request should have a static backoff or an exponential backoff when we construct each implementation instance of AbstractRequest.Builder. I'll include more details in the Jira ticket and rewrite this PR. Before we talk more about the code details and start the new implementation, please let me know what you think about the AdminClient refactor and static/exponential retry_backoff classification rule. As @abbccdda suggests, let's re-direct our further discussion to [Jira](https://issues.apache.org/jira/browse/KAFKA-9800) Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation
[ https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17123377#comment-17123377 ] Cheng Tan edited comment on KAFKA-9800 at 6/2/20, 3:12 PM: --- Recap the discussion in Github. We want to implement a per-request backoff for all types of clients. Let me talk about two of my major concerns and thoughts about implementing the universal client exponential backoff. **AdminClient logic redundancy** NetworkClient has request timeout handlers. Producer / Consumer are using NetworkClient to help handle timeout but AdminClient doesn’t. The reason, to my understanding, is that AdminClient is implementing the per-request timeout. For example, 1. Wrapping the request builder into a new class `Call`, (the construction lambda adds tons of lines into the AdminClient.java, which should probably have been living in each AbstractRequest implementation classes files) 2. Re-writing the request queues for different request status, while normal clients are fully using the NetworkClient. After we add support to the per-request retry backoff to all clients, we can implement the per-request timeout together by the way. Thus we can clean up the redundant request handling logic in AdminClient. Are we considering refactoring the AdminClient further and remove all the redundant logic which should have belonged to the networking layer and the AbstractRequest implementation classes? **Flexible backoff modes** Let's analyze the request backoff demands of all the types of clients at this point. In my opinion, there are simply two: 1. Requests do not need exponential backoff. These requests need to be sent ASAP to avoid dataflow performance degradation, such as the `ProduceRequest` and its related/preceding metadata requests. 2. Request do need exponential backoff. These requests are “second-class citizens” and can be throttled to avoid request storms on the broker side. Such as metadata related requests in AdminClient. Now the question comes. Even when two requests are of the same request type, one may have to get sent ASAP while the other one may wait, depending on the use case. We need to think deeper about how to make a classification. But the implementation would be simple. We can utilize the existing builder pattern AbstractRequest and build the request flexibly upon a given retry_backoff mode. For example, 1. AbstractRequest.Builder will interact with a new abstract class specifying the retry_backoff option, static or exponential. 2. AbstractRequest will have some new interfaces controlling the backoff. Then, we can control if the request should have a static backoff or an exponential backoff when we construct each implementation instance of AbstractRequest.Builder. I'll include more details in the Jira ticket and rewrite this PR. Before we talk more about the code details and start the new implementation, please let me know what you think about the AdminClient refactor and static/exponential retry_backoff classification rule. was (Author: d8tltanc): Recap the discussion in Github. We want to implement a per-request backoff for all types of clients. Let me talk about two of my major concerns and thoughts about implementing the universal client exponential backoff. *AdminClient logic redundant* NetworkClient has request timeout handlers. Producer / Consumer are using NetworkClient to help handle timeout but AdminClient doesn’t. The reason, to my understanding, is that AdminClient is implementing the per-request timeout. For example, # Wrapping the request builder into a new class {{Call}}, (the construction lambda adds tons of lines into the AdminClient.java, which should probably have been living in each AbstractRequest implementation classes files) # Re-writing the request queues for different request status, while normal clients are fully using the NetworkClient. After we add support to the per-request timeout to all clients, the AdminClient per-request timeout demand won’t be special anymore. Thus, the code for supporting the per-request timeout in AdminClient is not useful anymore and might be removed. Are we considering refactoring the AdminClient further and remove all the redundant logic which should have belonged to the networking layer and the AbstractRequest implementation classes? *Flexible backoff modes* Let's analyze the request backoff demands of all the types of clients at this point. In my opinion, there are simply two: # Requests do not need exponential backoff. These requests need to be sent ASAP to avoid dataflow performance degradation, such as the {{ProduceRequest}} and its related/preceding metadata requests. # Request do need exponential backoff. These requests are “second-class citizens” and can be throttled to avoid request storms on the broker side. Such as metadata related
[jira] [Commented] (KAFKA-9800) [KIP-580] Client Exponential Backoff Implementation
[ https://issues.apache.org/jira/browse/KAFKA-9800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17123935#comment-17123935 ] Ismael Juma commented on KAFKA-9800: I think you should apply the same backoff strategy for all request types. I don't see much benefit in the more complex approach. > [KIP-580] Client Exponential Backoff Implementation > --- > > Key: KAFKA-9800 > URL: https://issues.apache.org/jira/browse/KAFKA-9800 > Project: Kafka > Issue Type: New Feature >Reporter: Cheng Tan >Assignee: Cheng Tan >Priority: Major > Labels: KIP-580 > > In {{KafkaAdminClient}}, we will have to modify the way the retry backoff is > calculated for the calls that have failed and need to be retried. >From the > current static retry backoff, we have to introduce a mechanism for all calls > that upon failure, the next retry time is dynamically calculated. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] collabH closed pull request #8780: Read kafka
collabH closed pull request #8780: URL: https://github.com/apache/kafka/pull/8780 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] collabH opened a new pull request #8780: Read kafka
collabH opened a new pull request #8780: URL: https://github.com/apache/kafka/pull/8780 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] collabH opened a new pull request #8781: Read kafka
collabH opened a new pull request #8781: URL: https://github.com/apache/kafka/pull/8781 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] collabH closed pull request #8781: Read kafka
collabH closed pull request #8781: URL: https://github.com/apache/kafka/pull/8781 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)
ijuma commented on a change in pull request #8695: URL: https://github.com/apache/kafka/pull/8695#discussion_r433981892 ## File path: clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java ## @@ -49,11 +50,12 @@ public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol"; public static final String SSL_PROTOCOL_DOC = "The SSL protocol used to generate the SSLContext. " -+ "Default setting is TLSv1.2, which is fine for most cases. " ++ "Default setting is TLSv1.2(TLSv1.3 for modern JVM), which is fine for most cases. " + "Allowed values in recent JVMs are TLSv1.2 and TLSv1.3. TLS, TLSv1.1, SSL, SSLv2 and SSLv3 " -+ "may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities."; ++ "may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities." ++ "Please, note, TLSv1.2 clients can't connect to the servers with TLSv1.3 only even if ssl.enabled.protocols contains TLSv1.3"; Review comment: How about: ```java "The SSL protocol used to generate the SSLContext. " + "The default is TLSv1.3 when running with Java 11 or newer, TLSv1.2 otherwise. " + "This value should be fine for most use cases. " + "Allowed values in recent JVMs are TLSv1.2 and TLSv1.3. TLS, TLSv1.1, SSL, SSLv2 and SSLv3 " + "may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities. "; + "With the default value for this config and ssl.enabled.protocols, clients will downgrade to TLSv1.2 if " + "the server does not support TLSv1.3. If this config is set to TLSv1.2, clients will not use TLSv1.3 even " + "if it is one of the values in ssl.enabled.protocols and the server only supports TLSv1.3." ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)
ijuma commented on a change in pull request #8695: URL: https://github.com/apache/kafka/pull/8695#discussion_r433987384 ## File path: clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java ## @@ -64,7 +66,17 @@ public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols"; public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections."; Review comment: How about: ``` The list of protocols enabled for SSL connections. The default is 'TLSv1.2,TLSv1.3' when running with Java 11 or newer, 'TLSv1.2' otherwise. With the default value for Java 11, clients and servers will prefer TLSv1.3 if both support it and fallback to TLSv1.2 otherwise (assuming both support at least TLSv1.2). This default should be fine for most cases. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)
ijuma commented on a change in pull request #8695: URL: https://github.com/apache/kafka/pull/8695#discussion_r433987384 ## File path: clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java ## @@ -64,7 +66,17 @@ public static final String SSL_ENABLED_PROTOCOLS_CONFIG = "ssl.enabled.protocols"; public static final String SSL_ENABLED_PROTOCOLS_DOC = "The list of protocols enabled for SSL connections."; Review comment: How about: ``` The list of protocols enabled for SSL connections. The default is 'TLSv1.2,TLSv1.3' when running with Java 11 or newer, 'TLSv1.2' otherwise. With the default value for Java 11, clients and servers will prefer TLSv1.3 if both support it and fallback to TLSv1.2 otherwise (assuming both support at least TLSv1.2). This default should be fine for most cases. Also see the `ssl.protocol` config documentation. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)
ijuma commented on a change in pull request #8695: URL: https://github.com/apache/kafka/pull/8695#discussion_r433990762 ## File path: docs/upgrade.html ## @@ -18,6 +18,10 @@
[GitHub] [kafka] ijuma merged pull request #8779: [MINOR] Fixing spotbug fail - removing unused variable.
ijuma merged pull request #8779: URL: https://github.com/apache/kafka/pull/8779 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8695: KAFKA-9320: Enable TLSv1.3 by default (KIP-573)
ijuma commented on pull request #8695: URL: https://github.com/apache/kafka/pull/8695#issuecomment-637647620 @nizhikov I think we're good to merge this after the non code suggestions above are addressed (assuming we can get a Jenkins build, I merged your other PR fixing the build issue). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8774: KAFKA-10081: Remove an unused local variable to pass spotbugsMain check
mjsax commented on pull request #8774: URL: https://github.com/apache/kafka/pull/8774#issuecomment-637655627 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax removed a comment on pull request #8774: KAFKA-10081: Remove an unused local variable to pass spotbugsMain check
mjsax removed a comment on pull request #8774: URL: https://github.com/apache/kafka/pull/8774#issuecomment-637655696 Retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #8774: KAFKA-10081: Remove an unused local variable to pass spotbugsMain check
mjsax merged pull request #8774: URL: https://github.com/apache/kafka/pull/8774 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-9943) Enable TLSv.1.3 in system tests "run all" execution.
[ https://issues.apache.org/jira/browse/KAFKA-9943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma reassigned KAFKA-9943: -- Assignee: Nikolay Izhikov > Enable TLSv.1.3 in system tests "run all" execution. > > > Key: KAFKA-9943 > URL: https://issues.apache.org/jira/browse/KAFKA-9943 > Project: Kafka > Issue Type: Test >Reporter: Nikolay Izhikov >Assignee: Nikolay Izhikov >Priority: Major > > We need to enable system tests with the TLSv1.3 in "run all" execution. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9943) Enable TLSv.1.3 in system tests "run all" execution.
[ https://issues.apache.org/jira/browse/KAFKA-9943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-9943: --- Fix Version/s: 2.6.0 > Enable TLSv.1.3 in system tests "run all" execution. > > > Key: KAFKA-9943 > URL: https://issues.apache.org/jira/browse/KAFKA-9943 > Project: Kafka > Issue Type: Test >Reporter: Nikolay Izhikov >Assignee: Nikolay Izhikov >Priority: Major > Fix For: 2.6.0 > > > We need to enable system tests with the TLSv1.3 in "run all" execution. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #8778: KAFKA-10083: fix failed testReassignmentWithRandomSubscriptionsAndChanges tests
ableegoldman commented on a change in pull request #8778: URL: https://github.com/apache/kafka/pull/8778#discussion_r434008302 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -43,7 +43,7 @@ public static final int DEFAULT_GENERATION = -1; -private PartitionMovements partitionMovements = new PartitionMovements(); +private PartitionMovements partitionMovements; Review comment: Can we still initialize it here as well? I remember that was necessary for some tests to pass since they might never get to the `generalAssign` method and `isSticky` would hit NPE On the other hand, it seems like `isSticky` is pointless to call unless we get to the `generalAssign` method. So maybe we should just remove that from the tests that only do the `constrainedAssign` and just verify the stickiness directly? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8777: KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment
ableegoldman commented on pull request #8777: URL: https://github.com/apache/kafka/pull/8777#issuecomment-637664149 cc @mjsax @guozhangwang , should be cherrypicked to 2.6, 2.5, and 2.4 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8778: KAFKA-10083: fix failed testReassignmentWithRandomSubscriptionsAndChanges tests
ableegoldman commented on pull request #8778: URL: https://github.com/apache/kafka/pull/8778#issuecomment-637665071 cc @mjsax @guozhangwang , should be cherrypicked to 2.6, 2.5, and 2.4 (once my comment above is addressed) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8775: KAFKA-10079: improve thread-level stickiness
vvcephei commented on a change in pull request #8775: URL: https://github.com/apache/kafka/pull/8775#discussion_r433923120 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -938,57 +930,9 @@ private void populatePartitionsByHostMaps(final Map versionProbingAssignment(final Map clientsMetadata, - final Map> partitionsForTask, - final Map> partitionsByHost, - final Map> standbyPartitionsByHost, - final Set allOwnedPartitions, - final int minUserMetadataVersion, - final int minSupportedMetadataVersion) { -final Map assignment = new HashMap<>(); - -// Since we know another rebalance will be triggered anyway, just try and generate a balanced assignment -// (without violating cooperative protocol) now so that on the second rebalance we can just give tasks -// back to their previous owners -// within the client, distribute tasks to its owned consumers -for (final ClientMetadata clientMetadata : clientsMetadata.values()) { -final ClientState state = clientMetadata.state; - -final Map> interleavedActive = -interleaveConsumerTasksByGroupId(state.activeTasks(), clientMetadata.consumers); -final Map> interleavedStandby = -interleaveConsumerTasksByGroupId(state.standbyTasks(), clientMetadata.consumers); - -addClientAssignments( -assignment, -clientMetadata, -partitionsForTask, -partitionsByHost, -standbyPartitionsByHost, -allOwnedPartitions, -interleavedActive, -interleavedStandby, -minUserMetadataVersion, -minSupportedMetadataVersion, -true, -false); -} - -log.info("Finished unstable assignment of tasks, a followup rebalance will be scheduled due to version probing."); - -return assignment; -} - /** * Adds the encoded assignment for each StreamThread consumer in the client to the overall assignment map - * @return true if this client has been told to schedule a followup rebalance + * @return true if a followup rebalance will be required due to revoekd tasks Review comment: ```suggestion * @return true if a followup rebalance will be required due to revoked tasks ``` ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java ## @@ -242,8 +250,9 @@ public void addOwnedPartitions(final Collection ownedPartitions, } } -public void addPreviousTasksAndOffsetSums(final Map taskOffsetSums) { +public void addPreviousTasksAndOffsetSums(final String consumerId, final Map taskOffsetSums) { this.taskOffsetSums.putAll(taskOffsetSums); +consumerToPreviousTaskIds.put(consumerId, taskOffsetSums.keySet()); Review comment: We have several new methods, and also this new book-kept collection (`consumerToPreviousTaskIds`), but no new tests for them in ClientStateTest. Can you add the missing coverage? The new methods are more a matter of principle; I'm really concerned that we should have good coverage on the bookkeeping aspect of `consumerToPreviousTaskIds` because I fear future regressions when we have to maintain two data structures in a consistent fashion This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-637668386 @junrao Could you take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10063) UnsupportedOperation when querying cleaner metrics after shutdown
[ https://issues.apache.org/jira/browse/KAFKA-10063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-10063: -- Assignee: Chia-Ping Tsai > UnsupportedOperation when querying cleaner metrics after shutdown > - > > Key: KAFKA-10063 > URL: https://issues.apache.org/jira/browse/KAFKA-10063 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Chia-Ping Tsai >Priority: Major > > We have a few log cleaner metrics which iterate the set of cleaners. For > example: > {code} > newGauge("max-clean-time-secs", () => > cleaners.iterator.map(_.lastStats.elapsedSecs).max.toInt) > {code} > It seems possible currently for LogCleaner metrics to get queried after > shutdown of the log cleaner, which clears the `cleaners` collection. This can > lead to the following error: > {code} > java.lang.UnsupportedOperationException: empty.max > at scala.collection.IterableOnceOps.max(IterableOnce.scala:952) > at scala.collection.IterableOnceOps.max$(IterableOnce.scala:950) > at scala.collection.AbstractIterator.max(Iterator.scala:1279) > at > kafka.log.LogCleaner.kafka$log$LogCleaner$$$anonfun$new$9(LogCleaner.scala:132) > at kafka.log.LogCleaner$$anonfun$4.value(LogCleaner.scala:132) > at kafka.log.LogCleaner$$anonfun$4.value(LogCleaner.scala:132) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #8775: KAFKA-10079: improve thread-level stickiness
ableegoldman commented on a change in pull request #8775: URL: https://github.com/apache/kafka/pull/8775#discussion_r434018826 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java ## @@ -242,8 +250,9 @@ public void addOwnedPartitions(final Collection ownedPartitions, } } -public void addPreviousTasksAndOffsetSums(final Map taskOffsetSums) { +public void addPreviousTasksAndOffsetSums(final String consumerId, final Map taskOffsetSums) { this.taskOffsetSums.putAll(taskOffsetSums); +consumerToPreviousTaskIds.put(consumerId, taskOffsetSums.keySet()); Review comment: Definitely. I meant to write tests but then I took Luna for a walk and forgot 😄 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case
ableegoldman commented on pull request #8668: URL: https://github.com/apache/kafka/pull/8668#issuecomment-637672028 Sorry @ijuma, I think I only ever ran the local tests + checkstyle, not the full suite. My mistake This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8668: KAFKA-9987: optimize sticky assignment algorithm for same-subscription case
ableegoldman commented on a change in pull request #8668: URL: https://github.com/apache/kafka/pull/8668#discussion_r434021704 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -65,9 +72,206 @@ public MemberData(List partitions, Optional generation) @Override public Map> assign(Map partitionsPerTopic, Map subscriptions) { +Map> consumerToOwnedPartitions = new HashMap<>(); +if (allSubscriptionsEqual(partitionsPerTopic.keySet(), subscriptions, consumerToOwnedPartitions)) { +log.debug("Detected that all consumers were subscribed to same set of topics, invoking the " + + "optimized assignment algorithm"); +partitionsTransferringOwnership = new HashMap<>(); +return constrainedAssign(partitionsPerTopic, consumerToOwnedPartitions); +} else { +log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the " + + "general case assignment algorithm"); +partitionsTransferringOwnership = null; +return generalAssign(partitionsPerTopic, subscriptions); +} +} + +/** + * Returns true iff all consumers have an identical subscription. Also fills out the passed in + * {@code consumerToOwnedPartitions} with each consumer's previously owned and still-subscribed partitions + */ +private boolean allSubscriptionsEqual(Set allTopics, + Map subscriptions, + Map> consumerToOwnedPartitions) { +Set membersWithOldGeneration = new HashSet<>(); +Set membersOfCurrentHighestGeneration = new HashSet<>(); +int maxGeneration = DEFAULT_GENERATION; + +Set subscribedTopics = new HashSet<>(); + +for (Map.Entry subscriptionEntry : subscriptions.entrySet()) { +String consumer = subscriptionEntry.getKey(); +Subscription subscription = subscriptionEntry.getValue(); + +// initialize the subscribed topics set if this is the first subscription +if (subscribedTopics.isEmpty()) { +subscribedTopics.addAll(subscription.topics()); +} else if (!(subscription.topics().size() == subscribedTopics.size() +&& subscribedTopics.containsAll(subscription.topics( { +return false; +} + +MemberData memberData = memberData(subscription); + +List ownedPartitions = new ArrayList<>(); +consumerToOwnedPartitions.put(consumer, ownedPartitions); + +// Only consider this consumer's owned partitions as valid if it is a member of the current highest +// generation, or it's generation is not present but we have not seen any known generation so far +if (memberData.generation.isPresent() && memberData.generation.get() >= maxGeneration +|| !memberData.generation.isPresent() && maxGeneration == DEFAULT_GENERATION) { + +membersOfCurrentHighestGeneration.add(consumer); +for (final TopicPartition tp : memberData.partitions) { +// filter out any topics that no longer exist or aren't part of the current subscription +if (allTopics.contains(tp.topic())) { +ownedPartitions.add(tp); +} +} + +// If the current member's generation is higher, all the previous owned partitions are invalid +if (memberData.generation.isPresent() && memberData.generation.get() > maxGeneration) { + membersWithOldGeneration.addAll(membersOfCurrentHighestGeneration); +membersOfCurrentHighestGeneration.clear(); Review comment: Just FYI, I introduced this bug right before merging. Luckily the tests caught it -- fix is https://github.com/apache/kafka/pull/8777 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-7599) Trogdor - Allow configuration for not throttling Benchmark Workers and expose messages per second in task status
[ https://issues.apache.org/jira/browse/KAFKA-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski reassigned KAFKA-7599: -- Assignee: (was: Stanislav Kozlovski) > Trogdor - Allow configuration for not throttling Benchmark Workers and expose > messages per second in task status > > > Key: KAFKA-7599 > URL: https://issues.apache.org/jira/browse/KAFKA-7599 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Priority: Major > > In Trogdor, the ConsumeBench, ProduceBench and RoundTrip workers all take in > an argument called "targetMessagesPerSec". That argument works as an upper > bound on the number of messages that can be consumed/produced per second in > that worker. > It is useful to support infinite messages per second. Currently, if the > `targetMessagesPerSec` field is not present in the request, the > RoundTripWorker will raise an exception, whereas the ConsumeBench and > ProduceBench workers will work as if they had `targetMessagesPerSec=10`. > I propose we allow for unbounded `targetMessagesPerSec` if the field is not > present. > Further, it would be very useful if some of these workers showed the > `messagesPerSecond` they have been producing/consuming at. > Even now, giving the worker a `targetMessagesPerSec` does not guarantee that > the worker will reach the needed `targetMessagesPerSec`. There is no easy way > of knowing how the worker performed - you have to subtract the status fields > `startedMs` and `doneMs` to get the total duration of the task, convert to > seconds and then divide that by the `maxMessages` field. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-8264) Flaky Test PlaintextConsumerTest#testLowMaxFetchSizeForRequestAndPartition
[ https://issues.apache.org/jira/browse/KAFKA-8264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski reassigned KAFKA-8264: -- Assignee: (was: Stanislav Kozlovski) > Flaky Test PlaintextConsumerTest#testLowMaxFetchSizeForRequestAndPartition > -- > > Key: KAFKA-8264 > URL: https://issues.apache.org/jira/browse/KAFKA-8264 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.0.1, 2.3.0 >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > Fix For: 2.6.0 > > > [https://builds.apache.org/blue/organizations/jenkins/kafka-2.0-jdk8/detail/kafka-2.0-jdk8/252/tests] > {quote}org.apache.kafka.common.errors.TopicExistsException: Topic 'topic3' > already exists.{quote} > STDOUT > > {quote}[2019-04-19 03:54:20,080] ERROR [ReplicaFetcher replicaId=1, > leaderId=0, fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-04-19 03:54:20,080] ERROR [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-04-19 03:54:20,312] ERROR [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Error for partition topic-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-04-19 03:54:20,313] ERROR [ReplicaFetcher replicaId=2, leaderId=1, > fetcherId=0] Error for partition topic-1 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-04-19 03:54:20,994] ERROR [ReplicaFetcher replicaId=1, leaderId=0, > fetcherId=0] Error for partition topic-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-04-19 03:54:21,727] ERROR [ReplicaFetcher replicaId=0, leaderId=2, > fetcherId=0] Error for partition topicWithNewMessageFormat-1 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-04-19 03:54:28,696] ERROR [ReplicaFetcher replicaId=0, leaderId=2, > fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-04-19 03:54:28,699] ERROR [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-04-19 03:54:29,246] ERROR [ReplicaFetcher replicaId=0, leaderId=2, > fetcherId=0] Error for partition topic-1 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-04-19 03:54:29,247] ERROR [ReplicaFetcher replicaId=0, leaderId=1, > fetcherId=0] Error for partition topic-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-04-19 03:54:29,287] ERROR [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Error for partition topic-1 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-04-19 03:54:33,408] ERROR [ReplicaFetcher replicaId=0, leaderId=2, > fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-04-19 03:54:33,408] ERROR [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 > (kafka.server.ReplicaFetcherThread:76) > org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server > does not host this topic-partition. > [2019-04-19 03:54:33,655] ERROR [ReplicaFetcher replicaId=0, leaderId=2, > fetcherId=0] Error
[GitHub] [kafka] guozhangwang commented on pull request #8779: [MINOR] Fixing spotbug fail - removing unused variable.
guozhangwang commented on pull request #8779: URL: https://github.com/apache/kafka/pull/8779#issuecomment-637676075 Cherry-picked to 2.6 / 2.5 / 2.4 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #8777: KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment
guozhangwang merged pull request #8777: URL: https://github.com/apache/kafka/pull/8777 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8777: KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment
guozhangwang commented on pull request #8777: URL: https://github.com/apache/kafka/pull/8777#issuecomment-637677433 Cherry-picked to 2.6 / 2.5 / 2.4 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #8782: KAFKA-10080; Fix race condition on txn completion which can cause duplicate appends
hachikuji opened a new pull request #8782: URL: https://github.com/apache/kafka/pull/8782 The method `maybeWriteTxnCompletion` is unsafe for concurrent calls. This can cause duplicate attempts to write the completion record to the log, which can ultimately lead to illegal state errors and possible to correctness violations if another transaction had been started before the duplicate was written. This patch fixes the problem by ensuring only one thread can successfully remove the pending completion from the map. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-8723) flaky test LeaderElectionCommandTest#testAllTopicPartition
[ https://issues.apache.org/jira/browse/KAFKA-8723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski resolved KAFKA-8723. Resolution: Fixed > flaky test LeaderElectionCommandTest#testAllTopicPartition > -- > > Key: KAFKA-8723 > URL: https://issues.apache.org/jira/browse/KAFKA-8723 > Project: Kafka > Issue Type: Bug >Reporter: Boyang Chen >Assignee: Stanislav Kozlovski >Priority: Major > > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/23737/console] > > *15:52:26* kafka.admin.LeaderElectionCommandTest > testAllTopicPartition > STARTED*15:53:08* kafka.admin.LeaderElectionCommandTest.testAllTopicPartition > failed, log available in > /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk8-scala2.11@2/core/build/reports/testOutput/kafka.admin.LeaderElectionCommandTest.testAllTopicPartition.test.stdout*15:53:08* > *15:53:08* kafka.admin.LeaderElectionCommandTest > testAllTopicPartition > FAILED*15:53:08* kafka.common.AdminCommandFailedException: Timeout > waiting for election results*15:53:08* at > kafka.admin.LeaderElectionCommand$.electLeaders(LeaderElectionCommand.scala:133)*15:53:08* > at > kafka.admin.LeaderElectionCommand$.run(LeaderElectionCommand.scala:88)*15:53:08* > at > kafka.admin.LeaderElectionCommand$.main(LeaderElectionCommand.scala:41)*15:53:08* > at > kafka.admin.LeaderElectionCommandTest$$anonfun$testAllTopicPartition$1.apply(LeaderElectionCommandTest.scala:91)*15:53:08* > at > kafka.admin.LeaderElectionCommandTest$$anonfun$testAllTopicPartition$1.apply(LeaderElectionCommandTest.scala:74)*15:53:08* > at kafka.utils.TestUtils$.resource(TestUtils.scala:1588)*15:53:08* > at > kafka.admin.LeaderElectionCommandTest.testAllTopicPartition(LeaderElectionCommandTest.scala:74)*15:53:08* > *15:53:08* Caused by:*15:53:08* > org.apache.kafka.common.errors.TimeoutException: Aborted due to > timeout.*15:53:08* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-7940) Flaky Test CustomQuotaCallbackTest#testCustomQuotaCallback
[ https://issues.apache.org/jira/browse/KAFKA-7940?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stanislav Kozlovski reassigned KAFKA-7940: -- Assignee: (was: Stanislav Kozlovski) > Flaky Test CustomQuotaCallbackTest#testCustomQuotaCallback > -- > > Key: KAFKA-7940 > URL: https://issues.apache.org/jira/browse/KAFKA-7940 > Project: Kafka > Issue Type: Bug > Components: core, unit tests >Affects Versions: 2.2.0, 2.4.0 >Reporter: Matthias J. Sax >Priority: Critical > Labels: flaky-test > Fix For: 2.2.0 > > > To get stable nightly builds for `2.2` release, I create tickets for all > observed test failures. > [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/14/] > {quote}java.lang.AssertionError: Too many quotaLimit calls Map(PRODUCE -> 1, > FETCH -> 1, REQUEST -> 4) at org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.assertTrue(Assert.java:41) at > kafka.api.CustomQuotaCallbackTest.testCustomQuotaCallback(CustomQuotaCallbackTest.scala:105){quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8406) kafka-topics throws wrong error on invalid configuration with bootstrap-server and alter config
[ https://issues.apache.org/jira/browse/KAFKA-8406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17124052#comment-17124052 ] Stanislav Kozlovski commented on KAFKA-8406: [~savulchik] are you sure? Can you try the exact same commands I listed in the description? I just tested this in 2.5 and it is still an issue > kafka-topics throws wrong error on invalid configuration with > bootstrap-server and alter config > --- > > Key: KAFKA-8406 > URL: https://issues.apache.org/jira/browse/KAFKA-8406 > Project: Kafka > Issue Type: Improvement >Reporter: Stanislav Kozlovski >Assignee: Stanislav Kozlovski >Priority: Minor > > Running > {code:java} > ./kafka-topics --bootstrap-server --alter --config > retention.ms=360 --topic topic{code} > Results in > {code:java} > Missing required argument "[partitions]"{code} > Running > {code:java} > ./kafka-topics --bootstrap-server --alter --config > retention.ms=360 --topic topic --partitions 25{code} > Results in > {code:java} > Option combination "[bootstrap-server],[config]" can't be used with option > "[alter]"{code} > For better clarity, we should just throw the last error outright. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] bob-barrett opened a new pull request #8784: KAFKA-9788: Use distinct names for transaction and group load time se…
bob-barrett opened a new pull request #8784: URL: https://github.com/apache/kafka/pull/8784 …nsors Sensor objects are stored in the Kafka metrics registry and keyed by name. If a new sensor is created with the same name as an existing one, the existing one is returned rather than a new object being created. The partition load time sensors for the transaction and group coordinators used the same name, so data recorded to either was stored in the same object. This meant that the metrics values for both metrics were identical and consisted of the combined data. This patch changes the names to be distinct so that the data will be stored in separate Sensor objects. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 opened a new pull request #8783: KAFKA-10063 UnsupportedOperation when querying cleaner metrics after …
chia7712 opened a new pull request #8783: URL: https://github.com/apache/kafka/pull/8783 https://issues.apache.org/jira/browse/KAFKA-10063 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjinleekr commented on pull request #8117: KAFKA-8403: Suppress needs a Materialized variant
dongjinleekr commented on pull request #8117: URL: https://github.com/apache/kafka/pull/8117#issuecomment-637683206 @vvcephei Sorry for being late, I just got out from my last project; I will have a look. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjinleekr commented on pull request #7898: KAFKA-9366: please consider upgrade log4j to log4j2 due to critical security problem CVE-2019-17571
dongjinleekr commented on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-637684403 All // Sorry for being late, I just got out from my last project; I will have a look at this PR this weekend. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8782: KAFKA-10080; Fix race condition on txn completion which can cause duplicate appends
chia7712 commented on a change in pull request #8782: URL: https://github.com/apache/kafka/pull/8782#discussion_r434038469 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala ## @@ -215,8 +215,6 @@ class TransactionMarkerChannelManager(config: KafkaConfig, } private def writeTxnCompletion(pendingCommitTxn: PendingCompleteTxn): Unit = { Review comment: How about renaming ```pendingCommitTxn``` to ```pendingCompleteTxn``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-8480) Clients may fetch incomplete set of topic partitions during cluster startup
[ https://issues.apache.org/jira/browse/KAFKA-8480?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio reassigned KAFKA-8480: - Assignee: Jose Armando Garcia Sancio (was: Anna Povzner) > Clients may fetch incomplete set of topic partitions during cluster startup > --- > > Key: KAFKA-8480 > URL: https://issues.apache.org/jira/browse/KAFKA-8480 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.1 >Reporter: Anna Povzner >Assignee: Jose Armando Garcia Sancio >Priority: Major > > KafkaConsumer#partitionsFor() or AdminClient#describeTopics() may return not > all partitions for a given topic when the cluster is starting up (after > cluster was down). > The cause is controller, on becoming a controller, sending > UpdateMetadataRequest for all partitions with at least one online replica, > and then a separate UpdateMetadataRequest for all partitions with at least > one offline replica. If client sends metadata request in between broker > processing those two update metadata requests, clients will get incomplete > set of partitions. > Proposed fix: controller should send one UpdateMetadataRequest (containing > all partitions) in ReplicaStateMachine#startup(). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management
ableegoldman commented on a change in pull request #8776: URL: https://github.com/apache/kafka/pull/8776#discussion_r434051190 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ## @@ -247,6 +247,9 @@ private void close(final boolean clean) { "state manager close", log ); +} else if (state() == State.CLOSED) { Review comment: Should we switch to `switch` here as well? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -218,6 +218,10 @@ public void initializeIfNeeded() { */ @Override public void completeRestoration() { +if (state() == State.RUNNING) { +return; +} + Review comment: Can we use if/ else if here for consistency? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -466,6 +510,11 @@ public void closeAndRecycleState() { stateMgr.recycle(); recordCollector.close(); break; + +case CLOSED: +log.trace("Skip close since state is {}", state()); Review comment: I think this might be one of those exceptions where we should still enforce that the state is not `CLOSED` (ie throw `IllegalStateException`) since there are related actions that occur outside of the Task implementation that will fail if we try to recycle a CLOSED task. Similar to prepare/post commit, resume, etc ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -390,19 +387,17 @@ boolean tryToCompleteRestoration() { final List restoringTasks = new LinkedList<>(); for (final Task task : tasks.values()) { -if (task.state() == CREATED) { -try { -task.initializeIfNeeded(); -} catch (final LockException | TimeoutException e) { -// it is possible that if there are multiple threads within the instance that one thread -// trying to grab the task from the other, while the other has not released the lock since -// it did not participate in the rebalance. In this case we can just retry in the next iteration -log.debug("Could not initialize {} due to the following exception; will retry", task.id(), e); -allRunning = false; -} +try { +task.initializeIfNeeded(); +} catch (final LockException | TimeoutException e) { +// it is possible that if there are multiple threads within the instance that one thread +// trying to grab the task from the other, while the other has not released the lock since +// it did not participate in the rebalance. In this case we can just retry in the next iteration +log.debug("Could not initialize {} due to the following exception; will retry", task.id(), e); +allRunning = false; } -if (task.state() == RESTORING) { +if (task.isActive()) { Review comment: Can we add a comment or rename `restoringTasks` to clarify that it's ok to put an active-but-not-restoring task in here since `Task#completeRestoration` is idempotent/no-op for RUNNING tasks? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -218,6 +218,10 @@ public void initializeIfNeeded() { */ @Override public void completeRestoration() { +if (state() == State.RUNNING) { +return; +} + if (state() == State.RESTORING) { initializeMetadata(); initializeTopology(); Review comment: github won't let me leave a comment below this line, but can we use the `"Illegal state"`/`"Unknown state"` improvement in this method as well? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ## @@ -247,6 +247,9 @@ private void close(final boolean clean) { "state manager close", log ); +} else if (state() == State.CLOSED) { +log.trace("Skip closing since state is {}", state()); +return; } else { throw new IllegalStateException("Illegal state " + state() + " while closing standby task " + id); Review comment: `Illegal state` -> `Unknown state`? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map changelogEntry : changelogOffsets.entrySet()) { final long offset = changel
[jira] [Resolved] (KAFKA-9987) Improve sticky partition assignor algorithm
[ https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman resolved KAFKA-9987. Fix Version/s: 2.5.1 2.4.2 2.6.0 Resolution: Fixed > Improve sticky partition assignor algorithm > --- > > Key: KAFKA-9987 > URL: https://issues.apache.org/jira/browse/KAFKA-9987 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.6.0, 2.4.2, 2.5.1 > > > In > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > we added the new CooperativeStickyAssignor which leverages on the underlying > sticky assignment algorithm of the existing StickyAssignor (moved to > AbstractStickyAssignor). The algorithm is fairly complex as it tries to > optimize stickiness while satisfying perfect balance _in the case individual > consumers may be subscribed to different subsets of the topics._ While it > does a pretty good job at what it promises to do, it doesn't scale well with > large numbers of consumers and partitions. > To give a concrete example, users have reported that it takes 2.5 minutes for > the assignment to complete with just 2100 consumers reading from 2100 > partitions. Since partitions revoked during the first of two cooperative > rebalances will remain unassigned until the end of the second rebalance, it's > important for the rebalance to be as fast as possible. And since one of the > primary improvements of the cooperative rebalancing protocol is better > scaling experience, the only OOTB cooperative assignor should not itself > scale poorly > If we can constrain the problem a bit, we can simplify the algorithm greatly. > In many cases the individual consumers won't be subscribed to some random > subset of the total subscription, they will all be subscribed to the same set > of topics and rely on the assignor to balance the partition workload. > We can detect this case by checking the group's individual subscriptions and > call on a more efficient assignment algorithm. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9987) Improve sticky partition assignor algorithm
[ https://issues.apache.org/jira/browse/KAFKA-9987?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman updated KAFKA-9987: --- Component/s: consumer > Improve sticky partition assignor algorithm > --- > > Key: KAFKA-9987 > URL: https://issues.apache.org/jira/browse/KAFKA-9987 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Major > Fix For: 2.6.0, 2.4.2, 2.5.1 > > > In > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > we added the new CooperativeStickyAssignor which leverages on the underlying > sticky assignment algorithm of the existing StickyAssignor (moved to > AbstractStickyAssignor). The algorithm is fairly complex as it tries to > optimize stickiness while satisfying perfect balance _in the case individual > consumers may be subscribed to different subsets of the topics._ While it > does a pretty good job at what it promises to do, it doesn't scale well with > large numbers of consumers and partitions. > To give a concrete example, users have reported that it takes 2.5 minutes for > the assignment to complete with just 2100 consumers reading from 2100 > partitions. Since partitions revoked during the first of two cooperative > rebalances will remain unassigned until the end of the second rebalance, it's > important for the rebalance to be as fast as possible. And since one of the > primary improvements of the cooperative rebalancing protocol is better > scaling experience, the only OOTB cooperative assignor should not itself > scale poorly > If we can constrain the problem a bit, we can simplify the algorithm greatly. > In many cases the individual consumers won't be subscribed to some random > subset of the total subscription, they will all be subscribed to the same set > of topics and rely on the assignor to balance the partition workload. > We can detect this case by checking the group's individual subscriptions and > call on a more efficient assignment algorithm. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #8782: KAFKA-10080; Fix race condition on txn completion which can cause duplicate appends
hachikuji commented on a change in pull request #8782: URL: https://github.com/apache/kafka/pull/8782#discussion_r434098545 ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala ## @@ -285,15 +280,16 @@ class TransactionMarkerChannelManager(config: KafkaConfig, } } - private def maybeWriteTxnCompletion(transactionalId: String): Unit = { -Option(transactionsWithPendingMarkers.get(transactionalId)).foreach { pendingCommitTxn => Review comment: Multiple threads may see the transaction still as pending and attempt completion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management
mjsax commented on a change in pull request #8776: URL: https://github.com/apache/kafka/pull/8776#discussion_r434110314 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ## @@ -247,6 +247,9 @@ private void close(final boolean clean) { "state manager close", log ); +} else if (state() == State.CLOSED) { Review comment: We could... (cf. comment below) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management
mjsax commented on a change in pull request #8776: URL: https://github.com/apache/kafka/pull/8776#discussion_r434111426 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ## @@ -247,6 +247,9 @@ private void close(final boolean clean) { "state manager close", log ); +} else if (state() == State.CLOSED) { +log.trace("Skip closing since state is {}", state()); +return; } else { throw new IllegalStateException("Illegal state " + state() + " while closing standby task " + id); Review comment: The state could be `RESTORING` what is _illegal_ but not _unknown_ -- We would need more conditions to distinguish both cases (introducing `switch()` would be helpful for this case). Thoughts? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management
mjsax commented on a change in pull request #8776: URL: https://github.com/apache/kafka/pull/8776#discussion_r434111426 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ## @@ -247,6 +247,9 @@ private void close(final boolean clean) { "state manager close", log ); +} else if (state() == State.CLOSED) { +log.trace("Skip closing since state is {}", state()); +return; } else { throw new IllegalStateException("Illegal state " + state() + " while closing standby task " + id); Review comment: The state could be `RESTORING` what is _illegal_ but not _unknown_ -- We would need more conditions to distinguish both cases (introducing `switch()` would be helpful for this case). Thoughts? I guess this applied to other places in the code, too. I am happy to update all. Was just hesitant. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10084) System test failure: StreamsEosTest.test_failure_and_recovery_complex
John Roesler created KAFKA-10084: Summary: System test failure: StreamsEosTest.test_failure_and_recovery_complex Key: KAFKA-10084 URL: https://issues.apache.org/jira/browse/KAFKA-10084 Project: Kafka Issue Type: Task Reporter: John Roesler Assignee: John Roesler The test failed with message: {code:java} RemoteCommandError: ubuntu@worker14: Command 'grep ALL-RECORDS-DELIVERED /mnt/streams/streams.stdout' returned non-zero exit status 1.{code} And I found in the verifier's stderr: {code:java} java.lang.IllegalStateException: Offset for partition echo-1 is larger than topic endOffset: 2422 > 2421 at org.apache.kafka.streams.tests.EosTestDriver.verifyAllTransactionFinished(EosTestDriver.java:604) at org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:184) at org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:82){code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10084) System test failure: StreamsEosTest.test_failure_and_recovery_complex
[ https://issues.apache.org/jira/browse/KAFKA-10084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10084: - Affects Version/s: 2.7.0 > System test failure: StreamsEosTest.test_failure_and_recovery_complex > - > > Key: KAFKA-10084 > URL: https://issues.apache.org/jira/browse/KAFKA-10084 > Project: Kafka > Issue Type: Task >Affects Versions: 2.6.0, 2.7.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.6.0, 2.7.0 > > > The test failed with message: > {code:java} > RemoteCommandError: ubuntu@worker14: Command 'grep ALL-RECORDS-DELIVERED > /mnt/streams/streams.stdout' returned non-zero exit status 1.{code} > And I found in the verifier's stderr: > {code:java} > java.lang.IllegalStateException: Offset for partition echo-1 is larger than > topic endOffset: 2422 > 2421 > at > org.apache.kafka.streams.tests.EosTestDriver.verifyAllTransactionFinished(EosTestDriver.java:604) > at > org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:184) > at > org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:82){code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10084) System test failure: StreamsEosTest.test_failure_and_recovery_complex
[ https://issues.apache.org/jira/browse/KAFKA-10084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10084: - Fix Version/s: 2.6.0 2.7.0 > System test failure: StreamsEosTest.test_failure_and_recovery_complex > - > > Key: KAFKA-10084 > URL: https://issues.apache.org/jira/browse/KAFKA-10084 > Project: Kafka > Issue Type: Task >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.6.0, 2.7.0 > > > The test failed with message: > {code:java} > RemoteCommandError: ubuntu@worker14: Command 'grep ALL-RECORDS-DELIVERED > /mnt/streams/streams.stdout' returned non-zero exit status 1.{code} > And I found in the verifier's stderr: > {code:java} > java.lang.IllegalStateException: Offset for partition echo-1 is larger than > topic endOffset: 2422 > 2421 > at > org.apache.kafka.streams.tests.EosTestDriver.verifyAllTransactionFinished(EosTestDriver.java:604) > at > org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:184) > at > org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:82){code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10084) System test failure: StreamsEosTest.test_failure_and_recovery_complex
[ https://issues.apache.org/jira/browse/KAFKA-10084?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10084: - Affects Version/s: 2.6.0 > System test failure: StreamsEosTest.test_failure_and_recovery_complex > - > > Key: KAFKA-10084 > URL: https://issues.apache.org/jira/browse/KAFKA-10084 > Project: Kafka > Issue Type: Task >Affects Versions: 2.6.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.6.0, 2.7.0 > > > The test failed with message: > {code:java} > RemoteCommandError: ubuntu@worker14: Command 'grep ALL-RECORDS-DELIVERED > /mnt/streams/streams.stdout' returned non-zero exit status 1.{code} > And I found in the verifier's stderr: > {code:java} > java.lang.IllegalStateException: Offset for partition echo-1 is larger than > topic endOffset: 2422 > 2421 > at > org.apache.kafka.streams.tests.EosTestDriver.verifyAllTransactionFinished(EosTestDriver.java:604) > at > org.apache.kafka.streams.tests.EosTestDriver.verify(EosTestDriver.java:184) > at > org.apache.kafka.streams.tests.StreamsEosTest.main(StreamsEosTest.java:82){code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management
abbccdda commented on a change in pull request #8776: URL: https://github.com/apache/kafka/pull/8776#discussion_r434107127 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ## @@ -1799,8 +1799,8 @@ public void shouldThrowIfClosingOnIllegalState() { task.closeClean(checkpoint); // close call are not idempotent since we are already in closed Review comment: nit: call -> calls ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ## @@ -247,6 +247,9 @@ private void close(final boolean clean) { "state manager close", log ); +} else if (state() == State.CLOSED) { +log.trace("Skip closing since state is {}", state()); Review comment: We could just say `Skip closing since state is closed` here ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -493,28 +542,45 @@ public void closeAndRecycleState() { private Map prepareClose(final boolean clean) { final Map checkpoint; -if (state() == State.CREATED) { -// the task is created and not initialized, just re-write the checkpoint file -checkpoint = Collections.emptyMap(); -} else if (state() == State.RUNNING) { -closeTopology(clean); +switch (state()) { +case CREATED: +// the task is created and not initialized, just re-write the checkpoint file +checkpoint = Collections.emptyMap(); -if (clean) { -stateMgr.flush(); -recordCollector.flush(); -checkpoint = checkpointableOffsets(); -} else { +break; + +case RUNNING: +closeTopology(clean); + +if (clean) { +stateMgr.flush(); +recordCollector.flush(); +checkpoint = checkpointableOffsets(); +} else { +checkpoint = null; // `null` indicates to not write a checkpoint +executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log); +} + +break; + +case RESTORING: +executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log); +checkpoint = Collections.emptyMap(); + +break; + +case SUSPENDED: +// if `SUSPENDED` do not need to checkpoint, since when suspending we've already committed the state checkpoint = null; // `null` indicates to not write a checkpoint -executeAndMaybeSwallow(false, stateMgr::flush, "state manager flush", log); -} -} else if (state() == State.RESTORING) { -executeAndMaybeSwallow(clean, stateMgr::flush, "state manager flush", log); -checkpoint = Collections.emptyMap(); -} else if (state() == State.SUSPENDED) { -// if `SUSPENDED` do not need to checkpoint, since when suspending we've already committed the state -checkpoint = null; // `null` indicates to not write a checkpoint -} else { -throw new IllegalStateException("Illegal state " + state() + " while prepare closing active task " + id); + +break; +case CLOSED: Review comment: Could we merge the case `CLOSED` and `CREATED`? Also could you elaborate why we do empty checkpoint map instead of null? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map changelogEntry : changelogOffsets.entrySet()) { final long offset = changelogEntry.getValue(); -offsetSum += offset; -if (offsetSum < 0) { -log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id); -return Long.MAX_VALUE; +if (offset == Task.LATEST_OFFSET) { Review comment: Should we also check `task.isActive` here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10085) Compute lag correctly for optimized source changelogs
Sophie Blee-Goldman created KAFKA-10085: --- Summary: Compute lag correctly for optimized source changelogs Key: KAFKA-10085 URL: https://issues.apache.org/jira/browse/KAFKA-10085 Project: Kafka Issue Type: Bug Components: streams Reporter: Sophie Blee-Goldman Assignee: Sophie Blee-Goldman Fix For: 2.6.0 During KIP-441 we originally decided to leave the special handling of optimized source changelogs as a potential future improvement, since over-estimating lag was acceptable. But as always things have changed during the course of implementing this KIP, and the algorithm we ended up with requires accurate computation of lag. We should branch the lag computation in the assignor and use the correct end offset sum when computing lag -- This message was sent by Atlassian Jira (v8.3.4#803005)