[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12954: KAFKA-14352: Rack-aware consumer partition assignment protocol changes (KIP-881)
rajinisivaram commented on code in PR #12954: URL: https://github.com/apache/kafka/pull/12954#discussion_r1040685932 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java: ## @@ -47,23 +51,37 @@ public abstract class AbstractPartitionAssignor implements ConsumerPartitionAssi public abstract Map> assign(Map partitionsPerTopic, Map subscriptions); +/** + * Default implementation of assignPartitions() that does not include racks. This is only + * included to avoid breaking any custom implementation that extends AbstractPartitionAssignor. + * Note that this class is internal, but to be safe, we are maintaining compatibility. + */ +public Map> assignPartitions(Map> partitionsPerTopic, Review Comment: Removed. ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocolTest.java: ## @@ -56,7 +56,7 @@ public void serializeDeserializeSubscriptionAllVersions() { new TopicPartition("foo", 0), new TopicPartition("bar", 0)); Subscription subscription = new Subscription(Arrays.asList("foo", "bar"), -ByteBuffer.wrap("hello".getBytes()), ownedPartitions, generationId); +ByteBuffer.wrap("hello".getBytes()), ownedPartitions, generationId, Optional.empty()); Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14440) Local state wipeout with EOS
[ https://issues.apache.org/jira/browse/KAFKA-14440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17643737#comment-17643737 ] Abdullah alkhawatrah commented on KAFKA-14440: -- Makes sense. Thanks! > Local state wipeout with EOS > > > Key: KAFKA-14440 > URL: https://issues.apache.org/jira/browse/KAFKA-14440 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.2.3 >Reporter: Abdullah alkhawatrah >Priority: Major > Attachments: Screenshot 2022-12-02 at 09.26.27.png > > > Hey, > I have a kafka streams service that aggregates events from multiple input > topics (running in a k8s cluster). The topology has multiple FKJs. The input > topics have around 7 billion events when the service was started from > `earliest`. > The service has EOS enabled and > {code:java} > transaction.timeout.ms: 60{code} > The problem I am having is with frequent local state wipe-outs, this leads to > very long rebalances. As can be seen from the attached images, local disk > sizes go to ~ 0 very often. These wipe out are part of the EOS guarantee > based on this log message: > {code:java} > State store transfer-store did not find checkpoint offsets while stores are > not empty, since under EOS it has the risk of getting uncommitted data in > stores we have to treat it as a task corruption error and wipe out the local > state of task 1_8 before re-bootstrapping{code} > > I noticed that this happens as a result of one of the following: > * Process gets sigkill when running out of memory or on failure to shutdown > gracefully on pod rotation for example, this explains the missing local > checkpoint file, but for some reason I thought local checkpoint updates are > frequent, so I expected to get part of the state to be reset but not the > whole local state. > * Although we have a long transaction timeout config, this appears many > times in the logs, after which kafka streams gets into error state. On > startup, local checkpoint file is not found: > {code:java} > Transiting to abortable error state due to > org.apache.kafka.common.errors.InvalidProducerEpochException: Producer > attempted to produce with an old epoch.{code} > The service has 10 instances all having the same behaviour. The issue > disappears when EOS is disabled. > The kafka cluster runs kafka 2.6, with minimum isr of 3. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rajinisivaram commented on a diff in pull request #12954: KAFKA-14352: Rack-aware consumer partition assignment protocol changes (KIP-881)
rajinisivaram commented on code in PR #12954: URL: https://github.com/apache/kafka/pull/12954#discussion_r1040687385 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -245,7 +248,8 @@ protected JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() { Subscription subscription = new Subscription(topics, assignor.subscriptionUserData(joinedSubscription), subscriptions.assignedPartitionsList(), - generation().generationId); + generation().generationId, + rackId); Review Comment: Added a unit test. Also added an integration test for verifying propagation since we haven't yet added a rack-aware assignor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #12954: KAFKA-14352: Rack-aware consumer partition assignment protocol changes (KIP-881)
rajinisivaram commented on PR #12954: URL: https://github.com/apache/kafka/pull/12954#issuecomment-1339003475 @dajac Thanks for the review, have addressed the comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14445) Producer doesn't request metadata update on REQUEST_TIMED_OUT
Haruki Okada created KAFKA-14445: Summary: Producer doesn't request metadata update on REQUEST_TIMED_OUT Key: KAFKA-14445 URL: https://issues.apache.org/jira/browse/KAFKA-14445 Project: Kafka Issue Type: Improvement Reporter: Haruki Okada Produce requests may fail with timeout by `request.timeout.ms` in below two cases: * Didn't receive produce response within `request.timeout.ms` * Produce response received, but it ended up with `REQUEST_TIMEOUT_MS` in the broker Former case usually happens when a broker-machine got failed or there's network glitch etc. In this case, the connection will be disconnected and metadata-update will be requested to discover new leader: [https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L556] The problem is in latter case (REQUEST_TIMED_OUT on the broker). In this case, the produce request will be ended up with TimeoutException, which doesn't inherit InvalidMetadataException so it doesn't trigger metadata update. Typical cause of REQUEST_TIMED_OUT is replication delay due to follower-side problem, that metadata-update doesn't make much sense indeed. However, we found that in some cases, stale metadata on REQUEST_TIMED_OUT could cause produce requests to retry unnecessarily , which may end up with batch expiration due to delivery timeout. Below is the scenario we experienced: * Environment: ** Partition tp-0 has 3 replicas, 1, 2, 3. Leader is 1 ** min.insync.replicas=2 ** acks=all * Scenario: ** broker 1 "partially" failed *** It lost ZooKeeper connection and kicked out from the cluster There was controller log like: * {code:java} [2022-12-04 08:01:04,013] INFO [Controller id=XX] Newly added brokers: , deleted brokers: 1, bounced brokers: {code} *** However, somehow the broker was able continued to receive produce requests We're still working on investigating how this is possible though. Indeed, broker 1 was somewhat "alive" and keeps working according to server.log *** In other words, broker 1 became "zombie" ** broker 2 was elected as new leader *** broker 3 became follower of broker 2 *** However, since broker 1 was still out of cluster, it didn't receive LeaderAndIsr so 1 kept thinking itself as the leader of tp-0 ** Meanwhile, producer keeps sending produce requests to broker 1 and requests were failed due to REQUEST_TIMED_OUT because no brokers replicates from broker 1. *** REQUEST_TIMED_OUT doesn't trigger metadata update, so produce didn't have a change to update its stale metadata So I suggest to request metadata update even on REQUEST_TIMED_OUT exception, for the case that the old leader became "zombie" -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ableegoldman merged pull request #12803: KAFKA-13602: Adding ability to multicast records.
ableegoldman merged PR #12803: URL: https://github.com/apache/kafka/pull/12803 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14445) Producer doesn't request metadata update on REQUEST_TIMED_OUT
[ https://issues.apache.org/jira/browse/KAFKA-14445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17643768#comment-17643768 ] Haruki Okada commented on KAFKA-14445: -- If the suggestion makes sense, we're happy to send a patch. > Producer doesn't request metadata update on REQUEST_TIMED_OUT > - > > Key: KAFKA-14445 > URL: https://issues.apache.org/jira/browse/KAFKA-14445 > Project: Kafka > Issue Type: Improvement >Reporter: Haruki Okada >Priority: Major > > Produce requests may fail with timeout by `request.timeout.ms` in below two > cases: > * Didn't receive produce response within `request.timeout.ms` > * Produce response received, but it ended up with `REQUEST_TIMEOUT_MS` in > the broker > Former case usually happens when a broker-machine got failed or there's > network glitch etc. > In this case, the connection will be disconnected and metadata-update will be > requested to discover new leader: > [https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L556] > > The problem is in latter case (REQUEST_TIMED_OUT on the broker). > In this case, the produce request will be ended up with TimeoutException, > which doesn't inherit InvalidMetadataException so it doesn't trigger metadata > update. > > Typical cause of REQUEST_TIMED_OUT is replication delay due to follower-side > problem, that metadata-update doesn't make much sense indeed. > > However, we found that in some cases, stale metadata on REQUEST_TIMED_OUT > could cause produce requests to retry unnecessarily , which may end up with > batch expiration due to delivery timeout. > Below is the scenario we experienced: > * Environment: > ** Partition tp-0 has 3 replicas, 1, 2, 3. Leader is 1 > ** min.insync.replicas=2 > ** acks=all > * Scenario: > ** broker 1 "partially" failed > *** It lost ZooKeeper connection and kicked out from the cluster > There was controller log like: > * > {code:java} > [2022-12-04 08:01:04,013] INFO [Controller id=XX] Newly added brokers: , > deleted brokers: 1, bounced brokers: {code} > *** However, somehow the broker was able continued to receive produce > requests > We're still working on investigating how this is possible though. > Indeed, broker 1 was somewhat "alive" and keeps working according to > server.log > *** In other words, broker 1 became "zombie" > ** broker 2 was elected as new leader > *** broker 3 became follower of broker 2 > *** However, since broker 1 was still out of cluster, it didn't receive > LeaderAndIsr so 1 kept thinking itself as the leader of tp-0 > ** Meanwhile, producer keeps sending produce requests to broker 1 and > requests were failed due to REQUEST_TIMED_OUT because no brokers replicates > from broker 1. > *** REQUEST_TIMED_OUT doesn't trigger metadata update, so produce didn't > have a change to update its stale metadata > > So I suggest to request metadata update even on REQUEST_TIMED_OUT exception, > for the case that the old leader became "zombie" -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14445) Producer doesn't request metadata update on REQUEST_TIMED_OUT
[ https://issues.apache.org/jira/browse/KAFKA-14445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haruki Okada updated KAFKA-14445: - Description: Produce requests may fail with timeout by `request.timeout.ms` in below two cases: * Didn't receive produce response within `request.timeout.ms` * Produce response received, but it ended up with `REQUEST_TIMED_OUT` in the broker Former case usually happens when a broker-machine got failed or there's network glitch etc. In this case, the connection will be disconnected and metadata-update will be requested to discover new leader: [https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L556] The problem is in latter case (REQUEST_TIMED_OUT on the broker). In this case, the produce request will be ended up with TimeoutException, which doesn't inherit InvalidMetadataException so it doesn't trigger metadata update. Typical cause of REQUEST_TIMED_OUT is replication delay due to follower-side problem, that metadata-update doesn't make much sense indeed. However, we found that in some cases, stale metadata on REQUEST_TIMED_OUT could cause produce requests to retry unnecessarily , which may end up with batch expiration due to delivery timeout. Below is the scenario we experienced: * Environment: ** Partition tp-0 has 3 replicas, 1, 2, 3. Leader is 1 ** min.insync.replicas=2 ** acks=all * Scenario: ** broker 1 "partially" failed *** It lost ZooKeeper connection and kicked out from the cluster There was controller log like: * {code:java} [2022-12-04 08:01:04,013] INFO [Controller id=XX] Newly added brokers: , deleted brokers: 1, bounced brokers: {code} * ** *** However, somehow the broker was able continued to receive produce requests We're still working on investigating how this is possible though. Indeed, broker 1 was somewhat "alive" and keeps working according to server.log *** In other words, broker 1 became "zombie" ** broker 2 was elected as new leader *** broker 3 became follower of broker 2 *** However, since broker 1 was still out of cluster, it didn't receive LeaderAndIsr so 1 kept thinking itself as the leader of tp-0 ** Meanwhile, producer keeps sending produce requests to broker 1 and requests were failed due to REQUEST_TIMED_OUT because no brokers replicates from broker 1. *** REQUEST_TIMED_OUT doesn't trigger metadata update, so produce didn't have a change to update its stale metadata So I suggest to request metadata update even on REQUEST_TIMED_OUT exception, for the case that the old leader became "zombie" was: Produce requests may fail with timeout by `request.timeout.ms` in below two cases: * Didn't receive produce response within `request.timeout.ms` * Produce response received, but it ended up with `REQUEST_TIMEOUT_MS` in the broker Former case usually happens when a broker-machine got failed or there's network glitch etc. In this case, the connection will be disconnected and metadata-update will be requested to discover new leader: [https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L556] The problem is in latter case (REQUEST_TIMED_OUT on the broker). In this case, the produce request will be ended up with TimeoutException, which doesn't inherit InvalidMetadataException so it doesn't trigger metadata update. Typical cause of REQUEST_TIMED_OUT is replication delay due to follower-side problem, that metadata-update doesn't make much sense indeed. However, we found that in some cases, stale metadata on REQUEST_TIMED_OUT could cause produce requests to retry unnecessarily , which may end up with batch expiration due to delivery timeout. Below is the scenario we experienced: * Environment: ** Partition tp-0 has 3 replicas, 1, 2, 3. Leader is 1 ** min.insync.replicas=2 ** acks=all * Scenario: ** broker 1 "partially" failed *** It lost ZooKeeper connection and kicked out from the cluster There was controller log like: * {code:java} [2022-12-04 08:01:04,013] INFO [Controller id=XX] Newly added brokers: , deleted brokers: 1, bounced brokers: {code} *** However, somehow the broker was able continued to receive produce requests We're still working on investigating how this is possible though. Indeed, broker 1 was somewhat "alive" and keeps working according to server.log *** In other words, broker 1 became "zombie" ** broker 2 was elected as new leader *** broker 3 became follower of broker 2 *** However, since broker 1 was still out of cluster, it didn't receive LeaderAndIsr so 1 kept thinking itself as the leader of tp-0 ** Meanwhile, producer keeps sending produce requests to broker 1 and requests were failed due to REQUEST_TIMED_OUT because no brokers replicates from broker 1. *** REQUEST_TIMED_OUT doe
[GitHub] [kafka] lucasbru commented on a diff in pull request #12935: KAFKA-14432: RocksDBStore relies on finalizers to not leak memory
lucasbru commented on code in PR #12935: URL: https://github.com/apache/kafka/pull/12935#discussion_r1040792337 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ## @@ -526,6 +521,9 @@ public synchronized void close() { fOptions.close(); filter.close(); cache.close(); +if (statistics != null) { +statistics.close(); +} Review Comment: The user still needs to close the statistics he provided. This will just close the statistics handle. I added a test for this. It doesn't check everything, but that is hard to do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #12935: KAFKA-14432: RocksDBStore relies on finalizers to not leak memory
lucasbru commented on code in PR #12935: URL: https://github.com/apache/kafka/pull/12935#discussion_r1040807515 ## streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java: ## @@ -346,20 +346,6 @@ public void shouldNotAddItselfToRecordingTriggerWhenNotEmpty() { verifyNoMoreInteractions(recordingTrigger); } -@Test -public void shouldCloseStatisticsWhenValueProvidersAreRemoved() { -recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); -recorder.removeValueProviders(SEGMENT_STORE_NAME_1); -verify(statisticsToAdd1).close(); -} - -@Test -public void shouldNotCloseStatisticsWhenValueProvidersWithoutStatisticsAreRemoved() { -recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null); -recorder.removeValueProviders(SEGMENT_STORE_NAME_1); -verify(statisticsToAdd1, never()).close(); -} Review Comment: Yes, done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #12935: KAFKA-14432: RocksDBStore relies on finalizers to not leak memory
lucasbru commented on code in PR #12935: URL: https://github.com/apache/kafka/pull/12935#discussion_r1040808154 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ## @@ -229,40 +230,34 @@ void openDB(final Map configs, final File stateDir) { // Setup statistics before the database is opened, otherwise the statistics are not updated // with the measurements from Rocks DB -maybeSetUpStatistics(configs); - +statistics = userSpecifiedOptions.statistics(); +if (statistics == null) { +if (RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) { +statistics = new Statistics(); +dbOptions.setStatistics(statistics); +} +userSpecifiedStatistics = false; +} else { +userSpecifiedStatistics = true; +} Review Comment: For me personally, spreading linear code across methods with only one call-site does not help readability. Anyway, going with the PMC member on this one, done ;) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12935: KAFKA-14432: RocksDBStore relies on finalizers to not leak memory
cadonna commented on code in PR #12935: URL: https://github.com/apache/kafka/pull/12935#discussion_r1040845929 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ## @@ -229,40 +230,34 @@ void openDB(final Map configs, final File stateDir) { // Setup statistics before the database is opened, otherwise the statistics are not updated // with the measurements from Rocks DB -maybeSetUpStatistics(configs); - +statistics = userSpecifiedOptions.statistics(); +if (statistics == null) { +if (RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) { +statistics = new Statistics(); +dbOptions.setStatistics(statistics); +} +userSpecifiedStatistics = false; +} else { +userSpecifiedStatistics = true; +} Review Comment: While I get the joke, it is not a matter of PMC member or not. If you have good arguments, I am happy to be convinced. When I read code, I do not want to be distracted by every detail right away. For the first overview when reading a method -- for me -- it is enough to know that the statistics are set up. So reading `setupStatistics()` would be enough for me. Then, if I need to understand how the statistics are setup I would look into setupStatistics(). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #12935: KAFKA-14432: RocksDBStore relies on finalizers to not leak memory
lucasbru commented on code in PR #12935: URL: https://github.com/apache/kafka/pull/12935#discussion_r1040889287 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ## @@ -229,40 +230,34 @@ void openDB(final Map configs, final File stateDir) { // Setup statistics before the database is opened, otherwise the statistics are not updated // with the measurements from Rocks DB -maybeSetUpStatistics(configs); - +statistics = userSpecifiedOptions.statistics(); +if (statistics == null) { +if (RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) { +statistics = new Statistics(); +dbOptions.setStatistics(statistics); +} +userSpecifiedStatistics = false; +} else { +userSpecifiedStatistics = true; +} Review Comment: Yes, I was just joking. Normally, I'd be happy to convince you with arguments, but I think in this case it's really a matter of personal preference or convention among the engineers in the project (and you know those better, honestly!). In this particular case, I think a comment and newlines help to split the function into correlated "blocks". ``` // setup statistics ... ... // next block ``` Arguably, this is not as clearly scoped as a separate method, as you say. But it does give me extra information when reading: I do not at all have to worry about this code if I am trying to understand any of the other methods, because the code cannot be called from other methods. Also, I don't have to use my IDE to jump around to understand the flow of execution. Everyone will make these kinds of trade-offs slightly differently, I am happy to err on the side of project convention here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru closed pull request #12925: Revert "[KAFKA-14324] Upgrade RocksDB to 7.1.2 (#12809)"
lucasbru closed pull request #12925: Revert "[KAFKA-14324] Upgrade RocksDB to 7.1.2 (#12809)" URL: https://github.com/apache/kafka/pull/12925 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12800: KAFKA-14342: KafkaOffsetBackingStore should clear offsets for source partitions on tombstone messages
C0urante commented on code in PR #12800: URL: https://github.com/apache/kafka/pull/12800#discussion_r1039745954 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java: ## @@ -329,6 +329,60 @@ public void testGetSetNull() throws Exception { PowerMock.verifyAll(); } +@Test +public void testTombstoneOffset() throws Exception { +expectConfigure(); +expectStart(Collections.singletonList(new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), TP0_VALUE.array(), +new RecordHeaders(), Optional.empty(; + +Capture producerCallback = EasyMock.newCapture(); +storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.isNull(byte[].class), EasyMock.capture(producerCallback)); +PowerMock.expectLastCall(); + +final Capture> readToEndCallback = EasyMock.newCapture(); +storeLog.readToEnd(EasyMock.capture(readToEndCallback)); +PowerMock.expectLastCall().andAnswer(() -> { +capturedConsumedCallback.getValue().onCompletion(null, +new ConsumerRecord<>(TOPIC, 1, 0, 0L, TimestampType.CREATE_TIME, 0, 0, TP0_KEY.array(), null, +new RecordHeaders(), Optional.empty())); +readToEndCallback.getValue().onCompletion(null, null); +return null; +}); + +expectStop(); +expectClusterId(); + +PowerMock.replayAll(); + +store.configure(DEFAULT_DISTRIBUTED_CONFIG); +store.start(); + +// Write tombstone offset +Map toSet = new HashMap<>(); +toSet.put(TP0_KEY, null); + +final AtomicBoolean invoked = new AtomicBoolean(false); +Future setFuture = store.set(toSet, (error, result) -> invoked.set(true)); +assertFalse(setFuture.isDone()); +producerCallback.getValue().onCompletion(null, null); +setFuture.get(1, TimeUnit.MILLISECONDS); +assertTrue(invoked.get()); + +// Getting data should read to end of our published data and return it +Map offsets = store.get(Collections.singletonList(TP0_KEY)).get(1, TimeUnit.MILLISECONDS); +assertNull(offsets.get(TP0_KEY)); + +// Just verifying that KafkaOffsetBackingStore::get returns null isn't enough, we also need to verify that the mapping for the source partition key is removed. +// This is because KafkaOffsetBackingStore::get returns null if either there is no existing offset for the source partition or if there is an offset with null value. +// We need to make sure that tombstoned offsets are removed completely (i.e. that the mapping for the corresponding source partition is removed). +HashMap data = Whitebox.getInternalState(store, "data"); +assertFalse(data.containsKey(TP0_KEY)); Review Comment: Sorry, what exactly is clearer about writing a new test case instead of modifying the existing one? The logic we're testing here is directly related to handling `null` values since that's how tombstones are represented in Java. Plus, we're going to have to add logic for writing a non-null value with a given key no matter what, but with a new test case, we also have to duplicate the existing logic for writing a null value for that same key. Finally, considering how much overhead is involved in the setup of every test case in this class, we should be trying to minimize the number of test cases where possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14445) Producer doesn't request metadata update on REQUEST_TIMED_OUT
[ https://issues.apache.org/jira/browse/KAFKA-14445?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Haruki Okada updated KAFKA-14445: - Description: Produce requests may fail with timeout by `request.timeout.ms` in below two cases: * Didn't receive produce response within `request.timeout.ms` * Produce response received, but it ended up with `REQUEST_TIMED_OUT` in the broker Former case usually happens when a broker-machine got failed or there's network glitch etc. In this case, the connection will be disconnected and metadata-update will be requested to discover new leader: [https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L556] The problem is in latter case (REQUEST_TIMED_OUT on the broker). In this case, the produce request will be ended up with TimeoutException, which doesn't inherit InvalidMetadataException so it doesn't trigger metadata update. Typical cause of REQUEST_TIMED_OUT is replication delay due to follower-side problem, that metadata-update doesn't make much sense indeed. However, we found that in some cases, stale metadata on REQUEST_TIMED_OUT could cause produce requests to retry unnecessarily , which may end up with batch expiration due to delivery timeout. Below is the scenario we experienced: * Environment: ** Partition tp-0 has 3 replicas, 1, 2, 3. Leader is 1 ** min.insync.replicas=2 ** acks=all * Scenario: ** broker 1 "partially" failed *** It lost ZooKeeper connection and kicked out from the cluster There was controller log like: * {code:java} [2022-12-04 08:01:04,013] INFO [Controller id=XX] Newly added brokers: , deleted brokers: 1, bounced brokers: {code} * ** *** However, somehow the broker was able continued to receive produce requests We're still working on investigating how this is possible though. Indeed, broker 1 was somewhat "alive" and keeps working according to server.log *** In other words, broker 1 became "zombie" ** broker 2 was elected as new leader *** broker 3 became follower of broker 2 *** However, since broker 1 was still out of cluster, it didn't receive LeaderAndIsr so 1 kept thinking itself as the leader of tp-0 ** Meanwhile, producer keeps sending produce requests to broker 1 and requests were failed due to REQUEST_TIMED_OUT because no brokers replicates from broker 1. *** REQUEST_TIMED_OUT doesn't trigger metadata update, so produce didn't have a change to update its stale metadata So I suggest to request metadata update even on REQUEST_TIMED_OUT exception, to address the case that the old leader became "zombie" was: Produce requests may fail with timeout by `request.timeout.ms` in below two cases: * Didn't receive produce response within `request.timeout.ms` * Produce response received, but it ended up with `REQUEST_TIMED_OUT` in the broker Former case usually happens when a broker-machine got failed or there's network glitch etc. In this case, the connection will be disconnected and metadata-update will be requested to discover new leader: [https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L556] The problem is in latter case (REQUEST_TIMED_OUT on the broker). In this case, the produce request will be ended up with TimeoutException, which doesn't inherit InvalidMetadataException so it doesn't trigger metadata update. Typical cause of REQUEST_TIMED_OUT is replication delay due to follower-side problem, that metadata-update doesn't make much sense indeed. However, we found that in some cases, stale metadata on REQUEST_TIMED_OUT could cause produce requests to retry unnecessarily , which may end up with batch expiration due to delivery timeout. Below is the scenario we experienced: * Environment: ** Partition tp-0 has 3 replicas, 1, 2, 3. Leader is 1 ** min.insync.replicas=2 ** acks=all * Scenario: ** broker 1 "partially" failed *** It lost ZooKeeper connection and kicked out from the cluster There was controller log like: * {code:java} [2022-12-04 08:01:04,013] INFO [Controller id=XX] Newly added brokers: , deleted brokers: 1, bounced brokers: {code} * ** *** However, somehow the broker was able continued to receive produce requests We're still working on investigating how this is possible though. Indeed, broker 1 was somewhat "alive" and keeps working according to server.log *** In other words, broker 1 became "zombie" ** broker 2 was elected as new leader *** broker 3 became follower of broker 2 *** However, since broker 1 was still out of cluster, it didn't receive LeaderAndIsr so 1 kept thinking itself as the leader of tp-0 ** Meanwhile, producer keeps sending produce requests to broker 1 and requests were failed due to REQUEST_TIMED_OUT because no brokers replicates from broker 1. *** REQUES
[GitHub] [kafka] cadonna commented on a diff in pull request #12935: KAFKA-14432: RocksDBStore relies on finalizers to not leak memory
cadonna commented on code in PR #12935: URL: https://github.com/apache/kafka/pull/12935#discussion_r1041000955 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ## @@ -229,40 +230,34 @@ void openDB(final Map configs, final File stateDir) { // Setup statistics before the database is opened, otherwise the statistics are not updated // with the measurements from Rocks DB -maybeSetUpStatistics(configs); - +statistics = userSpecifiedOptions.statistics(); +if (statistics == null) { +if (RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) { +statistics = new Statistics(); +dbOptions.setStatistics(statistics); +} +userSpecifiedStatistics = false; +} else { +userSpecifiedStatistics = true; +} Review Comment: I am not a big fan of comments. They eventually start to lie because they are not maintained as well as executable parts of the code. I prefer to extract a code block into a method with a meaningful name instead of using a inline comment. One could argue, that the name of a method might also start to lie, but IMO that is less likely compared to an inline comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on pull request #12935: KAFKA-14432: RocksDBStore relies on finalizers to not leak memory
lucasbru commented on PR #12935: URL: https://github.com/apache/kafka/pull/12935#issuecomment-1339425004 Failing tests are unrelated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #12800: KAFKA-14342: KafkaOffsetBackingStore should clear offsets for source partitions on tombstone messages
C0urante merged PR #12800: URL: https://github.com/apache/kafka/pull/12800 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12937: KAFKA-13881: Add Connect package infos
C0urante commented on code in PR #12937: URL: https://github.com/apache/kafka/pull/12937#discussion_r1041131493 ## connect/api/src/main/java/org/apache/kafka/connect/header/package-info.java: ## @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Provides API for application-defined metadata attached to Connect records. Review Comment: Nit: ```suggestion * Provides an API for application-defined metadata attached to Connect records. ``` ## connect/api/src/main/java/org/apache/kafka/connect/sink/package-info.java: ## @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Provides API for implementing connectors which write Kafka records to external applications. Review Comment: Nits: ```suggestion * Provides an API for implementing connectors which write Kafka records to external applications, * also known as sink connectors. ``` ## connect/api/src/main/java/org/apache/kafka/connect/connector/package-info.java: ## @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Provides pluggable interfaces for Connector and Task implementations. Review Comment: Nit: is "pluggable" really necessary here? I've always used that term to identify parts of a code base or application that can be swapped out for something else, but only on an opt-in basis, and with a reasonable default available out of the box. Since there is no default `Connector` or `Task` implementation and it's required to write one in order to run a Kafka Connect connector, I'd opt to leave this bit out. Also worth noting that the `ConnectorContext` and `ConnectRecord` classes aren't really pluggable; they're defined by the Connect framework and users have no chance to swap in their own implementation (at least, not in a way that alters how they are used by the Connect framework). ## connect/api/src/main/java/org/apache/kafka/connect/transforms/package-info.java: ## @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, +
[jira] [Updated] (KAFKA-14443) Mirror Maker Connectors leak admin clients used for topic creation
[ https://issues.apache.org/jira/browse/KAFKA-14443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14443: -- Affects Version/s: 3.4.0 > Mirror Maker Connectors leak admin clients used for topic creation > -- > > Key: KAFKA-14443 > URL: https://issues.apache.org/jira/browse/KAFKA-14443 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.4.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Critical > > the MirrorMaker connectors are each responsible for creating internal topics. > For example, the Checkpoint connector creates a forwarding admin and passes > it to a method to create the topic, but never closes the ForwardingAdmin or > delegate objects: > [https://github.com/apache/kafka/blob/13c9c78a1f4ad92023e8354069c6817b44c89ce6/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java#L161-L164] > Instead, this object should be intentionally closed when it is no longer > needed, to prevent consuming resources in a running MM2 application. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14443) Mirror Maker Connectors leak admin clients used for topic creation
[ https://issues.apache.org/jira/browse/KAFKA-14443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-14443: -- Priority: Critical (was: Major) > Mirror Maker Connectors leak admin clients used for topic creation > -- > > Key: KAFKA-14443 > URL: https://issues.apache.org/jira/browse/KAFKA-14443 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Critical > > the MirrorMaker connectors are each responsible for creating internal topics. > For example, the Checkpoint connector creates a forwarding admin and passes > it to a method to create the topic, but never closes the ForwardingAdmin or > delegate objects: > [https://github.com/apache/kafka/blob/13c9c78a1f4ad92023e8354069c6817b44c89ce6/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java#L161-L164] > Instead, this object should be intentionally closed when it is no longer > needed, to prevent consuming resources in a running MM2 application. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mumrah commented on a diff in pull request #12958: MINOR: Move dynamic config logic to DynamicConfigPublisher
mumrah commented on code in PR #12958: URL: https://github.com/apache/kafka/pull/12958#discussion_r1041165700 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -71,7 +71,7 @@ class BrokerServer( val initialOfflineDirs: Seq[String], ) extends KafkaBroker { val threadNamePrefix = sharedServer.threadNamePrefix - val config = sharedServer.config + val config = new KafkaConfig(sharedServer.sharedServerConfig.props, false, None) Review Comment: Could we add a method on SharedServer like `createKafkaConfig` rather than expose `sharedServerConfig` to BrokerServer/ControllerServer? I worry someone down the line might read from the shared config rather than the local one thats getting dynamic updates. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a diff in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.
satishd commented on code in PR #11390: URL: https://github.com/apache/kafka/pull/11390#discussion_r1041219212 ## core/src/main/scala/kafka/server/ReplicaFetcherThread.scala: ## @@ -386,11 +397,147 @@ class ReplicaFetcherThread(name: String, } /** - * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, - * the quota is exceeded and the replica is not in sync. + * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, + * the quota is exceeded and the replica is not in sync. */ private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = { !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded } + /** + * It tries to build the required state for this partition from leader and remote storage so that it can start + * fetching records from the leader. + */ + override protected def buildRemoteLogAuxState(partition: TopicPartition, +currentLeaderEpoch: Int, +leaderLocalLogStartOffset: Long, + epochForLeaderLocalLogStartOffset: Int, +leaderLogStartOffset: Long): Long = { + +def fetchEarlierEpochEndOffset(epoch: Int): EpochEndOffset = { + val previousEpoch = epoch - 1 + // Find the end-offset for the epoch earlier to the given epoch from the leader + val partitionsWithEpochs = Map(partition -> new EpochData().setPartition(partition.partition()) +.setCurrentLeaderEpoch(currentLeaderEpoch) +.setLeaderEpoch(previousEpoch)) + val maybeEpochEndOffset = fetchEpochEndOffsetsFromLeader(partitionsWithEpochs).get(partition) + if (maybeEpochEndOffset.isEmpty) { +throw new KafkaException("No response received for partition: " + partition); + } + + val epochEndOffset = maybeEpochEndOffset.get + if (epochEndOffset.errorCode() != Errors.NONE.code()) { +throw Errors.forCode(epochEndOffset.errorCode()).exception() + } + + epochEndOffset +} + +val log = replicaMgr.localLogOrException(partition) +val nextOffset = { + if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) { +if (replicaMgr.remoteLogManager.isEmpty) throw new IllegalStateException("RemoteLogManager is not yet instantiated") + +val rlm = replicaMgr.remoteLogManager.get + +// Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache +// until that offset +val previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1 +val targetEpoch: Int = { + // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1) + // will have the same epoch. + if (epochForLeaderLocalLogStartOffset == 0) { +epochForLeaderLocalLogStartOffset + } else { +// Fetch the earlier epoch/end-offset(exclusive) from the leader. +val earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset) +// Check if the target offset lies with in the range of earlier epoch. Here, epoch's end-offset is exclusive. +if (earlierEpochEndOffset.endOffset > previousOffsetToLeaderLocalLogStartOffset) { + // Always use the leader epoch from returned earlierEpochEndOffset. + // This gives the respective leader epoch, that will handle any gaps in epochs. + // For ex, leader epoch cache contains: + // leader-epoch start-offset + // 0 20 + // 1 85 + // <2> - gap no messages were appended in this leader epoch. + // 3 90 + // 4 98 + // There is a gap in leader epoch. For leaderLocalLogStartOffset as 90, leader-epoch is 3. + // fetchEarlierEpochEndOffset(2) will return leader-epoch as 1, end-offset as 90. + // So, for offset 89, we should return leader epoch as 1 like below. + earlierEpochEndOffset.leaderEpoch() +} else epochForLeaderLocalLogStartOffset + } +} + +val maybeRlsm = rlm.fetchRemoteLogSegmentMetadata(partition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset) + +if (maybeRlsm.isPresent) { + val remoteLogSegmentMetadata = maybeRlsm.get() + // Build leader epoch cache, producer snapshots until remoteLogSegmentMetadata.endOffset() and start + // segments from (remoteLogSegmentMetadata.endOffse
[GitHub] [kafka] lucasbru opened a new pull request, #12959: MINOR: Fix various memory leaks in tests
lucasbru opened a new pull request, #12959: URL: https://github.com/apache/kafka/pull/12959 Various tests in the streams park were leaking native memory. Most tests were fixed by closing the corresponding rocksdb resource. I tested that the corresponding leak is gone by using a previous rocksdb release with finalizers and checking if the finalizers would be called at some point. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #12932: KAFKA-14425; The Kafka protocol should support nullable structs
dajac commented on PR #12932: URL: https://github.com/apache/kafka/pull/12932#issuecomment-1339754347 @cmccabe Could you review this one please? The vote is not closed yet but we have all the votes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12946: KAFKA-14427 ZK client support for migrations
cmccabe commented on code in PR #12946: URL: https://github.com/apache/kafka/pull/12946#discussion_r1041287139 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -54,6 +54,19 @@ import org.apache.kafka.common.utils.Sanitizer import scala.collection.{Map, mutable, _} import scala.jdk.CollectionConverters._ +object ZkAdminManager { + def clientQuotaPropsToDoubleMap(props: Map[String, String]): Map[String, Double] = { Review Comment: can we have a unit test for this function? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12946: KAFKA-14427 ZK client support for migrations
cmccabe commented on code in PR #12946: URL: https://github.com/apache/kafka/pull/12946#discussion_r1041288418 ## metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java: ## @@ -100,26 +106,30 @@ public boolean zkMigrationComplete() { @Override public String toString() { return "ZkMigrationLeadershipState{" + -"kraftControllerId=" + kraftControllerId + -", kraftControllerEpoch=" + kraftControllerEpoch + -", kraftMetadataOffset=" + kraftMetadataOffset + -", kraftMetadataEpoch=" + kraftMetadataEpoch + -", lastUpdatedTimeMs=" + lastUpdatedTimeMs + -", migrationZkVersion=" + migrationZkVersion + -", controllerZkVersion=" + controllerZkVersion + -'}'; +"kraftControllerId=" + kraftControllerId + +", kraftControllerEpoch=" + kraftControllerEpoch + +", kraftMetadataOffset=" + kraftMetadataOffset + +", kraftMetadataEpoch=" + kraftMetadataEpoch + +", lastUpdatedTimeMs=" + lastUpdatedTimeMs + +", migrationZkVersion=" + migrationZkVersion + +", controllerZkVersion=" + controllerZkVersion + +'}'; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ZkMigrationLeadershipState that = (ZkMigrationLeadershipState) o; -return kraftControllerId == that.kraftControllerId && kraftControllerEpoch == that.kraftControllerEpoch && kraftMetadataOffset == that.kraftMetadataOffset && kraftMetadataEpoch == that.kraftMetadataEpoch && lastUpdatedTimeMs == that.lastUpdatedTimeMs && migrationZkVersion == that.migrationZkVersion && controllerZkVersion == that.controllerZkVersion; +return kraftControllerId == that.kraftControllerId && kraftControllerEpoch == that.kraftControllerEpoch Review Comment: can we do one line per comparison? a little hard to read otherwise ## metadata/src/main/java/org/apache/kafka/metadata/migration/ZkMigrationLeadershipState.java: ## @@ -100,26 +106,30 @@ public boolean zkMigrationComplete() { @Override public String toString() { return "ZkMigrationLeadershipState{" + -"kraftControllerId=" + kraftControllerId + -", kraftControllerEpoch=" + kraftControllerEpoch + -", kraftMetadataOffset=" + kraftMetadataOffset + -", kraftMetadataEpoch=" + kraftMetadataEpoch + -", lastUpdatedTimeMs=" + lastUpdatedTimeMs + -", migrationZkVersion=" + migrationZkVersion + -", controllerZkVersion=" + controllerZkVersion + -'}'; +"kraftControllerId=" + kraftControllerId + +", kraftControllerEpoch=" + kraftControllerEpoch + +", kraftMetadataOffset=" + kraftMetadataOffset + +", kraftMetadataEpoch=" + kraftMetadataEpoch + +", lastUpdatedTimeMs=" + lastUpdatedTimeMs + +", migrationZkVersion=" + migrationZkVersion + +", controllerZkVersion=" + controllerZkVersion + +'}'; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ZkMigrationLeadershipState that = (ZkMigrationLeadershipState) o; -return kraftControllerId == that.kraftControllerId && kraftControllerEpoch == that.kraftControllerEpoch && kraftMetadataOffset == that.kraftMetadataOffset && kraftMetadataEpoch == that.kraftMetadataEpoch && lastUpdatedTimeMs == that.lastUpdatedTimeMs && migrationZkVersion == that.migrationZkVersion && controllerZkVersion == that.controllerZkVersion; +return kraftControllerId == that.kraftControllerId && kraftControllerEpoch == that.kraftControllerEpoch +&& kraftMetadataOffset == that.kraftMetadataOffset && kraftMetadataEpoch == that.kraftMetadataEpoch +&& lastUpdatedTimeMs == that.lastUpdatedTimeMs && migrationZkVersion == that.migrationZkVersion +&& controllerZkVersion == that.controllerZkVersion; } @Override public int hashCode() { -return Objects.hash(kraftControllerId, kraftControllerEpoch, kraftMetadataOffset, kraftMetadataEpoch, lastUpdatedTimeMs, migrationZkVersion, controllerZkVersion); +return Objects.hash(kraftControllerId, kraftControllerEpoch, kraftMetadataOffset, kraftMetadataEpoch, Review Comment: can we do one line per item -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apa
[GitHub] [kafka] cmccabe commented on pull request #12946: KAFKA-14427 ZK client support for migrations
cmccabe commented on PR #12946: URL: https://github.com/apache/kafka/pull/12946#issuecomment-1339772134 We do need a temporary workaround for KAFKA-14436. We discussed offline about just adding a big constant to it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #12946: KAFKA-14427 ZK client support for migrations
cmccabe commented on PR #12946: URL: https://github.com/apache/kafka/pull/12946#issuecomment-1339779087 * need to add idempotence stuff (handle error where we create something and it already exists, or delete something and it's already gone) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #12802: KAFKA-14311: Connect Worker clean shutdown does not cleanly stop connectors/tasks
vamossagar12 commented on code in PR #12802: URL: https://github.com/apache/kafka/pull/12802#discussion_r1041307783 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java: ## @@ -1645,6 +1646,8 @@ private void startAndStop(Collection> callables) { startAndStopExecutor.invokeAll(callables); } catch (InterruptedException e) { // ignore +} catch (RejectedExecutionException re) { +log.error("startAndStopExecutor already shutdown or full. Not invoking explicit connector/task shutdown"); Review Comment: Thanks @C0urante . I see what you mean here. I guess if we are to give a higher timeout to `erderExecutor::awaitTermination`, that needs an update to the worker config `task.shutdown.graceful.timeout.ms ` which is defaulted to 5s. Are you suggesting to increase the default. I would have thought would need a KIP for this considering it's a public interface. Or maybe I am wrong. Also, I was going through this KIP of yours: https://cwiki.apache.org/confluence/display/KAFKA/KIP-611%3A+Improved+Handling+of+Abandoned+Connectors+and+Tasks and it seems to suggest: ``` The task.shutdown.graceful.timeout.ms property will be deprecated and scheduled for removal at the next major release. At the time of writing, the next major release is 3.0. ``` but that doesnt seem to be the case. Am I missing something here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14445) Producer doesn't request metadata update on REQUEST_TIMED_OUT
[ https://issues.apache.org/jira/browse/KAFKA-14445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17644001#comment-17644001 ] Kirk True edited comment on KAFKA-14445 at 12/6/22 6:42 PM: [~ocadaruma] - I modified the code in {{completeBatch}} to request a metadata refresh on timeouts as part of KAFKA-14317: Jira: https://issues.apache.org/jira/browse/KAFKA-14317 PR: [https://github.com/apache/kafka/pull/12813] Is there more involved in your patch that is not in the above PR? Thanks! was (Author: kirktrue): [~ocadaruma] - I modified the code in `completeBatch` to request a metadata refresh on timeouts as part of KAFKA-14317: Jira: https://issues.apache.org/jira/browse/KAFKA-14317 PR: [https://github.com/apache/kafka/pull/12813] Is there more involved in your patch that is not in the above PR? Thanks! > Producer doesn't request metadata update on REQUEST_TIMED_OUT > - > > Key: KAFKA-14445 > URL: https://issues.apache.org/jira/browse/KAFKA-14445 > Project: Kafka > Issue Type: Improvement >Reporter: Haruki Okada >Priority: Major > > Produce requests may fail with timeout by `request.timeout.ms` in below two > cases: > * Didn't receive produce response within `request.timeout.ms` > * Produce response received, but it ended up with `REQUEST_TIMED_OUT` in the > broker > Former case usually happens when a broker-machine got failed or there's > network glitch etc. > In this case, the connection will be disconnected and metadata-update will be > requested to discover new leader: > [https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L556] > > The problem is in latter case (REQUEST_TIMED_OUT on the broker). > In this case, the produce request will be ended up with TimeoutException, > which doesn't inherit InvalidMetadataException so it doesn't trigger metadata > update. > > Typical cause of REQUEST_TIMED_OUT is replication delay due to follower-side > problem, that metadata-update doesn't make much sense indeed. > > However, we found that in some cases, stale metadata on REQUEST_TIMED_OUT > could cause produce requests to retry unnecessarily , which may end up with > batch expiration due to delivery timeout. > Below is the scenario we experienced: > * Environment: > ** Partition tp-0 has 3 replicas, 1, 2, 3. Leader is 1 > ** min.insync.replicas=2 > ** acks=all > * Scenario: > ** broker 1 "partially" failed > *** It lost ZooKeeper connection and kicked out from the cluster > There was controller log like: > * > {code:java} > [2022-12-04 08:01:04,013] INFO [Controller id=XX] Newly added brokers: , > deleted brokers: 1, bounced brokers: {code} > * > ** > *** However, somehow the broker was able continued to receive produce > requests > We're still working on investigating how this is possible though. > Indeed, broker 1 was somewhat "alive" and keeps working according to > server.log > *** In other words, broker 1 became "zombie" > ** broker 2 was elected as new leader > *** broker 3 became follower of broker 2 > *** However, since broker 1 was still out of cluster, it didn't receive > LeaderAndIsr so 1 kept thinking itself as the leader of tp-0 > ** Meanwhile, producer keeps sending produce requests to broker 1 and > requests were failed due to REQUEST_TIMED_OUT because no brokers replicates > from broker 1. > *** REQUEST_TIMED_OUT doesn't trigger metadata update, so produce didn't > have a change to update its stale metadata > > So I suggest to request metadata update even on REQUEST_TIMED_OUT exception, > to address the case that the old leader became "zombie" -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14445) Producer doesn't request metadata update on REQUEST_TIMED_OUT
[ https://issues.apache.org/jira/browse/KAFKA-14445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17644001#comment-17644001 ] Kirk True commented on KAFKA-14445: --- [~ocadaruma] - I modified the code in `completeBatch` to request a metadata refresh on timeouts as part of KAFKA-14317: Jira: https://issues.apache.org/jira/browse/KAFKA-14317 PR: [https://github.com/apache/kafka/pull/12813] Is there more involved in your patch that is not in the above PR? Thanks! > Producer doesn't request metadata update on REQUEST_TIMED_OUT > - > > Key: KAFKA-14445 > URL: https://issues.apache.org/jira/browse/KAFKA-14445 > Project: Kafka > Issue Type: Improvement >Reporter: Haruki Okada >Priority: Major > > Produce requests may fail with timeout by `request.timeout.ms` in below two > cases: > * Didn't receive produce response within `request.timeout.ms` > * Produce response received, but it ended up with `REQUEST_TIMED_OUT` in the > broker > Former case usually happens when a broker-machine got failed or there's > network glitch etc. > In this case, the connection will be disconnected and metadata-update will be > requested to discover new leader: > [https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L556] > > The problem is in latter case (REQUEST_TIMED_OUT on the broker). > In this case, the produce request will be ended up with TimeoutException, > which doesn't inherit InvalidMetadataException so it doesn't trigger metadata > update. > > Typical cause of REQUEST_TIMED_OUT is replication delay due to follower-side > problem, that metadata-update doesn't make much sense indeed. > > However, we found that in some cases, stale metadata on REQUEST_TIMED_OUT > could cause produce requests to retry unnecessarily , which may end up with > batch expiration due to delivery timeout. > Below is the scenario we experienced: > * Environment: > ** Partition tp-0 has 3 replicas, 1, 2, 3. Leader is 1 > ** min.insync.replicas=2 > ** acks=all > * Scenario: > ** broker 1 "partially" failed > *** It lost ZooKeeper connection and kicked out from the cluster > There was controller log like: > * > {code:java} > [2022-12-04 08:01:04,013] INFO [Controller id=XX] Newly added brokers: , > deleted brokers: 1, bounced brokers: {code} > * > ** > *** However, somehow the broker was able continued to receive produce > requests > We're still working on investigating how this is possible though. > Indeed, broker 1 was somewhat "alive" and keeps working according to > server.log > *** In other words, broker 1 became "zombie" > ** broker 2 was elected as new leader > *** broker 3 became follower of broker 2 > *** However, since broker 1 was still out of cluster, it didn't receive > LeaderAndIsr so 1 kept thinking itself as the leader of tp-0 > ** Meanwhile, producer keeps sending produce requests to broker 1 and > requests were failed due to REQUEST_TIMED_OUT because no brokers replicates > from broker 1. > *** REQUEST_TIMED_OUT doesn't trigger metadata update, so produce didn't > have a change to update its stale metadata > > So I suggest to request metadata update even on REQUEST_TIMED_OUT exception, > to address the case that the old leader became "zombie" -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 commented on pull request #12955: KAFKA-14443: Close topic creation Admin clients in MM2 connectors
gharris1727 commented on PR #12955: URL: https://github.com/apache/kafka/pull/12955#issuecomment-1339870223 @OmniaGM I believe that failure is flaky, as I cannot reproduce it locally. Those sorts of flaky failures are what got me looking at this code in the first place, coincidentally. @C0urante Thanks for your suggestion on making these sorts of leaks more difficult, given the complexity and number of resources being managed by the connector/task instances, that we should certainly look into it. I thought about the `Mirror*Config::close` solution that you proposed, and didn't like the idea of the config container being also responsible for managing the lifetime of the objects it creates. I didn't like it being responsible for creating the resources either, as you pointed out: it's too easy to leak the reference. I experimented with an alternative solution, to have a reusable (across task/connector classes) BackgroundResources class which handles all instantiation and cleanup of the AutoClosable resources used in an MM2 connector/task. It simplifies the `stop() methods, and doesn't appear to incur too much boilerplate in the `start()` methods, maybe with a shorter name it could be even better. I have two concerns here: 1. This change may be scope-creep for this PR, and maybe better handled as a lateral refactor after the release blocker bug is resolved. I am happy to continue iterating on the wider solution in this PR or a separate one, so let me know if you think the scope is still reasonable. 2. I wonder if the BackgroundResources class is in danger of becoming a "god object" which does everything. At the moment everything passes checkstyle, but if we add too many other pluggable interfaces or closeable resources, this class could be doing too much. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #12937: KAFKA-13881: Add Connect package infos
gharris1727 commented on code in PR #12937: URL: https://github.com/apache/kafka/pull/12937#discussion_r1041375736 ## connect/api/src/main/java/org/apache/kafka/connect/connector/policy/package-info.java: ## @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Provides pluggable interfaces for connector security policies. Review Comment: I think that this is wordy and unnecessary. Why should the description for a package with a single class be longer than a more complex package with multiple classes? I don't think there's a lot of real-estate to display these descriptions, so we have to be somewhat terse. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error
hachikuji merged PR #12915: URL: https://github.com/apache/kafka/pull/12915 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #12915: KAFKA-14417: Producer doesn't handle REQUEST_TIMED_OUT for InitProducerIdRequest, treats as fatal error
jolshan commented on PR #12915: URL: https://github.com/apache/kafka/pull/12915#issuecomment-1339938684 Did we want to cherry pick this back to older branches @hachikuji? Or wait for the server side version for that? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on a diff in pull request #12945: KAFKA-14365: Refactor Fetcher to allow different implementations
kirktrue commented on code in PR #12945: URL: https://github.com/apache/kafka/pull/12945#discussion_r1041415886 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsFinder.java: ## @@ -0,0 +1,805 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.StaleMetadataException; +import org.apache.kafka.clients.consumer.LogTruncationException; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult; +import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; +import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition; +import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse; +import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class OffsetsFinder { +private final Logger log; +private final ConsumerNetworkClient client; +private final Time time; +private final long retryBackoffMs; +private final long requestTimeoutMs; +private final ConsumerMetadata metadata; +private final SubscriptionState subscriptions; +private final IsolationLevel isolationLevel; +private final AtomicReference cachedListOffsetsException = new AtomicReference<>(); +private final AtomicReference cachedOffsetForLeaderException = new AtomicReference<>(); +private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient; +private final ApiVersions apiVersions; +private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1); + +public OffsetsFinder(LogContext logContext, + ConsumerNetworkClient client, + ConsumerMetadata metadata, + SubscriptionState subscriptions, + Time time, + long retryBackoffMs, + long requestTimeoutMs, + IsolationLevel isolationLevel, + ApiVersions apiVersions) { +this.log = logContext.logger(Fetcher.class); +this.time = time; +this.client = c
[GitHub] [kafka] jsancio merged pull request #12892: KAFKA-14386: Return TopicAssignment from the ReplicaPlacer
jsancio merged PR #12892: URL: https://github.com/apache/kafka/pull/12892 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #12903: KAFKA-14415: Faster ThreadCache
lucasbru commented on code in PR #12903: URL: https://github.com/apache/kafka/pull/12903#discussion_r1040930847 ## streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java: ## @@ -133,7 +139,12 @@ public void flush(final String namespace) { if (cache == null) { return; } -cache.flush(); + +synchronized (cache) { +final long oldSize = cache.sizeInBytes(); +cache.flush(); Review Comment: See above ## streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java: ## @@ -81,9 +85,11 @@ public synchronized void resize(final long newCacheSizeBytes) { return; } final CircularIterator circularIterator = new CircularIterator<>(caches.values()); -while (sizeBytes() > maxCacheSizeBytes) { +while (sizeInBytes.get() > maxCacheSizeBytes) { final NamedCache cache = circularIterator.next(); +final long oldSize = cache.sizeInBytes(); Review Comment: I considered this, but I would probably add more complicated logic to just compute this number (evict calls flush, flush calls delete - delete requires relatively brittle calculation, see last comment). You are right of course that we shouldn't acquire the lock 3 times here, but how about I just wrap those three into a `synchronized (cache)` block? We would only acquire the lock once and do not need to calculate the size of the evicted data inside `NamedCache`. I will do only the lock change for now, but let me know if you still prefer to shift the delta computation to `NamedCache` - it can certainly also be done the other way. ## streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java: ## @@ -186,7 +206,13 @@ public LRUCacheEntry delete(final String namespace, final Bytes key) { return null; } -return cache.delete(key); +final LRUCacheEntry entry; +synchronized (cache) { +final long oldSize = cache.sizeInBytes(); +entry = cache.delete(key); Review Comment: We can calculate the size, but it's definitely error-prone. It would currently be defined as: ``` if (entry != null) 0 else { key.get().length + 8 + // entry 8 + // previous 8 + // next entry.size(); } ``` We can calculate it here, but we would create quite tight coupling between this function and the internals of the `NamedCache` implementation. Since we already need to acquire the lock for `cache`, it should have no performance benefits to try to calculate this precisely here. ## streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java: ## @@ -241,35 +267,30 @@ private boolean isOverflowing(final long size) { } long sizeBytes() { -long sizeInBytes = 0; -for (final NamedCache namedCache : caches.values()) { -sizeInBytes += namedCache.sizeInBytes(); -if (isOverflowing(sizeInBytes)) { -return Long.MAX_VALUE; -} -} -return sizeInBytes; +return sizeInBytes.get(); } synchronized void close(final String namespace) { final NamedCache removed = caches.remove(namespace); if (removed != null) { +sizeInBytes.getAndAdd(-removed.sizeInBytes()); removed.close(); } } -private void maybeEvict(final String namespace) { +private void maybeEvict(final String namespace, final NamedCache cache) { Review Comment: It's used in the log message, where it's nice to have it. ## streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java: ## @@ -158,15 +169,24 @@ public void put(final String namespace, final Bytes key, final LRUCacheEntry val numPuts++; final NamedCache cache = getOrCreateCache(namespace); -cache.put(key, value); -maybeEvict(namespace); + +synchronized (cache) { +final long oldSize = cache.sizeInBytes(); +cache.put(key, value); Review Comment: See above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12958: MINOR: Move dynamic config logic to DynamicConfigPublisher
cmccabe commented on code in PR #12958: URL: https://github.com/apache/kafka/pull/12958#discussion_r1041503192 ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -71,7 +71,7 @@ class BrokerServer( val initialOfflineDirs: Seq[String], ) extends KafkaBroker { val threadNamePrefix = sharedServer.threadNamePrefix - val config = sharedServer.config + val config = new KafkaConfig(sharedServer.sharedServerConfig.props, false, None) Review Comment: Good idea -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #12958: MINOR: Move dynamic config logic to DynamicConfigPublisher
mumrah commented on code in PR #12958: URL: https://github.com/apache/kafka/pull/12958#discussion_r1041510271 ## core/src/main/scala/kafka/server/metadata/DynamicConfigPublisher.scala: ## @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.metadata + +import java.util.Properties +import kafka.server.ConfigAdminManager.toLoggableProps +import kafka.server.{ConfigEntityName, ConfigHandler, ConfigType, KafkaConfig} +import kafka.utils.Logging +import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, TOPIC} +import org.apache.kafka.image.{MetadataDelta, MetadataImage} +import org.apache.kafka.server.fault.FaultHandler + + +class DynamicConfigPublisher( + conf: KafkaConfig, + faultHandler: FaultHandler, + dynamicConfigHandlers: Map[String, ConfigHandler], + nodeType: String +) extends Logging { + logIdent = s"[DynamicConfigPublisher nodeType=${nodeType} id=${conf.nodeId}] " + + def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = { +val deltaName = s"MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}" +try { + // Apply configuration deltas. + Option(delta.configsDelta()).foreach { configsDelta => +configsDelta.changes().keySet().forEach { resource => + val props = newImage.configs().configProperties(resource) + resource.`type`() match { +case TOPIC => + dynamicConfigHandlers.get(ConfigType.Topic).foreach(topicConfigHandler => +try { + // Apply changes to a topic's dynamic configuration. + info(s"Updating topic ${resource.name()} with new configuration : " + +toLoggableProps(resource, props).mkString(",")) + topicConfigHandler.processConfigChanges(resource.name(), props) +} catch { + case t: Throwable => faultHandler.handleFault("Error updating topic " + +s"${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " + +s"in ${deltaName}", t) +} + ) +case BROKER => Review Comment: Unrelated to this change, but do we support BROKER_LOGGER configs in KRaft? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #12946: KAFKA-14427 ZK client support for migrations
mumrah commented on PR #12946: URL: https://github.com/apache/kafka/pull/12946#issuecomment-1340102935 @cmccabe I've added a large constant (10M) to the controller epoch when persisting it in ZK. I think this workaround should allow us to move forward with integration/system testing prior to getting KAFKA-14436 finished. WDYT about adding the idempotent stuff as a follow-on PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14445) Producer doesn't request metadata update on REQUEST_TIMED_OUT
[ https://issues.apache.org/jira/browse/KAFKA-14445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17644055#comment-17644055 ] Haruki Okada commented on KAFKA-14445: -- [~kirktrue] Oh I was not aware of KAFKA-14317. Thanks. > Is there more involved in your patch that is not in the above PR No. However, as mentioned in KAFKA-10228, changing error type could be considered as breaking change so may need more discussions I guess. My plan was just requesting metadata update on REQUEST_TIMED_OUT response as well, without changing the error type so more trivial. > Producer doesn't request metadata update on REQUEST_TIMED_OUT > - > > Key: KAFKA-14445 > URL: https://issues.apache.org/jira/browse/KAFKA-14445 > Project: Kafka > Issue Type: Improvement >Reporter: Haruki Okada >Priority: Major > > Produce requests may fail with timeout by `request.timeout.ms` in below two > cases: > * Didn't receive produce response within `request.timeout.ms` > * Produce response received, but it ended up with `REQUEST_TIMED_OUT` in the > broker > Former case usually happens when a broker-machine got failed or there's > network glitch etc. > In this case, the connection will be disconnected and metadata-update will be > requested to discover new leader: > [https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java#L556] > > The problem is in latter case (REQUEST_TIMED_OUT on the broker). > In this case, the produce request will be ended up with TimeoutException, > which doesn't inherit InvalidMetadataException so it doesn't trigger metadata > update. > > Typical cause of REQUEST_TIMED_OUT is replication delay due to follower-side > problem, that metadata-update doesn't make much sense indeed. > > However, we found that in some cases, stale metadata on REQUEST_TIMED_OUT > could cause produce requests to retry unnecessarily , which may end up with > batch expiration due to delivery timeout. > Below is the scenario we experienced: > * Environment: > ** Partition tp-0 has 3 replicas, 1, 2, 3. Leader is 1 > ** min.insync.replicas=2 > ** acks=all > * Scenario: > ** broker 1 "partially" failed > *** It lost ZooKeeper connection and kicked out from the cluster > There was controller log like: > * > {code:java} > [2022-12-04 08:01:04,013] INFO [Controller id=XX] Newly added brokers: , > deleted brokers: 1, bounced brokers: {code} > * > ** > *** However, somehow the broker was able continued to receive produce > requests > We're still working on investigating how this is possible though. > Indeed, broker 1 was somewhat "alive" and keeps working according to > server.log > *** In other words, broker 1 became "zombie" > ** broker 2 was elected as new leader > *** broker 3 became follower of broker 2 > *** However, since broker 1 was still out of cluster, it didn't receive > LeaderAndIsr so 1 kept thinking itself as the leader of tp-0 > ** Meanwhile, producer keeps sending produce requests to broker 1 and > requests were failed due to REQUEST_TIMED_OUT because no brokers replicates > from broker 1. > *** REQUEST_TIMED_OUT doesn't trigger metadata update, so produce didn't > have a change to update its stale metadata > > So I suggest to request metadata update even on REQUEST_TIMED_OUT exception, > to address the case that the old leader became "zombie" -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd commented on a diff in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.
satishd commented on code in PR #11390: URL: https://github.com/apache/kafka/pull/11390#discussion_r1033516642 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -289,13 +296,27 @@ class UnifiedLog(@volatile var logStartOffset: Long, @volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None + //todo-tier it needs to be updated. + private[kafka] var _localLogStartOffset: Long = logStartOffset Review Comment: Please ignore the stale comment, removed with the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14379) consumer should refresh preferred read replica on update metadata
[ https://issues.apache.org/jira/browse/KAFKA-14379?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-14379: --- Priority: Blocker (was: Critical) > consumer should refresh preferred read replica on update metadata > - > > Key: KAFKA-14379 > URL: https://issues.apache.org/jira/browse/KAFKA-14379 > Project: Kafka > Issue Type: Bug >Reporter: Jeff Kim >Assignee: Jeff Kim >Priority: Blocker > Fix For: 3.4.0 > > > The consumer (fetcher) refreshes the preferred read replica only on three > conditions: > # the consumer receives an OFFSET_OUT_OF_RANGE error > # the follower does not exist in the client's metadata (i.e., offline) > # after metadata.max.age.ms (5 min default) > For other errors, it will continue to reach to the possibly unavailable > follower and only after 5 minutes will it refresh the preferred read replica > and go back to the leader. > A specific example is when a partition is reassigned. the consumer will get > NOT_LEADER_OR_FOLLOWER which triggers a metadata update but the preferred > read replica will not be refreshed as the follower is still online. it will > continue to reach out to the old follower until the preferred read replica > expires. > the consumer can instead refresh its preferred read replica whenever it makes > a metadata update request. so when the consumer receives i.e. > NOT_LEADER_OR_FOLLOWER it can find the new preferred read replica without > waiting for the expiration. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe commented on pull request #12946: KAFKA-14427 ZK client support for migrations
cmccabe commented on PR #12946: URL: https://github.com/apache/kafka/pull/12946#issuecomment-1340220864 > WDYT about adding the idempotent stuff as a follow-on PR? +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a diff in pull request #12903: KAFKA-14415: Faster ThreadCache
ableegoldman commented on code in PR #12903: URL: https://github.com/apache/kafka/pull/12903#discussion_r1041646981 ## streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java: ## @@ -81,9 +85,11 @@ public synchronized void resize(final long newCacheSizeBytes) { return; } final CircularIterator circularIterator = new CircularIterator<>(caches.values()); -while (sizeBytes() > maxCacheSizeBytes) { +while (sizeInBytes.get() > maxCacheSizeBytes) { final NamedCache cache = circularIterator.next(); +final long oldSize = cache.sizeInBytes(); Review Comment: that sounds good to me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a diff in pull request #12903: KAFKA-14415: Faster ThreadCache
ableegoldman commented on code in PR #12903: URL: https://github.com/apache/kafka/pull/12903#discussion_r1041647625 ## streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java: ## @@ -241,35 +267,30 @@ private boolean isOverflowing(final long size) { } long sizeBytes() { -long sizeInBytes = 0; -for (final NamedCache namedCache : caches.values()) { -sizeInBytes += namedCache.sizeInBytes(); -if (isOverflowing(sizeInBytes)) { -return Long.MAX_VALUE; -} -} -return sizeInBytes; +return sizeInBytes.get(); } synchronized void close(final String namespace) { final NamedCache removed = caches.remove(namespace); if (removed != null) { +sizeInBytes.getAndAdd(-removed.sizeInBytes()); removed.close(); } } -private void maybeEvict(final String namespace) { +private void maybeEvict(final String namespace, final NamedCache cache) { Review Comment: ah I guess that was obscured in the Github view, makes sense -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a diff in pull request #12903: KAFKA-14415: Faster ThreadCache
ableegoldman commented on code in PR #12903: URL: https://github.com/apache/kafka/pull/12903#discussion_r1041650741 ## streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java: ## @@ -81,9 +85,13 @@ public synchronized void resize(final long newCacheSizeBytes) { return; } final CircularIterator circularIterator = new CircularIterator<>(caches.values()); -while (sizeBytes() > maxCacheSizeBytes) { +while (sizeInBytes.get() > maxCacheSizeBytes) { final NamedCache cache = circularIterator.next(); -cache.evict(); +synchronized (cache) { Review Comment: don't worry about this now as I want to merge your PR today, but WDYT about extracting this pattern of `lock - action - update_size` into a helper method where you can just pass in the action eg as `() -> flush()`? Not in a style/simplify the code way, but having a method like `#updateCacheAndSize` or similar might help future devs/us remember to follow this pattern instead of just directly invoking `flush`/`evict`/etc -- thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a diff in pull request #12903: KAFKA-14415: Faster ThreadCache
ableegoldman commented on code in PR #12903: URL: https://github.com/apache/kafka/pull/12903#discussion_r1041651458 ## streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java: ## @@ -81,9 +85,13 @@ public synchronized void resize(final long newCacheSizeBytes) { return; } final CircularIterator circularIterator = new CircularIterator<>(caches.values()); -while (sizeBytes() > maxCacheSizeBytes) { +while (sizeInBytes.get() > maxCacheSizeBytes) { final NamedCache cache = circularIterator.next(); -cache.evict(); +synchronized (cache) { Review Comment: If that makes sense to you feel free to do in a followup PR or just file a ticket, there are plenty of people looking to dip their toes into AK with simple patches like that. Not at all married to the idea, though, so also feel free to ignore 🙂 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #12903: KAFKA-14415: Faster ThreadCache
ableegoldman merged PR #12903: URL: https://github.com/apache/kafka/pull/12903 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14415) ThreadCache is getting slower with every additional state store
[ https://issues.apache.org/jira/browse/KAFKA-14415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-14415: --- Fix Version/s: 3.4.0 > ThreadCache is getting slower with every additional state store > --- > > Key: KAFKA-14415 > URL: https://issues.apache.org/jira/browse/KAFKA-14415 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > Fix For: 3.4.0 > > > There are a few lines in `ThreadCache` that I think should be optimized. > `sizeBytes` is called at least once, and potentially many times in every > `put` and is linear in the number of caches (= number of state stores, so > typically proportional to number of tasks). That means, with every additional > task, every put gets a little slower.Compare the throughput of TIME_ROCKS on > trunk (green graph): > [http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-3-4-0-51b7eb7937-jenkins-20221113214104-streamsbench/] > This is the throughput of TIME_ROCKS is 20% higher when a constant time > `sizeBytes` implementation is used: > [http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-LUCASCOMPARE-lucas-20221122140846-streamsbench/] > The same seems to apply for the MEM backend (initial throughput >8000 instead > of 6000), however, I cannot run the same benchmark here because the memory is > filled too quickly. > [http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-LUCASSTATE-lucas-20221121231632-streamsbench/] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14415) ThreadCache is getting slower with every additional state store
[ https://issues.apache.org/jira/browse/KAFKA-14415?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-14415. Resolution: Fixed > ThreadCache is getting slower with every additional state store > --- > > Key: KAFKA-14415 > URL: https://issues.apache.org/jira/browse/KAFKA-14415 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Lucas Brutschy >Assignee: Lucas Brutschy >Priority: Major > Fix For: 3.4.0 > > > There are a few lines in `ThreadCache` that I think should be optimized. > `sizeBytes` is called at least once, and potentially many times in every > `put` and is linear in the number of caches (= number of state stores, so > typically proportional to number of tasks). That means, with every additional > task, every put gets a little slower.Compare the throughput of TIME_ROCKS on > trunk (green graph): > [http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-3-4-0-51b7eb7937-jenkins-20221113214104-streamsbench/] > This is the throughput of TIME_ROCKS is 20% higher when a constant time > `sizeBytes` implementation is used: > [http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-LUCASCOMPARE-lucas-20221122140846-streamsbench/] > The same seems to apply for the MEM backend (initial throughput >8000 instead > of 6000), however, I cannot run the same benchmark here because the memory is > filled too quickly. > [http://kstreams-benchmark-results.s3-website-us-west-2.amazonaws.com/experiments/stateheavy-3-5-LUCASSTATE-lucas-20221121231632-streamsbench/] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ableegoldman opened a new pull request, #12960: MINOR: Bump trunk to 3.5.0-SNAPSHOT
ableegoldman opened a new pull request, #12960: URL: https://github.com/apache/kafka/pull/12960 Version bumps in trunk after the creation of the 3.4 branch. Also fixed the comment in `gradle.properties` which was missing several files related to the streams quickstart from the list of what to update when bumping the version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14443) Mirror Maker Connectors leak admin clients used for topic creation
[ https://issues.apache.org/jira/browse/KAFKA-14443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-14443: --- Fix Version/s: 3.4.0 > Mirror Maker Connectors leak admin clients used for topic creation > -- > > Key: KAFKA-14443 > URL: https://issues.apache.org/jira/browse/KAFKA-14443 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 3.4.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Critical > Fix For: 3.4.0 > > > the MirrorMaker connectors are each responsible for creating internal topics. > For example, the Checkpoint connector creates a forwarding admin and passes > it to a method to create the topic, but never closes the ForwardingAdmin or > delegate objects: > [https://github.com/apache/kafka/blob/13c9c78a1f4ad92023e8354069c6817b44c89ce6/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java#L161-L164] > Instead, this object should be intentionally closed when it is no longer > needed, to prevent consuming resources in a running MM2 application. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ableegoldman commented on pull request #12955: KAFKA-14443: Close topic creation Admin clients in MM2 connectors
ableegoldman commented on PR #12955: URL: https://github.com/apache/kafka/pull/12955#issuecomment-1340276915 > This change may be scope-creep for this PR, and maybe better handled as a lateral refactor after the release blocker bug is resolved. Just my 2 cents as the 3.4 release manager -- unless this refactor helps to stabilize the actual feature as-is or fixes a suspected issue, please save this for a followup PR and avoid increasing the scope here: both to reduce the possibility of introducing new bugs with a nontrivial last-minute refactor, and of course to help ensure this fix is merged in a timely manner. Happy to discuss further especially since I have absolutely zero context on the particular issue or this part of the codebase in general, but just keep in mind how close we are to the end of the 3.4 release. Thanks 🙂 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13602) Allow to broadcast a result record
[ https://issues.apache.org/jira/browse/KAFKA-13602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-13602: --- Fix Version/s: 3.4.0 > Allow to broadcast a result record > -- > > Key: KAFKA-13602 > URL: https://issues.apache.org/jira/browse/KAFKA-13602 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip, newbie++ > Fix For: 3.4.0 > > > From time to time, users ask how they can send a record to more than one > partition in a sink topic. Currently, this is only possible by replicate the > message N times before the sink and use a custom partitioner to write the N > messages into the N different partitions. > It might be worth to make this easier and add a new feature for it. There are > multiple options: > * extend `to()` / `addSink()` with a "broadcast" option/config > * add `toAllPartitions()` / `addBroadcastSink()` methods > * allow StreamPartitioner to return `-1` for "all partitions" > * extend `StreamPartitioner` to allow returning more than one partition (ie > a list/collection of integers instead of a single int) > The first three options imply that a "full broadcast" is supported only, so > it's less flexible. On the other hand, it's easier to use (especially the > first two options are easy as they do not require to implement a custom > partitioner). > The last option would be most flexible and also allow for a "partial > broadcast" (aka multi-cast) pattern. It might also be possible to combine two > options, or maye even a totally different idea. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14446) API forwarding support in ZkBrokers
Akhilesh Chaganti created KAFKA-14446: - Summary: API forwarding support in ZkBrokers Key: KAFKA-14446 URL: https://issues.apache.org/jira/browse/KAFKA-14446 Project: Kafka Issue Type: Sub-task Reporter: Akhilesh Chaganti Assignee: Akhilesh Chaganti To support migration, zkBrokers should be able to forward API requests to the Controller, whether it is zkController or kraftController. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] akhileshchg opened a new pull request, #12961: KAFKA-14446: API forwarding support from zkBrokers to the Controller
akhileshchg opened a new pull request, #12961: URL: https://github.com/apache/kafka/pull/12961 KAFKA-14446: API forwarding support from zkBrokers to the controller **Changes** * Enable `EnvelopeRequest` to support zkBroker to zkController forwarding. * Changes to `BrokerToControllerChannelManager`, which we use in every broker to maintain a channel to the controller node. This node and security protocol to connect to the node can keep changing during zk -> kraft migration. Also, added support to reinitialize the request thread used in `BrokerToControllerChannelManager` when the controller switches from zkBroker to kraftController and vice-versa. * Other peripheral changes required to support the above changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13602) Allow to broadcast a result record
[ https://issues.apache.org/jira/browse/KAFKA-13602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-13602. Resolution: Fixed > Allow to broadcast a result record > -- > > Key: KAFKA-13602 > URL: https://issues.apache.org/jira/browse/KAFKA-13602 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Matthias J. Sax >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip, newbie++ > Fix For: 3.4.0 > > > From time to time, users ask how they can send a record to more than one > partition in a sink topic. Currently, this is only possible by replicate the > message N times before the sink and use a custom partitioner to write the N > messages into the N different partitions. > It might be worth to make this easier and add a new feature for it. There are > multiple options: > * extend `to()` / `addSink()` with a "broadcast" option/config > * add `toAllPartitions()` / `addBroadcastSink()` methods > * allow StreamPartitioner to return `-1` for "all partitions" > * extend `StreamPartitioner` to allow returning more than one partition (ie > a list/collection of integers instead of a single int) > The first three options imply that a "full broadcast" is supported only, so > it's less flexible. On the other hand, it's easier to use (especially the > first two options are easy as they do not require to implement a custom > partitioner). > The last option would be most flexible and also allow for a "partial > broadcast" (aka multi-cast) pattern. It might also be possible to combine two > options, or maye even a totally different idea. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ableegoldman merged pull request #12893: KAFKA-14260: add `synchronized` to `prefixScan` method
ableegoldman merged PR #12893: URL: https://github.com/apache/kafka/pull/12893 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #12893: KAFKA-14260: add `synchronized` to `prefixScan` method
ableegoldman commented on PR #12893: URL: https://github.com/apache/kafka/pull/12893#issuecomment-1340330339 Merged to trunk and cherrypicked to 3.4 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14260) InMemoryKeyValueStore iterator still throws ConcurrentModificationException
[ https://issues.apache.org/jira/browse/KAFKA-14260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-14260: --- Fix Version/s: 3.4.0 > InMemoryKeyValueStore iterator still throws ConcurrentModificationException > --- > > Key: KAFKA-14260 > URL: https://issues.apache.org/jira/browse/KAFKA-14260 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.1, 3.2.3 >Reporter: Avi Cherry >Assignee: Lucia Cerchie >Priority: Major > Fix For: 3.4.0 > > > This is the same bug as KAFKA-7912 which was then re-introduced by KAFKA-8802. > Any iterator returned from {{InMemoryKeyValueStore}} may end up throwing a > ConcurrentModificationException because the backing map is not concurrent > safe. I expect that this only happens when the store is retrieved from > {{KafkaStreams.store()}} from outside of the topology since any usage of the > store from inside of the topology should be naturally single-threaded. > To start off, a reminder that this behaviour explicitly violates the > interface contract for {{ReadOnlyKeyValueStore}} which states > {quote}The returned iterator must be safe from > java.util.ConcurrentModificationExceptions > {quote} > It is often complicated to make code to demonstrate concurrency bugs, but > thankfully it is trivial to reason through the source code in > {{InMemoryKeyValueStore.java}} to show why this happens: > * All of the InMemoryKeyValueStore methods that return iterators do so by > passing a keySet based on the backing TreeMap to the InMemoryKeyValueIterator > constructor. > * These keySets are all VIEWS of the backing map, not copies. > * The InMemoryKeyValueIterator then makes a private copy of the keySet by > passing the original keySet into the constructor for TreeSet. This copying > was implemented in KAFKA-8802, incorrectly intending it to fix the > concurrency problem. > * TreeSet then iterates over the keySet to make a copy. If the original > backing TreeMap in InMemoryKeyValueStore is changed while this copy is being > created it will fail-fast a ConcurrentModificationException. > This bug should be able to be trivially fixed by replacing the backing > TreeMap with a ConcurrentSkipListMap but here's the rub: > This bug has already been found in KAFKA-7912 and the TreeMap was replaced > with a ConcurrentSkipListMap. It was then reverted back to a TreeMap in > KAFKA-8802 because of the performance regression. I can [see from one of the > PRs|https://github.com/apache/kafka/pull/7212/commits/384c12e40f3a59591f897d916f92253e126820ed] > that it was believed the concurrency problem with the TreeMap implementation > was fixed by copying the keyset when the iterator is created but the problem > remains, plus the fix creates an extra copy of the iterated portion of the > set in memory. > For what it's worth, the performance difference between TreeMap and > ConcurrentSkipListMap do not extend into complexity. TreeMap enjoys a similar > ~2x speed through all operations with any size of data, but at the cost of > what turned out to be an easy-to-encounter bug. > This is all unfortunate since the only time the state stores ever get > accessed concurrently is through the `KafkaStreams.store()` mechanism, but I > would imagine that "correct and slightly slower) is better than "incorrect > and faster". > Too bad BoilerBay's AirConcurrentMap is closed-source and patented. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14260) InMemoryKeyValueStore iterator still throws ConcurrentModificationException
[ https://issues.apache.org/jira/browse/KAFKA-14260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-14260. Resolution: Fixed > InMemoryKeyValueStore iterator still throws ConcurrentModificationException > --- > > Key: KAFKA-14260 > URL: https://issues.apache.org/jira/browse/KAFKA-14260 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.1, 3.2.3 >Reporter: Avi Cherry >Assignee: Lucia Cerchie >Priority: Major > Fix For: 3.4.0 > > > This is the same bug as KAFKA-7912 which was then re-introduced by KAFKA-8802. > Any iterator returned from {{InMemoryKeyValueStore}} may end up throwing a > ConcurrentModificationException because the backing map is not concurrent > safe. I expect that this only happens when the store is retrieved from > {{KafkaStreams.store()}} from outside of the topology since any usage of the > store from inside of the topology should be naturally single-threaded. > To start off, a reminder that this behaviour explicitly violates the > interface contract for {{ReadOnlyKeyValueStore}} which states > {quote}The returned iterator must be safe from > java.util.ConcurrentModificationExceptions > {quote} > It is often complicated to make code to demonstrate concurrency bugs, but > thankfully it is trivial to reason through the source code in > {{InMemoryKeyValueStore.java}} to show why this happens: > * All of the InMemoryKeyValueStore methods that return iterators do so by > passing a keySet based on the backing TreeMap to the InMemoryKeyValueIterator > constructor. > * These keySets are all VIEWS of the backing map, not copies. > * The InMemoryKeyValueIterator then makes a private copy of the keySet by > passing the original keySet into the constructor for TreeSet. This copying > was implemented in KAFKA-8802, incorrectly intending it to fix the > concurrency problem. > * TreeSet then iterates over the keySet to make a copy. If the original > backing TreeMap in InMemoryKeyValueStore is changed while this copy is being > created it will fail-fast a ConcurrentModificationException. > This bug should be able to be trivially fixed by replacing the backing > TreeMap with a ConcurrentSkipListMap but here's the rub: > This bug has already been found in KAFKA-7912 and the TreeMap was replaced > with a ConcurrentSkipListMap. It was then reverted back to a TreeMap in > KAFKA-8802 because of the performance regression. I can [see from one of the > PRs|https://github.com/apache/kafka/pull/7212/commits/384c12e40f3a59591f897d916f92253e126820ed] > that it was believed the concurrency problem with the TreeMap implementation > was fixed by copying the keyset when the iterator is created but the problem > remains, plus the fix creates an extra copy of the iterated portion of the > set in memory. > For what it's worth, the performance difference between TreeMap and > ConcurrentSkipListMap do not extend into complexity. TreeMap enjoys a similar > ~2x speed through all operations with any size of data, but at the cost of > what turned out to be an easy-to-encounter bug. > This is all unfortunate since the only time the state stores ever get > accessed concurrently is through the `KafkaStreams.store()` mechanism, but I > would imagine that "correct and slightly slower) is better than "incorrect > and faster". > Too bad BoilerBay's AirConcurrentMap is closed-source and patented. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14260) InMemoryKeyValueStore iterator still throws ConcurrentModificationException
[ https://issues.apache.org/jira/browse/KAFKA-14260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17644129#comment-17644129 ] A. Sophie Blee-Goldman commented on KAFKA-14260: Ok I did merge a patch to fix where we forgot to synchronize, which is certainly a bug leading to potential CME, but I realize that's not what this ticket was about so I want to explain why I resolved it: ie that synchronization is sufficient for avoiding CMEs. I do think you pointed out something of note here, though, which is worth following up on though perhaps tracking separately. In the IMKVIterator constructor from [~guozhang]'s snippet above, it's true we get an iterator based on the original map, but it's still just a copy of that map: so this iterator doesn't pin any part of the original map and just happily returns the set of keys that were in the original map when the range API was invoked. There's no way to modify the contents of this copy as it's internal to the (also internal) iterator, and even if you delete a record with a given key in that store, the actual key object itself still exists (and can/will still be returned by that iterator) So I really don't see how a CME is possible if we properly synchronize the APIs to enforce single-threaded access while that copy is being made. Which we do (now, since merging [~Cerchie] 's PR) That said, it still feels a bit awkward because the keyset-copy iterator can return keys that no longer exist in the actual store. In this case when we issue a get on that key it'll return null, and the range read will have an entry with a null value. Technically Streams makes no guarantees about whether a range scan will reflect only the original state store contents or only the latest contents or anything in between, and I'm not sure there's even a "right" answer there. Still, returning a KeyValue("key1", null) is still pretty awkward and likely unexpected by most users, so I _can_ this resulting in an NPE. Fortunately that's a much easier fix, as we can just toss out that result and return whatever is next. I think it's worth filing a separate ticket for that one, though [~aviperksy] thoughts? Did I miss something obvious here? Also note that the code has changed a lot over the years, so it's possible what you described does affect some older branch(es) > InMemoryKeyValueStore iterator still throws ConcurrentModificationException > --- > > Key: KAFKA-14260 > URL: https://issues.apache.org/jira/browse/KAFKA-14260 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.1, 3.2.3 >Reporter: Avi Cherry >Assignee: Lucia Cerchie >Priority: Major > Fix For: 3.4.0 > > > This is the same bug as KAFKA-7912 which was then re-introduced by KAFKA-8802. > Any iterator returned from {{InMemoryKeyValueStore}} may end up throwing a > ConcurrentModificationException because the backing map is not concurrent > safe. I expect that this only happens when the store is retrieved from > {{KafkaStreams.store()}} from outside of the topology since any usage of the > store from inside of the topology should be naturally single-threaded. > To start off, a reminder that this behaviour explicitly violates the > interface contract for {{ReadOnlyKeyValueStore}} which states > {quote}The returned iterator must be safe from > java.util.ConcurrentModificationExceptions > {quote} > It is often complicated to make code to demonstrate concurrency bugs, but > thankfully it is trivial to reason through the source code in > {{InMemoryKeyValueStore.java}} to show why this happens: > * All of the InMemoryKeyValueStore methods that return iterators do so by > passing a keySet based on the backing TreeMap to the InMemoryKeyValueIterator > constructor. > * These keySets are all VIEWS of the backing map, not copies. > * The InMemoryKeyValueIterator then makes a private copy of the keySet by > passing the original keySet into the constructor for TreeSet. This copying > was implemented in KAFKA-8802, incorrectly intending it to fix the > concurrency problem. > * TreeSet then iterates over the keySet to make a copy. If the original > backing TreeMap in InMemoryKeyValueStore is changed while this copy is being > created it will fail-fast a ConcurrentModificationException. > This bug should be able to be trivially fixed by replacing the backing > TreeMap with a ConcurrentSkipListMap but here's the rub: > This bug has already been found in KAFKA-7912 and the TreeMap was replaced > with a ConcurrentSkipListMap. It was then reverted back to a TreeMap in > KAFKA-8802 because of the performance regression. I can [see from one of the > PRs|https://github.com/apache/kafka/pull/7212/commits/3
[GitHub] [kafka] ableegoldman opened a new pull request, #12962: KAFKA-14318: KIP-878, Introduce partition autoscaling configs
ableegoldman opened a new pull request, #12962: URL: https://github.com/apache/kafka/pull/12962 First PR for [KIP-878: Internal Topic Autoscaling for Kafka Streams](https://cwiki.apache.org/confluence/display/KAFKA/KIP-878%3A+Internal+Topic+Autoscaling+for+Kafka+Streams) Introduces two new configs related to autoscaling in Streams: a feature flag and retry timeout. This PR just adds the configs and gets them passed through to the Streams assignor where they'll ultimately be needed/used -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a diff in pull request #12960: MINOR: Bump trunk to 3.5.0-SNAPSHOT
ableegoldman commented on code in PR #12960: URL: https://github.com/apache/kafka/pull/12960#discussion_r1041793005 ## tests/kafkatest/version.py: ## @@ -119,7 +119,7 @@ def get_version(node=None): return DEV_BRANCH DEV_BRANCH = KafkaVersion("dev") -DEV_VERSION = KafkaVersion("3.4.0-SNAPSHOT") +DEV_VERSION = KafkaVersion("3.5.0-SNAPSHOT") LATEST_METADATA_VERSION = "3.3" Review Comment: @cmccabe can you just confirm whether this is correct & hasn't been bumped in 3.5? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.
showuon commented on code in PR #11390: URL: https://github.com/apache/kafka/pull/11390#discussion_r1041821188 ## core/src/main/scala/kafka/log/remote/RemoteLogManager.scala: ## @@ -0,0 +1,290 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.log.remote + +import kafka.cluster.Partition +import kafka.metrics.KafkaMetricsGroup +import kafka.server.KafkaConfig +import kafka.server.epoch.LeaderEpochFileCache +import kafka.utils.Logging +import org.apache.kafka.common._ +import org.apache.kafka.common.record.FileRecords.TimestampAndOffset +import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream} +import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils} +import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager +import org.apache.kafka.server.log.remote.storage.{RemoteLogManagerConfig, RemoteLogMetadataManager, RemoteLogSegmentMetadata, RemoteStorageManager} + +import java.io.{Closeable, InputStream} +import java.security.{AccessController, PrivilegedAction} +import java.util +import java.util.Optional +import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} +import scala.collection.Set +import scala.jdk.CollectionConverters._ + +/** + * This class is responsible for + * - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` instances. + * - receives any leader and follower replica events and partition stop events and act on them + * - also provides APIs to fetch indexes, metadata about remote log segments. + * + * @param rlmConfig Configuration required for remote logging subsystem(tiered storage) at the broker level. + * @param brokerId id of the current broker. + * @param logDirdirectory of Kafka log segments. + */ +class RemoteLogManager(rlmConfig: RemoteLogManagerConfig, + brokerId: Int, + logDir: String) extends Logging with Closeable with KafkaMetricsGroup { + + // topic ids received on leadership changes + private val topicPartitionIds: ConcurrentMap[TopicPartition, Uuid] = new ConcurrentHashMap[TopicPartition, Uuid]() + + private val remoteLogStorageManager: RemoteStorageManager = createRemoteStorageManager() + private val remoteLogMetadataManager: RemoteLogMetadataManager = createRemoteLogMetadataManager() + + private val indexCache = new RemoteIndexCache(remoteStorageManager = remoteLogStorageManager, logDir = logDir) + + private var closed = false + + private[remote] def createRemoteStorageManager(): RemoteStorageManager = { +def createDelegate(classLoader: ClassLoader): RemoteStorageManager = { + classLoader.loadClass(rlmConfig.remoteStorageManagerClassName()) + .getDeclaredConstructor().newInstance().asInstanceOf[RemoteStorageManager] +} + +AccessController.doPrivileged(new PrivilegedAction[RemoteStorageManager] { + private val classPath = rlmConfig.remoteStorageManagerClassPath() + + override def run(): RemoteStorageManager = { + if (classPath != null && classPath.trim.nonEmpty) { +val classLoader = new ChildFirstClassLoader(classPath, this.getClass.getClassLoader) +val delegate = createDelegate(classLoader) +new ClassLoaderAwareRemoteStorageManager(delegate, classLoader) + } else { +createDelegate(this.getClass.getClassLoader) + } + } +}) + } + + private def configureRSM(): Unit = { +val rsmProps = new util.HashMap[String, Any]() +rlmConfig.remoteStorageManagerProps().asScala.foreach { case (k, v) => rsmProps.put(k, v) } +rsmProps.put(KafkaConfig.BrokerIdProp, brokerId) +remoteLogStorageManager.configure(rsmProps) + } + + private[remote] def createRemoteLogMetadataManager(): RemoteLogMetadataManager = { +def createDelegate(classLoader: ClassLoader) = { + classLoader.loadClass(rlmConfig.remoteLogMetadataManagerClassName()) +.getDeclaredConstructor() +.newInstance() +.asInstanceOf[RemoteLogMetadataManager] +} + +AccessController.doPrivileged(new PrivilegedAction[RemoteLogMetadataManager] { + private val classPath = rlmConfig.remoteLogMetadataManagerClassPath + +
[GitHub] [kafka] showuon commented on pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.
showuon commented on PR #11390: URL: https://github.com/apache/kafka/pull/11390#issuecomment-1340522323 @junrao , do you want to have another review? Since branch 3.4 has created, and this PR blocks some following tiered storage feature development (ex: copying segments to tiered storage, retention checks to clean local and remote log segments), we might need to consider to merge it first and have follow-up PRs for complicated changes. WDYT? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org