[GitHub] [kafka] jlprat commented on a change in pull request #10768: MINOR: fix code listings for ops.html

2021-05-27 Thread GitBox


jlprat commented on a change in pull request #10768:
URL: https://github.com/apache/kafka/pull/10768#discussion_r640340174



##
File path: docs/ops.html
##
@@ -248,98 +248,98 @@ only exist on brokers 5,6.
   
   Since the tool accepts the input list of topics as a json file, you first 
need to identify the topics you want to move and create the json file as 
follows:
-> cat 
topics-to-move.json
-  {"topics": [{"topic": "foo1"},
-  {"topic": "foo2"}],
-  "version":1
-  }
+  > cat 
topics-to-move.json

Review comment:
   Good catch! Pushed a fix




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10769: MINOR: fix code listings connect.html

2021-05-27 Thread GitBox


jlprat commented on pull request #10769:
URL: https://github.com/apache/kafka/pull/10769#issuecomment-849385403


   Thanks for the review @showuon !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10770: MINOR: fix code listings security.html

2021-05-27 Thread GitBox


showuon commented on a change in pull request #10770:
URL: https://github.com/apache/kafka/pull/10770#discussion_r640339525



##
File path: docs/security.html
##
@@ -1835,28 +1834,28 @@ Authentication using Delegation Tokens 
section.
+Authentication using Delegation Tokens 
section.

Review comment:
   Nice fix!

##
File path: docs/security.html
##
@@ -428,12 +428,12 @@ zookeeper.sasl.client.username to the appropriate name
 (e.g., -Dzookeeper.sasl.client.username=zk).
 
-Brokers may also configure JAAS using the broker configuration 
property sasl.jaas.config.
-The property name must be prefixed with the listener prefix 
including the SASL mechanism,
-i.e. 
listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config. 
Only one
-login module may be specified in the config value. If multiple 
mechanisms are configured on a
-listener, configs must be provided for each mechanism using the 
listener and mechanism prefix.
-For example,
+Brokers may also configure JAAS using the broker 
configuration property sasl.jaas.config.

Review comment:
   Line 402-408 has `ol` and `li` tag not get handled. Please help. Thanks.
   
https://github.com/apache/kafka/pull/10770/files#diff-3485b37e32662f4925ee0374c4f6eb1d4dfa589bbb1c148b2a70e92b4acbe8bdR402-R408




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] socutes commented on pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException

2021-05-27 Thread GitBox


socutes commented on pull request #10749:
URL: https://github.com/apache/kafka/pull/10749#issuecomment-849390939


   @showuon Please review the changes again! Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #10770: MINOR: fix code listings security.html

2021-05-27 Thread GitBox


jlprat commented on a change in pull request #10770:
URL: https://github.com/apache/kafka/pull/10770#discussion_r640352656



##
File path: docs/security.html
##
@@ -428,12 +428,12 @@ zookeeper.sasl.client.username to the appropriate name
 (e.g., -Dzookeeper.sasl.client.username=zk).
 
-Brokers may also configure JAAS using the broker configuration 
property sasl.jaas.config.
-The property name must be prefixed with the listener prefix 
including the SASL mechanism,
-i.e. 
listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config. 
Only one
-login module may be specified in the config value. If multiple 
mechanisms are configured on a
-listener, configs must be provided for each mechanism using the 
listener and mechanism prefix.
-For example,
+Brokers may also configure JAAS using the broker 
configuration property sasl.jaas.config.

Review comment:
   That `li` tag is currently closed at line 508. It's a 3 level nested 
