[GitHub] [kafka] jlprat commented on a change in pull request #10768: MINOR: fix code listings for ops.html
jlprat commented on a change in pull request #10768: URL: https://github.com/apache/kafka/pull/10768#discussion_r640340174 ## File path: docs/ops.html ## @@ -248,98 +248,98 @@ only exist on brokers 5,6. Since the tool accepts the input list of topics as a json file, you first need to identify the topics you want to move and create the json file as follows: -> cat topics-to-move.json - {"topics": [{"topic": "foo1"}, - {"topic": "foo2"}], - "version":1 - } + > cat topics-to-move.json Review comment: Good catch! Pushed a fix -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10769: MINOR: fix code listings connect.html
jlprat commented on pull request #10769: URL: https://github.com/apache/kafka/pull/10769#issuecomment-849385403 Thanks for the review @showuon ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10770: MINOR: fix code listings security.html
showuon commented on a change in pull request #10770: URL: https://github.com/apache/kafka/pull/10770#discussion_r640339525 ## File path: docs/security.html ## @@ -1835,28 +1834,28 @@ Authentication using Delegation Tokens section. +Authentication using Delegation Tokens section. Review comment: Nice fix! ## File path: docs/security.html ## @@ -428,12 +428,12 @@ zookeeper.sasl.client.username to the appropriate name (e.g., -Dzookeeper.sasl.client.username=zk). -Brokers may also configure JAAS using the broker configuration property sasl.jaas.config. -The property name must be prefixed with the listener prefix including the SASL mechanism, -i.e. listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config. Only one -login module may be specified in the config value. If multiple mechanisms are configured on a -listener, configs must be provided for each mechanism using the listener and mechanism prefix. -For example, +Brokers may also configure JAAS using the broker configuration property sasl.jaas.config. Review comment: Line 402-408 has `ol` and `li` tag not get handled. Please help. Thanks. https://github.com/apache/kafka/pull/10770/files#diff-3485b37e32662f4925ee0374c4f6eb1d4dfa589bbb1c148b2a70e92b4acbe8bdR402-R408 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] socutes commented on pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException
socutes commented on pull request #10749: URL: https://github.com/apache/kafka/pull/10749#issuecomment-849390939 @showuon Please review the changes again! Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a change in pull request #10770: MINOR: fix code listings security.html
jlprat commented on a change in pull request #10770: URL: https://github.com/apache/kafka/pull/10770#discussion_r640352656 ## File path: docs/security.html ## @@ -428,12 +428,12 @@ zookeeper.sasl.client.username to the appropriate name (e.g., -Dzookeeper.sasl.client.username=zk). -Brokers may also configure JAAS using the broker configuration property sasl.jaas.config. -The property name must be prefixed with the listener prefix including the SASL mechanism, -i.e. listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config. Only one -login module may be specified in the config value. If multiple mechanisms are configured on a -listener, configs must be provided for each mechanism using the listener and mechanism prefix. -For example, +Brokers may also configure JAAS using the broker configuration property sasl.jaas.config. Review comment: That `li` tag is currently closed at line 508. It's a 3 level nested ordered lists. Between line 402 and 508 we have the whole sublists as part of that `li` element. That's the reason the closing `li` element seems to be missing, it's just really down the page.  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException
showuon commented on a change in pull request #10749: URL: https://github.com/apache/kafka/pull/10749#discussion_r640356874 ## File path: raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java ## @@ -945,11 +946,10 @@ public void testObserverUnattachedToFollower() throws IOException { } @Test -public void testInitializeWithCorruptedStore() throws IOException { -QuorumStateStore stateStore = Mockito.mock(QuorumStateStore.class); - Mockito.doThrow(IOException.class).when(stateStore).readElectionState(); +public void testInitializeWithCorruptedStore() { QuorumState state = buildQuorumState(Utils.mkSet(localId)); - +QuorumStateStore stateStore = Mockito.mock(QuorumStateStore.class); + Mockito.doThrow(UncheckedIOException.class).when(stateStore).readElectionState(); Review comment: `nit`: I think it'd better to put the 2 line mock at the beginning of the test as before. (i.e. before `QuorumState state = buildQuorumState(Utils.mkSet(localId));`) ## File path: raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java ## @@ -146,25 +150,36 @@ private void writeElectionStateToFile(final File stateFile, QuorumStateData stat writer.flush(); fileOutputStream.getFD().sync(); Utils.atomicMoveWithFallback(temp.toPath(), stateFile.toPath()); +} catch (IOException e) { +throw new UncheckedIOException( +String.format("Error while writing the Quorum status from the file %s", stateFile.getAbsolutePath()), e); } finally { // cleanup the temp file when the write finishes (either success or fail). -Files.deleteIfExists(temp.toPath()); +deleteFileIfExists(temp); } } /** * Clear state store by deleting the local quorum state file - * - * @throws IOException if there is any IO exception during delete */ @Override -public void clear() throws IOException { -Files.deleteIfExists(stateFile.toPath()); -Files.deleteIfExists(new File(stateFile.getAbsolutePath() + ".tmp").toPath()); +public void clear() { +deleteFileIfExists(stateFile); +deleteFileIfExists(new File(stateFile.getAbsolutePath() + ".tmp")); } @Override public String toString() { return "Quorum state filepath: " + stateFile.getAbsolutePath(); } + +private void deleteFileIfExists(File file) { +try { +Files.deleteIfExists(file.toPath()); +} catch (IOException e) { +throw new UncheckedIOException( +String.format("Error deleting file %s", file.getAbsoluteFile()), e Review comment: Maybe this is better: `Error while deleting file...` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12849) Consider migrating TaskMetadata to interface with internal implementation
[ https://issues.apache.org/jira/browse/KAFKA-12849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352338#comment-17352338 ] Josep Prat commented on KAFKA-12849: KIP created https://cwiki.apache.org/confluence/x/XIrOCg > Consider migrating TaskMetadata to interface with internal implementation > - > > Key: KAFKA-12849 > URL: https://issues.apache.org/jira/browse/KAFKA-12849 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Josep Prat >Priority: Major > Labels: needs-kip, newbie, newbie++ > > In KIP-740 we had to go through a deprecation cycle in order to change the > constructor from the original one which accepted the taskId parameter as a > string, to the new one which takes a TaskId object directly. We had > considered just changing the signature directly without deprecation as this > was never intended to be instantiated by users, rather it just acts as a > pass-through metadata class. Sort of by definition if there is no reason to > ever instantiate it, this seems to indicate it may be better suited as a > public interface with the implementation and constructor as internal APIs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on pull request #10467: KAFKA-12609: Rewrite ListOffsets using AdminApiDriver
dajac commented on pull request #10467: URL: https://github.com/apache/kafka/pull/10467#issuecomment-849464267 @dengziming Sure. I have added it to my review backlog. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12629) Failing Test: RaftClusterTest
[ https://issues.apache.org/jira/browse/KAFKA-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352353#comment-17352353 ] Bruno Cadonna commented on KAFKA-12629: --- Failed on: https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10710/2/testReport/ {code:java} java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request. at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic(RaftClusterTest.scala:94) {code} > Failing Test: RaftClusterTest > - > > Key: KAFKA-12629 > URL: https://issues.apache.org/jira/browse/KAFKA-12629 > Project: Kafka > Issue Type: Test > Components: core, unit tests >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Blocker > Labels: flaky-test > Fix For: 3.0.0 > > > {quote} {{java.util.concurrent.ExecutionException: > java.lang.ClassNotFoundException: > org.apache.kafka.controller.NoOpSnapshotWriterBuilder > at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191) > at > kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:364) > at > kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions(RaftClusterTest.scala:181)}}{quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on pull request #10710: KAFKA-12796: Removal of deprecated classes under `streams-scala`
cadonna commented on pull request #10710: URL: https://github.com/apache/kafka/pull/10710#issuecomment-849484765 Failed test is unrelated and known to be flaky: ``` kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #10710: KAFKA-12796: Removal of deprecated classes under `streams-scala`
cadonna merged pull request #10710: URL: https://github.com/apache/kafka/pull/10710 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10710: KAFKA-12796: Removal of deprecated classes under `streams-scala`
jlprat commented on pull request #10710: URL: https://github.com/apache/kafka/pull/10710#issuecomment-849486858 Thanks for the review @cadonna ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp
dajac commented on a change in pull request #10760: URL: https://github.com/apache/kafka/pull/10760#discussion_r640459341 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -1316,6 +1316,16 @@ class Log(@volatile private var _dir: File, val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer]) val epochOptional = Optional.ofNullable(latestEpochOpt.orNull) Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epochOptional)) + } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { +// Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides +// constant time access while being safe to use with concurrent collections unlike `toArray`. +val segmentsCopy = logSegments.toBuffer +val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) +val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer]) +val epochOptional = Optional.ofNullable(latestEpochOpt.orNull) +Some(new TimestampAndOffset(latestTimestampSegment.maxTimestampSoFar, + latestTimestampSegment.offsetOfMaxTimestampSoFar, + epochOptional)) Review comment: Could we get a `maxTimestampSoFar` and `offsetOfMaxTimestampSoFar` which does not correspond to each others? It seems that we have no guarantee here. Is it an issue? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4225,76 +4232,84 @@ public ListOffsetsResult listOffsets(Map topicPartit } } -for (final Map.Entry> entry : leaders.entrySet()) { -final int brokerId = entry.getKey().id(); +for (final Map.Entry>> versionedEntry : leaders.entrySet()) { +for (final Map.Entry> entry : versionedEntry.getValue().entrySet()) { +final int brokerId = versionedEntry.getKey().id(); -calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) { +calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) { -final List partitionsToQuery = new ArrayList<>(entry.getValue().values()); +final List partitionsToQuery = new ArrayList<>(entry.getValue().values()); -@Override -ListOffsetsRequest.Builder createRequest(int timeoutMs) { -return ListOffsetsRequest.Builder +@Override +ListOffsetsRequest.Builder createRequest(int timeoutMs) { +ListOffsetRequestVersion requestVersion = entry.getKey(); +if (requestVersion == ListOffsetRequestVersion.V7AndAbove) { +return ListOffsetsRequest.Builder + .forMaxTimestamp(context.options().isolationLevel()) +.setTargetTimes(partitionsToQuery); +} Review comment: I'd like to better understand how we handle a broker which would not support the version that we need. `ListOffsetsRequest.Builder.forMaxTimestamp` constrains the version to 7 and above when we have have at least one max timestamp spec. It the broker does not support version 7, the request is failed with an `UnsupportedVersionException` and we fail all the future of the brokers with it in `handleFailure`. Now, let's imagine a case where the user does not only include "max timestamp specs" in his request. At the moment, we fail all of them irrespectively of their type. I wonder if we should retry to other specs in this particular case. Have we considered doing this? ## File path: core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package integration.kafka.admin + +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org
[GitHub] [kafka] mdedetrich commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy
mdedetrich commented on pull request #10652: URL: https://github.com/apache/kafka/pull/10652#issuecomment-849508419 @ryannedolan Since https://github.com/apache/kafka/pull/10762 was merged maybe it makes sense to rebase against the current `trunk`? Some of the tests in this PR have assert statements without the failure messages which have been just been fixed (I believe you can just copy some of those assert failure messages to make sure its consistent). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dgd-contributor opened a new pull request #10774: KAFKA-12380 Executor in Connect's Worker is not shut down when the worker is
dgd-contributor opened a new pull request #10774: URL: https://github.com/apache/kafka/pull/10774 The Worker class has an executor field that the public constructor initializes with a new cached thread pool (https://github.com/apache/kafka/blob/02226fa090513882b9229ac834fd493d71ae6d96/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L127.]). When the worker is stopped, it does not shutdown this executor. This is normally okay in the Connect runtime and MirrorMaker 2 runtimes, because the worker is stopped only when the JVM is stopped (via the shutdown hook in the herders). However, we instantiate and stop the herder many times in our integration tests, and this means we're not necessarily shutting down the herder's executor. Normally this won't hurt, as long as all of the runnables that the executor threads run actually do terminate. But it's possible those threads might not terminate in all tests. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9531) java.net.UnknownHostException loop on VM rolling update using CNAME
[ https://issues.apache.org/jira/browse/KAFKA-9531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352386#comment-17352386 ] Rui Abreu commented on KAFKA-9531: -- [~tcsantos] and [~Bolen] What helped us in our specific case was starting using *advertised.host.name* in the Kafka brokers' properties. {quote}node.internal is a CNAME to either nodeA.internal or nodeB.internal {quote} We added *advertised.host.name = node.internal* in the server properties and the issue seems to not manifest itself anymore. But this is a workaround. The underlying issue in Kafka's client code needs to be addressed. > java.net.UnknownHostException loop on VM rolling update using CNAME > --- > > Key: KAFKA-9531 > URL: https://issues.apache.org/jira/browse/KAFKA-9531 > Project: Kafka > Issue Type: Bug > Components: clients, controller, network, producer >Affects Versions: 2.4.0 >Reporter: Rui Abreu >Priority: Major > > Hello, > > My cluster setup in based on VMs behind DNS CNAME . > Example: node.internal is a CNAME to either nodeA.internal or nodeB.internal > Since kafka-client 1.2.1, it has been observed that sometimes Kafka clients > get stuck on a loop with the exception: > Example after nodeB.internal is replaced with nodeA.internal > > {code:java} > 2020-02-10T12:11:28.181Z o.a.k.c.NetworkClient [WARN]- [Consumer > clientId=consumer-6, groupId=consumer.group] Error connecting to node > nodeB.internal:9092 (id: 2 rack: null) > java.net.UnknownHostException: nodeB.internal:9092 > at java.net.InetAddress.getAllByName0(InetAddress.java:1281) > ~[?:1.8.0_222] > at java.net.InetAddress.getAllByName(InetAddress.java:1193) > ~[?:1.8.0_222] > at java.net.InetAddress.getAllByName(InetAddress.java:1127) > ~[?:1.8.0_222] > at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:943) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:68) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1114) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1005) > ~[stormjar.jar:?] > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:537) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:366) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220) > ~[stormjar.jar:?] > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.spout.KafkaSpout.pollKafkaBroker(KafkaSpout.java:365) > ~[stormjar.jar:?] > at > org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:294) > ~[stormjar.jar:?] > at > org.apache.storm.daemon.executor$fn__10715$fn__10730$fn__10761.invoke(executor.clj:649) > ~[storm-core-1.1.3.jar:1.1.3] > at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) > ~[storm-core-1.1.3.jar:1.1.3] > at clojure.lang.AFn.run(AFn.java:22) ~[clojure-1.7.0.jar:?] > at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222] > {code} > > The time it spends in the loop is arbitrary, but it seems the client > effectively stops while this is happening. > This error contrasts with instances where the client is able to recover on > its o
[GitHub] [kafka] cadonna commented on a change in pull request #10770: MINOR: fix code listings security.html
cadonna commented on a change in pull request #10770: URL: https://github.com/apache/kafka/pull/10770#discussion_r640479762 ## File path: docs/security.html ## @@ -208,25 +208,25 @@ Host Name Verification Then create a database and serial number file, these will be used to keep track of which certificates were signed with this CA. Both of these are simply text files that reside in the same directory as your CA keys. -echo 01 > serial.txt +> echo 01 > serial.txt touch index.txt Review comment: If we decide to keep the `>`, then it is missing here. ## File path: docs/security.html ## @@ -384,16 +384,16 @@ SSL key and certificates in PEM format ssl.key.password=test1234 Other configuration settings that may also be needed depending on our requirements and the broker configuration: - -ssl.provider (Optional). The name of the security provider used for SSL connections. Default value is the default security provider of the JVM. -ssl.cipher.suites (Optional). A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. -ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1. It should list at least one of the protocols configured on the broker side -ssl.truststore.type=JKS -ssl.keystore.type=JKS - - + +ssl.provider (Optional). The name of the security provider used for SSL connections. Default value is the default security provider of the JVM. +ssl.cipher.suites (Optional). A cipher suite is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. +ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1. It should list at least one of the protocols configured on the broker side +ssl.truststore.type=JKS +ssl.keystore.type=JKS + + Examples using console-producer and console-consumer: -kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --producer.config client-ssl.properties +> kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --producer.config client-ssl.properties kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties Review comment: If we decide to keep the `>`, then it is missing here. ## File path: docs/security.html ## @@ -47,7 +47,7 @@ keytool -keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg RSA -storetype pkcs12 +> keytool -keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg RSA -storetype pkcs12 Review comment: Now I saw that it has been already used further down. I fine with keeping it to distingiush better between configs and commands. However, I found some places where `>` is still missing (see further down). ## File path: docs/security.html ## @@ -47,7 +47,7 @@ keytool -keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg RSA -storetype pkcs12 +> keytool -keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg RSA -storetype pkcs12 Review comment: I am wondering if it is a good idea to add `>` in front of the commands. It is something additional users need to remove when they copy & paste the commands. ## File path: docs/security.html ## @@ -428,12 +428,12 @@ zookeeper.sasl.client.username to the appropriate name (e.g., -Dzookeeper.sasl.client.username=zk). -Brokers may also configure JAAS using the broker configuration property sasl.jaas.config. -The property name must be prefixed with the listener prefix including the SASL mechanism, -i.e. listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config. Only one -login module may be specified in the config value. If multiple mechanisms are configured on a -listener, configs must be provided for each mechanism using the listener and mechanism prefix. -For example, +Brokers may also configure JAAS using the broker configuration property sasl.jaas.config. +The property name must be prefixed with the listener prefix including the SASL mechanism, +i.e. listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config. Only one +login module may be specified in the config value. If multiple mechanisms are conf
[GitHub] [kafka] feyman2016 commented on pull request #10377: KAFKA-12515 ApiVersionManager should create response based on request version
feyman2016 commented on pull request #10377: URL: https://github.com/apache/kafka/pull/10377#issuecomment-849526883 @rajinisivaram Thanks for the review, I will address it later~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a change in pull request #10770: MINOR: fix code listings security.html
jlprat commented on a change in pull request #10770: URL: https://github.com/apache/kafka/pull/10770#discussion_r640512166 ## File path: docs/security.html ## @@ -47,7 +47,7 @@ keytool -keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg RSA -storetype pkcs12 +> keytool -keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg RSA -storetype pkcs12 Review comment: The examples present in the documentation, use more frequently the `>` character when showing commands. Sometimes a `$` is shown, and really occasionally nothing is put in there. As `>` was the most common used one already in the codebase, I decided to unify on this character. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a change in pull request #10770: MINOR: fix code listings security.html
jlprat commented on a change in pull request #10770: URL: https://github.com/apache/kafka/pull/10770#discussion_r640515305 ## File path: docs/security.html ## @@ -428,12 +428,12 @@ zookeeper.sasl.client.username to the appropriate name (e.g., -Dzookeeper.sasl.client.username=zk). -Brokers may also configure JAAS using the broker configuration property sasl.jaas.config. -The property name must be prefixed with the listener prefix including the SASL mechanism, -i.e. listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config. Only one -login module may be specified in the config value. If multiple mechanisms are configured on a -listener, configs must be provided for each mechanism using the listener and mechanism prefix. -For example, +Brokers may also configure JAAS using the broker configuration property sasl.jaas.config. +The property name must be prefixed with the listener prefix including the SASL mechanism, +i.e. listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config. Only one +login module may be specified in the config value. If multiple mechanisms are configured on a +listener, configs must be provided for each mechanism using the listener and mechanism prefix. +For example, Review comment: Actually, it seems the above paragraphs should be indented to the right, there are several nested `` and `` and `` that didn't follow indentation. I'm fixing these now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a change in pull request #10770: MINOR: fix code listings security.html
jlprat commented on a change in pull request #10770: URL: https://github.com/apache/kafka/pull/10770#discussion_r640516748 ## File path: docs/security.html ## @@ -443,134 +443,133 @@ -If JAAS configuration is defined at different levels, the order of precedence used is: - - Broker configuration property listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config - {listenerName}.KafkaServer section of static JAAS configuration - KafkaServer section of static JAAS configuration - -Note that ZooKeeper JAAS config may only be configured using static JAAS configuration. - -See GSSAPI (Kerberos), -PLAIN, -SCRAM or -OAUTHBEARER for example broker configurations. - - -JAAS configuration for Kafka clients - -Clients may configure JAAS using the client configuration property -sasl.jaas.config -or using the static JAAS config file -similar to brokers. - - -JAAS configuration using client configuration property -Clients may specify JAAS configuration as a producer or consumer property without -creating a physical configuration file. This mode also enables different producers -and consumers within the same JVM to use different credentials by specifying -different properties for each client. If both static JAAS configuration system property -java.security.auth.login.config and client property sasl.jaas.config -are specified, the client property will be used. - -See GSSAPI (Kerberos), -PLAIN, -SCRAM or -OAUTHBEARER for example configurations. - -JAAS configuration using static config file -To configure SASL authentication on the clients using static JAAS config file: - -Add a JAAS config file with a client login section named KafkaClient. Configure -a login module in KafkaClient for the selected mechanism as described in the examples -for setting up GSSAPI (Kerberos), -PLAIN, -SCRAM or -OAUTHBEARER. -For example, GSSAPI -credentials may be configured as: -KafkaClient { +If JAAS configuration is defined at different levels, the order of precedence used is: Review comment: Same as previous, you are right, it needs to be aligned with the contents of the header `JAAS configuration for Kafka brokers`. However, the problem here was that the previous content was misaligned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10770: MINOR: fix code listings security.html
jlprat commented on pull request #10770: URL: https://github.com/apache/kafka/pull/10770#issuecomment-849539579 Hi @cadonna thanks a lot for the feedback. I fixed the missing `>` characters. About the misalignment, you were right, it was but not from the lines you mention, but the previous ones. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp
thomaskwscott commented on a change in pull request #10760: URL: https://github.com/apache/kafka/pull/10760#discussion_r640520669 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4225,76 +4232,84 @@ public ListOffsetsResult listOffsets(Map topicPartit } } -for (final Map.Entry> entry : leaders.entrySet()) { -final int brokerId = entry.getKey().id(); +for (final Map.Entry>> versionedEntry : leaders.entrySet()) { +for (final Map.Entry> entry : versionedEntry.getValue().entrySet()) { +final int brokerId = versionedEntry.getKey().id(); -calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) { +calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) { -final List partitionsToQuery = new ArrayList<>(entry.getValue().values()); +final List partitionsToQuery = new ArrayList<>(entry.getValue().values()); -@Override -ListOffsetsRequest.Builder createRequest(int timeoutMs) { -return ListOffsetsRequest.Builder +@Override +ListOffsetsRequest.Builder createRequest(int timeoutMs) { +ListOffsetRequestVersion requestVersion = entry.getKey(); +if (requestVersion == ListOffsetRequestVersion.V7AndAbove) { +return ListOffsetsRequest.Builder + .forMaxTimestamp(context.options().isolationLevel()) +.setTargetTimes(partitionsToQuery); +} Review comment: At present the only way requestVersion could be V7AndAbove is if we were issuing MAX_TIMESTAMP requests because of the way the calls are parsed earlier: ` ListOffsetRequestVersion requiredRequestVersion = offsetQuery == ListOffsetsRequest.MAX_TIMESTAMP ? ListOffsetRequestVersion.V7AndAbove : ListOffsetRequestVersion.V0AndAbove; ` All non-max timestamp requests are built using forConsumer rather than forMaxTimestamp and so should succeed against older brokers. Maybe the enums are a bit misleading in this regard. I'll see if i can come up with something better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat edited a comment on pull request #10770: MINOR: fix code listings security.html
jlprat edited a comment on pull request #10770: URL: https://github.com/apache/kafka/pull/10770#issuecomment-849539579 Hi @cadonna thanks a lot for the feedback. I fixed the missing `>` characters. About the misalignment, you were right, however it was not from the lines you mention, but the previous ones. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12855) Update ssl certificates of kafka connect worker runtime without restarting the worker process.
kaushik srinivas created KAFKA-12855: Summary: Update ssl certificates of kafka connect worker runtime without restarting the worker process. Key: KAFKA-12855 URL: https://issues.apache.org/jira/browse/KAFKA-12855 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: kaushik srinivas Is there a possibility to update the ssl certificates of kafka connect worker dynamically something similar to kafka-configs script for kafka ? Or the only way to update the certificates is to restart the worker processes and update the certificates ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp
thomaskwscott commented on a change in pull request #10760: URL: https://github.com/apache/kafka/pull/10760#discussion_r640543594 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -1316,6 +1316,16 @@ class Log(@volatile private var _dir: File, val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer]) val epochOptional = Optional.ofNullable(latestEpochOpt.orNull) Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epochOptional)) + } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) { +// Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides +// constant time access while being safe to use with concurrent collections unlike `toArray`. +val segmentsCopy = logSegments.toBuffer +val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar) +val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer]) +val epochOptional = Optional.ofNullable(latestEpochOpt.orNull) +Some(new TimestampAndOffset(latestTimestampSegment.maxTimestampSoFar, + latestTimestampSegment.offsetOfMaxTimestampSoFar, + epochOptional)) Review comment: In all cases I can find the 2 are updated together so I think we can assume consistency. For the topic liveness case in the KIP absolute consistency is not required but there will be other cases that will need this (e.g. topic inspection). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp
thomaskwscott commented on a change in pull request #10760: URL: https://github.com/apache/kafka/pull/10760#discussion_r640544656 ## File path: core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala ## @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package integration.kafka.admin + +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.clients.admin._ +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.utils.Utils +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} + +import scala.collection.{Map, Seq} +import scala.jdk.CollectionConverters._ + +class ListOffsetsIntegrationTest extends KafkaServerTestHarness { + + val topicName = "foo" + var adminClient: Admin = null + + @BeforeEach + override def setUp(): Unit = { +super.setUp() +createTopic(topicName,1,1.asInstanceOf[Short]) +produceMessages() +adminClient = Admin.create(Map[String, Object]( + AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList +).asJava) + } + + @AfterEach + override def tearDown(): Unit = { +Utils.closeQuietly(adminClient, "ListOffsetsAdminClient") +super.tearDown() + } + + @Test + def testEarliestOffset(): Unit = { +val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest()) +assertEquals(0,earliestOffset.offset()) + } + + @Test + def testLatestOffset(): Unit = { +val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest()) +assertEquals(3,latestOffset.offset()) + } + + @Test + def testMaxTimestampOffset(): Unit = { +val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp()) +assertEquals(1,maxTimestampOffset.offset()) + } + + private def runFetchOffsets(adminClient: Admin, + offsetSpec: OffsetSpec): ListOffsetsResult.ListOffsetsResultInfo = { +println(s"==> listOffsets(${topicName} -> ${offsetSpec}, new ListOffsetsOptions())") Review comment: I used ReassignPartitionsIntegrationTest as a base for creating this and this has similar messages, I can remove if needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp
dajac commented on a change in pull request #10760: URL: https://github.com/apache/kafka/pull/10760#discussion_r640544984 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4225,76 +4232,84 @@ public ListOffsetsResult listOffsets(Map topicPartit } } -for (final Map.Entry> entry : leaders.entrySet()) { -final int brokerId = entry.getKey().id(); +for (final Map.Entry>> versionedEntry : leaders.entrySet()) { +for (final Map.Entry> entry : versionedEntry.getValue().entrySet()) { +final int brokerId = versionedEntry.getKey().id(); -calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) { +calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) { -final List partitionsToQuery = new ArrayList<>(entry.getValue().values()); +final List partitionsToQuery = new ArrayList<>(entry.getValue().values()); -@Override -ListOffsetsRequest.Builder createRequest(int timeoutMs) { -return ListOffsetsRequest.Builder +@Override +ListOffsetsRequest.Builder createRequest(int timeoutMs) { +ListOffsetRequestVersion requestVersion = entry.getKey(); +if (requestVersion == ListOffsetRequestVersion.V7AndAbove) { +return ListOffsetsRequest.Builder + .forMaxTimestamp(context.options().isolationLevel()) +.setTargetTimes(partitionsToQuery); +} Review comment: Oh... I see. So you are saying that if we have two specs for a given leader, say one with MAX_TIMESTAMP and another one with EARLIEST_TIMESTAMP, we send two separate requests to that leader, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp
thomaskwscott commented on a change in pull request #10760: URL: https://github.com/apache/kafka/pull/10760#discussion_r640552093 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4225,76 +4232,84 @@ public ListOffsetsResult listOffsets(Map topicPartit } } -for (final Map.Entry> entry : leaders.entrySet()) { -final int brokerId = entry.getKey().id(); +for (final Map.Entry>> versionedEntry : leaders.entrySet()) { +for (final Map.Entry> entry : versionedEntry.getValue().entrySet()) { +final int brokerId = versionedEntry.getKey().id(); -calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) { +calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) { -final List partitionsToQuery = new ArrayList<>(entry.getValue().values()); +final List partitionsToQuery = new ArrayList<>(entry.getValue().values()); -@Override -ListOffsetsRequest.Builder createRequest(int timeoutMs) { -return ListOffsetsRequest.Builder +@Override +ListOffsetsRequest.Builder createRequest(int timeoutMs) { +ListOffsetRequestVersion requestVersion = entry.getKey(); +if (requestVersion == ListOffsetRequestVersion.V7AndAbove) { +return ListOffsetsRequest.Builder + .forMaxTimestamp(context.options().isolationLevel()) +.setTargetTimes(partitionsToQuery); +} Review comment: That's right, as I read it we would send separate requests even under the old logic (i.e. for LATEST_TIMESTAMP and EARLIEST_TIMESTAMP). The only difference here is we limit the versions for MAX_TIMESTAMP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #10759: MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout`
ijuma merged pull request #10759: URL: https://github.com/apache/kafka/pull/10759 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] iakunin opened a new pull request #10775: KAFKA-12668: Making MockScheduler.schedule safe to use in concurrent code
iakunin opened a new pull request #10775: URL: https://github.com/apache/kafka/pull/10775 Making MockScheduler.schedule safe to use in concurrent code by removing `tick()` call inside MockScheduler.schedule. To reproduce a bug I wrote a unit-test MockSchedulerTest. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-12668) MockScheduler is not safe to use in concurrent code.
[ https://issues.apache.org/jira/browse/KAFKA-12668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352476#comment-17352476 ] Maksim Iakunin edited comment on KAFKA-12668 at 5/27/21, 1:29 PM: -- GitHub pull-request with bug-fix: https://github.com/apache/kafka/pull/10775 [~jagsancio], could you have a look, please? was (Author: iakunin): GitHub pull-request: https://github.com/apache/kafka/pull/10775 > MockScheduler is not safe to use in concurrent code. > > > Key: KAFKA-12668 > URL: https://issues.apache.org/jira/browse/KAFKA-12668 > Project: Kafka > Issue Type: Improvement > Components: unit tests >Reporter: Jose Armando Garcia Sancio >Assignee: Maksim Iakunin >Priority: Major > Labels: newbie > > The current implementation of {{MockScheduler}} executes tasks in the same > stack when {{schedule}} is called. This violates {{Log}}'s assumption since > {{Log}} calls {{schedule}} while holding a lock. This can cause deadlock in > tests. > One solution is to change {{MockSchedule}} {{schedule}} method so that > {{tick}} is not called. {{tick}} should be called by a stack (thread) that > doesn't hold any locks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma opened a new pull request #10776: MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout`
ijuma opened a new pull request #10776: URL: https://github.com/apache/kafka/pull/10776 New parameters in overloaded methods should appear later apart from lambdas that should always be last. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request #10777: MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout`
ijuma opened a new pull request #10777: URL: https://github.com/apache/kafka/pull/10777 New parameters in overloaded methods should appear later apart from lambdas that should always be last. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10759: MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout`
ijuma commented on pull request #10759: URL: https://github.com/apache/kafka/pull/10759#issuecomment-849642509 PRs for 2.8 and 2.7: * https://github.com/apache/kafka/pull/10777 * https://github.com/apache/kafka/pull/10776 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10761: MINOR: Don't ignore deletion of partition metadata file and log topic id clean-ups
ijuma commented on a change in pull request #10761: URL: https://github.com/apache/kafka/pull/10761#discussion_r640639035 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -325,19 +325,25 @@ class Log(@volatile private var _dir: File, // Ensure we do not try to assign a provided topicId that is inconsistent with the ID on file. if (partitionMetadataFile.exists()) { if (!keepPartitionMetadataFile) - partitionMetadataFile.delete() + try partitionMetadataFile.delete() + catch { +case e: IOException => + error(s"Error while trying to delete partition metadata file ${partitionMetadataFile}", e) Review comment: Good point, addressed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp
dajac commented on a change in pull request #10760: URL: https://github.com/apache/kafka/pull/10760#discussion_r640640161 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -4225,76 +4232,84 @@ public ListOffsetsResult listOffsets(Map topicPartit } } -for (final Map.Entry> entry : leaders.entrySet()) { -final int brokerId = entry.getKey().id(); +for (final Map.Entry>> versionedEntry : leaders.entrySet()) { +for (final Map.Entry> entry : versionedEntry.getValue().entrySet()) { +final int brokerId = versionedEntry.getKey().id(); -calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) { +calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) { -final List partitionsToQuery = new ArrayList<>(entry.getValue().values()); +final List partitionsToQuery = new ArrayList<>(entry.getValue().values()); -@Override -ListOffsetsRequest.Builder createRequest(int timeoutMs) { -return ListOffsetsRequest.Builder +@Override +ListOffsetsRequest.Builder createRequest(int timeoutMs) { +ListOffsetRequestVersion requestVersion = entry.getKey(); +if (requestVersion == ListOffsetRequestVersion.V7AndAbove) { +return ListOffsetsRequest.Builder + .forMaxTimestamp(context.options().isolationLevel()) +.setTargetTimes(partitionsToQuery); +} Review comment: It seems that the current logic send only one request per broker/leader whereas we could send up to two requests with your PR because specs are partitioned by `Node` and `ListOffsetRequestVersion`. Previously, they were only partitioned by `Node`. Intuitively, I would have approached the problem differently. I would have put all the specs in the same request and constrained its version to 7 and above if there is at least one `MAX_TIMESTAMP`. If the request succeeds, all good. If the request fail with an `UnsupportedVersionException`, I would have retried with all the specs but the `MAX_TIMESTAMP` ones and I would have failed the future of the `MAX_TIMESTAMP` specs. In case of an `UnsupportedVersionException`, the admin client calls the `handleUnsupportedVersionException` method of the `Call`. This gives you an opportunity to downgrade and to retry the `Call`. There are couple of example in the `KafkaAdminClient`. I wonder if we could rely on a similar pattern and avoid sending two requests per leader in the worst case. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10176: KAFKA-12359: Update Jetty to 11
ijuma commented on pull request #10176: URL: https://github.com/apache/kafka/pull/10176#issuecomment-849652409 We are not planning to upgrade to Java 11 any time soon. Can you please submit a PR for the latest Jetty version that supports Java 8 still? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma closed pull request #10176: KAFKA-12359: Update Jetty to 11
ijuma closed pull request #10176: URL: https://github.com/apache/kafka/pull/10176 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12774) kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through log4j
[ https://issues.apache.org/jira/browse/KAFKA-12774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352499#comment-17352499 ] Almog Gavra commented on KAFKA-12774: - I went back to triage the severity of this because if things aren't getting logged via Log4j then our applications would be missing log-shipping, which is crticial in our production environments. To confirm nothing was being logged outside of Log4J I hacked together something to try to reproduce this and I also added the following line any time a record was to be sent in RecordCollectorImpl to make sure I had exactly the same type of error that you encountered: {code:java} recordSendError(topic, new InvalidPidMappingException("foo"), serializedRecord); {code} I could not reproduce. The only things that were logged (notice that all Log4j loggers are turned OFF so only things that get logged to stdout get logged) were what I specifially logged to stdout. This was the stdout (I printed to stdout in the uncaught handler instead of logging): {code:java} SEEN: 0,0 (HERE) Uncaught exception handled - replacing thread Error encountered sending record to topic bar for task 0_0 due to: org.apache.kafka.common.errors.InvalidPidMappingException: foo Exception handler choose to FAIL the processing, no more records would be sent. {code} We can leave this ticket open in case anyone else has any Ideas. Here is the app: {code:java} public static void main(String[] args) throws InterruptedException, IOException { LogManager.getRootLogger().setLevel(Level.OFF); @SuppressWarnings("unchecked") Enumeration loggers = LogManager.getCurrentLoggers(); while (loggers.hasMoreElements()) { loggers.nextElement().setLevel(Level.OFF); } final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(1); cluster.start(); cluster.createTopic("foo"); final Properties config = new Properties(); config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app"); config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + 123); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); config.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1); config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200); config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000); config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); final CountDownLatch sawKey = new CountDownLatch(1); final StreamsBuilder builder = new StreamsBuilder(); builder.stream("foo") .filter((k, v) -> k != null) .peek((k, v) -> System.out.println("SEEN: " + k + "," + v)) .peek((k ,v) -> { if ((int) k == 0) sawKey.countDown(); }) .to("bar"); final Topology build = builder.build(config); final KafkaStreams app = new KafkaStreams(build, config); app.setUncaughtExceptionHandler(exception -> { System.out.println("(HERE) Uncaught exception handled - replacing thread " + exception.getMessage()); return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD; } ); final CountDownLatch startLatch = new CountDownLatch(1); app.setStateListener((newState, oldState) -> { if (newState == State.RUNNING) { startLatch.countDown(); } }); app.start(); startLatch.await(); final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); IntegrationTestUtils.produceKeyValuesSynchronously( "foo", IntStream.range(0, 1) .mapToObj(i -> KeyValue.pair(0, i)) .collect(Collectors.toList()), producerProps, Time.SYSTEM); sawKey.await(); app.close(); app.cleanUp(); cluster.after(); }{code} > kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through > log4j > > > Key: KAFKA-12774 > URL: https://issues.apache.org/jira/browse/KAFKA-12774 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Jørgen >Priority: Minor > Fix For: 3.0.0, 2.8.1 > > > When exceptions is handled in the uncaught-exception handler introduced in > KS2.8,
[jira] [Created] (KAFKA-12856) Upgrade Jackson to 2.12.3
Ismael Juma created KAFKA-12856: --- Summary: Upgrade Jackson to 2.12.3 Key: KAFKA-12856 URL: https://issues.apache.org/jira/browse/KAFKA-12856 Project: Kafka Issue Type: Bug Reporter: Ismael Juma Assignee: Ismael Juma 2.10.x is no longer supported, so we should move to 2.12 for the 3.0 release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] iakunin commented on pull request #10775: KAFKA-12668: Making MockScheduler.schedule safe to use in concurrent code
iakunin commented on pull request #10775: URL: https://github.com/apache/kafka/pull/10775#issuecomment-849675662 @jsancio could you have a look, please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request #10778: KAFKA-12856: Upgrade Jackson to 2.12.3
ijuma opened a new pull request #10778: URL: https://github.com/apache/kafka/pull/10778 2.10.x is no longer supported, so we should move to 2.12 for the 3.0 release. ScalaObjectMapper has been deprecated and it looks like we don't actually need it, so remove its usage. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12465) Decide whether inconsistent cluster id error are fatal
[ https://issues.apache.org/jira/browse/KAFKA-12465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352518#comment-17352518 ] Omnia Ibrahim commented on KAFKA-12465: --- I have been testing KRAFT and I was trying this scenario where I setup a cluster with 3 combined nodes (broker, controller) and 3 nodes as brokers then later at some point I add an extra 2 nodes to the KRAFT with different cluster id. I would expect if this is a really deployment on production then these 2 nodes with wrong cluster id should crash immediately so we can tell that something is wrong during the deployment. The scenario I was testing is the following: * Setup a cluster with 3 combined raft nodes (broker, controller mode) + 3 brokers nodes with cluster id {{CLUSTER_ID_1}} and they elected {{raft-node-1}} to become the leader. * Added an extra 2 nodes later to the raft with different cluster id {{WRONG_CLUSTER_ID}} * The the extra nodes don't crash however it stay in running mode and keep throw error {code:java} {"level":"ERROR","message":"[RaftManager nodeId=8] Unexpected error INCONSISTENT_CLUSTER_ID in FETCH response: InboundResponse(correlationId=16699, data=FetchResponseData(throttleTimeMs=0, errorCode=104, sessionId=0, responses=[]), sourceId=2)","logger":"org.apache.kafka.raft.KafkaRaftClient"}{code} {{}} * {{raft-node-1}} don't throw errors, only warning for connection issues connection {code:java} {"level":"WARN","message":"[RaftManager nodeId=1] Error connecting to node raft-node-4:9093 (id: 8 rack: null)","logger":"org.apache.kafka.clients.NetworkClient","throwable":{"class":"java.net.UnknownHostException","msg":"raft-node-4","stack":["java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)","java.net.InetAddress.getAllByName0(InetAddress.java:1505)","java.net.InetAddress.getAllByName(InetAddress.java:1364)","java.net.InetAddress.getAllByName(InetAddress.java:1298)","org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)","org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:111)","org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:512)","org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:466)","org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:172)","org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:985)","org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:311)","kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1(InterBrokerSendThread.scala:103)","kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1$adapted(InterBrokerSendThread.scala:99)","scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)","scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)","scala.collection.AbstractIterable.foreach(Iterable.scala:920)","kafka.common.InterBrokerSendThread.sendRequests(InterBrokerSendThread.scala:99)","kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:73)","kafka.common.InterBrokerSendThread.doWork(InterBrokerSendThread.scala:94)","kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)"]}}{code} If this is a real deployment the error `INCONSISTENT_CLUSTER_ID` should be fatel at all time, otherwise how can we tell if these nodes is failing to join the active raft quourm? > Decide whether inconsistent cluster id error are fatal > -- > > Key: KAFKA-12465 > URL: https://issues.apache.org/jira/browse/KAFKA-12465 > Project: Kafka > Issue Type: Sub-task >Reporter: dengziming >Priority: Major > > Currently, we just log an error when an inconsistent cluster-id occurred. We > should set a window during startup when these errors are fatal but after that > window, we no longer treat them to be fatal. see > https://github.com/apache/kafka/pull/10289#discussion_r592853088 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12465) Decide whether inconsistent cluster id error are fatal
[ https://issues.apache.org/jira/browse/KAFKA-12465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352518#comment-17352518 ] Omnia Ibrahim edited comment on KAFKA-12465 at 5/27/21, 2:26 PM: - I have been testing KRAFT and I was trying this scenario where I setup a cluster with 3 combined nodes (broker, controller) and 3 nodes as brokers then later at some point I add an extra 2 nodes to the KRAFT with different cluster id. I would expect if this is a really deployment on production then these 2 nodes with wrong cluster id should crash immediately so we can tell that something is wrong during the deployment. The scenario I was testing is the following: * Setup a cluster with 3 combined raft nodes (broker, controller mode) + 3 brokers nodes with cluster id {{CLUSTER_ID_1}} and they elected {{raft-node-1}} to become the leader. * Added an extra 2 nodes later to the raft with different cluster id {{WRONG_CLUSTER_ID}} * The the extra nodes don't crash however it stay in running mode and keep throw error {code:java} {"level":"ERROR","message":"[RaftManager nodeId=8] Unexpected error INCONSISTENT_CLUSTER_ID in FETCH response: InboundResponse(correlationId=16699, data=FetchResponseData(throttleTimeMs=0, errorCode=104, sessionId=0, responses=[]), sourceId=2)","logger":"org.apache.kafka.raft.KafkaRaftClient"}{code} * {{raft-node-1}} don't throw errors, only warning for connection issues connection {code:java} {"level":"WARN","message":"[RaftManager nodeId=1] Error connecting to node raft-node-4:9093 (id: 8 rack: null)","logger":"org.apache.kafka.clients.NetworkClient","throwable":{"class":"java.net.UnknownHostException","msg":"raft-node-4","stack":["java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)","java.net.InetAddress.getAllByName0(InetAddress.java:1505)","java.net.InetAddress.getAllByName(InetAddress.java:1364)","java.net.InetAddress.getAllByName(InetAddress.java:1298)","org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)","org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:111)","org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:512)","org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:466)","org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:172)","org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:985)","org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:311)","kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1(InterBrokerSendThread.scala:103)","kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1$adapted(InterBrokerSendThread.scala:99)","scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)","scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)","scala.collection.AbstractIterable.foreach(Iterable.scala:920)","kafka.common.InterBrokerSendThread.sendRequests(InterBrokerSendThread.scala:99)","kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:73)","kafka.common.InterBrokerSendThread.doWork(InterBrokerSendThread.scala:94)","kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)"]}}{code} If this is a real deployment the error `INCONSISTENT_CLUSTER_ID` should be fatel at all time, otherwise how can we tell if these nodes is failing to join the active raft quourm? was (Author: omnia_h_ibrahim): I have been testing KRAFT and I was trying this scenario where I setup a cluster with 3 combined nodes (broker, controller) and 3 nodes as brokers then later at some point I add an extra 2 nodes to the KRAFT with different cluster id. I would expect if this is a really deployment on production then these 2 nodes with wrong cluster id should crash immediately so we can tell that something is wrong during the deployment. The scenario I was testing is the following: * Setup a cluster with 3 combined raft nodes (broker, controller mode) + 3 brokers nodes with cluster id {{CLUSTER_ID_1}} and they elected {{raft-node-1}} to become the leader. * Added an extra 2 nodes later to the raft with different cluster id {{WRONG_CLUSTER_ID}} * The the extra nodes don't crash however it stay in running mode and keep throw error {code:java} {"level":"ERROR","message":"[RaftManager nodeId=8] Unexpected error INCONSISTENT_CLUSTER_ID in FETCH response: InboundResponse(correlationId=16699, data=FetchResponseData(throttleTimeMs=0, errorCode=104, sessionId=0, responses=[]), sourceId=2)","logger":"org.apache.kafka.raft.KafkaRaftClient"}{code} {{}} * {{raft-node-1}} don't throw errors, only warning for connection issues connection {code:java} {"level":"WARN","message":"[RaftManager nodeId=1] Error connecting to node raft-node-4:9093 (id: 8 rack: null)","logger":
[GitHub] [kafka] iakunin edited a comment on pull request #10775: KAFKA-12668: Making MockScheduler.schedule safe to use in concurrent code
iakunin edited a comment on pull request #10775: URL: https://github.com/apache/kafka/pull/10775#issuecomment-849675662 @jsancio hi! Could you have a look at this PR, please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #10765: KAFKA-12519: Remove built-in Streams metrics for versions 0.10.0-2.4
cadonna commented on a change in pull request #10765: URL: https://github.com/apache/kafka/pull/10765#discussion_r640686291 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java ## @@ -272,21 +230,8 @@ public void shouldRecordRestoreLatencyOnInit() { expectLastCall(); replay(innerStoreMock); store.init((StateStoreContext) context, store); -final Map metrics = context.metrics().metrics(); -if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) { -assertEquals(1.0, getMetricByNameFilterByTags( -metrics, -"restore-total", -storeLevelGroup, -singletonMap(STORE_TYPE + "-state-id", STORE_NAME) -).metricValue()); -assertEquals(1.0, getMetricByNameFilterByTags( -metrics, -"restore-total", -storeLevelGroup, -singletonMap(STORE_TYPE + "-state-id", ROLLUP_VALUE) -).metricValue()); -} + +verify(innerStoreMock); Review comment: Good that you mention that! I checked the metered state store tests and found some testing holes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10764: MINOR: make sure all fiedls of o.p.k.s.a.Action are NOT null
ijuma commented on a change in pull request #10764: URL: https://github.com/apache/kafka/pull/10764#discussion_r640705924 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Action.java ## @@ -36,22 +36,22 @@ public Action(AclOperation operation, int resourceReferenceCount, boolean logIfAllowed, boolean logIfDenied) { -this.operation = operation; -this.resourcePattern = resourcePattern; +this.operation = Objects.requireNonNull(operation, "operation can't be null"); +this.resourcePattern = Objects.requireNonNull(resourcePattern, "resourcePattern can't be null"); this.logIfAllowed = logIfAllowed; this.logIfDenied = logIfDenied; this.resourceReferenceCount = resourceReferenceCount; } /** - * Resource on which action is being performed. + * Resource on which action is being performed. never null Review comment: I would include this in the sentence, "Returns a non-null resource pattern on which this action is being performed". -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10764: MINOR: make sure all fiedls of o.p.k.s.a.Action are NOT null
ijuma commented on a change in pull request #10764: URL: https://github.com/apache/kafka/pull/10764#discussion_r640705924 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Action.java ## @@ -36,22 +36,22 @@ public Action(AclOperation operation, int resourceReferenceCount, boolean logIfAllowed, boolean logIfDenied) { -this.operation = operation; -this.resourcePattern = resourcePattern; +this.operation = Objects.requireNonNull(operation, "operation can't be null"); +this.resourcePattern = Objects.requireNonNull(resourcePattern, "resourcePattern can't be null"); this.logIfAllowed = logIfAllowed; this.logIfDenied = logIfDenied; this.resourceReferenceCount = resourceReferenceCount; } /** - * Resource on which action is being performed. + * Resource on which action is being performed. never null Review comment: I would include the non-null constraint in the sentence, eg "Returns a non-null resource pattern on which this action is being performed". ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/Action.java ## @@ -36,22 +36,22 @@ public Action(AclOperation operation, int resourceReferenceCount, boolean logIfAllowed, boolean logIfDenied) { -this.operation = operation; -this.resourcePattern = resourcePattern; +this.operation = Objects.requireNonNull(operation, "operation can't be null"); +this.resourcePattern = Objects.requireNonNull(resourcePattern, "resourcePattern can't be null"); Review comment: Shall we mention this in the javadoc too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #10704: KAFKA-12791: ConcurrentModificationException in AbstractConfig use by KafkaProducer
lbradstreet commented on a change in pull request #10704: URL: https://github.com/apache/kafka/pull/10704#discussion_r640706594 ## File path: clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java ## @@ -106,7 +107,7 @@ public AbstractConfig(ConfigDef definition, Map originals, Map this.originals = resolveConfigVariables(configProviderProps, (Map) originals); this.values = definition.parse(this.originals); -this.used = Collections.synchronizedSet(new HashSet<>()); +this.used = ConcurrentHashMap.newKeySet(); Review comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot
feyman2016 commented on a change in pull request #10593: URL: https://github.com/apache/kafka/pull/10593#discussion_r640723618 ## File path: raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java ## @@ -135,4 +141,18 @@ public static void assertSnapshot(List> batches, SnapshotReader
[jira] [Comment Edited] (KAFKA-12465) Decide whether inconsistent cluster id error are fatal
[ https://issues.apache.org/jira/browse/KAFKA-12465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352518#comment-17352518 ] Omnia Ibrahim edited comment on KAFKA-12465 at 5/27/21, 3:40 PM: - I have been testing KRAFT and I tried this scenario where I setup a cluster with 3 combined nodes (broker, controller) and 3 nodes as brokers then later at some point I add an extra 2 nodes to the KRAFT with different cluster id. I would expect if this is a really deployment on production then these 2 nodes with wrong cluster id should crash immediately so we can tell that something is wrong during the deployment. The scenario I was testing is the following: * Setup a cluster with 3 combined raft nodes (broker, controller mode) + 3 brokers nodes with cluster id {{CLUSTER_ID_1}} and they elected {{raft-node-1}} to become the leader. * Added an extra 2 nodes later to the raft with different cluster id {{WRONG_CLUSTER_ID}} * The the extra nodes don't crash however it stay in running mode and keep throw error {code:java} {"level":"ERROR","message":"[RaftManager nodeId=8] Unexpected error INCONSISTENT_CLUSTER_ID in FETCH response: InboundResponse(correlationId=16699, data=FetchResponseData(throttleTimeMs=0, errorCode=104, sessionId=0, responses=[]), sourceId=2)","logger":"org.apache.kafka.raft.KafkaRaftClient"}{code} * {{raft-node-1}} don't throw errors, only warning for connection issues connection {code:java} {"level":"WARN","message":"[RaftManager nodeId=1] Error connecting to node raft-node-4:9093 (id: 8 rack: null)","logger":"org.apache.kafka.clients.NetworkClient","throwable":{"class":"java.net.UnknownHostException","msg":"raft-node-4","stack":["java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)","java.net.InetAddress.getAllByName0(InetAddress.java:1505)","java.net.InetAddress.getAllByName(InetAddress.java:1364)","java.net.InetAddress.getAllByName(InetAddress.java:1298)","org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)","org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:111)","org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:512)","org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:466)","org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:172)","org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:985)","org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:311)","kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1(InterBrokerSendThread.scala:103)","kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1$adapted(InterBrokerSendThread.scala:99)","scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)","scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)","scala.collection.AbstractIterable.foreach(Iterable.scala:920)","kafka.common.InterBrokerSendThread.sendRequests(InterBrokerSendThread.scala:99)","kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:73)","kafka.common.InterBrokerSendThread.doWork(InterBrokerSendThread.scala:94)","kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)"]}}{code} If this is a real deployment the error `INCONSISTENT_CLUSTER_ID` should be fatel at all time, otherwise how can we tell if these nodes is failing to join the active raft quourm? was (Author: omnia_h_ibrahim): I have been testing KRAFT and I was trying this scenario where I setup a cluster with 3 combined nodes (broker, controller) and 3 nodes as brokers then later at some point I add an extra 2 nodes to the KRAFT with different cluster id. I would expect if this is a really deployment on production then these 2 nodes with wrong cluster id should crash immediately so we can tell that something is wrong during the deployment. The scenario I was testing is the following: * Setup a cluster with 3 combined raft nodes (broker, controller mode) + 3 brokers nodes with cluster id {{CLUSTER_ID_1}} and they elected {{raft-node-1}} to become the leader. * Added an extra 2 nodes later to the raft with different cluster id {{WRONG_CLUSTER_ID}} * The the extra nodes don't crash however it stay in running mode and keep throw error {code:java} {"level":"ERROR","message":"[RaftManager nodeId=8] Unexpected error INCONSISTENT_CLUSTER_ID in FETCH response: InboundResponse(correlationId=16699, data=FetchResponseData(throttleTimeMs=0, errorCode=104, sessionId=0, responses=[]), sourceId=2)","logger":"org.apache.kafka.raft.KafkaRaftClient"}{code} * {{raft-node-1}} don't throw errors, only warning for connection issues connection {code:java} {"level":"WARN","message":"[RaftManager nodeId=1] Error connecting to node raft-node-4:9093 (id: 8 rack: null)","logger":"org.apache.k
[GitHub] [kafka] junrao merged pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas
junrao merged pull request #10684: URL: https://github.com/apache/kafka/pull/10684 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot
feyman2016 commented on pull request #10593: URL: https://github.com/apache/kafka/pull/10593#issuecomment-849753337 @jsancio Addressed the comments but just found that I would need to generalize `context.advanceLeaderHighWatermarkToEndOffset ()`, will update later~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #10744: KAFKA-8410: KTableProcessor migration groundwork
abbccdda commented on a change in pull request #10744: URL: https://github.com/apache/kafka/pull/10744#discussion_r640767449 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ## @@ -124,7 +124,10 @@ private static final String TOPIC_SUFFIX = "-topic"; private static final String SINK_NAME = "KTABLE-SINK-"; -private final ProcessorSupplier processorSupplier; +// Temporarily setting the processorSupplier to type Object so that we can transition from the Review comment: s/transition/transit ## File path: streams/src/main/java/org/apache/kafka/streams/processor/To.java ## @@ -89,4 +89,11 @@ public int hashCode() { throw new UnsupportedOperationException("To is unsafe for use in Hash collections"); } +@Override +public String toString() { +return "To{" + Review comment: nit: could we do a string format for this to read easier? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java ## @@ -19,30 +19,45 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.state.internals.CacheFlushListener; -class SessionCacheFlushListener implements CacheFlushListener, V> { -private final InternalProcessorContext context; +class SessionCacheFlushListener implements CacheFlushListener, VOut> { +private final InternalProcessorContext, Change> context; + +@SuppressWarnings("rawtypes") private final ProcessorNode myNode; +@SuppressWarnings("unchecked") SessionCacheFlushListener(final ProcessorContext context) { -this.context = (InternalProcessorContext) context; +this.context = (InternalProcessorContext, Change>) context; myNode = this.context.currentNode(); } @Override -public void apply(final Windowed key, - final V newValue, - final V oldValue, +public void apply(final Windowed key, + final VOut newValue, + final VOut oldValue, final long timestamp) { -final ProcessorNode prev = context.currentNode(); +@SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode(); Review comment: Why do we put suppression inline, instead of putting it on the top of function? ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableNewProcessorSupplier.java ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.processor.api.ProcessorSupplier; + +public interface KTableNewProcessorSupplier extends ProcessorSupplier, KOut, Change> { + +KTableValueGetterSupplier view(); + +/** + * Potentially enables sending old values. + * + * If {@code forceMaterialization} is {@code true}, the method will force the materialization of upstream nodes to + * enable sending old values. + * + * If {@code forceMaterialization} is {@code false}, the method will only enable the sending of old values if + * an upstream node is already materialized. + * + * @param forceMaterialization indicates if an upstream node should be forced to materialize to enable sending old + * values. + * @return {@code true} is sending old values is enabled, i.e. either because {@code forceMaterialization} was Review comment: ...if sending old values ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java ## @@ -24,14 +24,14 @@ import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerializer; import static org.apache.kafka.stre
[GitHub] [kafka] ijuma commented on a change in pull request #10704: KAFKA-12791: ConcurrentModificationException in AbstractConfig use by KafkaProducer
ijuma commented on a change in pull request #10704: URL: https://github.com/apache/kafka/pull/10704#discussion_r640788402 ## File path: clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java ## @@ -508,6 +509,55 @@ public void testDocumentationOfExpectNull() { assertNull(config.documentationOf("xyz")); } +@Test +public void testConcurrentUnusedUse() throws InterruptedException { Review comment: I think I'd remove it or at least tag it as an integration test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #10764: MINOR: make sure all fiedls of o.p.k.s.a.Action are NOT null
chia7712 commented on pull request #10764: URL: https://github.com/apache/kafka/pull/10764#issuecomment-849784432 @ijuma thanks for your suggestion. I have addressed all comments. PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot
jsancio commented on pull request #10593: URL: https://github.com/apache/kafka/pull/10593#issuecomment-849787584 > @jsancio Addressed the comments but just found that I would need to generalize context.advanceLeaderHighWatermarkToEndOffset (), will update later~ I needed something similar in a PR I am currently working and this is what I have if you want to adapt it to your PR: ```java public void advanceLocalLeaderHighWatermarkToLogEndOffset() throws InterruptedException { assertEquals(localId, currentLeader()); long localLogEndOffset = log.endOffset().offset; Set followers = voters.stream().filter(voter -> voter != localId.getAsInt()).collect(Collectors.toSet()); // Send a request from every follower for (int follower : followers) { deliverRequest( fetchRequest(currentEpoch(), follower, localLogEndOffset, currentEpoch(), 0) ); pollUntilResponse(); assertSentFetchPartitionResponse(Errors.NONE, currentEpoch(), localId); } pollUntil(() -> OptionalLong.of(localLogEndOffset).equals(client.highWatermark())); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino opened a new pull request #10779: CONFLUENT: Complete version regex check in system tests
rondagostino opened a new pull request #10779: URL: https://github.com/apache/kafka/pull/10779 A couple of the sanity-check system tests confirm that the version of Kafka on the CLASSPATH during the test is the one that is expected. These tests were failing for the 2.8 release due to the version `6.2.0-ccs` not being accounted for correctly in the various regex's. This patch revamps the regex set to be more organized and complete with the various possibilities clearly identified. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino closed pull request #10779: CONFLUENT: Complete version regex check in system tests
rondagostino closed pull request #10779: URL: https://github.com/apache/kafka/pull/10779 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #10779: CONFLUENT: Complete version regex check in system tests
rondagostino commented on pull request #10779: URL: https://github.com/apache/kafka/pull/10779#issuecomment-849788603 Wrong base -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10778: KAFKA-12856: Upgrade Jackson to 2.12.3
ijuma commented on pull request #10778: URL: https://github.com/apache/kafka/pull/10778#issuecomment-849789835 Tests look good, one job passed, the others had unrelated failures: > Build / JDK 15 and Scala 2.13 / testCreateClusterAndCreateAndManyTopicsWithManyPartitions() – kafka.server.RaftClusterTest > 23s > Build / JDK 15 and Scala 2.13 / testCreateClusterAndCreateAndManyTopicsWithManyPartitions() – kafka.server.RaftClusterTest > 18s > Build / JDK 8 and Scala 2.12 / testCreateClusterAndCreateAndManyTopicsWithManyPartitions() – kafka.server.RaftClusterTest > 25s > Build / JDK 8 and Scala 2.12 / testCreateClusterAndCreateAndManyTopicsWithManyPartitions() – kafka.server.RaftClusterTest -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #10778: KAFKA-12856: Upgrade Jackson to 2.12.3
ijuma merged pull request #10778: URL: https://github.com/apache/kafka/pull/10778 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10776: MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout`
ijuma commented on pull request #10776: URL: https://github.com/apache/kafka/pull/10776#issuecomment-849790914 Created the backport PR just to verify that tests passed, merging. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #10776: MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout`
ijuma merged pull request #10776: URL: https://github.com/apache/kafka/pull/10776 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot
jsancio commented on a change in pull request #10593: URL: https://github.com/apache/kafka/pull/10593#discussion_r640805609 ## File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java ## @@ -963,6 +952,20 @@ static void verifyLeaderChangeMessage( new HashSet<>(leaderChangeMessage.grantingVoters())); } +public void advanceLeaderHighWatermarkToEndOffset() throws InterruptedException { +Integer replicaId = null; +for (Integer voter: voters) { +if (voter != localId.getAsInt()) { +replicaId = voter; +break; +} +} +deliverRequest(fetchRequest(currentEpoch(), replicaId, log.endOffset().offset, currentEpoch(), 0)); +pollUntilResponse(); +assertSentFetchPartitionResponse(Errors.NONE, currentEpoch(), localId); +assertEquals(log.endOffset().offset, client.highWatermark().getAsLong()); +} Review comment: I needed something similar in a PR I am currently working and this is what I have if you want to adapt it to your PR: ```java public void advanceLocalLeaderHighWatermarkToLogEndOffset() throws InterruptedException { assertEquals(localId, currentLeader()); long localLogEndOffset = log.endOffset().offset; Set followers = voters.stream().filter(voter -> voter != localId.getAsInt()).collect(Collectors.toSet()); // Send a request from every follower for (int follower : followers) { deliverRequest( fetchRequest(currentEpoch(), follower, localLogEndOffset, currentEpoch(), 0) ); pollUntilResponse(); assertSentFetchPartitionResponse(Errors.NONE, currentEpoch(), localId); } pollUntil(() -> OptionalLong.of(localLogEndOffset).equals(client.highWatermark())); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10777: MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout`
ijuma commented on pull request #10777: URL: https://github.com/apache/kafka/pull/10777#issuecomment-849791655 Created the backport PR just to check the tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #10777: MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout`
ijuma merged pull request #10777: URL: https://github.com/apache/kafka/pull/10777 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12856) Upgrade Jackson to 2.12.3
[ https://issues.apache.org/jira/browse/KAFKA-12856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma resolved KAFKA-12856. - Fix Version/s: 3.0.0 Resolution: Fixed > Upgrade Jackson to 2.12.3 > - > > Key: KAFKA-12856 > URL: https://issues.apache.org/jira/browse/KAFKA-12856 > Project: Kafka > Issue Type: Bug >Reporter: Ismael Juma >Assignee: Ismael Juma >Priority: Major > Fix For: 3.0.0 > > > 2.10.x is no longer supported, so we should move to 2.12 for the 3.0 release. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL
ijuma commented on pull request #10758: URL: https://github.com/apache/kafka/pull/10758#issuecomment-849798924 Thanks for looking into this. Can we switch the release requirement to be Java 15 instead of Java 11 then? We can then switch to Java 17 once that's out and stick to that for a while. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #10704: KAFKA-12791: ConcurrentModificationException in AbstractConfig use by KafkaProducer
lbradstreet commented on a change in pull request #10704: URL: https://github.com/apache/kafka/pull/10704#discussion_r640820748 ## File path: clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java ## @@ -508,6 +509,55 @@ public void testDocumentationOfExpectNull() { assertNull(config.documentationOf("xyz")); } +@Test +public void testConcurrentUnusedUse() throws InterruptedException { Review comment: I don't think it adds much value so I'd rather delete it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL
jlprat commented on pull request #10758: URL: https://github.com/apache/kafka/pull/10758#issuecomment-849805196 Hi @ijuma, yes, if you would generate the Javadocs with JDK 15 (or 16) instead of 11, it would work (I tried it myself locally). Then one only needs to modify the `release.py` file to accomplish this. I could create a PR with those changes if you like. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #10704: KAFKA-12791: ConcurrentModificationException in AbstractConfig use by KafkaProducer
lbradstreet commented on a change in pull request #10704: URL: https://github.com/apache/kafka/pull/10704#discussion_r640823494 ## File path: clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java ## @@ -508,6 +509,55 @@ public void testDocumentationOfExpectNull() { assertNull(config.documentationOf("xyz")); } +@Test +public void testConcurrentUnusedUse() throws InterruptedException { Review comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL
ijuma commented on pull request #10758: URL: https://github.com/apache/kafka/pull/10758#issuecomment-849813756 Yes, let's go with 15 since Kafka doesn't work with 16 yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL
jlprat commented on pull request #10758: URL: https://github.com/apache/kafka/pull/10758#issuecomment-849814665 Shall, I close this PR and open a new one with the modified `release.py`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #10744: KAFKA-8410: KTableProcessor migration groundwork
vvcephei commented on a change in pull request #10744: URL: https://github.com/apache/kafka/pull/10744#discussion_r640835700 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableNewProcessorSupplier.java ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.processor.api.ProcessorSupplier; + +public interface KTableNewProcessorSupplier extends ProcessorSupplier, KOut, Change> { Review comment: I'll leave it to @jeqo to make sure this is done at the end of the migration. I don't think a TODO in the code is much use, but a subtask in @jeqo 's ticket would be effective. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #10744: KAFKA-8410: KTableProcessor migration groundwork
vvcephei commented on a change in pull request #10744: URL: https://github.com/apache/kafka/pull/10744#discussion_r640835922 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java ## @@ -19,30 +19,45 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.To; +import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.state.internals.CacheFlushListener; -class SessionCacheFlushListener implements CacheFlushListener, V> { -private final InternalProcessorContext context; +class SessionCacheFlushListener implements CacheFlushListener, VOut> { +private final InternalProcessorContext, Change> context; + +@SuppressWarnings("rawtypes") private final ProcessorNode myNode; +@SuppressWarnings("unchecked") SessionCacheFlushListener(final ProcessorContext context) { -this.context = (InternalProcessorContext) context; +this.context = (InternalProcessorContext, Change>) context; myNode = this.context.currentNode(); } @Override -public void apply(final Windowed key, - final V newValue, - final V oldValue, +public void apply(final Windowed key, + final VOut newValue, + final VOut oldValue, final long timestamp) { -final ProcessorNode prev = context.currentNode(); +@SuppressWarnings("rawtypes") final ProcessorNode prev = context.currentNode(); Review comment: Just to limit the scope of the suppression and not mask other mistakes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #10744: KAFKA-8410: KTableProcessor migration groundwork
vvcephei commented on a change in pull request #10744: URL: https://github.com/apache/kafka/pull/10744#discussion_r640837000 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableNewProcessorSupplier.java ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.processor.api.ProcessorSupplier; + +public interface KTableNewProcessorSupplier extends ProcessorSupplier, KOut, Change> { + +KTableValueGetterSupplier view(); + +/** + * Potentially enables sending old values. + * + * If {@code forceMaterialization} is {@code true}, the method will force the materialization of upstream nodes to + * enable sending old values. + * + * If {@code forceMaterialization} is {@code false}, the method will only enable the sending of old values if + * an upstream node is already materialized. + * + * @param forceMaterialization indicates if an upstream node should be forced to materialize to enable sending old + * values. + * @return {@code true} is sending old values is enabled, i.e. either because {@code forceMaterialization} was Review comment: ```suggestion * @return {@code true} if sending old values is enabled, i.e. either because {@code forceMaterialization} was ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #10744: KAFKA-8410: KTableProcessor migration groundwork
vvcephei commented on a change in pull request #10744: URL: https://github.com/apache/kafka/pull/10744#discussion_r640837839 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/To.java ## @@ -89,4 +89,11 @@ public int hashCode() { throw new UnsupportedOperationException("To is unsafe for use in Hash collections"); } +@Override +public String toString() { +return "To{" + Review comment: I just didn't bother because there's no place it would actually be printed unless a test is failing. We can give more thought to the string format later on as needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL
ijuma commented on pull request #10758: URL: https://github.com/apache/kafka/pull/10758#issuecomment-849820046 Or just replace the code in this branch. Whatever you prefer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #10744: KAFKA-8410: KTableProcessor migration groundwork
vvcephei commented on a change in pull request #10744: URL: https://github.com/apache/kafka/pull/10744#discussion_r640838987 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CacheFlushListener.java ## @@ -31,4 +34,9 @@ * @param timestamp timestamp of new value */ void apply(final K key, final V newValue, final V oldValue, final long timestamp); + +/** + * Called when records are flushed from the {@link ThreadCache} + */ +void apply(final Record> record); Review comment: I don't want to deprecate it right now, but as with the rest of the "compatibility mode" changes, the old member should become unused by the time @jeqo is done and we can remove it at that time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat opened a new pull request #10780: KAFKA-12782: Fix Javadocs generation by upgrading JDK
jlprat opened a new pull request #10780: URL: https://github.com/apache/kafka/pull/10780 This, upgrades JDK to version 15 for the docs generation, this way we can circumvent bug https://bugs.openjdk.java.net/browse/JDK-8215291 present in JDK11 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat closed pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL
jlprat closed pull request #10758: URL: https://github.com/apache/kafka/pull/10758 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL
jlprat commented on pull request #10758: URL: https://github.com/apache/kafka/pull/10758#issuecomment-849820849 Closed in favor of https://github.com/apache/kafka/pull/10780 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a change in pull request #10780: KAFKA-12782: Fix Javadocs generation by upgrading JDK
jlprat commented on a change in pull request #10780: URL: https://github.com/apache/kafka/pull/10780#discussion_r640840181 ## File path: release.py ## @@ -265,7 +265,7 @@ def command_stage_docs(): # version due to already having bumped the bugfix version number. gradle_version_override = docs_release_version(version) -cmd("Building docs", "./gradlew -Pversion=%s clean siteDocsTar aggregatedJavadoc" % gradle_version_override, cwd=REPO_HOME, env=jdk11_env) +cmd("Building docs", "./gradlew -Pversion=%s clean siteDocsTar aggregatedJavadoc" % gradle_version_override, cwd=REPO_HOME, env=jdk15_env) Review comment: This, together with change in line 259, are the changes needed ## File path: release.py ## @@ -600,7 +601,7 @@ def select_gpg_key(): cmd("Building artifacts", "./gradlew clean && ./gradlewAll releaseTarGz", cwd=kafka_dir, env=jdk8_env, shell=True) cmd("Copying artifacts", "cp %s/core/build/distributions/* %s" % (kafka_dir, artifacts_dir), shell=True) -cmd("Building docs", "./gradlew clean aggregatedJavadoc", cwd=kafka_dir, env=jdk11_env) +cmd("Building docs", "./gradlew clean aggregatedJavadoc", cwd=kafka_dir, env=jdk15_env) Review comment: This one I'm not sure it's really needed as I never executed that part of the script. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #10744: KAFKA-8410: KTableProcessor migration groundwork
vvcephei commented on a change in pull request #10744: URL: https://github.com/apache/kafka/pull/10744#discussion_r640840509 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java ## @@ -24,14 +24,14 @@ import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerializer; import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueSerializer; -public class SinkNode extends ProcessorNode { +public class SinkNode extends ProcessorNode { Review comment: We already throw an exception if you try and add a child. I think it would complicate any of our processor graph traversal algorithms if we made it illegal to even call getChildren, as they would have to type-check the nodes before traversing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10780: KAFKA-12782: Fix Javadocs generation by upgrading JDK
jlprat commented on pull request #10780: URL: https://github.com/apache/kafka/pull/10780#issuecomment-849822082 Hi, @ijuma ready for you to review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10780: KAFKA-12782: Fix Javadocs generation by upgrading JDK
ijuma commented on a change in pull request #10780: URL: https://github.com/apache/kafka/pull/10780#discussion_r640841685 ## File path: release.py ## @@ -512,6 +512,7 @@ def command_release_announcement_email(): jdk8_env = get_jdk(prefs, 8) jdk11_env = get_jdk(prefs, 11) Review comment: Can we remove this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a change in pull request #10780: KAFKA-12782: Fix Javadocs generation by upgrading JDK
jlprat commented on a change in pull request #10780: URL: https://github.com/apache/kafka/pull/10780#discussion_r640841969 ## File path: release.py ## @@ -512,6 +512,7 @@ def command_release_announcement_email(): jdk8_env = get_jdk(prefs, 8) jdk11_env = get_jdk(prefs, 11) Review comment: Most probably, but I never executed that part of the script. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a change in pull request #10780: KAFKA-12782: Fix Javadocs generation by upgrading JDK
jlprat commented on a change in pull request #10780: URL: https://github.com/apache/kafka/pull/10780#discussion_r640842878 ## File path: release.py ## @@ -512,6 +512,7 @@ def command_release_announcement_email(): jdk8_env = get_jdk(prefs, 8) jdk11_env = get_jdk(prefs, 11) Review comment: I will push a commit without this line -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10780: KAFKA-12782: Fix Javadocs generation by upgrading JDK
ijuma commented on a change in pull request #10780: URL: https://github.com/apache/kafka/pull/10780#discussion_r640842976 ## File path: release.py ## @@ -512,6 +512,7 @@ def command_release_announcement_email(): jdk8_env = get_jdk(prefs, 8) jdk11_env = get_jdk(prefs, 11) Review comment: Can you search the file for references to this? If there are none, please remove it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a change in pull request #10780: KAFKA-12782: Fix Javadocs generation by upgrading JDK
jlprat commented on a change in pull request #10780: URL: https://github.com/apache/kafka/pull/10780#discussion_r640843605 ## File path: release.py ## @@ -512,6 +512,7 @@ def command_release_announcement_email(): jdk8_env = get_jdk(prefs, 8) jdk11_env = get_jdk(prefs, 11) Review comment: The only reference was at line 604, which I replaced for JDK15 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on a change in pull request #10780: KAFKA-12782: Fix Javadocs generation by upgrading JDK
jlprat commented on a change in pull request #10780: URL: https://github.com/apache/kafka/pull/10780#discussion_r640844354 ## File path: release.py ## @@ -512,6 +512,7 @@ def command_release_announcement_email(): jdk8_env = get_jdk(prefs, 8) jdk11_env = get_jdk(prefs, 11) Review comment: Pushed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10780: KAFKA-12782: Fix Javadocs generation by upgrading JDK
jlprat commented on pull request #10780: URL: https://github.com/apache/kafka/pull/10780#issuecomment-849834440 An easy way to test this locally, is to search a Kafka class that contains a Javadoc link to a Standard Java Class. For example, searching for `KafkaFuture` and then clicking on `Throwable`. Both search click and `Throwable` click should work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8295) Optimize count() using RocksDB merge operator
[ https://issues.apache.org/jira/browse/KAFKA-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352731#comment-17352731 ] A. Sophie Blee-Goldman commented on KAFKA-8295: --- Thanks for the initial results – I think it would be valuable to try plugging it into Kafka Streams with a basic POC and then running some kind of throughput benchmarks. I imagine you can get some idea of how well this works even with some very rough benchmarks, for example loading up an input topic with a very large amount of data and then using the TopologyTestDriver to compare how many records can be processed within some constant time (eg 5 minutes) between the POC and the original. As long as there is enough input data to ensure it won't run out of records to process before that time limit is up, this should give us a good sense of how the merge operator compares. Does that make sense? It may be that the jmh benchmarks for the ByteBuffer optimization could be reused for this too > Optimize count() using RocksDB merge operator > - > > Key: KAFKA-8295 > URL: https://issues.apache.org/jira/browse/KAFKA-8295 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Sagar Rao >Priority: Major > > In addition to regular put/get/delete RocksDB provides a fourth operation, > merge. This essentially provides an optimized read/update/write path in a > single operation. One of the built-in (C++) merge operators exposed over the > Java API is a counter. We should be able to leverage this for a more > efficient implementation of count() > > (Note: Unfortunately it seems unlikely we can use this to optimize general > aggregations, even if RocksJava allowed for a custom merge operator, unless > we provide a way for the user to specify and connect a C++ implemented > aggregator – otherwise we incur too much cost crossing the jni for a net > performance benefit) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-5892) Connector property override does not work unless setting ALL converter properties
[ https://issues.apache.org/jira/browse/KAFKA-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352737#comment-17352737 ] Randall Hauch commented on KAFKA-5892: -- [~dc-heros], I've added your account as a contributor to the project, so you should now be able to self-assign KAFKA issues. Please try to assign this to yourself, and let me know here if you cannot. Thanks for volunteering to take this up! > Connector property override does not work unless setting ALL converter > properties > - > > Key: KAFKA-5892 > URL: https://issues.apache.org/jira/browse/KAFKA-5892 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Yeva Byzek >Assignee: Jitendra Sahu >Priority: Minor > Labels: newbie > > A single connector setting override {{value.converter.schemas.enable=false}} > only takes effect if ALL of the converter properties are overridden in the > connector. > At minimum, we should give user warning or error that this is will be ignored. > We should also consider changing the behavior to allow the single property > override even if all the converter properties are not specified, but this > requires discussion to evaluate the impact of this change. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #10761: MINOR: Don't ignore deletion of partition metadata file and log topic id clean-ups
ijuma commented on pull request #10761: URL: https://github.com/apache/kafka/pull/10761#issuecomment-849931983 JDK 11 build passed, a few unrelated failures for the other builds: > Build / JDK 8 and Scala 2.12 / testMetricsDuringTopicCreateDelete() – kafka.integration.MetricsDuringTopicCreationDeletionTest > 16s > Build / JDK 15 and Scala 2.13 / testMetricsDuringTopicCreateDelete() – kafka.integration.MetricsDuringTopicCreationDeletionTest > 7s > Build / JDK 15 and Scala 2.13 / testCreateClusterAndCreateAndManyTopicsWithManyPartitions() – kafka.server.RaftClusterTest > 19s > Build / JDK 15 and Scala 2.13 / testCreateClusterAndWaitForBrokerInRunningState() – kafka.server.RaftClusterTest > 1m 11s > Build / JDK 15 and Scala 2.13 / testCreateClusterAndCreateListDeleteTopic() – kafka.server.RaftClusterTest > 16s > Build / JDK 15 and Scala 2.13 / testCreateClusterAndCreateListDeleteTopic() – kafka.server.RaftClusterTest -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #10761: MINOR: Don't ignore deletion of partition metadata file and log topic id clean-ups
ijuma merged pull request #10761: URL: https://github.com/apache/kafka/pull/10761 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10704: KAFKA-12791: ConcurrentModificationException in AbstractConfig use by KafkaProducer
ijuma commented on pull request #10704: URL: https://github.com/apache/kafka/pull/10704#issuecomment-849934626 Unrelated flaky tests: > Build / JDK 15 and Scala 2.13 / kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() > Build / JDK 11 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions() > Build / JDK 11 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics() > Build / JDK 8 and Scala 2.12 / kafka.api.PlaintextConsumerTest.testMultiConsumerDefaultAssignment() > Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #10704: KAFKA-12791: ConcurrentModificationException in AbstractConfig use by KafkaProducer
ijuma merged pull request #10704: URL: https://github.com/apache/kafka/pull/10704 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org