ordered lists. Between line 402 and 508 we have the whole sublists as part of 
that `li` element. That's the reason the closing `li` element seems to be 
missing, it's just really down the page.
   
   
![image](https://user-images.githubusercontent.com/3337739/119782497-8e017c80-becc-11eb-9813-c644076ac862.png)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException

2021-05-27 Thread GitBox


showuon commented on a change in pull request #10749:
URL: https://github.com/apache/kafka/pull/10749#discussion_r640356874



##
File path: raft/src/test/java/org/apache/kafka/raft/QuorumStateTest.java
##
@@ -945,11 +946,10 @@ public void testObserverUnattachedToFollower() throws 
IOException {
 }
 
 @Test
-public void testInitializeWithCorruptedStore() throws IOException {
-QuorumStateStore stateStore = Mockito.mock(QuorumStateStore.class);
-
Mockito.doThrow(IOException.class).when(stateStore).readElectionState();
+public void testInitializeWithCorruptedStore() {
 QuorumState state = buildQuorumState(Utils.mkSet(localId));
-
+QuorumStateStore stateStore = Mockito.mock(QuorumStateStore.class);
+
Mockito.doThrow(UncheckedIOException.class).when(stateStore).readElectionState();

Review comment:
   `nit`: I think it'd better to put the 2 line mock at the beginning of 
the test as before. (i.e. before `QuorumState state = 
buildQuorumState(Utils.mkSet(localId));`)

##
File path: raft/src/main/java/org/apache/kafka/raft/FileBasedStateStore.java
##
@@ -146,25 +150,36 @@ private void writeElectionStateToFile(final File 
stateFile, QuorumStateData stat
 writer.flush();
 fileOutputStream.getFD().sync();
 Utils.atomicMoveWithFallback(temp.toPath(), stateFile.toPath());
+} catch (IOException e) {
+throw new UncheckedIOException(
+String.format("Error while writing the Quorum status from 
the file %s", stateFile.getAbsolutePath()), e);
 } finally {
 // cleanup the temp file when the write finishes (either success 
or fail).
-Files.deleteIfExists(temp.toPath());
+deleteFileIfExists(temp);
 }
 }
 
 /**
  * Clear state store by deleting the local quorum state file
- *
- * @throws IOException if there is any IO exception during delete
  */
 @Override
-public void clear() throws IOException {
-Files.deleteIfExists(stateFile.toPath());
-Files.deleteIfExists(new File(stateFile.getAbsolutePath() + 
".tmp").toPath());
+public void clear() {
+deleteFileIfExists(stateFile);
+deleteFileIfExists(new File(stateFile.getAbsolutePath() + ".tmp"));
 }
 
 @Override
 public String toString() {
 return "Quorum state filepath: " + stateFile.getAbsolutePath();
 }
+
+private void deleteFileIfExists(File file) {
+try {
+Files.deleteIfExists(file.toPath());
+} catch (IOException e) {
+throw new UncheckedIOException(
+String.format("Error deleting file %s", 
file.getAbsoluteFile()), e

Review comment:
   Maybe this is better: `Error while deleting file...`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12849) Consider migrating TaskMetadata to interface with internal implementation

2021-05-27 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352338#comment-17352338
 ] 

Josep Prat commented on KAFKA-12849:


KIP created https://cwiki.apache.org/confluence/x/XIrOCg

> Consider migrating TaskMetadata to interface with internal implementation
> -
>
> Key: KAFKA-12849
> URL: https://issues.apache.org/jira/browse/KAFKA-12849
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Josep Prat
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> In KIP-740 we had to go through a deprecation cycle in order to change the 
> constructor from the original one which accepted the taskId parameter as a 
> string, to the new one which takes a TaskId object directly. We had 
> considered just changing the signature directly without deprecation as this 
> was never intended to be instantiated by users, rather it just acts as a 
> pass-through metadata class. Sort of by definition if there is no reason to 
> ever instantiate it, this seems to indicate it may be better suited as a 
> public interface with the implementation and constructor as internal APIs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on pull request #10467: KAFKA-12609: Rewrite ListOffsets using AdminApiDriver

2021-05-27 Thread GitBox


dajac commented on pull request #10467:
URL: https://github.com/apache/kafka/pull/10467#issuecomment-849464267


   @dengziming Sure. I have added it to my review backlog.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12629) Failing Test: RaftClusterTest

2021-05-27 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352353#comment-17352353
 ] 

Bruno Cadonna commented on KAFKA-12629:
---

Failed on: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10710/2/testReport/

{code:java}
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.UnknownServerException: The server experienced 
an unexpected error when processing the request.
at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at 
kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic(RaftClusterTest.scala:94)
{code}

> Failing Test: RaftClusterTest
> -
>
> Key: KAFKA-12629
> URL: https://issues.apache.org/jira/browse/KAFKA-12629
> Project: Kafka
>  Issue Type: Test
>  Components: core, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Blocker
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> {quote} {{java.util.concurrent.ExecutionException: 
> java.lang.ClassNotFoundException: 
> org.apache.kafka.controller.NoOpSnapshotWriterBuilder
>   at java.base/java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.base/java.util.concurrent.FutureTask.get(FutureTask.java:191)
>   at 
> kafka.testkit.KafkaClusterTestKit.startup(KafkaClusterTestKit.java:364)
>   at 
> kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions(RaftClusterTest.scala:181)}}{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on pull request #10710: KAFKA-12796: Removal of deprecated classes under `streams-scala`

2021-05-27 Thread GitBox


cadonna commented on pull request #10710:
URL: https://github.com/apache/kafka/pull/10710#issuecomment-849484765


   Failed test is unrelated and known to be flaky:
   ```
   kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna merged pull request #10710: KAFKA-12796: Removal of deprecated classes under `streams-scala`

2021-05-27 Thread GitBox


cadonna merged pull request #10710:
URL: https://github.com/apache/kafka/pull/10710


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10710: KAFKA-12796: Removal of deprecated classes under `streams-scala`

2021-05-27 Thread GitBox


jlprat commented on pull request #10710:
URL: https://github.com/apache/kafka/pull/10710#issuecomment-849486858


   Thanks for the review @cadonna !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

2021-05-27 Thread GitBox


dajac commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r640459341



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1316,6 +1316,16 @@ class Log(@volatile private var _dir: File,
 val latestEpochOpt = 
leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
 val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
 Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, 
epochOptional))
+  } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
+// Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
+// constant time access while being safe to use with concurrent 
collections unlike `toArray`.
+val segmentsCopy = logSegments.toBuffer
+val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
+val latestEpochOpt = 
leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
+val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
+Some(new TimestampAndOffset(latestTimestampSegment.maxTimestampSoFar,
+  latestTimestampSegment.offsetOfMaxTimestampSoFar,
+  epochOptional))

Review comment:
   Could we get a `maxTimestampSoFar` and `offsetOfMaxTimestampSoFar` which 
does not correspond to each others? It seems that we have no guarantee here. Is 
it an issue? 

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4225,76 +4232,84 @@ public ListOffsetsResult 
listOffsets(Map topicPartit
 }
 }
 
-for (final Map.Entry> entry : 
leaders.entrySet()) {
-final int brokerId = entry.getKey().id();
+for (final Map.Entry>> versionedEntry : leaders.entrySet()) {
+for (final Map.Entry> entry : versionedEntry.getValue().entrySet()) {
+final int brokerId = versionedEntry.getKey().id();
 
-calls.add(new Call("listOffsets on broker " + brokerId, 
context.deadline(), new ConstantNodeIdProvider(brokerId)) {
+calls.add(new Call("listOffsets on broker " + brokerId, 
context.deadline(), new ConstantNodeIdProvider(brokerId)) {
 
-final List partitionsToQuery = new 
ArrayList<>(entry.getValue().values());
+final List partitionsToQuery = new 
ArrayList<>(entry.getValue().values());
 
-@Override
-ListOffsetsRequest.Builder createRequest(int timeoutMs) {
-return ListOffsetsRequest.Builder
+@Override
+ListOffsetsRequest.Builder createRequest(int timeoutMs) {
+ListOffsetRequestVersion requestVersion = 
entry.getKey();
+if (requestVersion == 
ListOffsetRequestVersion.V7AndAbove) {
+return ListOffsetsRequest.Builder
+
.forMaxTimestamp(context.options().isolationLevel())
+.setTargetTimes(partitionsToQuery);
+}

Review comment:
   I'd like to better understand how we handle a broker which would not 
support the version that we need. 
   
   `ListOffsetsRequest.Builder.forMaxTimestamp` constrains the version to 7 and 
above when we have have at least one max timestamp spec. It the broker does not 
support version 7, the request is failed with an `UnsupportedVersionException` 
and we fail all the future of the brokers with it in `handleFailure`.
   
   Now, let's imagine a case where the user does not only include "max 
timestamp specs" in his request. At the moment, we fail all of them 
irrespectively of their type. I wonder if we should retry to other specs in 
this particular case. Have we considered doing this? 

##
File path: 
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.admin
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org

[GitHub] [kafka] mdedetrich commented on pull request #10652: KAFKA-9726 IdentityReplicationPolicy

2021-05-27 Thread GitBox


mdedetrich commented on pull request #10652:
URL: https://github.com/apache/kafka/pull/10652#issuecomment-849508419


   @ryannedolan Since https://github.com/apache/kafka/pull/10762 was merged 
maybe it makes sense to rebase against the current `trunk`? Some of the tests 
in this PR have assert statements without the failure messages which have been 
just been fixed (I believe you can just copy some of those assert failure 
messages to make sure its consistent).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dgd-contributor opened a new pull request #10774: KAFKA-12380 Executor in Connect's Worker is not shut down when the worker is

2021-05-27 Thread GitBox


dgd-contributor opened a new pull request #10774:
URL: https://github.com/apache/kafka/pull/10774


   The Worker class has an executor field that the public constructor 
initializes with a new cached thread pool 
(https://github.com/apache/kafka/blob/02226fa090513882b9229ac834fd493d71ae6d96/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L127.]).
   
   When the worker is stopped, it does not shutdown this executor. This is 
normally okay in the Connect runtime and MirrorMaker 2 runtimes, because the 
worker is stopped only when the JVM is stopped (via the shutdown hook in the 
herders).
   
   However, we instantiate and stop the herder many times in our integration 
tests, and this means we're not necessarily shutting down the herder's 
executor. Normally this won't hurt, as long as all of the runnables that the 
executor threads run actually do terminate. But it's possible those threads 
might not terminate in all tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9531) java.net.UnknownHostException loop on VM rolling update using CNAME

2021-05-27 Thread Rui Abreu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352386#comment-17352386
 ] 

Rui Abreu commented on KAFKA-9531:
--

[~tcsantos] and [~Bolen] 

What helped us in our specific case was starting using *advertised.host.name* 
in the Kafka brokers' properties.
{quote}node.internal is a CNAME to either nodeA.internal or nodeB.internal
{quote}
We added *advertised.host.name = node.internal* in the server properties and 
the issue seems to not manifest itself anymore.

But this is a workaround. The underlying issue in Kafka's client code needs to 
be addressed. 

> java.net.UnknownHostException loop on VM rolling update using CNAME
> ---
>
> Key: KAFKA-9531
> URL: https://issues.apache.org/jira/browse/KAFKA-9531
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, controller, network, producer 
>Affects Versions: 2.4.0
>Reporter: Rui Abreu
>Priority: Major
>
> Hello,
>  
> My cluster setup in based on VMs behind DNS CNAME .
> Example:  node.internal is a CNAME to either nodeA.internal or nodeB.internal
> Since kafka-client 1.2.1,  it has been observed that sometimes Kafka clients 
> get stuck on a loop with the exception:
> Example after nodeB.internal is replaced with nodeA.internal 
>  
> {code:java}
> 2020-02-10T12:11:28.181Z o.a.k.c.NetworkClient [WARN]- [Consumer 
> clientId=consumer-6, groupId=consumer.group] Error connecting to node 
> nodeB.internal:9092 (id: 2 rack: null)
> java.net.UnknownHostException: nodeB.internal:9092
>   at java.net.InetAddress.getAllByName0(InetAddress.java:1281) 
> ~[?:1.8.0_222]
>   at java.net.InetAddress.getAllByName(InetAddress.java:1193) 
> ~[?:1.8.0_222]
>   at java.net.InetAddress.getAllByName(InetAddress.java:1127) 
> ~[?:1.8.0_222]
>   at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:104) 
> ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:403)
>  ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:363)
>  ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:151)
>  ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:943)
>  ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:68) 
> ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1114)
>  ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1005)
>  ~[stormjar.jar:?]
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:537) 
> ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
>  ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>  ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>  ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161)
>  ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:366)
>  ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
>  ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220) 
> ~[stormjar.jar:?]
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159) 
> ~[stormjar.jar:?]
>   at 
> org.apache.storm.kafka.spout.KafkaSpout.pollKafkaBroker(KafkaSpout.java:365) 
> ~[stormjar.jar:?]
>   at 
> org.apache.storm.kafka.spout.KafkaSpout.nextTuple(KafkaSpout.java:294) 
> ~[stormjar.jar:?]
>   at 
> org.apache.storm.daemon.executor$fn__10715$fn__10730$fn__10761.invoke(executor.clj:649)
>  ~[storm-core-1.1.3.jar:1.1.3]
>   at org.apache.storm.util$async_loop$fn__553.invoke(util.clj:484) 
> ~[storm-core-1.1.3.jar:1.1.3]
>   at clojure.lang.AFn.run(AFn.java:22) ~[clojure-1.7.0.jar:?]
>   at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
> {code}
>  
> The time it spends in the loop is arbitrary, but it seems the client 
> effectively stops while this is happening.
> This error contrasts with instances where the client is able to recover on 
> its o

[GitHub] [kafka] cadonna commented on a change in pull request #10770: MINOR: fix code listings security.html

2021-05-27 Thread GitBox


cadonna commented on a change in pull request #10770:
URL: https://github.com/apache/kafka/pull/10770#discussion_r640479762



##
File path: docs/security.html
##
@@ -208,25 +208,25 @@ Host Name Verification
 Then create a database and serial number file, these will be used 
to keep track of which certificates were signed with this CA. Both of
 these are simply text files that reside in the same directory as 
your CA keys.
 
-echo 01 > 
serial.txt
+> echo 01 
> serial.txt
 touch index.txt

Review comment:
   If we decide to keep the `>`, then it is missing here.

##
File path: docs/security.html
##
@@ -384,16 +384,16 @@ SSL key and certificates in PEM format
 ssl.key.password=test1234
 
 Other configuration settings that may also be needed depending on 
our requirements and the broker configuration:
-
-ssl.provider (Optional). The name of the security 
provider used for SSL connections. Default value is the default security 
provider of the JVM.
-ssl.cipher.suites (Optional). A cipher suite is a 
named combination of authentication, encryption, MAC and key exchange algorithm 
used to negotiate the security settings for a network connection using TLS or 
SSL network protocol.
-ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1. It should 
list at least one of the protocols configured on the broker side
-ssl.truststore.type=JKS
-ssl.keystore.type=JKS
-
-
+
+ssl.provider (Optional). The name of the security provider 
used for SSL connections. Default value is the default security provider of the 
JVM.
+ssl.cipher.suites (Optional). A cipher suite is a named 
combination of authentication, encryption, MAC and key exchange algorithm used 
to negotiate the security settings for a network connection using TLS or SSL 
network protocol.
+ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1. It should 
list at least one of the protocols configured on the broker side
+ssl.truststore.type=JKS
+ssl.keystore.type=JKS
+
+
 Examples using console-producer and console-consumer:
-kafka-console-producer.sh --bootstrap-server 
localhost:9093 --topic test --producer.config client-ssl.properties
+> 
kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test 
--producer.config client-ssl.properties
 kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test 
--consumer.config client-ssl.properties

Review comment:
   If we decide to keep the `>`, then it is missing here.

##
File path: docs/security.html
##
@@ -47,7 +47,7 @@ keytool 
-keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg 
RSA -storetype pkcs12
+> keytool 
-keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg 
RSA -storetype pkcs12

Review comment:
   Now I saw that it has been already used further down. I fine with 
keeping it to distingiush better between configs and commands. However, I found 
some places where `>` is still missing (see further down).  

##
File path: docs/security.html
##
@@ -47,7 +47,7 @@ keytool 
-keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg 
RSA -storetype pkcs12
+> keytool 
-keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg 
RSA -storetype pkcs12

Review comment:
   I am wondering if it is a good idea to add `>` in front of the commands. 
It is something additional users need to remove when they copy & paste the 
commands.

##
File path: docs/security.html
##
@@ -428,12 +428,12 @@ zookeeper.sasl.client.username to the appropriate name
 (e.g., -Dzookeeper.sasl.client.username=zk).
 
-Brokers may also configure JAAS using the broker configuration 
property sasl.jaas.config.
-The property name must be prefixed with the listener prefix 
including the SASL mechanism,
-i.e. 
listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config. 
Only one
-login module may be specified in the config value. If multiple 
mechanisms are configured on a
-listener, configs must be provided for each mechanism using the 
listener and mechanism prefix.
-For example,
+Brokers may also configure JAAS using the broker 
configuration property sasl.jaas.config.
+The property name must be prefixed with the listener 
prefix including the SASL mechanism,
+i.e. 
listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config. 
Only one
+login module may be specified in the config value. If 
multiple mechanisms are conf

[GitHub] [kafka] feyman2016 commented on pull request #10377: KAFKA-12515 ApiVersionManager should create response based on request version

2021-05-27 Thread GitBox


feyman2016 commented on pull request #10377:
URL: https://github.com/apache/kafka/pull/10377#issuecomment-849526883


   @rajinisivaram Thanks for the review, I will address it later~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #10770: MINOR: fix code listings security.html

2021-05-27 Thread GitBox


jlprat commented on a change in pull request #10770:
URL: https://github.com/apache/kafka/pull/10770#discussion_r640512166



##
File path: docs/security.html
##
@@ -47,7 +47,7 @@ keytool 
-keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg 
RSA -storetype pkcs12
+> keytool 
-keystore {keystorefile} -alias localhost -validity {validity} -genkey -keyalg 
RSA -storetype pkcs12

Review comment:
   The examples present in the documentation, use more frequently the `>` 
character when showing commands. Sometimes a `$` is shown, and really 
occasionally nothing is put in there.
   As `>` was the most common used one already in the codebase, I decided to 
unify on this character.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #10770: MINOR: fix code listings security.html

2021-05-27 Thread GitBox


jlprat commented on a change in pull request #10770:
URL: https://github.com/apache/kafka/pull/10770#discussion_r640515305



##
File path: docs/security.html
##
@@ -428,12 +428,12 @@ zookeeper.sasl.client.username to the appropriate name
 (e.g., -Dzookeeper.sasl.client.username=zk).
 
-Brokers may also configure JAAS using the broker configuration 
property sasl.jaas.config.
-The property name must be prefixed with the listener prefix 
including the SASL mechanism,
-i.e. 
listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config. 
Only one
-login module may be specified in the config value. If multiple 
mechanisms are configured on a
-listener, configs must be provided for each mechanism using the 
listener and mechanism prefix.
-For example,
+Brokers may also configure JAAS using the broker 
configuration property sasl.jaas.config.
+The property name must be prefixed with the listener 
prefix including the SASL mechanism,
+i.e. 
listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config. 
Only one
+login module may be specified in the config value. If 
multiple mechanisms are configured on a
+listener, configs must be provided for each mechanism 
using the listener and mechanism prefix.
+For example,

Review comment:
   Actually, it seems the above paragraphs should be indented to the right, 
there are several nested `` and `` and `` that didn't follow 
indentation.
   I'm fixing these now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #10770: MINOR: fix code listings security.html

2021-05-27 Thread GitBox


jlprat commented on a change in pull request #10770:
URL: https://github.com/apache/kafka/pull/10770#discussion_r640516748



##
File path: docs/security.html
##
@@ -443,134 +443,133 @@ 
 
-If JAAS configuration is defined at different levels, the order of 
precedence used is:
-
-  Broker configuration property 
listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config
-  {listenerName}.KafkaServer section of static 
JAAS configuration
-  KafkaServer section of static JAAS 
configuration
-
-Note that ZooKeeper JAAS config may only be configured using 
static JAAS configuration.
-
-See GSSAPI 
(Kerberos),
-PLAIN,
-SCRAM or
-OAUTHBEARER 
for example broker configurations.
-
-
-JAAS configuration for Kafka 
clients
-
-Clients may configure JAAS using the client configuration 
property
-sasl.jaas.config
-or using the static JAAS 
config file
-similar to brokers.
-
-
-JAAS configuration using 
client configuration property
-Clients may specify JAAS configuration as a producer or 
consumer property without
-creating a physical configuration file. This mode also enables 
different producers
-and consumers within the same JVM to use different credentials 
by specifying
-different properties for each client. If both static JAAS 
configuration system property
-java.security.auth.login.config and client 
property sasl.jaas.config
-are specified, the client property will be used.
-
-See GSSAPI 
(Kerberos),
-PLAIN,
-SCRAM or
-OAUTHBEARER for example 
configurations.
-
-JAAS configuration using static config 
file
-To configure SASL authentication on the clients using static 
JAAS config file:
-
-Add a JAAS config file with a client login section named 
KafkaClient. Configure
-a login module in KafkaClient for the selected 
mechanism as described in the examples
-for setting up GSSAPI (Kerberos),
-PLAIN,
-SCRAM or
-OAUTHBEARER.
-For example, GSSAPI
-credentials may be configured as:
-KafkaClient {
+If JAAS configuration is defined at different levels, the 
order of precedence used is:

Review comment:
   Same as previous, you are right, it needs to be aligned with the 
contents of the header `JAAS configuration for Kafka brokers`. However, the 
problem here was that the previous content was misaligned.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10770: MINOR: fix code listings security.html

2021-05-27 Thread GitBox


jlprat commented on pull request #10770:
URL: https://github.com/apache/kafka/pull/10770#issuecomment-849539579


   Hi @cadonna thanks a lot for the feedback. I fixed the missing `>` 
characters. About the misalignment, you were right, it was but not from the 
lines you mention, but the previous ones. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

2021-05-27 Thread GitBox


thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r640520669



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4225,76 +4232,84 @@ public ListOffsetsResult 
listOffsets(Map topicPartit
 }
 }
 
-for (final Map.Entry> entry : 
leaders.entrySet()) {
-final int brokerId = entry.getKey().id();
+for (final Map.Entry>> versionedEntry : leaders.entrySet()) {
+for (final Map.Entry> entry : versionedEntry.getValue().entrySet()) {
+final int brokerId = versionedEntry.getKey().id();
 
-calls.add(new Call("listOffsets on broker " + brokerId, 
context.deadline(), new ConstantNodeIdProvider(brokerId)) {
+calls.add(new Call("listOffsets on broker " + brokerId, 
context.deadline(), new ConstantNodeIdProvider(brokerId)) {
 
-final List partitionsToQuery = new 
ArrayList<>(entry.getValue().values());
+final List partitionsToQuery = new 
ArrayList<>(entry.getValue().values());
 
-@Override
-ListOffsetsRequest.Builder createRequest(int timeoutMs) {
-return ListOffsetsRequest.Builder
+@Override
+ListOffsetsRequest.Builder createRequest(int timeoutMs) {
+ListOffsetRequestVersion requestVersion = 
entry.getKey();
+if (requestVersion == 
ListOffsetRequestVersion.V7AndAbove) {
+return ListOffsetsRequest.Builder
+
.forMaxTimestamp(context.options().isolationLevel())
+.setTargetTimes(partitionsToQuery);
+}

Review comment:
   At present the only way requestVersion could be V7AndAbove is if we were 
issuing MAX_TIMESTAMP requests because of the way the calls are parsed earlier:
   
   `
   ListOffsetRequestVersion requiredRequestVersion = offsetQuery == 
ListOffsetsRequest.MAX_TIMESTAMP
   ? ListOffsetRequestVersion.V7AndAbove :
   ListOffsetRequestVersion.V0AndAbove;
   `
   
   All non-max timestamp requests are built using forConsumer rather than 
forMaxTimestamp and so should succeed against older brokers. Maybe the enums 
are a bit misleading in this regard. I'll see if i can come up with something 
better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat edited a comment on pull request #10770: MINOR: fix code listings security.html

2021-05-27 Thread GitBox


jlprat edited a comment on pull request #10770:
URL: https://github.com/apache/kafka/pull/10770#issuecomment-849539579


   Hi @cadonna thanks a lot for the feedback. I fixed the missing `>` 
characters. About the misalignment, you were right, however it was not from the 
lines you mention, but the previous ones. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12855) Update ssl certificates of kafka connect worker runtime without restarting the worker process.

2021-05-27 Thread kaushik srinivas (Jira)
kaushik srinivas created KAFKA-12855:


 Summary: Update ssl certificates of kafka connect worker runtime 
without restarting the worker process.
 Key: KAFKA-12855
 URL: https://issues.apache.org/jira/browse/KAFKA-12855
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: kaushik srinivas


Is there a possibility to update the ssl certificates of kafka connect worker 
dynamically something similar to kafka-configs script for kafka ? Or the only 
way to update the certificates is to restart the worker processes and update 
the certificates ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

2021-05-27 Thread GitBox


thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r640543594



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1316,6 +1316,16 @@ class Log(@volatile private var _dir: File,
 val latestEpochOpt = 
leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
 val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
 Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, 
epochOptional))
+  } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
+// Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
+// constant time access while being safe to use with concurrent 
collections unlike `toArray`.
+val segmentsCopy = logSegments.toBuffer
+val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
+val latestEpochOpt = 
leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
+val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
+Some(new TimestampAndOffset(latestTimestampSegment.maxTimestampSoFar,
+  latestTimestampSegment.offsetOfMaxTimestampSoFar,
+  epochOptional))

Review comment:
   In all cases I can find the 2 are updated together so I think we can 
assume consistency. For the topic liveness case in the KIP absolute consistency 
is not required but there will be other cases that will need this (e.g. topic 
inspection).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

2021-05-27 Thread GitBox


thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r640544656



##
File path: 
core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
##
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.admin
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin._
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Utils
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.collection.{Map, Seq}
+import scala.jdk.CollectionConverters._
+
+class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
+
+  val topicName = "foo"
+  var adminClient: Admin = null
+
+  @BeforeEach
+  override def setUp(): Unit = {
+super.setUp()
+createTopic(topicName,1,1.asInstanceOf[Short])
+produceMessages()
+adminClient = Admin.create(Map[String, Object](
+  AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList
+).asJava)
+  }
+
+  @AfterEach
+  override def tearDown(): Unit = {
+Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
+super.tearDown()
+  }
+
+  @Test
+  def testEarliestOffset(): Unit = {
+val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
+assertEquals(0,earliestOffset.offset())
+  }
+
+  @Test
+  def testLatestOffset(): Unit = {
+val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
+assertEquals(3,latestOffset.offset())
+  }
+
+  @Test
+  def testMaxTimestampOffset(): Unit = {
+val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp())
+assertEquals(1,maxTimestampOffset.offset())
+  }
+
+  private def runFetchOffsets(adminClient: Admin,
+  offsetSpec: OffsetSpec): 
ListOffsetsResult.ListOffsetsResultInfo = {
+println(s"==> listOffsets(${topicName} -> ${offsetSpec}, new 
ListOffsetsOptions())")

Review comment:
   I used ReassignPartitionsIntegrationTest as a base for creating this and 
this has similar messages, I can remove if needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

2021-05-27 Thread GitBox


dajac commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r640544984



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4225,76 +4232,84 @@ public ListOffsetsResult 
listOffsets(Map topicPartit
 }
 }
 
-for (final Map.Entry> entry : 
leaders.entrySet()) {
-final int brokerId = entry.getKey().id();
+for (final Map.Entry>> versionedEntry : leaders.entrySet()) {
+for (final Map.Entry> entry : versionedEntry.getValue().entrySet()) {
+final int brokerId = versionedEntry.getKey().id();
 
-calls.add(new Call("listOffsets on broker " + brokerId, 
context.deadline(), new ConstantNodeIdProvider(brokerId)) {
+calls.add(new Call("listOffsets on broker " + brokerId, 
context.deadline(), new ConstantNodeIdProvider(brokerId)) {
 
-final List partitionsToQuery = new 
ArrayList<>(entry.getValue().values());
+final List partitionsToQuery = new 
ArrayList<>(entry.getValue().values());
 
-@Override
-ListOffsetsRequest.Builder createRequest(int timeoutMs) {
-return ListOffsetsRequest.Builder
+@Override
+ListOffsetsRequest.Builder createRequest(int timeoutMs) {
+ListOffsetRequestVersion requestVersion = 
entry.getKey();
+if (requestVersion == 
ListOffsetRequestVersion.V7AndAbove) {
+return ListOffsetsRequest.Builder
+
.forMaxTimestamp(context.options().isolationLevel())
+.setTargetTimes(partitionsToQuery);
+}

Review comment:
   Oh... I see. So you are saying that if we have two specs for a given 
leader, say one with MAX_TIMESTAMP and another one with EARLIEST_TIMESTAMP, we 
send two separate requests to that leader, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

2021-05-27 Thread GitBox


thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r640552093



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4225,76 +4232,84 @@ public ListOffsetsResult 
listOffsets(Map topicPartit
 }
 }
 
-for (final Map.Entry> entry : 
leaders.entrySet()) {
-final int brokerId = entry.getKey().id();
+for (final Map.Entry>> versionedEntry : leaders.entrySet()) {
+for (final Map.Entry> entry : versionedEntry.getValue().entrySet()) {
+final int brokerId = versionedEntry.getKey().id();
 
-calls.add(new Call("listOffsets on broker " + brokerId, 
context.deadline(), new ConstantNodeIdProvider(brokerId)) {
+calls.add(new Call("listOffsets on broker " + brokerId, 
context.deadline(), new ConstantNodeIdProvider(brokerId)) {
 
-final List partitionsToQuery = new 
ArrayList<>(entry.getValue().values());
+final List partitionsToQuery = new 
ArrayList<>(entry.getValue().values());
 
-@Override
-ListOffsetsRequest.Builder createRequest(int timeoutMs) {
-return ListOffsetsRequest.Builder
+@Override
+ListOffsetsRequest.Builder createRequest(int timeoutMs) {
+ListOffsetRequestVersion requestVersion = 
entry.getKey();
+if (requestVersion == 
ListOffsetRequestVersion.V7AndAbove) {
+return ListOffsetsRequest.Builder
+
.forMaxTimestamp(context.options().isolationLevel())
+.setTargetTimes(partitionsToQuery);
+}

Review comment:
   That's right, as I read it we would send separate requests even under 
the old logic (i.e. for LATEST_TIMESTAMP and EARLIEST_TIMESTAMP). The only 
difference here is we limit the versions for MAX_TIMESTAMP.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10759: MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout`

2021-05-27 Thread GitBox


ijuma merged pull request #10759:
URL: https://github.com/apache/kafka/pull/10759


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] iakunin opened a new pull request #10775: KAFKA-12668: Making MockScheduler.schedule safe to use in concurrent code

2021-05-27 Thread GitBox


iakunin opened a new pull request #10775:
URL: https://github.com/apache/kafka/pull/10775


   Making MockScheduler.schedule safe to use in concurrent code
   by removing `tick()` call inside MockScheduler.schedule.
   
   To reproduce a bug I wrote a unit-test MockSchedulerTest. 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-12668) MockScheduler is not safe to use in concurrent code.

2021-05-27 Thread Maksim Iakunin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12668?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352476#comment-17352476
 ] 

Maksim Iakunin edited comment on KAFKA-12668 at 5/27/21, 1:29 PM:
--

GitHub pull-request with bug-fix: https://github.com/apache/kafka/pull/10775

[~jagsancio], could you have a look, please?


was (Author: iakunin):
GitHub pull-request: https://github.com/apache/kafka/pull/10775

> MockScheduler is not safe to use in concurrent code.
> 
>
> Key: KAFKA-12668
> URL: https://issues.apache.org/jira/browse/KAFKA-12668
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Reporter: Jose Armando Garcia Sancio
>Assignee: Maksim Iakunin
>Priority: Major
>  Labels: newbie
>
> The current implementation of {{MockScheduler}} executes tasks in the same 
> stack when {{schedule}} is called. This violates {{Log}}'s assumption since 
> {{Log}} calls {{schedule}} while holding a lock. This can cause deadlock in 
> tests.
> One solution is to change {{MockSchedule}} {{schedule}} method so that 
> {{tick}} is not called. {{tick}} should be called by a stack (thread) that 
> doesn't hold any locks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma opened a new pull request #10776: MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout`

2021-05-27 Thread GitBox


ijuma opened a new pull request #10776:
URL: https://github.com/apache/kafka/pull/10776


   New parameters in overloaded methods should appear later apart from
   lambdas that should always be last.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #10777: MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout`

2021-05-27 Thread GitBox


ijuma opened a new pull request #10777:
URL: https://github.com/apache/kafka/pull/10777


   New parameters in overloaded methods should appear later apart from
   lambdas that should always be last.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10759: MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout`

2021-05-27 Thread GitBox


ijuma commented on pull request #10759:
URL: https://github.com/apache/kafka/pull/10759#issuecomment-849642509


   PRs for 2.8 and 2.7:
   * https://github.com/apache/kafka/pull/10777
   * https://github.com/apache/kafka/pull/10776


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10761: MINOR: Don't ignore deletion of partition metadata file and log topic id clean-ups

2021-05-27 Thread GitBox


ijuma commented on a change in pull request #10761:
URL: https://github.com/apache/kafka/pull/10761#discussion_r640639035



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -325,19 +325,25 @@ class Log(@volatile private var _dir: File,
 // Ensure we do not try to assign a provided topicId that is inconsistent 
with the ID on file.
 if (partitionMetadataFile.exists()) {
 if (!keepPartitionMetadataFile)
-  partitionMetadataFile.delete()
+  try partitionMetadataFile.delete()
+  catch {
+case e: IOException =>
+  error(s"Error while trying to delete partition metadata file 
${partitionMetadataFile}", e)

Review comment:
   Good point, addressed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

2021-05-27 Thread GitBox


dajac commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r640640161



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4225,76 +4232,84 @@ public ListOffsetsResult 
listOffsets(Map topicPartit
 }
 }
 
-for (final Map.Entry> entry : 
leaders.entrySet()) {
-final int brokerId = entry.getKey().id();
+for (final Map.Entry>> versionedEntry : leaders.entrySet()) {
+for (final Map.Entry> entry : versionedEntry.getValue().entrySet()) {
+final int brokerId = versionedEntry.getKey().id();
 
-calls.add(new Call("listOffsets on broker " + brokerId, 
context.deadline(), new ConstantNodeIdProvider(brokerId)) {
+calls.add(new Call("listOffsets on broker " + brokerId, 
context.deadline(), new ConstantNodeIdProvider(brokerId)) {
 
-final List partitionsToQuery = new 
ArrayList<>(entry.getValue().values());
+final List partitionsToQuery = new 
ArrayList<>(entry.getValue().values());
 
-@Override
-ListOffsetsRequest.Builder createRequest(int timeoutMs) {
-return ListOffsetsRequest.Builder
+@Override
+ListOffsetsRequest.Builder createRequest(int timeoutMs) {
+ListOffsetRequestVersion requestVersion = 
entry.getKey();
+if (requestVersion == 
ListOffsetRequestVersion.V7AndAbove) {
+return ListOffsetsRequest.Builder
+
.forMaxTimestamp(context.options().isolationLevel())
+.setTargetTimes(partitionsToQuery);
+}

Review comment:
   It seems that the current logic send only one request per broker/leader 
whereas we could send up to two requests with your PR because specs are 
partitioned by `Node` and `ListOffsetRequestVersion`. Previously, they were 
only partitioned by `Node`.
   
   Intuitively, I would have approached the problem differently. I would have 
put all the specs in the same request and constrained its version to 7 and 
above if there is at least one `MAX_TIMESTAMP`. If the request succeeds, all 
good. If the request fail with an `UnsupportedVersionException`, I would have 
retried with all the specs but the `MAX_TIMESTAMP` ones and I would have failed 
the future of the `MAX_TIMESTAMP` specs.
   
   In case of an `UnsupportedVersionException`, the admin client calls the 
`handleUnsupportedVersionException` method of the `Call`. This gives you an 
opportunity to downgrade and to retry the `Call`. There are couple of example 
in the `KafkaAdminClient`.
   
   I wonder if we could rely on a similar pattern and avoid sending two 
requests per leader in the worst case. What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10176: KAFKA-12359: Update Jetty to 11

2021-05-27 Thread GitBox


ijuma commented on pull request #10176:
URL: https://github.com/apache/kafka/pull/10176#issuecomment-849652409


   We are not planning to upgrade to Java 11 any time soon. Can you please 
submit a PR for the latest Jetty version that supports Java 8 still?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma closed pull request #10176: KAFKA-12359: Update Jetty to 11

2021-05-27 Thread GitBox


ijuma closed pull request #10176:
URL: https://github.com/apache/kafka/pull/10176


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12774) kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through log4j

2021-05-27 Thread Almog Gavra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352499#comment-17352499
 ] 

Almog Gavra commented on KAFKA-12774:
-

I went back to triage the severity of this because if things aren't getting 
logged via Log4j then our applications would be missing log-shipping, which is 
crticial in our production environments.

To confirm nothing was being logged outside of Log4J I hacked together 
something to try to reproduce this and I also added the following line any time 
a record was to be sent in RecordCollectorImpl to make sure I had exactly the 
same type of error that you encountered:
{code:java}
recordSendError(topic, new InvalidPidMappingException("foo"), serializedRecord);
{code}
I could not reproduce. The only things that were logged (notice that all Log4j 
loggers are turned OFF so only things that get logged to stdout get logged) 
were what I specifially logged to stdout. This was the stdout (I printed to 
stdout in the uncaught handler instead of logging):
{code:java}
SEEN: 0,0
(HERE) Uncaught exception handled - replacing thread Error encountered sending 
record to topic bar for task 0_0 due to:
org.apache.kafka.common.errors.InvalidPidMappingException: foo
Exception handler choose to FAIL the processing, no more records would be sent.
{code}
We can leave this ticket open in case anyone else has any Ideas.

Here is the app:
{code:java}
public static void main(String[] args) throws InterruptedException, IOException 
{
  LogManager.getRootLogger().setLevel(Level.OFF);
  @SuppressWarnings("unchecked") Enumeration loggers = 
LogManager.getCurrentLoggers();
  while (loggers.hasMoreElements()) {
loggers.nextElement().setLevel(Level.OFF);
  }

  final EmbeddedKafkaCluster cluster = new EmbeddedKafkaCluster(1);
  cluster.start();
  cluster.createTopic("foo");

  final Properties config = new Properties();
  config.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.OPTIMIZE);
  config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app");
  config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + 123);
  config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
  config.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
  config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
  config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
  config.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
  config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
  config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);
  config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 1000);
  config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);

  final CountDownLatch sawKey = new CountDownLatch(1);
  final StreamsBuilder builder = new StreamsBuilder();
  builder.stream("foo")
  .filter((k, v) -> k != null)
  .peek((k, v) -> System.out.println("SEEN: " + k + "," + v))
  .peek((k ,v) -> {
if ((int) k == 0) sawKey.countDown();
  })
  .to("bar");

  final Topology build = builder.build(config);
  final KafkaStreams app = new KafkaStreams(build, config);

  app.setUncaughtExceptionHandler(exception -> {
System.out.println("(HERE) Uncaught exception handled - replacing 
thread " + exception.getMessage());
return 
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
  }
  );

  final CountDownLatch startLatch = new CountDownLatch(1);
  app.setStateListener((newState, oldState) -> {
if (newState == State.RUNNING) {
  startLatch.countDown();
}
  });
  app.start();
  startLatch.await();

  final Properties producerProps = new Properties();
  producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
  producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
  producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);

  IntegrationTestUtils.produceKeyValuesSynchronously(
  "foo",
  IntStream.range(0, 1)
  .mapToObj(i -> KeyValue.pair(0, i))
  .collect(Collectors.toList()),
  producerProps,
  Time.SYSTEM);


  sawKey.await();
  app.close();
  app.cleanUp();
  cluster.after();
}{code}
 

> kafka-streams 2.8: logging in uncaught-exceptionhandler doesn't go through 
> log4j
> 
>
> Key: KAFKA-12774
> URL: https://issues.apache.org/jira/browse/KAFKA-12774
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Jørgen
>Priority: Minor
> Fix For: 3.0.0, 2.8.1
>
>
> When exceptions is handled in the uncaught-exception handler introduced in 
> KS2.8,

[jira] [Created] (KAFKA-12856) Upgrade Jackson to 2.12.3

2021-05-27 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-12856:
---

 Summary: Upgrade Jackson to 2.12.3
 Key: KAFKA-12856
 URL: https://issues.apache.org/jira/browse/KAFKA-12856
 Project: Kafka
  Issue Type: Bug
Reporter: Ismael Juma
Assignee: Ismael Juma


2.10.x is no longer supported, so we should move to 2.12 for the 3.0 release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] iakunin commented on pull request #10775: KAFKA-12668: Making MockScheduler.schedule safe to use in concurrent code

2021-05-27 Thread GitBox


iakunin commented on pull request #10775:
URL: https://github.com/apache/kafka/pull/10775#issuecomment-849675662


   @jsancio  could you have a look, please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #10778: KAFKA-12856: Upgrade Jackson to 2.12.3

2021-05-27 Thread GitBox


ijuma opened a new pull request #10778:
URL: https://github.com/apache/kafka/pull/10778


   2.10.x is no longer supported, so we should move to 2.12 for the 3.0
   release.
   
   ScalaObjectMapper has been deprecated and it looks like we don't
   actually need it, so remove its usage.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12465) Decide whether inconsistent cluster id error are fatal

2021-05-27 Thread Omnia Ibrahim (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352518#comment-17352518
 ] 

Omnia Ibrahim commented on KAFKA-12465:
---

I have been testing KRAFT and I was trying this scenario where I setup a 
cluster with 3 combined nodes (broker, controller) and 3 nodes as brokers then 
later at some point I add an extra 2 nodes to the KRAFT with different cluster 
id. I would expect if this is a really deployment on production then these 2 
nodes with wrong cluster id should crash immediately so we can tell that 
something is wrong during the deployment. 


The scenario I was testing is the following:
 * Setup a cluster with 3 combined raft nodes (broker, controller mode) + 3 
brokers nodes with cluster id {{CLUSTER_ID_1}} and they elected {{raft-node-1}} 
to become the leader.
 *  Added an extra 2 nodes later to the raft with different cluster id 
{{WRONG_CLUSTER_ID}}


 * The the extra nodes don't crash however it stay in running mode and keep 
throw error
{code:java}
 {"level":"ERROR","message":"[RaftManager nodeId=8] Unexpected error 
INCONSISTENT_CLUSTER_ID in FETCH response: InboundResponse(correlationId=16699, 
data=FetchResponseData(throttleTimeMs=0, errorCode=104, sessionId=0, 
responses=[]), 
sourceId=2)","logger":"org.apache.kafka.raft.KafkaRaftClient"}{code}
{{}}
 * {{raft-node-1}} don't throw errors, only warning for connection issues 
connection 

{code:java}
{"level":"WARN","message":"[RaftManager nodeId=1] Error connecting to node 
raft-node-4:9093 (id: 8 rack: 
null)","logger":"org.apache.kafka.clients.NetworkClient","throwable":{"class":"java.net.UnknownHostException","msg":"raft-node-4","stack":["java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)","java.net.InetAddress.getAllByName0(InetAddress.java:1505)","java.net.InetAddress.getAllByName(InetAddress.java:1364)","java.net.InetAddress.getAllByName(InetAddress.java:1298)","org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)","org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:111)","org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:512)","org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:466)","org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:172)","org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:985)","org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:311)","kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1(InterBrokerSendThread.scala:103)","kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1$adapted(InterBrokerSendThread.scala:99)","scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)","scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)","scala.collection.AbstractIterable.foreach(Iterable.scala:920)","kafka.common.InterBrokerSendThread.sendRequests(InterBrokerSendThread.scala:99)","kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:73)","kafka.common.InterBrokerSendThread.doWork(InterBrokerSendThread.scala:94)","kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)"]}}{code}

If this is a real deployment the error `INCONSISTENT_CLUSTER_ID` should be 
fatel at all time, otherwise how can we tell if these nodes is failing to join 
the active raft quourm? 

> Decide whether inconsistent cluster id error are fatal
> --
>
> Key: KAFKA-12465
> URL: https://issues.apache.org/jira/browse/KAFKA-12465
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: dengziming
>Priority: Major
>
> Currently, we just log an error when an inconsistent cluster-id occurred. We 
> should set a window during startup when these errors are fatal but after that 
> window, we no longer treat them to be fatal. see 
> https://github.com/apache/kafka/pull/10289#discussion_r592853088



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12465) Decide whether inconsistent cluster id error are fatal

2021-05-27 Thread Omnia Ibrahim (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352518#comment-17352518
 ] 

Omnia Ibrahim edited comment on KAFKA-12465 at 5/27/21, 2:26 PM:
-

I have been testing KRAFT and I was trying this scenario where I setup a 
cluster with 3 combined nodes (broker, controller) and 3 nodes as brokers then 
later at some point I add an extra 2 nodes to the KRAFT with different cluster 
id. I would expect if this is a really deployment on production then these 2 
nodes with wrong cluster id should crash immediately so we can tell that 
something is wrong during the deployment. 

The scenario I was testing is the following:
 * Setup a cluster with 3 combined raft nodes (broker, controller mode) + 3 
brokers nodes with cluster id {{CLUSTER_ID_1}} and they elected {{raft-node-1}} 
to become the leader.
 *  Added an extra 2 nodes later to the raft with different cluster id 
{{WRONG_CLUSTER_ID}}
 * The the extra nodes don't crash however it stay in running mode and keep 
throw error
{code:java}
 {"level":"ERROR","message":"[RaftManager nodeId=8] Unexpected error 
INCONSISTENT_CLUSTER_ID in FETCH response: InboundResponse(correlationId=16699, 
data=FetchResponseData(throttleTimeMs=0, errorCode=104, sessionId=0, 
responses=[]), 
sourceId=2)","logger":"org.apache.kafka.raft.KafkaRaftClient"}{code}

 * {{raft-node-1}} don't throw errors, only warning for connection issues 
connection

{code:java}
{"level":"WARN","message":"[RaftManager nodeId=1] Error connecting to node 
raft-node-4:9093 (id: 8 rack: 
null)","logger":"org.apache.kafka.clients.NetworkClient","throwable":{"class":"java.net.UnknownHostException","msg":"raft-node-4","stack":["java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)","java.net.InetAddress.getAllByName0(InetAddress.java:1505)","java.net.InetAddress.getAllByName(InetAddress.java:1364)","java.net.InetAddress.getAllByName(InetAddress.java:1298)","org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)","org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:111)","org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:512)","org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:466)","org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:172)","org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:985)","org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:311)","kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1(InterBrokerSendThread.scala:103)","kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1$adapted(InterBrokerSendThread.scala:99)","scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)","scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)","scala.collection.AbstractIterable.foreach(Iterable.scala:920)","kafka.common.InterBrokerSendThread.sendRequests(InterBrokerSendThread.scala:99)","kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:73)","kafka.common.InterBrokerSendThread.doWork(InterBrokerSendThread.scala:94)","kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)"]}}{code}
If this is a real deployment the error `INCONSISTENT_CLUSTER_ID` should be 
fatel at all time, otherwise how can we tell if these nodes is failing to join 
the active raft quourm? 


was (Author: omnia_h_ibrahim):
I have been testing KRAFT and I was trying this scenario where I setup a 
cluster with 3 combined nodes (broker, controller) and 3 nodes as brokers then 
later at some point I add an extra 2 nodes to the KRAFT with different cluster 
id. I would expect if this is a really deployment on production then these 2 
nodes with wrong cluster id should crash immediately so we can tell that 
something is wrong during the deployment. 


The scenario I was testing is the following:
 * Setup a cluster with 3 combined raft nodes (broker, controller mode) + 3 
brokers nodes with cluster id {{CLUSTER_ID_1}} and they elected {{raft-node-1}} 
to become the leader.
 *  Added an extra 2 nodes later to the raft with different cluster id 
{{WRONG_CLUSTER_ID}}


 * The the extra nodes don't crash however it stay in running mode and keep 
throw error
{code:java}
 {"level":"ERROR","message":"[RaftManager nodeId=8] Unexpected error 
INCONSISTENT_CLUSTER_ID in FETCH response: InboundResponse(correlationId=16699, 
data=FetchResponseData(throttleTimeMs=0, errorCode=104, sessionId=0, 
responses=[]), 
sourceId=2)","logger":"org.apache.kafka.raft.KafkaRaftClient"}{code}
{{}}
 * {{raft-node-1}} don't throw errors, only warning for connection issues 
connection 

{code:java}
{"level":"WARN","message":"[RaftManager nodeId=1] Error connecting to node 
raft-node-4:9093 (id: 8 rack: 
null)","logger":

[GitHub] [kafka] iakunin edited a comment on pull request #10775: KAFKA-12668: Making MockScheduler.schedule safe to use in concurrent code

2021-05-27 Thread GitBox


iakunin edited a comment on pull request #10775:
URL: https://github.com/apache/kafka/pull/10775#issuecomment-849675662


   @jsancio hi!
   
   Could you have a look at this PR, please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #10765: KAFKA-12519: Remove built-in Streams metrics for versions 0.10.0-2.4

2021-05-27 Thread GitBox


cadonna commented on a change in pull request #10765:
URL: https://github.com/apache/kafka/pull/10765#discussion_r640686291



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
##
@@ -272,21 +230,8 @@ public void shouldRecordRestoreLatencyOnInit() {
 expectLastCall();
 replay(innerStoreMock);
 store.init((StateStoreContext) context, store);
-final Map metrics = 
context.metrics().metrics();
-if (StreamsConfig.METRICS_0100_TO_24.equals(builtInMetricsVersion)) {
-assertEquals(1.0, getMetricByNameFilterByTags(
-metrics,
-"restore-total",
-storeLevelGroup,
-singletonMap(STORE_TYPE + "-state-id", STORE_NAME)
-).metricValue());
-assertEquals(1.0, getMetricByNameFilterByTags(
-metrics,
-"restore-total",
-storeLevelGroup,
-singletonMap(STORE_TYPE + "-state-id", ROLLUP_VALUE)
-).metricValue());
-}
+
+verify(innerStoreMock);

Review comment:
   Good that you mention that! I checked the metered state store tests and 
found some testing holes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10764: MINOR: make sure all fiedls of o.p.k.s.a.Action are NOT null

2021-05-27 Thread GitBox


ijuma commented on a change in pull request #10764:
URL: https://github.com/apache/kafka/pull/10764#discussion_r640705924



##
File path: clients/src/main/java/org/apache/kafka/server/authorizer/Action.java
##
@@ -36,22 +36,22 @@ public Action(AclOperation operation,
   int resourceReferenceCount,
   boolean logIfAllowed,
   boolean logIfDenied) {
-this.operation = operation;
-this.resourcePattern = resourcePattern;
+this.operation = Objects.requireNonNull(operation, "operation can't be 
null");
+this.resourcePattern = Objects.requireNonNull(resourcePattern, 
"resourcePattern can't be null");
 this.logIfAllowed = logIfAllowed;
 this.logIfDenied = logIfDenied;
 this.resourceReferenceCount = resourceReferenceCount;
 }
 
 /**
- * Resource on which action is being performed.
+ * Resource on which action is being performed. never null

Review comment:
   I would include this in the sentence, "Returns a non-null resource 
pattern on which this action is being performed".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10764: MINOR: make sure all fiedls of o.p.k.s.a.Action are NOT null

2021-05-27 Thread GitBox


ijuma commented on a change in pull request #10764:
URL: https://github.com/apache/kafka/pull/10764#discussion_r640705924



##
File path: clients/src/main/java/org/apache/kafka/server/authorizer/Action.java
##
@@ -36,22 +36,22 @@ public Action(AclOperation operation,
   int resourceReferenceCount,
   boolean logIfAllowed,
   boolean logIfDenied) {
-this.operation = operation;
-this.resourcePattern = resourcePattern;
+this.operation = Objects.requireNonNull(operation, "operation can't be 
null");
+this.resourcePattern = Objects.requireNonNull(resourcePattern, 
"resourcePattern can't be null");
 this.logIfAllowed = logIfAllowed;
 this.logIfDenied = logIfDenied;
 this.resourceReferenceCount = resourceReferenceCount;
 }
 
 /**
- * Resource on which action is being performed.
+ * Resource on which action is being performed. never null

Review comment:
   I would include the non-null constraint in the sentence, eg "Returns a 
non-null resource pattern on which this action is being performed".

##
File path: clients/src/main/java/org/apache/kafka/server/authorizer/Action.java
##
@@ -36,22 +36,22 @@ public Action(AclOperation operation,
   int resourceReferenceCount,
   boolean logIfAllowed,
   boolean logIfDenied) {
-this.operation = operation;
-this.resourcePattern = resourcePattern;
+this.operation = Objects.requireNonNull(operation, "operation can't be 
null");
+this.resourcePattern = Objects.requireNonNull(resourcePattern, 
"resourcePattern can't be null");

Review comment:
   Shall we mention this in the javadoc too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #10704: KAFKA-12791: ConcurrentModificationException in AbstractConfig use by KafkaProducer

2021-05-27 Thread GitBox


lbradstreet commented on a change in pull request #10704:
URL: https://github.com/apache/kafka/pull/10704#discussion_r640706594



##
File path: 
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
##
@@ -106,7 +107,7 @@ public AbstractConfig(ConfigDef definition, Map 
originals,  Map
 
 this.originals = resolveConfigVariables(configProviderProps, 
(Map) originals);
 this.values = definition.parse(this.originals);
-this.used = Collections.synchronizedSet(new HashSet<>());
+this.used = ConcurrentHashMap.newKeySet();

Review comment:
   Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-05-27 Thread GitBox


feyman2016 commented on a change in pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#discussion_r640723618



##
File path: 
raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
##
@@ -135,4 +141,18 @@ public static void assertSnapshot(List> 
batches, SnapshotReader

[jira] [Comment Edited] (KAFKA-12465) Decide whether inconsistent cluster id error are fatal

2021-05-27 Thread Omnia Ibrahim (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12465?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352518#comment-17352518
 ] 

Omnia Ibrahim edited comment on KAFKA-12465 at 5/27/21, 3:40 PM:
-

I have been testing KRAFT and I tried this scenario where I setup a cluster 
with 3 combined nodes (broker, controller) and 3 nodes as brokers then later at 
some point I add an extra 2 nodes to the KRAFT with different cluster id. I 
would expect if this is a really deployment on production then these 2 nodes 
with wrong cluster id should crash immediately so we can tell that something is 
wrong during the deployment. 

The scenario I was testing is the following:
 * Setup a cluster with 3 combined raft nodes (broker, controller mode) + 3 
brokers nodes with cluster id {{CLUSTER_ID_1}} and they elected {{raft-node-1}} 
to become the leader.
 *  Added an extra 2 nodes later to the raft with different cluster id 
{{WRONG_CLUSTER_ID}}
 * The the extra nodes don't crash however it stay in running mode and keep 
throw error
{code:java}
 {"level":"ERROR","message":"[RaftManager nodeId=8] Unexpected error 
INCONSISTENT_CLUSTER_ID in FETCH response: InboundResponse(correlationId=16699, 
data=FetchResponseData(throttleTimeMs=0, errorCode=104, sessionId=0, 
responses=[]), 
sourceId=2)","logger":"org.apache.kafka.raft.KafkaRaftClient"}{code}

 * {{raft-node-1}} don't throw errors, only warning for connection issues 
connection

{code:java}
{"level":"WARN","message":"[RaftManager nodeId=1] Error connecting to node 
raft-node-4:9093 (id: 8 rack: 
null)","logger":"org.apache.kafka.clients.NetworkClient","throwable":{"class":"java.net.UnknownHostException","msg":"raft-node-4","stack":["java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)","java.net.InetAddress.getAllByName0(InetAddress.java:1505)","java.net.InetAddress.getAllByName(InetAddress.java:1364)","java.net.InetAddress.getAllByName(InetAddress.java:1298)","org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)","org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:111)","org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:512)","org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:466)","org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:172)","org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:985)","org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:311)","kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1(InterBrokerSendThread.scala:103)","kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1$adapted(InterBrokerSendThread.scala:99)","scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)","scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)","scala.collection.AbstractIterable.foreach(Iterable.scala:920)","kafka.common.InterBrokerSendThread.sendRequests(InterBrokerSendThread.scala:99)","kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:73)","kafka.common.InterBrokerSendThread.doWork(InterBrokerSendThread.scala:94)","kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)"]}}{code}
If this is a real deployment the error `INCONSISTENT_CLUSTER_ID` should be 
fatel at all time, otherwise how can we tell if these nodes is failing to join 
the active raft quourm? 


was (Author: omnia_h_ibrahim):
I have been testing KRAFT and I was trying this scenario where I setup a 
cluster with 3 combined nodes (broker, controller) and 3 nodes as brokers then 
later at some point I add an extra 2 nodes to the KRAFT with different cluster 
id. I would expect if this is a really deployment on production then these 2 
nodes with wrong cluster id should crash immediately so we can tell that 
something is wrong during the deployment. 

The scenario I was testing is the following:
 * Setup a cluster with 3 combined raft nodes (broker, controller mode) + 3 
brokers nodes with cluster id {{CLUSTER_ID_1}} and they elected {{raft-node-1}} 
to become the leader.
 *  Added an extra 2 nodes later to the raft with different cluster id 
{{WRONG_CLUSTER_ID}}
 * The the extra nodes don't crash however it stay in running mode and keep 
throw error
{code:java}
 {"level":"ERROR","message":"[RaftManager nodeId=8] Unexpected error 
INCONSISTENT_CLUSTER_ID in FETCH response: InboundResponse(correlationId=16699, 
data=FetchResponseData(throttleTimeMs=0, errorCode=104, sessionId=0, 
responses=[]), 
sourceId=2)","logger":"org.apache.kafka.raft.KafkaRaftClient"}{code}

 * {{raft-node-1}} don't throw errors, only warning for connection issues 
connection

{code:java}
{"level":"WARN","message":"[RaftManager nodeId=1] Error connecting to node 
raft-node-4:9093 (id: 8 rack: 
null)","logger":"org.apache.k

[GitHub] [kafka] junrao merged pull request #10684: MINOR: Improve Log layer segment iteration logic and few other areas

2021-05-27 Thread GitBox


junrao merged pull request #10684:
URL: https://github.com/apache/kafka/pull/10684


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-05-27 Thread GitBox


feyman2016 commented on pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#issuecomment-849753337


   @jsancio Addressed the comments but just found that I would need to 
generalize `context.advanceLeaderHighWatermarkToEndOffset ()`, will update 
later~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #10744: KAFKA-8410: KTableProcessor migration groundwork

2021-05-27 Thread GitBox


abbccdda commented on a change in pull request #10744:
URL: https://github.com/apache/kafka/pull/10744#discussion_r640767449



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##
@@ -124,7 +124,10 @@
 private static final String TOPIC_SUFFIX = "-topic";
 private static final String SINK_NAME = "KTABLE-SINK-";
 
-private final ProcessorSupplier processorSupplier;
+// Temporarily setting the processorSupplier to type Object so that we can 
transition from the

Review comment:
   s/transition/transit

##
File path: streams/src/main/java/org/apache/kafka/streams/processor/To.java
##
@@ -89,4 +89,11 @@ public int hashCode() {
 throw new UnsupportedOperationException("To is unsafe for use in Hash 
collections");
 }
 
+@Override
+public String toString() {
+return "To{" +

Review comment:
   nit: could we do a string format for this to read easier?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
##
@@ -19,30 +19,45 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.state.internals.CacheFlushListener;
 
-class SessionCacheFlushListener implements 
CacheFlushListener, V> {
-private final InternalProcessorContext context;
+class SessionCacheFlushListener implements 
CacheFlushListener, VOut> {
+private final InternalProcessorContext, Change> 
context;
+
+@SuppressWarnings("rawtypes")
 private final ProcessorNode myNode;
 
+@SuppressWarnings("unchecked")
 SessionCacheFlushListener(final ProcessorContext context) {
-this.context = (InternalProcessorContext) context;
+this.context = (InternalProcessorContext, 
Change>) context;
 myNode = this.context.currentNode();
 }
 
 @Override
-public void apply(final Windowed key,
-  final V newValue,
-  final V oldValue,
+public void apply(final Windowed key,
+  final VOut newValue,
+  final VOut oldValue,
   final long timestamp) {
-final ProcessorNode prev = context.currentNode();
+@SuppressWarnings("rawtypes") final ProcessorNode prev = 
context.currentNode();

Review comment:
   Why do we put suppression inline, instead of putting it on the top of 
function?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableNewProcessorSupplier.java
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+
+public interface KTableNewProcessorSupplier extends 
ProcessorSupplier, KOut, Change> {
+
+KTableValueGetterSupplier view();
+
+/**
+ * Potentially enables sending old values.
+ * 
+ * If {@code forceMaterialization} is {@code true}, the method will force 
the materialization of upstream nodes to
+ * enable sending old values.
+ * 
+ * If {@code forceMaterialization} is {@code false}, the method will only 
enable the sending of old values if
+ * an upstream node is already materialized.
+ *
+ * @param forceMaterialization indicates if an upstream node should be 
forced to materialize to enable sending old
+ * values.
+ * @return {@code true} is sending old values is enabled, i.e. either 
because {@code forceMaterialization} was

Review comment:
   ...if sending old values

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
##
@@ -24,14 +24,14 @@
 import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerializer;
 import static 
org.apache.kafka.stre

[GitHub] [kafka] ijuma commented on a change in pull request #10704: KAFKA-12791: ConcurrentModificationException in AbstractConfig use by KafkaProducer

2021-05-27 Thread GitBox


ijuma commented on a change in pull request #10704:
URL: https://github.com/apache/kafka/pull/10704#discussion_r640788402



##
File path: 
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
##
@@ -508,6 +509,55 @@ public void testDocumentationOfExpectNull() {
 assertNull(config.documentationOf("xyz"));
 }
 
+@Test
+public void testConcurrentUnusedUse() throws InterruptedException {

Review comment:
   I think I'd remove it or at least tag it as an integration test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10764: MINOR: make sure all fiedls of o.p.k.s.a.Action are NOT null

2021-05-27 Thread GitBox


chia7712 commented on pull request #10764:
URL: https://github.com/apache/kafka/pull/10764#issuecomment-849784432


   @ijuma thanks for your suggestion. I have addressed all comments. PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-05-27 Thread GitBox


jsancio commented on pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#issuecomment-849787584


   > @jsancio Addressed the comments but just found that I would need to 
generalize context.advanceLeaderHighWatermarkToEndOffset (), will update later~
   
   I needed something similar in a PR I am currently working and this is what I 
have if you want to adapt it to your PR:
   
   ```java
   public void advanceLocalLeaderHighWatermarkToLogEndOffset() throws 
InterruptedException {
   assertEquals(localId, currentLeader());
   long localLogEndOffset = log.endOffset().offset;
   Set followers = voters.stream().filter(voter -> voter != 
localId.getAsInt()).collect(Collectors.toSet());
   
   // Send a request from every follower
   for (int follower : followers) {
   deliverRequest(
   fetchRequest(currentEpoch(), follower, localLogEndOffset, 
currentEpoch(), 0)
   );
   pollUntilResponse();
   assertSentFetchPartitionResponse(Errors.NONE, currentEpoch(), 
localId);
   }
   
   pollUntil(() -> 
OptionalLong.of(localLogEndOffset).equals(client.highWatermark()));
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino opened a new pull request #10779: CONFLUENT: Complete version regex check in system tests

2021-05-27 Thread GitBox


rondagostino opened a new pull request #10779:
URL: https://github.com/apache/kafka/pull/10779


   A couple of the sanity-check system tests confirm that the version of Kafka 
on the CLASSPATH during the test is the one that is expected.  These tests were 
failing for the 2.8 release due to the version `6.2.0-ccs` not being accounted 
for correctly in the various regex's.  This patch revamps the regex set to be 
more organized and complete with the various possibilities clearly identified.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino closed pull request #10779: CONFLUENT: Complete version regex check in system tests

2021-05-27 Thread GitBox


rondagostino closed pull request #10779:
URL: https://github.com/apache/kafka/pull/10779


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #10779: CONFLUENT: Complete version regex check in system tests

2021-05-27 Thread GitBox


rondagostino commented on pull request #10779:
URL: https://github.com/apache/kafka/pull/10779#issuecomment-849788603


   Wrong base


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10778: KAFKA-12856: Upgrade Jackson to 2.12.3

2021-05-27 Thread GitBox


ijuma commented on pull request #10778:
URL: https://github.com/apache/kafka/pull/10778#issuecomment-849789835


   Tests look good, one job passed, the others had unrelated failures:
   
   > Build / JDK 15 and Scala 2.13 / 
testCreateClusterAndCreateAndManyTopicsWithManyPartitions() – 
kafka.server.RaftClusterTest
   > 23s
   > Build / JDK 15 and Scala 2.13 / 
testCreateClusterAndCreateAndManyTopicsWithManyPartitions() – 
kafka.server.RaftClusterTest
   > 18s
   > Build / JDK 8 and Scala 2.12 / 
testCreateClusterAndCreateAndManyTopicsWithManyPartitions() – 
kafka.server.RaftClusterTest
   > 25s
   > Build / JDK 8 and Scala 2.12 / 
testCreateClusterAndCreateAndManyTopicsWithManyPartitions() – 
kafka.server.RaftClusterTest


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10778: KAFKA-12856: Upgrade Jackson to 2.12.3

2021-05-27 Thread GitBox


ijuma merged pull request #10778:
URL: https://github.com/apache/kafka/pull/10778


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10776: MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout`

2021-05-27 Thread GitBox


ijuma commented on pull request #10776:
URL: https://github.com/apache/kafka/pull/10776#issuecomment-849790914


   Created the backport PR just to verify that tests passed, merging.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10776: MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout`

2021-05-27 Thread GitBox


ijuma merged pull request #10776:
URL: https://github.com/apache/kafka/pull/10776


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #10593: KAFKA-10800 Validate the snapshot id when the state machine creates a snapshot

2021-05-27 Thread GitBox


jsancio commented on a change in pull request #10593:
URL: https://github.com/apache/kafka/pull/10593#discussion_r640805609



##
File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
##
@@ -963,6 +952,20 @@ static void verifyLeaderChangeMessage(
 new HashSet<>(leaderChangeMessage.grantingVoters()));
 }
 
+public void advanceLeaderHighWatermarkToEndOffset() throws 
InterruptedException {
+Integer replicaId = null;
+for (Integer voter: voters) {
+if (voter != localId.getAsInt()) {
+replicaId = voter;
+break;
+}
+}
+deliverRequest(fetchRequest(currentEpoch(), replicaId, 
log.endOffset().offset, currentEpoch(), 0));
+pollUntilResponse();
+assertSentFetchPartitionResponse(Errors.NONE, currentEpoch(), localId);
+assertEquals(log.endOffset().offset, 
client.highWatermark().getAsLong());
+}

Review comment:
   I needed something similar in a PR I am currently working and this is 
what I have if you want to adapt it to your PR:
   
   ```java
   public void advanceLocalLeaderHighWatermarkToLogEndOffset() throws 
InterruptedException {
   assertEquals(localId, currentLeader());
   long localLogEndOffset = log.endOffset().offset;
   Set followers = voters.stream().filter(voter -> voter != 
localId.getAsInt()).collect(Collectors.toSet());
   
   // Send a request from every follower
   for (int follower : followers) {
   deliverRequest(
   fetchRequest(currentEpoch(), follower, localLogEndOffset, 
currentEpoch(), 0)
   );
   pollUntilResponse();
   assertSentFetchPartitionResponse(Errors.NONE, currentEpoch(), 
localId);
   }
   
   pollUntil(() -> 
OptionalLong.of(localLogEndOffset).equals(client.highWatermark()));
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10777: MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout`

2021-05-27 Thread GitBox


ijuma commented on pull request #10777:
URL: https://github.com/apache/kafka/pull/10777#issuecomment-849791655


   Created the backport PR just to check the tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10777: MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout`

2021-05-27 Thread GitBox


ijuma merged pull request #10777:
URL: https://github.com/apache/kafka/pull/10777


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12856) Upgrade Jackson to 2.12.3

2021-05-27 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-12856.
-
Fix Version/s: 3.0.0
   Resolution: Fixed

> Upgrade Jackson to 2.12.3
> -
>
> Key: KAFKA-12856
> URL: https://issues.apache.org/jira/browse/KAFKA-12856
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 3.0.0
>
>
> 2.10.x is no longer supported, so we should move to 2.12 for the 3.0 release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL

2021-05-27 Thread GitBox


ijuma commented on pull request #10758:
URL: https://github.com/apache/kafka/pull/10758#issuecomment-849798924


   Thanks for looking into this. Can we switch the release requirement to be 
Java 15 instead of Java 11 then? We can then switch to Java 17 once that's out 
and stick to that for a while.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #10704: KAFKA-12791: ConcurrentModificationException in AbstractConfig use by KafkaProducer

2021-05-27 Thread GitBox


lbradstreet commented on a change in pull request #10704:
URL: https://github.com/apache/kafka/pull/10704#discussion_r640820748



##
File path: 
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
##
@@ -508,6 +509,55 @@ public void testDocumentationOfExpectNull() {
 assertNull(config.documentationOf("xyz"));
 }
 
+@Test
+public void testConcurrentUnusedUse() throws InterruptedException {

Review comment:
   I don't think it adds much value so I'd rather delete it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL

2021-05-27 Thread GitBox


jlprat commented on pull request #10758:
URL: https://github.com/apache/kafka/pull/10758#issuecomment-849805196


   Hi @ijuma, yes, if you would generate the Javadocs with JDK 15 (or 16) 
instead of 11, it would work (I tried it myself locally). Then one only needs 
to modify the `release.py` file to accomplish this.
   I could create a PR with those changes if you like.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #10704: KAFKA-12791: ConcurrentModificationException in AbstractConfig use by KafkaProducer

2021-05-27 Thread GitBox


lbradstreet commented on a change in pull request #10704:
URL: https://github.com/apache/kafka/pull/10704#discussion_r640823494



##
File path: 
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
##
@@ -508,6 +509,55 @@ public void testDocumentationOfExpectNull() {
 assertNull(config.documentationOf("xyz"));
 }
 
+@Test
+public void testConcurrentUnusedUse() throws InterruptedException {

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL

2021-05-27 Thread GitBox


ijuma commented on pull request #10758:
URL: https://github.com/apache/kafka/pull/10758#issuecomment-849813756


   Yes, let's go with 15 since Kafka doesn't work with 16 yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL

2021-05-27 Thread GitBox


jlprat commented on pull request #10758:
URL: https://github.com/apache/kafka/pull/10758#issuecomment-849814665


   Shall, I close this PR and open a new one with the modified `release.py`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #10744: KAFKA-8410: KTableProcessor migration groundwork

2021-05-27 Thread GitBox


vvcephei commented on a change in pull request #10744:
URL: https://github.com/apache/kafka/pull/10744#discussion_r640835700



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableNewProcessorSupplier.java
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+
+public interface KTableNewProcessorSupplier extends 
ProcessorSupplier, KOut, Change> {

Review comment:
   I'll leave it to @jeqo to make sure this is done at the end of the 
migration. I don't think a TODO in the code is much use, but a subtask in @jeqo 
's ticket would be effective.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #10744: KAFKA-8410: KTableProcessor migration groundwork

2021-05-27 Thread GitBox


vvcephei commented on a change in pull request #10744:
URL: https://github.com/apache/kafka/pull/10744#discussion_r640835922



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionCacheFlushListener.java
##
@@ -19,30 +19,45 @@
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.To;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.state.internals.CacheFlushListener;
 
-class SessionCacheFlushListener implements 
CacheFlushListener, V> {
-private final InternalProcessorContext context;
+class SessionCacheFlushListener implements 
CacheFlushListener, VOut> {
+private final InternalProcessorContext, Change> 
context;
+
+@SuppressWarnings("rawtypes")
 private final ProcessorNode myNode;
 
+@SuppressWarnings("unchecked")
 SessionCacheFlushListener(final ProcessorContext context) {
-this.context = (InternalProcessorContext) context;
+this.context = (InternalProcessorContext, 
Change>) context;
 myNode = this.context.currentNode();
 }
 
 @Override
-public void apply(final Windowed key,
-  final V newValue,
-  final V oldValue,
+public void apply(final Windowed key,
+  final VOut newValue,
+  final VOut oldValue,
   final long timestamp) {
-final ProcessorNode prev = context.currentNode();
+@SuppressWarnings("rawtypes") final ProcessorNode prev = 
context.currentNode();

Review comment:
   Just to limit the scope of the suppression and not mask other mistakes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #10744: KAFKA-8410: KTableProcessor migration groundwork

2021-05-27 Thread GitBox


vvcephei commented on a change in pull request #10744:
URL: https://github.com/apache/kafka/pull/10744#discussion_r640837000



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableNewProcessorSupplier.java
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+
+public interface KTableNewProcessorSupplier extends 
ProcessorSupplier, KOut, Change> {
+
+KTableValueGetterSupplier view();
+
+/**
+ * Potentially enables sending old values.
+ * 
+ * If {@code forceMaterialization} is {@code true}, the method will force 
the materialization of upstream nodes to
+ * enable sending old values.
+ * 
+ * If {@code forceMaterialization} is {@code false}, the method will only 
enable the sending of old values if
+ * an upstream node is already materialized.
+ *
+ * @param forceMaterialization indicates if an upstream node should be 
forced to materialize to enable sending old
+ * values.
+ * @return {@code true} is sending old values is enabled, i.e. either 
because {@code forceMaterialization} was

Review comment:
   ```suggestion
* @return {@code true} if sending old values is enabled, i.e. either 
because {@code forceMaterialization} was
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #10744: KAFKA-8410: KTableProcessor migration groundwork

2021-05-27 Thread GitBox


vvcephei commented on a change in pull request #10744:
URL: https://github.com/apache/kafka/pull/10744#discussion_r640837839



##
File path: streams/src/main/java/org/apache/kafka/streams/processor/To.java
##
@@ -89,4 +89,11 @@ public int hashCode() {
 throw new UnsupportedOperationException("To is unsafe for use in Hash 
collections");
 }
 
+@Override
+public String toString() {
+return "To{" +

Review comment:
   I just didn't bother because there's no place it would actually be 
printed unless a test is failing. We can give more thought to the string format 
later on as needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL

2021-05-27 Thread GitBox


ijuma commented on pull request #10758:
URL: https://github.com/apache/kafka/pull/10758#issuecomment-849820046


   Or just replace the code in this branch. Whatever you prefer.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #10744: KAFKA-8410: KTableProcessor migration groundwork

2021-05-27 Thread GitBox


vvcephei commented on a change in pull request #10744:
URL: https://github.com/apache/kafka/pull/10744#discussion_r640838987



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CacheFlushListener.java
##
@@ -31,4 +34,9 @@
  * @param timestamp   timestamp of new value
  */
 void apply(final K key, final V newValue, final V oldValue, final long 
timestamp);
+
+/**
+ * Called when records are flushed from the {@link ThreadCache}
+ */
+void apply(final Record> record);

Review comment:
   I don't want to deprecate it right now, but as with the rest of the 
"compatibility mode" changes, the old member should become unused by the time 
@jeqo is done and we can remove it at that time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat opened a new pull request #10780: KAFKA-12782: Fix Javadocs generation by upgrading JDK

2021-05-27 Thread GitBox


jlprat opened a new pull request #10780:
URL: https://github.com/apache/kafka/pull/10780


   This, upgrades JDK to version 15 for the docs generation, this way we
   can circumvent bug https://bugs.openjdk.java.net/browse/JDK-8215291
   present in JDK11
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat closed pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL

2021-05-27 Thread GitBox


jlprat closed pull request #10758:
URL: https://github.com/apache/kafka/pull/10758


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL

2021-05-27 Thread GitBox


jlprat commented on pull request #10758:
URL: https://github.com/apache/kafka/pull/10758#issuecomment-849820849


   Closed in favor of https://github.com/apache/kafka/pull/10780


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #10780: KAFKA-12782: Fix Javadocs generation by upgrading JDK

2021-05-27 Thread GitBox


jlprat commented on a change in pull request #10780:
URL: https://github.com/apache/kafka/pull/10780#discussion_r640840181



##
File path: release.py
##
@@ -265,7 +265,7 @@ def command_stage_docs():
 # version due to already having bumped the bugfix version number.
 gradle_version_override = docs_release_version(version)
 
-cmd("Building docs", "./gradlew -Pversion=%s clean siteDocsTar 
aggregatedJavadoc" % gradle_version_override, cwd=REPO_HOME, env=jdk11_env)
+cmd("Building docs", "./gradlew -Pversion=%s clean siteDocsTar 
aggregatedJavadoc" % gradle_version_override, cwd=REPO_HOME, env=jdk15_env)

Review comment:
   This, together with change in line 259, are the changes needed

##
File path: release.py
##
@@ -600,7 +601,7 @@ def select_gpg_key():
 
 cmd("Building artifacts", "./gradlew clean && ./gradlewAll releaseTarGz", 
cwd=kafka_dir, env=jdk8_env, shell=True)
 cmd("Copying artifacts", "cp %s/core/build/distributions/* %s" % (kafka_dir, 
artifacts_dir), shell=True)
-cmd("Building docs", "./gradlew clean aggregatedJavadoc", cwd=kafka_dir, 
env=jdk11_env)
+cmd("Building docs", "./gradlew clean aggregatedJavadoc", cwd=kafka_dir, 
env=jdk15_env)

Review comment:
   This one I'm not sure it's really needed as I never executed that part 
of the script.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #10744: KAFKA-8410: KTableProcessor migration groundwork

2021-05-27 Thread GitBox


vvcephei commented on a change in pull request #10744:
URL: https://github.com/apache/kafka/pull/10744#discussion_r640840509



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
##
@@ -24,14 +24,14 @@
 import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerializer;
 import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueSerializer;
 
-public class SinkNode extends ProcessorNode {
+public class SinkNode extends ProcessorNode {

Review comment:
   We already throw an exception if you try and add a child. I think it 
would complicate any of our processor graph traversal algorithms if we made it 
illegal to even call getChildren, as they would have to type-check the nodes 
before traversing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10780: KAFKA-12782: Fix Javadocs generation by upgrading JDK

2021-05-27 Thread GitBox


jlprat commented on pull request #10780:
URL: https://github.com/apache/kafka/pull/10780#issuecomment-849822082


   Hi, @ijuma ready for you to review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10780: KAFKA-12782: Fix Javadocs generation by upgrading JDK

2021-05-27 Thread GitBox


ijuma commented on a change in pull request #10780:
URL: https://github.com/apache/kafka/pull/10780#discussion_r640841685



##
File path: release.py
##
@@ -512,6 +512,7 @@ def command_release_announcement_email():
 
 jdk8_env = get_jdk(prefs, 8)
 jdk11_env = get_jdk(prefs, 11)

Review comment:
   Can we remove this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #10780: KAFKA-12782: Fix Javadocs generation by upgrading JDK

2021-05-27 Thread GitBox


jlprat commented on a change in pull request #10780:
URL: https://github.com/apache/kafka/pull/10780#discussion_r640841969



##
File path: release.py
##
@@ -512,6 +512,7 @@ def command_release_announcement_email():
 
 jdk8_env = get_jdk(prefs, 8)
 jdk11_env = get_jdk(prefs, 11)

Review comment:
   Most probably, but I never executed that part of the script.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #10780: KAFKA-12782: Fix Javadocs generation by upgrading JDK

2021-05-27 Thread GitBox


jlprat commented on a change in pull request #10780:
URL: https://github.com/apache/kafka/pull/10780#discussion_r640842878



##
File path: release.py
##
@@ -512,6 +512,7 @@ def command_release_announcement_email():
 
 jdk8_env = get_jdk(prefs, 8)
 jdk11_env = get_jdk(prefs, 11)

Review comment:
   I will push a commit without this line




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10780: KAFKA-12782: Fix Javadocs generation by upgrading JDK

2021-05-27 Thread GitBox


ijuma commented on a change in pull request #10780:
URL: https://github.com/apache/kafka/pull/10780#discussion_r640842976



##
File path: release.py
##
@@ -512,6 +512,7 @@ def command_release_announcement_email():
 
 jdk8_env = get_jdk(prefs, 8)
 jdk11_env = get_jdk(prefs, 11)

Review comment:
   Can you search the file for references to this? If there are none, 
please remove it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #10780: KAFKA-12782: Fix Javadocs generation by upgrading JDK

2021-05-27 Thread GitBox


jlprat commented on a change in pull request #10780:
URL: https://github.com/apache/kafka/pull/10780#discussion_r640843605



##
File path: release.py
##
@@ -512,6 +512,7 @@ def command_release_announcement_email():
 
 jdk8_env = get_jdk(prefs, 8)
 jdk11_env = get_jdk(prefs, 11)

Review comment:
   The only reference was at line 604, which I replaced for JDK15




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on a change in pull request #10780: KAFKA-12782: Fix Javadocs generation by upgrading JDK

2021-05-27 Thread GitBox


jlprat commented on a change in pull request #10780:
URL: https://github.com/apache/kafka/pull/10780#discussion_r640844354



##
File path: release.py
##
@@ -512,6 +512,7 @@ def command_release_announcement_email():
 
 jdk8_env = get_jdk(prefs, 8)
 jdk11_env = get_jdk(prefs, 11)

Review comment:
   Pushed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10780: KAFKA-12782: Fix Javadocs generation by upgrading JDK

2021-05-27 Thread GitBox


jlprat commented on pull request #10780:
URL: https://github.com/apache/kafka/pull/10780#issuecomment-849834440


   An easy way to test this locally, is to search a Kafka class that contains a 
Javadoc link to a Standard Java Class.
   For example, searching for `KafkaFuture` and then clicking on `Throwable`. 
Both search click and `Throwable` click should work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8295) Optimize count() using RocksDB merge operator

2021-05-27 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352731#comment-17352731
 ] 

A. Sophie Blee-Goldman commented on KAFKA-8295:
---

Thanks for the initial results – I think it would be valuable to try plugging 
it into Kafka Streams with a basic POC and then running some kind of throughput 
benchmarks. I imagine you can get some idea of how well this works even with 
some very rough benchmarks, for example loading up an input topic with a very 
large amount of data and then using the TopologyTestDriver to compare how many 
records can be processed within some constant time (eg 5 minutes) between the 
POC and the original. As long as there is enough input data to ensure it won't 
run out of records to process before that time limit is up, this should give us 
a good sense of how the merge operator compares. Does that make sense? It may 
be that the jmh benchmarks for the ByteBuffer optimization could be reused for 
this too

> Optimize count() using RocksDB merge operator
> -
>
> Key: KAFKA-8295
> URL: https://issues.apache.org/jira/browse/KAFKA-8295
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Major
>
> In addition to regular put/get/delete RocksDB provides a fourth operation, 
> merge. This essentially provides an optimized read/update/write path in a 
> single operation. One of the built-in (C++) merge operators exposed over the 
> Java API is a counter. We should be able to leverage this for a more 
> efficient implementation of count()
>  
> (Note: Unfortunately it seems unlikely we can use this to optimize general 
> aggregations, even if RocksJava allowed for a custom merge operator, unless 
> we provide a way for the user to specify and connect a C++ implemented 
> aggregator – otherwise we incur too much cost crossing the jni for a net 
> performance benefit)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5892) Connector property override does not work unless setting ALL converter properties

2021-05-27 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17352737#comment-17352737
 ] 

Randall Hauch commented on KAFKA-5892:
--

[~dc-heros], I've added your account as a contributor to the project, so you 
should now be able to self-assign KAFKA issues. Please try to assign this to 
yourself, and let me know here if you cannot. Thanks for volunteering to take 
this up!

> Connector property override does not work unless setting ALL converter 
> properties
> -
>
> Key: KAFKA-5892
> URL: https://issues.apache.org/jira/browse/KAFKA-5892
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Yeva Byzek
>Assignee: Jitendra Sahu
>Priority: Minor
>  Labels: newbie
>
> A single connector setting override {{value.converter.schemas.enable=false}} 
> only takes effect if ALL of the converter properties are overridden in the 
> connector.
> At minimum, we should give user warning or error that this is will be ignored.
> We should also consider changing the behavior to allow the single property 
> override even if all the converter properties are not specified, but this 
> requires discussion to evaluate the impact of this change.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #10761: MINOR: Don't ignore deletion of partition metadata file and log topic id clean-ups

2021-05-27 Thread GitBox


ijuma commented on pull request #10761:
URL: https://github.com/apache/kafka/pull/10761#issuecomment-849931983


   JDK 11 build passed, a few unrelated failures for the other builds:
   
   > Build / JDK 8 and Scala 2.12 / testMetricsDuringTopicCreateDelete() – 
kafka.integration.MetricsDuringTopicCreationDeletionTest
   > 16s
   > Build / JDK 15 and Scala 2.13 / testMetricsDuringTopicCreateDelete() – 
kafka.integration.MetricsDuringTopicCreationDeletionTest
   > 7s
   > Build / JDK 15 and Scala 2.13 / 
testCreateClusterAndCreateAndManyTopicsWithManyPartitions() – 
kafka.server.RaftClusterTest
   > 19s
   > Build / JDK 15 and Scala 2.13 / 
testCreateClusterAndWaitForBrokerInRunningState() – kafka.server.RaftClusterTest
   > 1m 11s
   > Build / JDK 15 and Scala 2.13 / 
testCreateClusterAndCreateListDeleteTopic() – kafka.server.RaftClusterTest
   > 16s
   > Build / JDK 15 and Scala 2.13 / 
testCreateClusterAndCreateListDeleteTopic() – kafka.server.RaftClusterTest


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10761: MINOR: Don't ignore deletion of partition metadata file and log topic id clean-ups

2021-05-27 Thread GitBox


ijuma merged pull request #10761:
URL: https://github.com/apache/kafka/pull/10761


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10704: KAFKA-12791: ConcurrentModificationException in AbstractConfig use by KafkaProducer

2021-05-27 Thread GitBox


ijuma commented on pull request #10704:
URL: https://github.com/apache/kafka/pull/10704#issuecomment-849934626


   Unrelated flaky tests:
   
   > Build / JDK 15 and Scala 2.13 / 
kafka.network.ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
   > Build / JDK 11 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
   > Build / JDK 11 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   > Build / JDK 8 and Scala 2.12 / 
kafka.api.PlaintextConsumerTest.testMultiConsumerDefaultAssignment()
   > Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10704: KAFKA-12791: ConcurrentModificationException in AbstractConfig use by KafkaProducer

2021-05-27 Thread GitBox


ijuma merged pull request #10704:
URL: https://github.com/apache/kafka/pull/10704


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >