[GitHub] [kafka] chia7712 commented on pull request #10567: Ensure `ignorable` is a boolean value.

2021-04-21 Thread GitBox


chia7712 commented on pull request #10567:
URL: https://github.com/apache/kafka/pull/10567#issuecomment-823837778


   > Does it cause any failures?
   
   According to source code 
(https://github.com/FasterXML/jackson-databind/blob/2.10/src/main/java/com/fasterxml/jackson/databind/deser/std/NumberDeserializers.java#L242),
 it should be fine to use "true" (string type). However, I like consistent 
value (boolean type) :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12700) The admin.listeners config has wonky valid values in the docs

2021-04-21 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-12700:
-

Assignee: Luke Chen

> The admin.listeners config has wonky valid values in the docs
> -
>
> Key: KAFKA-12700
> URL: https://issues.apache.org/jira/browse/KAFKA-12700
> Project: Kafka
>  Issue Type: Bug
>  Components: docs, KafkaConnect
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Major
>
> Noticed this while updating the docs for the 2.6.2 release, the docs for 
> these configs are generated from the config definition, including info such 
> as default, type, valid values, etc. 
> When defining WorkerConfig.ADMIN_LISTENERS_CONFIG we seem to pass an actual 
> `new AdminListenersValidator()` object in for the "valid values" parameter, 
> causing this field to display some wonky useless object reference in the 
> docs. See 
> https://kafka.apache.org/documentation/#connectconfigs_admin.listeners:
> Valid Values: 
> org.apache.kafka.connect.runtime.WorkerConfig$AdminListenersValidator@383534aa



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #10446: KAFKA-12661 ConfigEntry#equal does not compare other fields when value is NOT null

2021-04-21 Thread GitBox


chia7712 commented on pull request #10446:
URL: https://github.com/apache/kafka/pull/10446#issuecomment-823838830


   ```
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplication()
   ```
   
   unrelated error. merge trunk to trigger QA again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12701) NPE in MetadataRequest when using topic IDs

2021-04-21 Thread Travis Bischel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326316#comment-17326316
 ] 

Travis Bischel commented on KAFKA-12701:


Metadata v11 is supported in 2.8.0. I've added support for specifying topic IDs 
in my {{kcl}} command line client 
[here|https://github.com/twmb/kcl/commit/a09f6f8cca4f87b878d943a2bed4ef8ed2e10a7e]
 (you'd need to build off of master if you want to test this yourself).

{noformat}
[01:20:51]
twmb@h4x3r:~
$ kcl admin topic create foo
NAME  IDMESSAGE
foo   6a62c01129a341c8a0231f7b3c21bc9b  OK

[01:20:57]
twmb@h4x3r:~
$ kcl metadata -t --ids 6a62c01129a341c8a0231f7b3c21bc9b
^C

[01:21:11]
twmb@h4x3r:~
$ kcl metadata -t foo
NAME  IDPARTITIONS  REPLICAS
foo   6a62c01129a341c8a0231f7b3c21bc9b  20  1

[01:21:16]
twmb@h4x3r:~
$ kcl admin topic delete --ids 6a62c01129a341c8a0231f7b3c21bc9b
foo   OK
{noformat}

What this is doing is issuing a metadata request that has a single topic where 
the topic name is null and the topic ID is the hex-decoded ID that is passed in.

> NPE in MetadataRequest when using topic IDs
> ---
>
> Key: KAFKA-12701
> URL: https://issues.apache.org/jira/browse/KAFKA-12701
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Travis Bischel
>Assignee: dengziming
>Priority: Major
>
> Authorized result checking relies on topic name to not be null, which, when 
> using topic IDs, it is.
> Unlike the logic in handleDeleteTopicsRequest, hanelMetadataRequest does not 
> check zk for the names corresponding to topic IDs if topic IDs are present.
> {noformat}
> [2021-04-21 05:53:01,463] ERROR [KafkaApi-1] Error when handling request: 
> clientId=kgo, correlationId=1, api=METADATA, version=11, 
> body=MetadataRequestData(topics=[MetadataRequestTopic(topicId=LmqOoFOASnqQp_4-oJgeKA,
>  name=null)], allowAutoTopicCreation=false, 
> includeClusterAuthorizedOperations=false, 
> includeTopicAuthorizedOperations=false) (kafka.server.RequestHandlerHelper)
> java.lang.NullPointerException: name
>   at java.base/java.util.Objects.requireNonNull(Unknown Source)
>   at 
> org.apache.kafka.common.resource.ResourcePattern.(ResourcePattern.java:50)
>   at 
> kafka.server.AuthHelper.$anonfun$filterByAuthorized$3(AuthHelper.scala:121)
>   at scala.collection.Iterator$$anon$9.next(Iterator.scala:575)
>   at scala.collection.mutable.Growable.addAll(Growable.scala:62)
>   at scala.collection.mutable.Growable.addAll$(Growable.scala:57)
>   at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:142)
>   at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:42)
>   at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:258)
>   at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:247)
>   at scala.collection.SeqFactory$Delegate.from(Factory.scala:306)
>   at scala.collection.IterableOnceOps.toBuffer(IterableOnce.scala:1270)
>   at scala.collection.IterableOnceOps.toBuffer$(IterableOnce.scala:1270)
>   at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1288)
>   at kafka.server.AuthHelper.filterByAuthorized(AuthHelper.scala:120)
>   at 
> kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:1146)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:170)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
>   at java.base/java.lang.Thread.run(Unknown Source)
> [2021-04-21 05:53:01,464] ERROR [Kafka Request Handler 1 on Broker 1], 
> Exception when handling request (kafka.server.KafkaRequestHandler)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.message.MetadataResponseData$MetadataResponseTopic.addSize(MetadataResponseData.java:1247)
>   at 
> org.apache.kafka.common.message.MetadataResponseData.addSize(MetadataResponseData.java:417)
>   at 
> org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
>   at 
> org.apache.kafka.common.protocol.SendBuilder.buildResponseSend(SendBuilder.java:200)
>   at 
> org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:43)
>   at 
> org.apache.kafka.common.requests.RequestContext.buildResponseSend(RequestContext.java:111)
>   at 
> kafka.network.RequestChannel$Request.buildResponseSend(RequestChannel.scala:132)
>   at 
> kafka.server.RequestHandlerHelper.sendResponse(RequestHandlerHelper.scala:185)
>   at 
> kafka.server.RequestHandlerHelper.sendErrorOrCloseConnection(RequestHandlerHelper.scala:155)
>   at 
> kafka.server.RequestHandlerHelper.sendErrorResponseMaybeThrottle(RequestHandlerHelper.scala:109)
>   at 
> kafka.server.RequestHandlerHelper.h

[GitHub] [kafka] showuon opened a new pull request #10574: KAFKA-12700: override toString method to show correct value in doc

2021-04-21 Thread GitBox


showuon opened a new pull request #10574:
URL: https://github.com/apache/kafka/pull/10574


   We use customized validator here, but forgot to override the `toString` 
method to show correct value in doc.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10574: KAFKA-12700: override toString method to show correct value in doc

2021-04-21 Thread GitBox


showuon commented on pull request #10574:
URL: https://github.com/apache/kafka/pull/10574#issuecomment-823848128


   @rhauch @wicknicks , could you help review this PR to update the doc? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12700) The admin.listeners config has wonky valid values in the docs

2021-04-21 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326325#comment-17326325
 ] 

Luke Chen commented on KAFKA-12700:
---

Nice catch!!

> The admin.listeners config has wonky valid values in the docs
> -
>
> Key: KAFKA-12700
> URL: https://issues.apache.org/jira/browse/KAFKA-12700
> Project: Kafka
>  Issue Type: Bug
>  Components: docs, KafkaConnect
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Major
>
> Noticed this while updating the docs for the 2.6.2 release, the docs for 
> these configs are generated from the config definition, including info such 
> as default, type, valid values, etc. 
> When defining WorkerConfig.ADMIN_LISTENERS_CONFIG we seem to pass an actual 
> `new AdminListenersValidator()` object in for the "valid values" parameter, 
> causing this field to display some wonky useless object reference in the 
> docs. See 
> https://kafka.apache.org/documentation/#connectconfigs_admin.listeners:
> Valid Values: 
> org.apache.kafka.connect.runtime.WorkerConfig$AdminListenersValidator@383534aa



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on pull request #10567: Ensure `ignorable` is a boolean value.

2021-04-21 Thread GitBox


dajac commented on pull request #10567:
URL: https://github.com/apache/kafka/pull/10567#issuecomment-823855071


   > > Does it cause any failures?
   > 
   > According to source code 
(https://github.com/FasterXML/jackson-databind/blob/2.10/src/main/java/com/fasterxml/jackson/databind/deser/std/NumberDeserializers.java#L242),
 it should be fine to use "true" (string type). However, I like consistent 
value (boolean type) :)
   
   Sounds good, thanks for the clarification.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12702) Unhandled exception caught in InterBrokerSendThread

2021-04-21 Thread Wenbing Shen (Jira)
Wenbing Shen created KAFKA-12702:


 Summary: Unhandled exception caught in InterBrokerSendThread
 Key: KAFKA-12702
 URL: https://issues.apache.org/jira/browse/KAFKA-12702
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.8.0
Reporter: Wenbing Shen
 Attachments: afterFixing.png, beforeFixing.png

In kraft mode, if listeners and advertised.listeners are not configured with 
host addresses, the host parameter value of Listener in 
BrokerRegistrationRequestData will be null. When the broker is started, a null 
pointer exception will be thrown, causing startup failure.

A feasible solution is to replace the empty host of endPoint in 
advertisedListeners with InetAddress.getLocalHost.getCanonicalHostName in 
Broker Server when building networkListeners.

The following is the debug log:

before fixing:

[2021-04-21 14:15:20,032] DEBUG (broker-2-to-controller-send-thread 
org.apache.kafka.clients.NetworkClient 522) [broker-2-to-controller] Sending 
BROKER_REGISTRATION request with header RequestHeader(apiKey=BROKER_REGIS
TRATION, apiVersion=0, clientId=2, correlationId=6) and timeout 3 to node 
2: BrokerRegistrationRequestData(brokerId=2, 
clusterId='nCqve6D1TEef3NpQniA0Mg', incarnationId=X8w4_1DFT2yUjOm6asPjIQ, 
listeners=[Listener(n
ame='PLAINTEXT', {color:#FF}host=null,{color} port=9092, 
securityProtocol=0)], features=[], rack=null)
[2021-04-21 14:15:20,033] ERROR (broker-2-to-controller-send-thread 
kafka.server.BrokerToControllerRequestThread 76) 
[broker-2-to-controller-send-thread]: unhandled exception caught in 
InterBrokerSendThread
java.lang.NullPointerException
 at 
org.apache.kafka.common.message.BrokerRegistrationRequestData$Listener.addSize(BrokerRegistrationRequestData.java:515)
 at 
org.apache.kafka.common.message.BrokerRegistrationRequestData.addSize(BrokerRegistrationRequestData.java:216)
 at org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
 at 
org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)
 at 
org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101)
 at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:525)
 at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:501)
 at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:461)
 at 
kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1(InterBrokerSendThread.scala:104)
 at 
kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1$adapted(InterBrokerSendThread.scala:99)
 at kafka.common.InterBrokerSendThread$$Lambda$259/910445654.apply(Unknown 
Source)
 at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
 at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
 at 
kafka.common.InterBrokerSendThread.sendRequests(InterBrokerSendThread.scala:99)
 at kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:73)
 at 
kafka.server.BrokerToControllerRequestThread.doWork(BrokerToControllerChannelManager.scala:368)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-04-21 14:15:20,034] INFO (broker-2-to-controller-send-thread 
kafka.server.BrokerToControllerRequestThread 66) 
[broker-2-to-controller-send-thread]: Stopped



after fixing:

[2021-04-21 15:05:01,095] DEBUG (BrokerToControllerChannelManager broker=2 
name=heartbeat org.apache.kafka.clients.NetworkClient 512) 
[BrokerToControllerChannelManager broker=2 name=heartbeat] Sending 
BROKER_REGISTRATI
ON request with header RequestHeader(apiKey=BROKER_REGISTRATION, apiVersion=0, 
clientId=2, correlationId=0) and timeout 3 to node 2: 
BrokerRegistrationRequestData(brokerId=2, clusterId='nCqve6D1TEef3NpQniA0Mg', 
inc
arnationId=xF29h_IRR1KzrERWwssQ2w, listeners=[Listener(name='PLAINTEXT', 
host='hdpxxx.cn', port=9092, securityProtocol=0)], features=[], rack=null)

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #10564: MINOR: clean up some replication code

2021-04-21 Thread GitBox


chia7712 commented on a change in pull request #10564:
URL: https://github.com/apache/kafka/pull/10564#discussion_r617297824



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -321,22 +339,17 @@ public void replay(PartitionRecord record) {
 }
 PartitionControlInfo newPartInfo = new PartitionControlInfo(record);
 PartitionControlInfo prevPartInfo = 
topicInfo.parts.get(record.partitionId());
+String topicPart = topicInfo.name + "-" + record.partitionId();
 if (prevPartInfo == null) {
-log.info("Created partition {}:{} with {}.", record.topicId(),
-record.partitionId(), newPartInfo.toString());
+log.info("Created partition {} with {}.", topicPart, 
newPartInfo.toString());

Review comment:
   Is it intentional to use "name" to replace "id"?

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1119,6 +1053,62 @@ void validateManualPartitionAssignment(List 
assignment,
 }
 }
 
+void generateLeaderAndIsrUpdates(String context,
+ int brokerToRemoveFromIsr,
+ List records,
+ Iterator iterator) {
+int oldSize = records.size();
+while (iterator.hasNext()) {
+TopicIdPartition topicIdPart = iterator.next();
+TopicControlInfo topic = topics.get(topicIdPart.topicId());
+if (topic == null) {
+throw new RuntimeException("Topic ID " + topicIdPart.topicId() 
+ " existed in " +
+"isrMembers, but not in the topics map.");
+}
+PartitionControlInfo partition = 
topic.parts.get(topicIdPart.partitionId());
+if (partition == null) {
+throw new RuntimeException("Partition " + topicIdPart +
+" existed in isrMembers, but not in the partitions map.");
+}
+int[] newIsr = Replicas.copyWithout(partition.isr, 
brokerToRemoveFromIsr);
+int newLeader = Replicas.contains(newIsr, partition.leader) ? 
partition.leader :
+bestLeader(partition.replicas, newIsr, false);
+boolean unclean = newLeader != NO_LEADER && 
!Replicas.contains(newIsr, newLeader);

Review comment:
   Could we reuse `wasCleanlyDerivedFrom` to get "unclean"?

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -907,13 +843,13 @@ ApiError electLeader(String topic, int partitionId, 
boolean unclean,
 }
 PartitionChangeRecord record = new PartitionChangeRecord().
 setPartitionId(partitionId).
-setTopicId(topicId);
-if (unclean && !Replicas.contains(partitionInfo.isr, newLeader)) {
-// If the election was unclean, we may have to forcibly add the 
replica to
-// the ISR.  This can result in data loss!
+setTopicId(topicId).
+setLeader(newLeader);
+if (!Replicas.contains(partitionInfo.isr, newLeader)) {
+// If the election was unclean, we have to forcibly set the ISR to 
just the
+// new leader.  This can result in data loss!

Review comment:
   redundant "space"

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1119,6 +1053,62 @@ void validateManualPartitionAssignment(List 
assignment,
 }
 }
 
+void generateLeaderAndIsrUpdates(String context,
+ int brokerToRemoveFromIsr,
+ List records,
+ Iterator iterator) {
+int oldSize = records.size();
+while (iterator.hasNext()) {
+TopicIdPartition topicIdPart = iterator.next();
+TopicControlInfo topic = topics.get(topicIdPart.topicId());
+if (topic == null) {
+throw new RuntimeException("Topic ID " + topicIdPart.topicId() 
+ " existed in " +
+"isrMembers, but not in the topics map.");
+}
+PartitionControlInfo partition = 
topic.parts.get(topicIdPart.partitionId());
+if (partition == null) {
+throw new RuntimeException("Partition " + topicIdPart +
+" existed in isrMembers, but not in the partitions map.");
+}
+int[] newIsr = Replicas.copyWithout(partition.isr, 
brokerToRemoveFromIsr);
+int newLeader = Replicas.contains(newIsr, partition.leader) ? 
partition.leader :
+bestLeader(partition.replicas, newIsr, false);
+boolean unclean = newLeader != NO_LEADER && 
!Replicas.contains(newIsr, newLeader);
+if (unclean) {
+// After an unclean leader ele

[jira] [Updated] (KAFKA-12702) Unhandled exception caught in InterBrokerSendThread

2021-04-21 Thread Wenbing Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbing Shen updated KAFKA-12702:
-
Attachment: image-2021-04-21-17-12-28-471.png

> Unhandled exception caught in InterBrokerSendThread
> ---
>
> Key: KAFKA-12702
> URL: https://issues.apache.org/jira/browse/KAFKA-12702
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Wenbing Shen
>Priority: Blocker
> Attachments: afterFixing.png, beforeFixing.png, 
> image-2021-04-21-17-12-28-471.png
>
>
> In kraft mode, if listeners and advertised.listeners are not configured with 
> host addresses, the host parameter value of Listener in 
> BrokerRegistrationRequestData will be null. When the broker is started, a 
> null pointer exception will be thrown, causing startup failure.
> A feasible solution is to replace the empty host of endPoint in 
> advertisedListeners with InetAddress.getLocalHost.getCanonicalHostName in 
> Broker Server when building networkListeners.
> The following is the debug log:
> before fixing:
> [2021-04-21 14:15:20,032] DEBUG (broker-2-to-controller-send-thread 
> org.apache.kafka.clients.NetworkClient 522) [broker-2-to-controller] Sending 
> BROKER_REGISTRATION request with header RequestHeader(apiKey=BROKER_REGIS
> TRATION, apiVersion=0, clientId=2, correlationId=6) and timeout 3 to node 
> 2: BrokerRegistrationRequestData(brokerId=2, 
> clusterId='nCqve6D1TEef3NpQniA0Mg', incarnationId=X8w4_1DFT2yUjOm6asPjIQ, 
> listeners=[Listener(n
> ame='PLAINTEXT', {color:#FF}host=null,{color} port=9092, 
> securityProtocol=0)], features=[], rack=null)
> [2021-04-21 14:15:20,033] ERROR (broker-2-to-controller-send-thread 
> kafka.server.BrokerToControllerRequestThread 76) 
> [broker-2-to-controller-send-thread]: unhandled exception caught in 
> InterBrokerSendThread
> java.lang.NullPointerException
>  at 
> org.apache.kafka.common.message.BrokerRegistrationRequestData$Listener.addSize(BrokerRegistrationRequestData.java:515)
>  at 
> org.apache.kafka.common.message.BrokerRegistrationRequestData.addSize(BrokerRegistrationRequestData.java:216)
>  at 
> org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
>  at 
> org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)
>  at 
> org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101)
>  at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:525)
>  at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:501)
>  at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:461)
>  at 
> kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1(InterBrokerSendThread.scala:104)
>  at 
> kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1$adapted(InterBrokerSendThread.scala:99)
>  at kafka.common.InterBrokerSendThread$$Lambda$259/910445654.apply(Unknown 
> Source)
>  at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
>  at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
>  at 
> kafka.common.InterBrokerSendThread.sendRequests(InterBrokerSendThread.scala:99)
>  at 
> kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:73)
>  at 
> kafka.server.BrokerToControllerRequestThread.doWork(BrokerToControllerChannelManager.scala:368)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
> [2021-04-21 14:15:20,034] INFO (broker-2-to-controller-send-thread 
> kafka.server.BrokerToControllerRequestThread 66) 
> [broker-2-to-controller-send-thread]: Stopped
> after fixing:
> [2021-04-21 15:05:01,095] DEBUG (BrokerToControllerChannelManager broker=2 
> name=heartbeat org.apache.kafka.clients.NetworkClient 512) 
> [BrokerToControllerChannelManager broker=2 name=heartbeat] Sending 
> BROKER_REGISTRATI
> ON request with header RequestHeader(apiKey=BROKER_REGISTRATION, 
> apiVersion=0, clientId=2, correlationId=0) and timeout 3 to node 2: 
> BrokerRegistrationRequestData(brokerId=2, clusterId='nCqve6D1TEef3NpQniA0Mg', 
> inc
> arnationId=xF29h_IRR1KzrERWwssQ2w, listeners=[Listener(name='PLAINTEXT', 
> host='hdpxxx.cn', port=9092, securityProtocol=0)], features=[], rack=null)
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12702) Unhandled exception caught in InterBrokerSendThread

2021-04-21 Thread Wenbing Shen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326387#comment-17326387
 ] 

Wenbing Shen commented on KAFKA-12702:
--

We need to fix this problem, because according to the comments in the 
configuration file (config/kraft/server.properties), if listeners and 
advertised.listeners are not configured with an address, the program will 
automatically obtain java.net.InetAddress.getCanonicalHostName(), but this will 
actually cause the service to fail to start. !image-2021-04-21-17-12-28-471.png!

> Unhandled exception caught in InterBrokerSendThread
> ---
>
> Key: KAFKA-12702
> URL: https://issues.apache.org/jira/browse/KAFKA-12702
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Wenbing Shen
>Priority: Blocker
> Attachments: afterFixing.png, beforeFixing.png, 
> image-2021-04-21-17-12-28-471.png
>
>
> In kraft mode, if listeners and advertised.listeners are not configured with 
> host addresses, the host parameter value of Listener in 
> BrokerRegistrationRequestData will be null. When the broker is started, a 
> null pointer exception will be thrown, causing startup failure.
> A feasible solution is to replace the empty host of endPoint in 
> advertisedListeners with InetAddress.getLocalHost.getCanonicalHostName in 
> Broker Server when building networkListeners.
> The following is the debug log:
> before fixing:
> [2021-04-21 14:15:20,032] DEBUG (broker-2-to-controller-send-thread 
> org.apache.kafka.clients.NetworkClient 522) [broker-2-to-controller] Sending 
> BROKER_REGISTRATION request with header RequestHeader(apiKey=BROKER_REGIS
> TRATION, apiVersion=0, clientId=2, correlationId=6) and timeout 3 to node 
> 2: BrokerRegistrationRequestData(brokerId=2, 
> clusterId='nCqve6D1TEef3NpQniA0Mg', incarnationId=X8w4_1DFT2yUjOm6asPjIQ, 
> listeners=[Listener(n
> ame='PLAINTEXT', {color:#FF}host=null,{color} port=9092, 
> securityProtocol=0)], features=[], rack=null)
> [2021-04-21 14:15:20,033] ERROR (broker-2-to-controller-send-thread 
> kafka.server.BrokerToControllerRequestThread 76) 
> [broker-2-to-controller-send-thread]: unhandled exception caught in 
> InterBrokerSendThread
> java.lang.NullPointerException
>  at 
> org.apache.kafka.common.message.BrokerRegistrationRequestData$Listener.addSize(BrokerRegistrationRequestData.java:515)
>  at 
> org.apache.kafka.common.message.BrokerRegistrationRequestData.addSize(BrokerRegistrationRequestData.java:216)
>  at 
> org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
>  at 
> org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)
>  at 
> org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101)
>  at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:525)
>  at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:501)
>  at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:461)
>  at 
> kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1(InterBrokerSendThread.scala:104)
>  at 
> kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1$adapted(InterBrokerSendThread.scala:99)
>  at kafka.common.InterBrokerSendThread$$Lambda$259/910445654.apply(Unknown 
> Source)
>  at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
>  at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
>  at 
> kafka.common.InterBrokerSendThread.sendRequests(InterBrokerSendThread.scala:99)
>  at 
> kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:73)
>  at 
> kafka.server.BrokerToControllerRequestThread.doWork(BrokerToControllerChannelManager.scala:368)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
> [2021-04-21 14:15:20,034] INFO (broker-2-to-controller-send-thread 
> kafka.server.BrokerToControllerRequestThread 66) 
> [broker-2-to-controller-send-thread]: Stopped
> after fixing:
> [2021-04-21 15:05:01,095] DEBUG (BrokerToControllerChannelManager broker=2 
> name=heartbeat org.apache.kafka.clients.NetworkClient 512) 
> [BrokerToControllerChannelManager broker=2 name=heartbeat] Sending 
> BROKER_REGISTRATI
> ON request with header RequestHeader(apiKey=BROKER_REGISTRATION, 
> apiVersion=0, clientId=2, correlationId=0) and timeout 3 to node 2: 
> BrokerRegistrationRequestData(brokerId=2, clusterId='nCqve6D1TEef3NpQniA0Mg', 
> inc
> arnationId=xF29h_IRR1KzrERWwssQ2w, listeners=[Listener(name='PLAINTEXT', 
> host='hdpxxx.cn', port=9092, securityProtocol=0)], features=[], rack=null)
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12701) NPE in MetadataRequest when using topic IDs

2021-04-21 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326392#comment-17326392
 ] 

dengziming commented on KAFKA-12701:


In fact, we first planned to support topicId in MetadataRequest and 
MetadataResponse, and we added 2 tickets KAFKA-10547 and  KAFKA-10774.

though KAFKA-10547 add topicId in MetadataResponse and MetadataRequest, 
describe topic using topicId is supported by KAFKA-10774.

sadly KAFKA-10547 PR was merged but KAFKA-10774 PR was held off, for more 
details: [https://github.com/apache/kafka/pull/9769#issuecomment-772830472]

I think this is a bug so we should fix this for 2.8.0, I will submit a PR 
target at 2.8.0 to remove TopicID from MetadataRequest since it isn't supported.

> NPE in MetadataRequest when using topic IDs
> ---
>
> Key: KAFKA-12701
> URL: https://issues.apache.org/jira/browse/KAFKA-12701
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Travis Bischel
>Assignee: dengziming
>Priority: Major
>
> Authorized result checking relies on topic name to not be null, which, when 
> using topic IDs, it is.
> Unlike the logic in handleDeleteTopicsRequest, hanelMetadataRequest does not 
> check zk for the names corresponding to topic IDs if topic IDs are present.
> {noformat}
> [2021-04-21 05:53:01,463] ERROR [KafkaApi-1] Error when handling request: 
> clientId=kgo, correlationId=1, api=METADATA, version=11, 
> body=MetadataRequestData(topics=[MetadataRequestTopic(topicId=LmqOoFOASnqQp_4-oJgeKA,
>  name=null)], allowAutoTopicCreation=false, 
> includeClusterAuthorizedOperations=false, 
> includeTopicAuthorizedOperations=false) (kafka.server.RequestHandlerHelper)
> java.lang.NullPointerException: name
>   at java.base/java.util.Objects.requireNonNull(Unknown Source)
>   at 
> org.apache.kafka.common.resource.ResourcePattern.(ResourcePattern.java:50)
>   at 
> kafka.server.AuthHelper.$anonfun$filterByAuthorized$3(AuthHelper.scala:121)
>   at scala.collection.Iterator$$anon$9.next(Iterator.scala:575)
>   at scala.collection.mutable.Growable.addAll(Growable.scala:62)
>   at scala.collection.mutable.Growable.addAll$(Growable.scala:57)
>   at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:142)
>   at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:42)
>   at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:258)
>   at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:247)
>   at scala.collection.SeqFactory$Delegate.from(Factory.scala:306)
>   at scala.collection.IterableOnceOps.toBuffer(IterableOnce.scala:1270)
>   at scala.collection.IterableOnceOps.toBuffer$(IterableOnce.scala:1270)
>   at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1288)
>   at kafka.server.AuthHelper.filterByAuthorized(AuthHelper.scala:120)
>   at 
> kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:1146)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:170)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
>   at java.base/java.lang.Thread.run(Unknown Source)
> [2021-04-21 05:53:01,464] ERROR [Kafka Request Handler 1 on Broker 1], 
> Exception when handling request (kafka.server.KafkaRequestHandler)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.message.MetadataResponseData$MetadataResponseTopic.addSize(MetadataResponseData.java:1247)
>   at 
> org.apache.kafka.common.message.MetadataResponseData.addSize(MetadataResponseData.java:417)
>   at 
> org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
>   at 
> org.apache.kafka.common.protocol.SendBuilder.buildResponseSend(SendBuilder.java:200)
>   at 
> org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:43)
>   at 
> org.apache.kafka.common.requests.RequestContext.buildResponseSend(RequestContext.java:111)
>   at 
> kafka.network.RequestChannel$Request.buildResponseSend(RequestChannel.scala:132)
>   at 
> kafka.server.RequestHandlerHelper.sendResponse(RequestHandlerHelper.scala:185)
>   at 
> kafka.server.RequestHandlerHelper.sendErrorOrCloseConnection(RequestHandlerHelper.scala:155)
>   at 
> kafka.server.RequestHandlerHelper.sendErrorResponseMaybeThrottle(RequestHandlerHelper.scala:109)
>   at 
> kafka.server.RequestHandlerHelper.handleError(RequestHandlerHelper.scala:79)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:229)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
>   at java.base/java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wenbingshen opened a new pull request #10575: KAFKA-12702: Fix NPE in networkListeners from BrokerServer

2021-04-21 Thread GitBox


wenbingshen opened a new pull request #10575:
URL: https://github.com/apache/kafka/pull/10575


   According to the comments in the configuration file 
(config/kraft/server.properties), if listeners and advertised.listeners are not 
configured with an address, the program will automatically obtain 
java.net.InetAddress.getCanonicalHostName(), but this will actually cause the 
service to fail to start. Because the host parameter value of Listener in 
BrokerRegistrationRequestData will be null.
   For more information stack, can link to this jiraId to view: 
[KAFKA-12702](https://issues.apache.org/jira/browse/KAFKA-12702)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10567: Ensure `ignorable` is a boolean value.

2021-04-21 Thread GitBox


chia7712 commented on pull request #10567:
URL: https://github.com/apache/kafka/pull/10567#issuecomment-823919500


   ```
   Build / JDK 11 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
   
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplicationWithEmptyPartition()
   Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplication()
   ```
   
   unrelated error


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12680) Failed to restart the broker in kraft mode

2021-04-21 Thread Wenbing Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbing Shen resolved KAFKA-12680.
--
Resolution: Cannot Reproduce

> Failed to restart the broker in kraft mode
> --
>
> Key: KAFKA-12680
> URL: https://issues.apache.org/jira/browse/KAFKA-12680
> Project: Kafka
>  Issue Type: Bug
>Reporter: Wenbing Shen
>Priority: Major
>
> I tested kraft mode for the first time today, I deployed a single node kraft 
> mode broker according to the documentation:
> [https://github.com/apache/kafka/blob/6d1d68617ecd023b787f54aafc24a4232663428d/config/kraft/README.md]
>  
> first step: ./bin/kafka-storage.sh random-uuid
> Second step: Use the uuid generated above to execute the following commands:
> ./bin/kafka-storage.sh format -t  -c ./config/kraft/server.properties
>  
> third step: ./bin/kafka-server-start.sh ./config/kraft/server.properties
>  
> Then I created two topics with two partitions and a single replica.
> ./bin/kafka-topics.sh --create --topic test-01 --partitions 2 
> --replication-factor 1 --bootstrap-server localhost:9092
> Verify that there is no problem with production and consumption, but when I 
> call kafka-server-stop.sh, when I call the start command again, the broker 
> starts to report an error.
> I am not sure if it is a known bug or a problem with my usage
>  
> [2021-04-18 00:19:37,443] ERROR Exiting Kafka due to fatal exception 
> (kafka.Kafka$)
> java.io.IOException: Invalid argument
>  at java.io.RandomAccessFile.setLength(Native Method)
>  at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:189)
>  at kafka.log.AbstractIndex.resize(AbstractIndex.scala:175)
>  at 
> kafka.log.AbstractIndex.$anonfun$trimToValidSize$1(AbstractIndex.scala:241)
>  at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:241)
>  at kafka.log.LogSegment.recover(LogSegment.scala:385)
>  at kafka.log.Log.recoverSegment(Log.scala:741)
>  at kafka.log.Log.recoverLog(Log.scala:894)
>  at kafka.log.Log.$anonfun$loadSegments$2(Log.scala:816)
>  at kafka.log.Log$$Lambda$153/391630194.apply$mcJ$sp(Unknown Source)
>  at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17)
>  at kafka.log.Log.retryOnOffsetOverflow(Log.scala:2456)
>  at kafka.log.Log.loadSegments(Log.scala:816)
>  at kafka.log.Log.(Log.scala:326)
>  at kafka.log.Log$.apply(Log.scala:2593)
>  at kafka.raft.KafkaMetadataLog$.apply(KafkaMetadataLog.scala:358)
>  at kafka.raft.KafkaRaftManager.buildMetadataLog(RaftManager.scala:253)
>  at kafka.raft.KafkaRaftManager.(RaftManager.scala:127)
>  at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:74)
>  at kafka.Kafka$.buildServer(Kafka.scala:79)
>  at kafka.Kafka$.main(Kafka.scala:87)
>  at kafka.Kafka.main(Kafka.scala)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #10575: KAFKA-12702: Fix NPE in networkListeners from BrokerServer

2021-04-21 Thread GitBox


chia7712 commented on a change in pull request #10575:
URL: https://github.com/apache/kafka/pull/10575#discussion_r617365981



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -272,7 +273,7 @@ class BrokerServer(
   val networkListeners = new ListenerCollection()
   config.advertisedListeners.foreach { ep =>
 networkListeners.add(new Listener().
-  setHost(ep.host).
+  setHost(if (ep.host == null || ep.host.trim.isEmpty) 
InetAddress.getLocalHost.getCanonicalHostName else ep.host).

Review comment:
   Could you use `Utils.isBlank(ep.host)`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wenbingshen commented on a change in pull request #10575: KAFKA-12702: Fix NPE in networkListeners from BrokerServer

2021-04-21 Thread GitBox


wenbingshen commented on a change in pull request #10575:
URL: https://github.com/apache/kafka/pull/10575#discussion_r617379329



##
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##
@@ -272,7 +273,7 @@ class BrokerServer(
   val networkListeners = new ListenerCollection()
   config.advertisedListeners.foreach { ep =>
 networkListeners.add(new Listener().
-  setHost(ep.host).
+  setHost(if (ep.host == null || ep.host.trim.isEmpty) 
InetAddress.getLocalHost.getCanonicalHostName else ep.host).

Review comment:
   Good idea.Thank you for your guidance and submit your feedback. :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #10576: KAFKA-12701: Remove topicId from MetadataReq since it was not supported in 2.8.0

2021-04-21 Thread GitBox


dengziming opened a new pull request #10576:
URL: https://github.com/apache/kafka/pull/10576


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   #9622 add topicId in MetadataResponse and MetadataRequest, but describe 
topic using topicId is supported by #9769 
   
   sadly #9622 PR was merged but #9769 PR was held off, for more details: 
https://github.com/apache/kafka/pull/9769#issuecomment-772830472
   I think this is a bug so we should fix this for 2.8.0.
   
   This PR should be cherry-picked to 2.8.0.
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12702) Unhandled exception caught in InterBrokerSendThread

2021-04-21 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-12702:
--

Assignee: Wenbing Shen

> Unhandled exception caught in InterBrokerSendThread
> ---
>
> Key: KAFKA-12702
> URL: https://issues.apache.org/jira/browse/KAFKA-12702
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Wenbing Shen
>Assignee: Wenbing Shen
>Priority: Blocker
> Attachments: afterFixing.png, beforeFixing.png, 
> image-2021-04-21-17-12-28-471.png
>
>
> In kraft mode, if listeners and advertised.listeners are not configured with 
> host addresses, the host parameter value of Listener in 
> BrokerRegistrationRequestData will be null. When the broker is started, a 
> null pointer exception will be thrown, causing startup failure.
> A feasible solution is to replace the empty host of endPoint in 
> advertisedListeners with InetAddress.getLocalHost.getCanonicalHostName in 
> Broker Server when building networkListeners.
> The following is the debug log:
> before fixing:
> [2021-04-21 14:15:20,032] DEBUG (broker-2-to-controller-send-thread 
> org.apache.kafka.clients.NetworkClient 522) [broker-2-to-controller] Sending 
> BROKER_REGISTRATION request with header RequestHeader(apiKey=BROKER_REGIS
> TRATION, apiVersion=0, clientId=2, correlationId=6) and timeout 3 to node 
> 2: BrokerRegistrationRequestData(brokerId=2, 
> clusterId='nCqve6D1TEef3NpQniA0Mg', incarnationId=X8w4_1DFT2yUjOm6asPjIQ, 
> listeners=[Listener(n
> ame='PLAINTEXT', {color:#FF}host=null,{color} port=9092, 
> securityProtocol=0)], features=[], rack=null)
> [2021-04-21 14:15:20,033] ERROR (broker-2-to-controller-send-thread 
> kafka.server.BrokerToControllerRequestThread 76) 
> [broker-2-to-controller-send-thread]: unhandled exception caught in 
> InterBrokerSendThread
> java.lang.NullPointerException
>  at 
> org.apache.kafka.common.message.BrokerRegistrationRequestData$Listener.addSize(BrokerRegistrationRequestData.java:515)
>  at 
> org.apache.kafka.common.message.BrokerRegistrationRequestData.addSize(BrokerRegistrationRequestData.java:216)
>  at 
> org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
>  at 
> org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187)
>  at 
> org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101)
>  at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:525)
>  at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:501)
>  at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:461)
>  at 
> kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1(InterBrokerSendThread.scala:104)
>  at 
> kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1$adapted(InterBrokerSendThread.scala:99)
>  at kafka.common.InterBrokerSendThread$$Lambda$259/910445654.apply(Unknown 
> Source)
>  at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563)
>  at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561)
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:919)
>  at 
> kafka.common.InterBrokerSendThread.sendRequests(InterBrokerSendThread.scala:99)
>  at 
> kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:73)
>  at 
> kafka.server.BrokerToControllerRequestThread.doWork(BrokerToControllerChannelManager.scala:368)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
> [2021-04-21 14:15:20,034] INFO (broker-2-to-controller-send-thread 
> kafka.server.BrokerToControllerRequestThread 66) 
> [broker-2-to-controller-send-thread]: Stopped
> after fixing:
> [2021-04-21 15:05:01,095] DEBUG (BrokerToControllerChannelManager broker=2 
> name=heartbeat org.apache.kafka.clients.NetworkClient 512) 
> [BrokerToControllerChannelManager broker=2 name=heartbeat] Sending 
> BROKER_REGISTRATI
> ON request with header RequestHeader(apiKey=BROKER_REGISTRATION, 
> apiVersion=0, clientId=2, correlationId=0) and timeout 3 to node 2: 
> BrokerRegistrationRequestData(brokerId=2, clusterId='nCqve6D1TEef3NpQniA0Mg', 
> inc
> arnationId=xF29h_IRR1KzrERWwssQ2w, listeners=[Listener(name='PLAINTEXT', 
> host='hdpxxx.cn', port=9092, securityProtocol=0)], features=[], rack=null)
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 opened a new pull request #10577: MINOR: check duplicate advertised listeners based on resolved host

2021-04-21 Thread GitBox


chia7712 opened a new pull request #10577:
URL: https://github.com/apache/kafka/pull/10577


   related to #4897
   
   I noticed this issue when reviewing #10575. With this patch, the listener 
`:12345` gets fast failure if its resolved host and port is registered already.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Iskuskov commented on a change in pull request #9541: KAFKA-10675: Add schema name to ConnectSchema.validateValue() error message

2021-04-21 Thread GitBox


Iskuskov commented on a change in pull request #9541:
URL: https://github.com/apache/kafka/pull/9541#discussion_r617423452



##
File path: 
connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
##
@@ -266,7 +267,7 @@ public static void validateValue(String name, Schema 
schema, Object value) {
 private static List expectedClassesFor(Schema schema) {
 List expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name());
 if (expectedClasses == null)
-expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type());
+expectedClasses = SCHEMA_TYPE_CLASSES.getOrDefault(schema.type(), 
Collections.emptyList());

Review comment:
   Thank you! Highly appreciate your comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-21 Thread GitBox


satishd commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r617482721



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataContextSerdes.java
##
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import 
org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataContext;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class provides serialization and deserialization for {@link 
RemoteLogMetadataContext}. This is the root serdes
+ * for the messages that are stored in internal remote log metadata topic.
+ */
+public class RemoteLogMetadataContextSerdes implements 
Serde {
+
+public static final byte REMOTE_LOG_SEGMENT_METADATA_API_KEY = (byte) new 
RemoteLogSegmentMetadataRecord().apiKey();
+public static final byte REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = 
(byte) new RemoteLogSegmentMetadataUpdateRecord().apiKey();
+public static final byte REMOTE_PARTITION_DELETE_API_KEY = (byte) new 
RemotePartitionDeleteMetadataRecord().apiKey();
+
+private static final Map KEY_TO_SERDES = 
createInternalSerde();
+
+private final Deserializer rootDeserializer;
+private final Serializer rootSerializer;
+
+public RemoteLogMetadataContextSerdes() {
+rootSerializer = (topic, data) -> serialize(data);

Review comment:
   Deserializer can be used by devs to build any tools that they need based 
on remote log metadata topic. But they can also use the deserializer and invoke 
that for each message. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-21 Thread GitBox


satishd commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r617483955



##
File path: build.gradle
##
@@ -1409,6 +1409,7 @@ project(':storage') {
   dependencies {
 implementation project(':storage:api')
 implementation project(':clients')
+implementation project(':metadata')

Review comment:
   I will remove this dependency once I move `ApiMessageAndVersion` to 
`clients` module. This is done in a followup PR. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd opened a new pull request #10578: MINOR Moved ApiMessageAndVersion and AbstractApiMessageAndVersionSerde to clients module.

2021-04-21 Thread GitBox


satishd opened a new pull request #10578:
URL: https://github.com/apache/kafka/pull/10578


   MINOR Moved ApiMessageAndVersion and AbstractApiMessageAndVersionSerde to 
clients module.
   
   Existing unit tests would be sufficient as this change is more about moving 
classes into `client` module.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on pull request #10578: MINOR Moved ApiMessageAndVersion and AbstractApiMessageAndVersionSerde to clients module.

2021-04-21 Thread GitBox


satishd commented on pull request #10578:
URL: https://github.com/apache/kafka/pull/10578#issuecomment-824024671


   @junrao  This PR is based on https://github.com/apache/kafka/pull/10271. So, 
this should be merged only after https://github.com/apache/kafka/pull/10271 is 
merged. 
   Review can be done starting with the commit 
https://github.com/apache/kafka/pull/10578/commits/223ce42ea2a9f18798b5300f386df1cc15d60c31


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-21 Thread GitBox


satishd commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r617483955



##
File path: build.gradle
##
@@ -1409,6 +1409,7 @@ project(':storage') {
   dependencies {
 implementation project(':storage:api')
 implementation project(':clients')
+implementation project(':metadata')

Review comment:
   I will remove this dependency once I move `ApiMessageAndVersion` to 
`clients` module. This is done in a followup PR 
https://github.com/apache/kafka/pull/10271. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12680) Failed to restart the broker in kraft mode

2021-04-21 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326499#comment-17326499
 ] 

Ismael Juma commented on KAFKA-12680:
-

Hi [~wenbing.shen]. Thanks for the ticket. Can you please explain why you 
closed it? You were unable to reproduce it after seeing this once?

> Failed to restart the broker in kraft mode
> --
>
> Key: KAFKA-12680
> URL: https://issues.apache.org/jira/browse/KAFKA-12680
> Project: Kafka
>  Issue Type: Bug
>Reporter: Wenbing Shen
>Priority: Major
>
> I tested kraft mode for the first time today, I deployed a single node kraft 
> mode broker according to the documentation:
> [https://github.com/apache/kafka/blob/6d1d68617ecd023b787f54aafc24a4232663428d/config/kraft/README.md]
>  
> first step: ./bin/kafka-storage.sh random-uuid
> Second step: Use the uuid generated above to execute the following commands:
> ./bin/kafka-storage.sh format -t  -c ./config/kraft/server.properties
>  
> third step: ./bin/kafka-server-start.sh ./config/kraft/server.properties
>  
> Then I created two topics with two partitions and a single replica.
> ./bin/kafka-topics.sh --create --topic test-01 --partitions 2 
> --replication-factor 1 --bootstrap-server localhost:9092
> Verify that there is no problem with production and consumption, but when I 
> call kafka-server-stop.sh, when I call the start command again, the broker 
> starts to report an error.
> I am not sure if it is a known bug or a problem with my usage
>  
> [2021-04-18 00:19:37,443] ERROR Exiting Kafka due to fatal exception 
> (kafka.Kafka$)
> java.io.IOException: Invalid argument
>  at java.io.RandomAccessFile.setLength(Native Method)
>  at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:189)
>  at kafka.log.AbstractIndex.resize(AbstractIndex.scala:175)
>  at 
> kafka.log.AbstractIndex.$anonfun$trimToValidSize$1(AbstractIndex.scala:241)
>  at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:241)
>  at kafka.log.LogSegment.recover(LogSegment.scala:385)
>  at kafka.log.Log.recoverSegment(Log.scala:741)
>  at kafka.log.Log.recoverLog(Log.scala:894)
>  at kafka.log.Log.$anonfun$loadSegments$2(Log.scala:816)
>  at kafka.log.Log$$Lambda$153/391630194.apply$mcJ$sp(Unknown Source)
>  at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17)
>  at kafka.log.Log.retryOnOffsetOverflow(Log.scala:2456)
>  at kafka.log.Log.loadSegments(Log.scala:816)
>  at kafka.log.Log.(Log.scala:326)
>  at kafka.log.Log$.apply(Log.scala:2593)
>  at kafka.raft.KafkaMetadataLog$.apply(KafkaMetadataLog.scala:358)
>  at kafka.raft.KafkaRaftManager.buildMetadataLog(RaftManager.scala:253)
>  at kafka.raft.KafkaRaftManager.(RaftManager.scala:127)
>  at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:74)
>  at kafka.Kafka$.buildServer(Kafka.scala:79)
>  at kafka.Kafka$.main(Kafka.scala:87)
>  at kafka.Kafka.main(Kafka.scala)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] satishd opened a new pull request #10579: KAFKA-9555 Initial version of default RLMM implementation.

2021-04-21 Thread GitBox


satishd opened a new pull request #10579:
URL: https://github.com/apache/kafka/pull/10579


   KAFKA-9555 Initial version of default RLMM implementation. 
   This is still in draft mode. 
   
   This includes changes containing default RLMM configs, RLMM implementation, 
producer/consumer managers.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10564: MINOR: clean up some replication code

2021-04-21 Thread GitBox


cmccabe commented on a change in pull request #10564:
URL: https://github.com/apache/kafka/pull/10564#discussion_r617505731



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -321,22 +339,17 @@ public void replay(PartitionRecord record) {
 }
 PartitionControlInfo newPartInfo = new PartitionControlInfo(record);
 PartitionControlInfo prevPartInfo = 
topicInfo.parts.get(record.partitionId());
+String topicPart = topicInfo.name + "-" + record.partitionId();
 if (prevPartInfo == null) {
-log.info("Created partition {}:{} with {}.", record.topicId(),
-record.partitionId(), newPartInfo.toString());
+log.info("Created partition {} with {}.", topicPart, 
newPartInfo.toString());

Review comment:
   Yes, this was an intentional change to make it clearer which topic was 
getting created.  The ID is still available in the metadata log, of course.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10564: MINOR: clean up some replication code

2021-04-21 Thread GitBox


cmccabe commented on a change in pull request #10564:
URL: https://github.com/apache/kafka/pull/10564#discussion_r617507396



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1119,6 +1053,62 @@ void validateManualPartitionAssignment(List 
assignment,
 }
 }
 
+void generateLeaderAndIsrUpdates(String context,
+ int brokerToRemoveFromIsr,
+ List records,
+ Iterator iterator) {
+int oldSize = records.size();
+while (iterator.hasNext()) {
+TopicIdPartition topicIdPart = iterator.next();
+TopicControlInfo topic = topics.get(topicIdPart.topicId());
+if (topic == null) {
+throw new RuntimeException("Topic ID " + topicIdPart.topicId() 
+ " existed in " +
+"isrMembers, but not in the topics map.");
+}
+PartitionControlInfo partition = 
topic.parts.get(topicIdPart.partitionId());
+if (partition == null) {
+throw new RuntimeException("Partition " + topicIdPart +
+" existed in isrMembers, but not in the partitions map.");
+}
+int[] newIsr = Replicas.copyWithout(partition.isr, 
brokerToRemoveFromIsr);
+int newLeader = Replicas.contains(newIsr, partition.leader) ? 
partition.leader :
+bestLeader(partition.replicas, newIsr, false);
+boolean unclean = newLeader != NO_LEADER && 
!Replicas.contains(newIsr, newLeader);
+if (unclean) {
+// After an unclean leader election, the ISR is reset to just 
the new leader.
+newIsr = new int[] {newLeader};
+} else if (newIsr.length == 0) {
+// We never want to shrink the ISR to size 0.

Review comment:
   I agree that if `newIsr.length` is 0 on line 1080, then `newLeader` must 
be `NO_LEADER`.  However, it's not necessary to add any special logic for this 
since `bestLeader` already handles this case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12703) Unencrypted PEM files don't work

2021-04-21 Thread Brian Bascoy (Jira)
Brian Bascoy created KAFKA-12703:


 Summary: Unencrypted PEM files don't work
 Key: KAFKA-12703
 URL: https://issues.apache.org/jira/browse/KAFKA-12703
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.8.0
Reporter: Brian Bascoy


Unencrypted PEM files seem to be internally [supported in the 
codebase|https://github.com/apache/kafka/blob/a46beb9d29781e0709baf596601122f770a5fa31/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java#L509]
 but setting an ssl.key.password is currently enforced by createKeystore (on 
DefaultSslEngineFactory). I was unable to find a reason for this, so I wonder 
if this limitation could simply be removed:
 [https://github.com/pera/kafka/commit/8df2feab5fc6955cf8c89a7d132f05d8f562e16b]

 

Thanks



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tombentley commented on a change in pull request #10530: KAFKA-10231 fail bootstrap of Rest server if the host name in the adv…

2021-04-21 Thread GitBox


tombentley commented on a change in pull request #10530:
URL: https://github.com/apache/kafka/pull/10530#discussion_r616833243



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
##
@@ -369,9 +370,34 @@ else if (serverConnector != null && 
serverConnector.getHost() != null && serverC
 else if (serverConnector != null && serverConnector.getPort() > 0)
 builder.port(serverConnector.getPort());
 
-log.info("Advertised URI: {}", builder.build());
+URI uri = builder.build();
+validateUriHost(uri);
+log.info("Advertised URI: {}", uri);
 
-return builder.build();
+return uri;
+}
+
+/**
+ * Parses the uri and throws a more definitive error
+ * when the internal node to node communication can't happen due to an 
invalid host name.
+ */
+static void validateUriHost(URI uri) {
+//java URI parsing will fail silently returning null in the host if 
the host name contains invalid characters like _
+//this bubbles up later when the Herder tries to communicate on the 
advertised url and the current HttpClient fails with an ambiguous message
+//we need to revisit this when we upgrade to a better HttpClient that 
can communicate with such host names or throws a better error message

Review comment:
   The place for plans for future changes is JIRA. Statements about changes 
which may or may not happen in the future means the comment could become stale 
(in this case when the client does get changed), or at least means someone has 
to know to amend the comment when the client does change. So best drop the last 
line imo.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10530: KAFKA-10231 fail bootstrap of Rest server if the host name in the adv…

2021-04-21 Thread GitBox


kpatelatwork commented on a change in pull request #10530:
URL: https://github.com/apache/kafka/pull/10530#discussion_r617556919



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
##
@@ -369,9 +370,34 @@ else if (serverConnector != null && 
serverConnector.getHost() != null && serverC
 else if (serverConnector != null && serverConnector.getPort() > 0)
 builder.port(serverConnector.getPort());
 
-log.info("Advertised URI: {}", builder.build());
+URI uri = builder.build();
+validateUriHost(uri);
+log.info("Advertised URI: {}", uri);
 
-return builder.build();
+return uri;
+}
+
+/**
+ * Parses the uri and throws a more definitive error
+ * when the internal node to node communication can't happen due to an 
invalid host name.
+ */
+static void validateUriHost(URI uri) {
+//java URI parsing will fail silently returning null in the host if 
the host name contains invalid characters like _
+//this bubbles up later when the Herder tries to communicate on the 
advertised url and the current HttpClient fails with an ambiguous message
+//we need to revisit this when we upgrade to a better HttpClient that 
can communicate with such host names or throws a better error message

Review comment:
   @tombentley  excellent points about not parsing the hostname from URI 
and I also felt Utils.getHost was broken as it didn't worked on a case with 
path in the URL and that's why I passed authority to it.
   
   Let me work on a change using the approach you suggested and get back to 
you. I was doing uri.getHost null check to avoid the blast radius in case we 
didn't anticipated some pattern so I think we should still use the null check 
but as you said use the hostname that is available to us instead of parsing it 
from authority and host.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on a change in pull request #10530: KAFKA-10231 fail bootstrap of Rest server if the host name in the adv…

2021-04-21 Thread GitBox


kpatelatwork commented on a change in pull request #10530:
URL: https://github.com/apache/kafka/pull/10530#discussion_r617556919



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
##
@@ -369,9 +370,34 @@ else if (serverConnector != null && 
serverConnector.getHost() != null && serverC
 else if (serverConnector != null && serverConnector.getPort() > 0)
 builder.port(serverConnector.getPort());
 
-log.info("Advertised URI: {}", builder.build());
+URI uri = builder.build();
+validateUriHost(uri);
+log.info("Advertised URI: {}", uri);
 
-return builder.build();
+return uri;
+}
+
+/**
+ * Parses the uri and throws a more definitive error
+ * when the internal node to node communication can't happen due to an 
invalid host name.
+ */
+static void validateUriHost(URI uri) {
+//java URI parsing will fail silently returning null in the host if 
the host name contains invalid characters like _
+//this bubbles up later when the Herder tries to communicate on the 
advertised url and the current HttpClient fails with an ambiguous message
+//we need to revisit this when we upgrade to a better HttpClient that 
can communicate with such host names or throws a better error message

Review comment:
   @tombentley  excellent points about not parsing the hostname from URI 
and I also felt Utils.getHost was broken as it didn't worked on a case with 
path in the URL and that's why I passed authority to it.
   
   Let me work on a change using the approach you suggested and get back to 
you. I was doing uri.getHost null check to avoid the blast radius in case we 
didn't anticipated some pattern so I think we should still use the null check 
but as you said use the hostname that is available to us instead of parsing it 
from authority and uri.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12703) Unencrypted PEM files can't be loaded

2021-04-21 Thread Brian Bascoy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brian Bascoy updated KAFKA-12703:
-
Summary: Unencrypted PEM files can't be loaded  (was: Unencrypted PEM files 
don't work)

> Unencrypted PEM files can't be loaded
> -
>
> Key: KAFKA-12703
> URL: https://issues.apache.org/jira/browse/KAFKA-12703
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.8.0
>Reporter: Brian Bascoy
>Priority: Major
>
> Unencrypted PEM files seem to be internally [supported in the 
> codebase|https://github.com/apache/kafka/blob/a46beb9d29781e0709baf596601122f770a5fa31/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java#L509]
>  but setting an ssl.key.password is currently enforced by createKeystore (on 
> DefaultSslEngineFactory). I was unable to find a reason for this, so I wonder 
> if this limitation could simply be removed:
>  
> [https://github.com/pera/kafka/commit/8df2feab5fc6955cf8c89a7d132f05d8f562e16b]
>  
> Thanks



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12680) Failed to restart the broker in kraft mode

2021-04-21 Thread Wenbing Shen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326589#comment-17326589
 ] 

Wenbing Shen commented on KAFKA-12680:
--

Hi [~ijuma] Sorry, this problem is caused by me. In fact, this is a bug of wsl. 
The link for this bug is here: [https://github.com/microsoft/WSL/issues/2281]
I did not find the problem in my centos physical machine test today.

In fact, I should classify this problem as not a problem. I will change it now. 
Sorry for wasting your precious time. This is my first time using wsl. I 
Unexpectedly, it has this problem. Put your time on important issues. 

I wish the kraft model a great success in the end, and I will do my best to 
accomplish this together. :)

> Failed to restart the broker in kraft mode
> --
>
> Key: KAFKA-12680
> URL: https://issues.apache.org/jira/browse/KAFKA-12680
> Project: Kafka
>  Issue Type: Bug
>Reporter: Wenbing Shen
>Priority: Major
>
> I tested kraft mode for the first time today, I deployed a single node kraft 
> mode broker according to the documentation:
> [https://github.com/apache/kafka/blob/6d1d68617ecd023b787f54aafc24a4232663428d/config/kraft/README.md]
>  
> first step: ./bin/kafka-storage.sh random-uuid
> Second step: Use the uuid generated above to execute the following commands:
> ./bin/kafka-storage.sh format -t  -c ./config/kraft/server.properties
>  
> third step: ./bin/kafka-server-start.sh ./config/kraft/server.properties
>  
> Then I created two topics with two partitions and a single replica.
> ./bin/kafka-topics.sh --create --topic test-01 --partitions 2 
> --replication-factor 1 --bootstrap-server localhost:9092
> Verify that there is no problem with production and consumption, but when I 
> call kafka-server-stop.sh, when I call the start command again, the broker 
> starts to report an error.
> I am not sure if it is a known bug or a problem with my usage
>  
> [2021-04-18 00:19:37,443] ERROR Exiting Kafka due to fatal exception 
> (kafka.Kafka$)
> java.io.IOException: Invalid argument
>  at java.io.RandomAccessFile.setLength(Native Method)
>  at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:189)
>  at kafka.log.AbstractIndex.resize(AbstractIndex.scala:175)
>  at 
> kafka.log.AbstractIndex.$anonfun$trimToValidSize$1(AbstractIndex.scala:241)
>  at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:241)
>  at kafka.log.LogSegment.recover(LogSegment.scala:385)
>  at kafka.log.Log.recoverSegment(Log.scala:741)
>  at kafka.log.Log.recoverLog(Log.scala:894)
>  at kafka.log.Log.$anonfun$loadSegments$2(Log.scala:816)
>  at kafka.log.Log$$Lambda$153/391630194.apply$mcJ$sp(Unknown Source)
>  at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17)
>  at kafka.log.Log.retryOnOffsetOverflow(Log.scala:2456)
>  at kafka.log.Log.loadSegments(Log.scala:816)
>  at kafka.log.Log.(Log.scala:326)
>  at kafka.log.Log$.apply(Log.scala:2593)
>  at kafka.raft.KafkaMetadataLog$.apply(KafkaMetadataLog.scala:358)
>  at kafka.raft.KafkaRaftManager.buildMetadataLog(RaftManager.scala:253)
>  at kafka.raft.KafkaRaftManager.(RaftManager.scala:127)
>  at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:74)
>  at kafka.Kafka$.buildServer(Kafka.scala:79)
>  at kafka.Kafka$.main(Kafka.scala:87)
>  at kafka.Kafka.main(Kafka.scala)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-12680) Failed to restart the broker in kraft mode

2021-04-21 Thread Wenbing Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbing Shen reopened KAFKA-12680:
--

> Failed to restart the broker in kraft mode
> --
>
> Key: KAFKA-12680
> URL: https://issues.apache.org/jira/browse/KAFKA-12680
> Project: Kafka
>  Issue Type: Bug
>Reporter: Wenbing Shen
>Priority: Major
>
> I tested kraft mode for the first time today, I deployed a single node kraft 
> mode broker according to the documentation:
> [https://github.com/apache/kafka/blob/6d1d68617ecd023b787f54aafc24a4232663428d/config/kraft/README.md]
>  
> first step: ./bin/kafka-storage.sh random-uuid
> Second step: Use the uuid generated above to execute the following commands:
> ./bin/kafka-storage.sh format -t  -c ./config/kraft/server.properties
>  
> third step: ./bin/kafka-server-start.sh ./config/kraft/server.properties
>  
> Then I created two topics with two partitions and a single replica.
> ./bin/kafka-topics.sh --create --topic test-01 --partitions 2 
> --replication-factor 1 --bootstrap-server localhost:9092
> Verify that there is no problem with production and consumption, but when I 
> call kafka-server-stop.sh, when I call the start command again, the broker 
> starts to report an error.
> I am not sure if it is a known bug or a problem with my usage
>  
> [2021-04-18 00:19:37,443] ERROR Exiting Kafka due to fatal exception 
> (kafka.Kafka$)
> java.io.IOException: Invalid argument
>  at java.io.RandomAccessFile.setLength(Native Method)
>  at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:189)
>  at kafka.log.AbstractIndex.resize(AbstractIndex.scala:175)
>  at 
> kafka.log.AbstractIndex.$anonfun$trimToValidSize$1(AbstractIndex.scala:241)
>  at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:241)
>  at kafka.log.LogSegment.recover(LogSegment.scala:385)
>  at kafka.log.Log.recoverSegment(Log.scala:741)
>  at kafka.log.Log.recoverLog(Log.scala:894)
>  at kafka.log.Log.$anonfun$loadSegments$2(Log.scala:816)
>  at kafka.log.Log$$Lambda$153/391630194.apply$mcJ$sp(Unknown Source)
>  at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17)
>  at kafka.log.Log.retryOnOffsetOverflow(Log.scala:2456)
>  at kafka.log.Log.loadSegments(Log.scala:816)
>  at kafka.log.Log.(Log.scala:326)
>  at kafka.log.Log$.apply(Log.scala:2593)
>  at kafka.raft.KafkaMetadataLog$.apply(KafkaMetadataLog.scala:358)
>  at kafka.raft.KafkaRaftManager.buildMetadataLog(RaftManager.scala:253)
>  at kafka.raft.KafkaRaftManager.(RaftManager.scala:127)
>  at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:74)
>  at kafka.Kafka$.buildServer(Kafka.scala:79)
>  at kafka.Kafka$.main(Kafka.scala:87)
>  at kafka.Kafka.main(Kafka.scala)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12680) Failed to restart the broker in kraft mode

2021-04-21 Thread Wenbing Shen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenbing Shen resolved KAFKA-12680.
--
Resolution: Not A Problem

> Failed to restart the broker in kraft mode
> --
>
> Key: KAFKA-12680
> URL: https://issues.apache.org/jira/browse/KAFKA-12680
> Project: Kafka
>  Issue Type: Bug
>Reporter: Wenbing Shen
>Priority: Major
>
> I tested kraft mode for the first time today, I deployed a single node kraft 
> mode broker according to the documentation:
> [https://github.com/apache/kafka/blob/6d1d68617ecd023b787f54aafc24a4232663428d/config/kraft/README.md]
>  
> first step: ./bin/kafka-storage.sh random-uuid
> Second step: Use the uuid generated above to execute the following commands:
> ./bin/kafka-storage.sh format -t  -c ./config/kraft/server.properties
>  
> third step: ./bin/kafka-server-start.sh ./config/kraft/server.properties
>  
> Then I created two topics with two partitions and a single replica.
> ./bin/kafka-topics.sh --create --topic test-01 --partitions 2 
> --replication-factor 1 --bootstrap-server localhost:9092
> Verify that there is no problem with production and consumption, but when I 
> call kafka-server-stop.sh, when I call the start command again, the broker 
> starts to report an error.
> I am not sure if it is a known bug or a problem with my usage
>  
> [2021-04-18 00:19:37,443] ERROR Exiting Kafka due to fatal exception 
> (kafka.Kafka$)
> java.io.IOException: Invalid argument
>  at java.io.RandomAccessFile.setLength(Native Method)
>  at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:189)
>  at kafka.log.AbstractIndex.resize(AbstractIndex.scala:175)
>  at 
> kafka.log.AbstractIndex.$anonfun$trimToValidSize$1(AbstractIndex.scala:241)
>  at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:241)
>  at kafka.log.LogSegment.recover(LogSegment.scala:385)
>  at kafka.log.Log.recoverSegment(Log.scala:741)
>  at kafka.log.Log.recoverLog(Log.scala:894)
>  at kafka.log.Log.$anonfun$loadSegments$2(Log.scala:816)
>  at kafka.log.Log$$Lambda$153/391630194.apply$mcJ$sp(Unknown Source)
>  at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17)
>  at kafka.log.Log.retryOnOffsetOverflow(Log.scala:2456)
>  at kafka.log.Log.loadSegments(Log.scala:816)
>  at kafka.log.Log.(Log.scala:326)
>  at kafka.log.Log$.apply(Log.scala:2593)
>  at kafka.raft.KafkaMetadataLog$.apply(KafkaMetadataLog.scala:358)
>  at kafka.raft.KafkaRaftManager.buildMetadataLog(RaftManager.scala:253)
>  at kafka.raft.KafkaRaftManager.(RaftManager.scala:127)
>  at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:74)
>  at kafka.Kafka$.buildServer(Kafka.scala:79)
>  at kafka.Kafka$.main(Kafka.scala:87)
>  at kafka.Kafka.main(Kafka.scala)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10773) When I execute the below command, Kafka cannot start in local.

2021-04-21 Thread Wenbing Shen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326600#comment-17326600
 ] 

Wenbing Shen commented on KAFKA-10773:
--

Similar to KAFKA-12680. In fact, this is a bug of wsl. The link for this bug is 
here: [https://github.com/microsoft/WSL/issues/2281]

> When I execute the below command, Kafka cannot start in local.
> --
>
> Key: KAFKA-10773
> URL: https://issues.apache.org/jira/browse/KAFKA-10773
> Project: Kafka
>  Issue Type: Bug
>Reporter: NAYUSIK
>Priority: Critical
> Attachments: image-2020-11-27-19-09-49-389.png
>
>
> When I execute the below command, Kafka cannot start in local.
> *confluent local services start*
> *Please check the below error log and let me know how I modify it.*
> [2020-11-27 18:52:21,019] ERROR Error while deleting the clean shutdown file 
> in dir /tmp/confluent.056805/kafka/data (kafka.server.LogDirFailureChannel)
> java.io.IOException: Invalid argument
>  at java.io.RandomAccessFile.setLength(Native Method)
>  at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:190)
>  at kafka.log.AbstractIndex.resize(AbstractIndex.scala:176)
>  at 
> kafka.log.AbstractIndex.$anonfun$trimToValidSize$1(AbstractIndex.scala:242)
>  at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:242)
>  at kafka.log.LogSegment.recover(LogSegment.scala:380)
>  at kafka.log.Log.recoverSegment(Log.scala:692)
>  at kafka.log.Log.recoverLog(Log.scala:830)
>  at kafka.log.Log.$anonfun$loadSegments$3(Log.scala:767)
>  at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17)
>  at kafka.log.Log.retryOnOffsetOverflow(Log.scala:2626)
>  at kafka.log.Log.loadSegments(Log.scala:767)
>  at kafka.log.Log.(Log.scala:313)
>  at kafka.log.MergedLog$.apply(MergedLog.scala:796)
>  at kafka.log.LogManager.loadLog(LogManager.scala:294)
>  at kafka.log.LogManager.$anonfun$loadLogs$12(LogManager.scala:373)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> [2020-11-27 18:52:21,019] ERROR Error while deleting the clean shutdown file 
> in dir /tmp/confluent.056805/kafka/data (kafka.server.LogDirFailureChannel)
> java.io.IOException: Invalid argument
>  at java.io.RandomAccessFile.setLength(Native Method)
>  at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:190)
>  at kafka.log.AbstractIndex.resize(AbstractIndex.scala:176)
>  at 
> kafka.log.AbstractIndex.$anonfun$trimToValidSize$1(AbstractIndex.scala:242)
>  at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:242)
>  at kafka.log.LogSegment.recover(LogSegment.scala:380)
>  at kafka.log.Log.recoverSegment(Log.scala:692)
>  at kafka.log.Log.recoverLog(Log.scala:830)
>  at kafka.log.Log.$anonfun$loadSegments$3(Log.scala:767)
>  at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17)
>  at kafka.log.Log.retryOnOffsetOverflow(Log.scala:2626)
>  at kafka.log.Log.loadSegments(Log.scala:767)
>  at kafka.log.Log.(Log.scala:313)
>  at kafka.log.MergedLog$.apply(MergedLog.scala:796)
>  at kafka.log.LogManager.loadLog(LogManager.scala:294)
>  at kafka.log.LogManager.$anonfun$loadLogs$12(LogManager.scala:373)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
>  
>  
> [2020-11-27 18:52:22,261] ERROR Shutdown broker because all log dirs in 
> /tmp/confluent.056805/kafka/data have failed (kafka.log.LogManager)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10800) Validate the snapshot id when the state machine creates a snapshot

2021-04-21 Thread Haoran Xuan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326611#comment-17326611
 ] 

Haoran Xuan commented on KAFKA-10800:
-

Hi, [~jagsancio], I just started working on this task, and want to make sure I 
have the correct understanding before going too far.

-

"When the state machine attempts to create a snapshot writer we should validate 
that the following is true:
 # The end offset and epoch of the snapshot is less than the high-watermark.
 # The end offset and epoch of the snapshot is valid based on the leader epoch 
cache.

Note that this validation should not be performed when the raft client creates 
the snapshot writer because in that case the local log is out of date and the 
follower should trust the snapshot id sent by the partition leader."



Questions are:

1) What does "the state machine" mean here?  I assume it's the KafkaRaftClient? 
And "attempts to create a snapshot writer", I assume this refers to 
`log.createSnapshot(snapshotId)`?

2) "The end offset and epoch of the snapshot is less than the high-watermark", 
does the "high-watermark" refer to the leader's highwatermark or the follower's 
highwatermark? If it is the former, shouldn't it be the leader's responsibility 
to satisfy this ? If it's the latter, then I think the snapshotId can actually 
be larger than itself's highwatermark, say the follower has been lagged too 
much, and its highwatermark == its logEndOffset, which is smaller than the 
leader's logStartOffset, in this case, the follower's highwatermark will be 
updated to the snapshotId's endOffset when the snapshot fetching has completed, 
did I miss anything?

3) "validation should not be performed when the raft client creates the 
snapshot writer ", if my assumption in Question 1) is correct, then this seems 
to be in conflict with 1)

 

Thanks!

 

 

> Validate the snapshot id when the state machine creates a snapshot
> --
>
> Key: KAFKA-10800
> URL: https://issues.apache.org/jira/browse/KAFKA-10800
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: Haoran Xuan
>Priority: Major
>
> When the state machine attempts to create a snapshot writer we should 
> validate that the following is true:
>  # The end offset and epoch of the snapshot is less than the high-watermark.
>  # The end offset and epoch of the snapshot is valid based on the leader 
> epoch cache.
> Note that this validation should not be performed when the raft client 
> creates the snapshot writer because in that case the local log is out of date 
> and the follower should trust the snapshot id sent by the partition leader.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] Nathan22177 commented on pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key

2021-04-21 Thread GitBox


Nathan22177 commented on pull request #10548:
URL: https://github.com/apache/kafka/pull/10548#issuecomment-824141690


   @mjsax 
   > MeteredSessionStore:
   > 
   > * put() (should verify `sessionKey != null` and `sessionKey.key() != null`)
   > * remove()
   > * fetchSession()
   > * fetch(K)
   > * backwardFetch(K)
   > * fetch(K, K)
   > * backwardFetch(K, K)
   > * findSession(K, long, long)
   > * backwardFindSession
   > * findSession(K, K, long, long)
   > * backwardFindSession(K, K, long, long)
   
   All of them already had the checks. Some lacked test coverage, though. I 
made changes according to your comments and added some tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork commented on pull request #10530: KAFKA-10231 fail bootstrap of Rest server if the host name in the adv…

2021-04-21 Thread GitBox


kpatelatwork commented on pull request #10530:
URL: https://github.com/apache/kafka/pull/10530#issuecomment-824141840


   @tombentley I took another shot of implementing the PR using the above 
suggestions.  
   
   Could you please check if it still needs improvement?
   
   I didn't do explicit IPV4 and IPV6 checks because the `uri.getHost()` won't 
be null if it's a valid address.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12701) NPE in MetadataRequest when using topic IDs

2021-04-21 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326628#comment-17326628
 ] 

Justine Olshan commented on KAFKA-12701:


Thanks [~twmb] and [~dengziming] for taking a look at this. I am not sure this 
is something for 2.8.0 since that release has already passed the vote. Since 
this is only an issue with v11 and there are no public apis supporting this 
usage, I'm thinking that this is ok. We should still fix for 3.0 and possibly 
2.8.x though. Does that make sense?

> NPE in MetadataRequest when using topic IDs
> ---
>
> Key: KAFKA-12701
> URL: https://issues.apache.org/jira/browse/KAFKA-12701
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Travis Bischel
>Assignee: dengziming
>Priority: Major
>
> Authorized result checking relies on topic name to not be null, which, when 
> using topic IDs, it is.
> Unlike the logic in handleDeleteTopicsRequest, hanelMetadataRequest does not 
> check zk for the names corresponding to topic IDs if topic IDs are present.
> {noformat}
> [2021-04-21 05:53:01,463] ERROR [KafkaApi-1] Error when handling request: 
> clientId=kgo, correlationId=1, api=METADATA, version=11, 
> body=MetadataRequestData(topics=[MetadataRequestTopic(topicId=LmqOoFOASnqQp_4-oJgeKA,
>  name=null)], allowAutoTopicCreation=false, 
> includeClusterAuthorizedOperations=false, 
> includeTopicAuthorizedOperations=false) (kafka.server.RequestHandlerHelper)
> java.lang.NullPointerException: name
>   at java.base/java.util.Objects.requireNonNull(Unknown Source)
>   at 
> org.apache.kafka.common.resource.ResourcePattern.(ResourcePattern.java:50)
>   at 
> kafka.server.AuthHelper.$anonfun$filterByAuthorized$3(AuthHelper.scala:121)
>   at scala.collection.Iterator$$anon$9.next(Iterator.scala:575)
>   at scala.collection.mutable.Growable.addAll(Growable.scala:62)
>   at scala.collection.mutable.Growable.addAll$(Growable.scala:57)
>   at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:142)
>   at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:42)
>   at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:258)
>   at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:247)
>   at scala.collection.SeqFactory$Delegate.from(Factory.scala:306)
>   at scala.collection.IterableOnceOps.toBuffer(IterableOnce.scala:1270)
>   at scala.collection.IterableOnceOps.toBuffer$(IterableOnce.scala:1270)
>   at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1288)
>   at kafka.server.AuthHelper.filterByAuthorized(AuthHelper.scala:120)
>   at 
> kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:1146)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:170)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
>   at java.base/java.lang.Thread.run(Unknown Source)
> [2021-04-21 05:53:01,464] ERROR [Kafka Request Handler 1 on Broker 1], 
> Exception when handling request (kafka.server.KafkaRequestHandler)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.message.MetadataResponseData$MetadataResponseTopic.addSize(MetadataResponseData.java:1247)
>   at 
> org.apache.kafka.common.message.MetadataResponseData.addSize(MetadataResponseData.java:417)
>   at 
> org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
>   at 
> org.apache.kafka.common.protocol.SendBuilder.buildResponseSend(SendBuilder.java:200)
>   at 
> org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:43)
>   at 
> org.apache.kafka.common.requests.RequestContext.buildResponseSend(RequestContext.java:111)
>   at 
> kafka.network.RequestChannel$Request.buildResponseSend(RequestChannel.scala:132)
>   at 
> kafka.server.RequestHandlerHelper.sendResponse(RequestHandlerHelper.scala:185)
>   at 
> kafka.server.RequestHandlerHelper.sendErrorOrCloseConnection(RequestHandlerHelper.scala:155)
>   at 
> kafka.server.RequestHandlerHelper.sendErrorResponseMaybeThrottle(RequestHandlerHelper.scala:109)
>   at 
> kafka.server.RequestHandlerHelper.handleError(RequestHandlerHelper.scala:79)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:229)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
>   at java.base/java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-21 Thread GitBox


satishd commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r617482721



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataContextSerdes.java
##
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import 
org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataContext;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class provides serialization and deserialization for {@link 
RemoteLogMetadataContext}. This is the root serdes
+ * for the messages that are stored in internal remote log metadata topic.
+ */
+public class RemoteLogMetadataContextSerdes implements 
Serde {
+
+public static final byte REMOTE_LOG_SEGMENT_METADATA_API_KEY = (byte) new 
RemoteLogSegmentMetadataRecord().apiKey();
+public static final byte REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = 
(byte) new RemoteLogSegmentMetadataUpdateRecord().apiKey();
+public static final byte REMOTE_PARTITION_DELETE_API_KEY = (byte) new 
RemotePartitionDeleteMetadataRecord().apiKey();
+
+private static final Map KEY_TO_SERDES = 
createInternalSerde();
+
+private final Deserializer rootDeserializer;
+private final Serializer rootSerializer;
+
+public RemoteLogMetadataContextSerdes() {
+rootSerializer = (topic, data) -> serialize(data);

Review comment:
   Serde deserializer can be used by devs to build any tools that they need 
based on remote log metadata topic by setting it as value deserializer for 
consumer.  But they can also use byte array deserializer on consumer and invoke 
RemoteLogContextDeserializer for each message to convert from byte array to 
RemoteLogContext. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-21 Thread GitBox


satishd commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r617482721



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataContextSerdes.java
##
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import 
org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataContext;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class provides serialization and deserialization for {@link 
RemoteLogMetadataContext}. This is the root serdes
+ * for the messages that are stored in internal remote log metadata topic.
+ */
+public class RemoteLogMetadataContextSerdes implements 
Serde {
+
+public static final byte REMOTE_LOG_SEGMENT_METADATA_API_KEY = (byte) new 
RemoteLogSegmentMetadataRecord().apiKey();
+public static final byte REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = 
(byte) new RemoteLogSegmentMetadataUpdateRecord().apiKey();
+public static final byte REMOTE_PARTITION_DELETE_API_KEY = (byte) new 
RemotePartitionDeleteMetadataRecord().apiKey();
+
+private static final Map KEY_TO_SERDES = 
createInternalSerde();
+
+private final Deserializer rootDeserializer;
+private final Serializer rootSerializer;
+
+public RemoteLogMetadataContextSerdes() {
+rootSerializer = (topic, data) -> serialize(data);

Review comment:
   Serde deserializer can be used by devs to build any tools that they need 
based on remote log metadata topic by setting it as value deserializer for 
consumer.  But they can also use byte array deserializer on consumer and invoke 
RemoteLogContextDeserializer for each message to convert from byte array to 
RemoteLogContext. I updated the PR with the suggested changes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12701) NPE in MetadataRequest when using topic IDs

2021-04-21 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326633#comment-17326633
 ] 

Ismael Juma commented on KAFKA-12701:
-

I suggest fixing it in 3.0.0 and 2.8.1.

> NPE in MetadataRequest when using topic IDs
> ---
>
> Key: KAFKA-12701
> URL: https://issues.apache.org/jira/browse/KAFKA-12701
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Travis Bischel
>Assignee: dengziming
>Priority: Major
>
> Authorized result checking relies on topic name to not be null, which, when 
> using topic IDs, it is.
> Unlike the logic in handleDeleteTopicsRequest, hanelMetadataRequest does not 
> check zk for the names corresponding to topic IDs if topic IDs are present.
> {noformat}
> [2021-04-21 05:53:01,463] ERROR [KafkaApi-1] Error when handling request: 
> clientId=kgo, correlationId=1, api=METADATA, version=11, 
> body=MetadataRequestData(topics=[MetadataRequestTopic(topicId=LmqOoFOASnqQp_4-oJgeKA,
>  name=null)], allowAutoTopicCreation=false, 
> includeClusterAuthorizedOperations=false, 
> includeTopicAuthorizedOperations=false) (kafka.server.RequestHandlerHelper)
> java.lang.NullPointerException: name
>   at java.base/java.util.Objects.requireNonNull(Unknown Source)
>   at 
> org.apache.kafka.common.resource.ResourcePattern.(ResourcePattern.java:50)
>   at 
> kafka.server.AuthHelper.$anonfun$filterByAuthorized$3(AuthHelper.scala:121)
>   at scala.collection.Iterator$$anon$9.next(Iterator.scala:575)
>   at scala.collection.mutable.Growable.addAll(Growable.scala:62)
>   at scala.collection.mutable.Growable.addAll$(Growable.scala:57)
>   at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:142)
>   at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:42)
>   at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:258)
>   at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:247)
>   at scala.collection.SeqFactory$Delegate.from(Factory.scala:306)
>   at scala.collection.IterableOnceOps.toBuffer(IterableOnce.scala:1270)
>   at scala.collection.IterableOnceOps.toBuffer$(IterableOnce.scala:1270)
>   at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1288)
>   at kafka.server.AuthHelper.filterByAuthorized(AuthHelper.scala:120)
>   at 
> kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:1146)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:170)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
>   at java.base/java.lang.Thread.run(Unknown Source)
> [2021-04-21 05:53:01,464] ERROR [Kafka Request Handler 1 on Broker 1], 
> Exception when handling request (kafka.server.KafkaRequestHandler)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.message.MetadataResponseData$MetadataResponseTopic.addSize(MetadataResponseData.java:1247)
>   at 
> org.apache.kafka.common.message.MetadataResponseData.addSize(MetadataResponseData.java:417)
>   at 
> org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
>   at 
> org.apache.kafka.common.protocol.SendBuilder.buildResponseSend(SendBuilder.java:200)
>   at 
> org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:43)
>   at 
> org.apache.kafka.common.requests.RequestContext.buildResponseSend(RequestContext.java:111)
>   at 
> kafka.network.RequestChannel$Request.buildResponseSend(RequestChannel.scala:132)
>   at 
> kafka.server.RequestHandlerHelper.sendResponse(RequestHandlerHelper.scala:185)
>   at 
> kafka.server.RequestHandlerHelper.sendErrorOrCloseConnection(RequestHandlerHelper.scala:155)
>   at 
> kafka.server.RequestHandlerHelper.sendErrorResponseMaybeThrottle(RequestHandlerHelper.scala:109)
>   at 
> kafka.server.RequestHandlerHelper.handleError(RequestHandlerHelper.scala:79)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:229)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
>   at java.base/java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12696) Add standard getters to LagInfo class to allow automatic serialization

2021-04-21 Thread Mikhail Panchenko (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326635#comment-17326635
 ] 

Mikhail Panchenko commented on KAFKA-12696:
---

Ok. Just to confirm, the confluence where KIPs are created is a separate login, 
so I should sign up?

Re: the naming convention, is this documented somewhere that I can reference in 
the KIP? I'm inclined to make the KIP a bit broader than "change this one 
interface." It would be nice to be able to reference the convention directly 
when proposing to change it. I see a "avoid getters and setters" in the [Coding 
Guide|http://kafka.apache.org/coding-guide.html], but that appears to be 
directed at Scala, not Java code.

> Add standard getters to LagInfo class to allow automatic serialization
> --
>
> Key: KAFKA-12696
> URL: https://issues.apache.org/jira/browse/KAFKA-12696
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mikhail Panchenko
>Assignee: Mikhail Panchenko
>Priority: Trivial
>  Labels: needs-kip
>   Original Estimate: 0.5h
>  Remaining Estimate: 0.5h
>
> The LagInfo class has non-standard getters for its member variables. This 
> means that Jackson and other serialization frameworks do not know how to 
> serialize them without additional annotations. So when implementing the sort 
> of system that KAFKA-6144 is meant to enable, as documented here in docs like 
> [Kafka Streams Interactive 
> Queries|https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html#querying-state-stores-during-a-rebalance],
>  one has to either inject a bunch of custom serialization logic into Jersey 
> or wrap this class.
> The patch to fix this is trivial, and I will be putting one up shortly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12696) Add standard getters to LagInfo class to allow automatic serialization

2021-04-21 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326641#comment-17326641
 ] 

Ismael Juma commented on KAFKA-12696:
-

The "avoid getters and setters" note applies to both Java and Scala. We may 
need to make the docs clearer. Note that even Java itself doesn't use the 
getter/setter convention for new apis.

> Add standard getters to LagInfo class to allow automatic serialization
> --
>
> Key: KAFKA-12696
> URL: https://issues.apache.org/jira/browse/KAFKA-12696
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mikhail Panchenko
>Assignee: Mikhail Panchenko
>Priority: Trivial
>  Labels: needs-kip
>   Original Estimate: 0.5h
>  Remaining Estimate: 0.5h
>
> The LagInfo class has non-standard getters for its member variables. This 
> means that Jackson and other serialization frameworks do not know how to 
> serialize them without additional annotations. So when implementing the sort 
> of system that KAFKA-6144 is meant to enable, as documented here in docs like 
> [Kafka Streams Interactive 
> Queries|https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html#querying-state-stores-during-a-rebalance],
>  one has to either inject a bunch of custom serialization logic into Jersey 
> or wrap this class.
> The patch to fix this is trivial, and I will be putting one up shortly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12701) NPE in MetadataRequest when using topic IDs

2021-04-21 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326652#comment-17326652
 ] 

Justine Olshan commented on KAFKA-12701:


[~dengziming] Just to clarify, we did not yet implement functionality to handle 
using topic IDs in MetadataRequest. So even if we get past the NPE, we wouldn't 
handle the topic ID yet. That will be added in 
https://github.com/apache/kafka/pull/9769, correct? Maybe for 2.8.1 we can 
return an error that the operation is not yet supported

> NPE in MetadataRequest when using topic IDs
> ---
>
> Key: KAFKA-12701
> URL: https://issues.apache.org/jira/browse/KAFKA-12701
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Travis Bischel
>Assignee: dengziming
>Priority: Major
>
> Authorized result checking relies on topic name to not be null, which, when 
> using topic IDs, it is.
> Unlike the logic in handleDeleteTopicsRequest, hanelMetadataRequest does not 
> check zk for the names corresponding to topic IDs if topic IDs are present.
> {noformat}
> [2021-04-21 05:53:01,463] ERROR [KafkaApi-1] Error when handling request: 
> clientId=kgo, correlationId=1, api=METADATA, version=11, 
> body=MetadataRequestData(topics=[MetadataRequestTopic(topicId=LmqOoFOASnqQp_4-oJgeKA,
>  name=null)], allowAutoTopicCreation=false, 
> includeClusterAuthorizedOperations=false, 
> includeTopicAuthorizedOperations=false) (kafka.server.RequestHandlerHelper)
> java.lang.NullPointerException: name
>   at java.base/java.util.Objects.requireNonNull(Unknown Source)
>   at 
> org.apache.kafka.common.resource.ResourcePattern.(ResourcePattern.java:50)
>   at 
> kafka.server.AuthHelper.$anonfun$filterByAuthorized$3(AuthHelper.scala:121)
>   at scala.collection.Iterator$$anon$9.next(Iterator.scala:575)
>   at scala.collection.mutable.Growable.addAll(Growable.scala:62)
>   at scala.collection.mutable.Growable.addAll$(Growable.scala:57)
>   at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:142)
>   at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:42)
>   at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:258)
>   at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:247)
>   at scala.collection.SeqFactory$Delegate.from(Factory.scala:306)
>   at scala.collection.IterableOnceOps.toBuffer(IterableOnce.scala:1270)
>   at scala.collection.IterableOnceOps.toBuffer$(IterableOnce.scala:1270)
>   at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1288)
>   at kafka.server.AuthHelper.filterByAuthorized(AuthHelper.scala:120)
>   at 
> kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:1146)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:170)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
>   at java.base/java.lang.Thread.run(Unknown Source)
> [2021-04-21 05:53:01,464] ERROR [Kafka Request Handler 1 on Broker 1], 
> Exception when handling request (kafka.server.KafkaRequestHandler)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.message.MetadataResponseData$MetadataResponseTopic.addSize(MetadataResponseData.java:1247)
>   at 
> org.apache.kafka.common.message.MetadataResponseData.addSize(MetadataResponseData.java:417)
>   at 
> org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
>   at 
> org.apache.kafka.common.protocol.SendBuilder.buildResponseSend(SendBuilder.java:200)
>   at 
> org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:43)
>   at 
> org.apache.kafka.common.requests.RequestContext.buildResponseSend(RequestContext.java:111)
>   at 
> kafka.network.RequestChannel$Request.buildResponseSend(RequestChannel.scala:132)
>   at 
> kafka.server.RequestHandlerHelper.sendResponse(RequestHandlerHelper.scala:185)
>   at 
> kafka.server.RequestHandlerHelper.sendErrorOrCloseConnection(RequestHandlerHelper.scala:155)
>   at 
> kafka.server.RequestHandlerHelper.sendErrorResponseMaybeThrottle(RequestHandlerHelper.scala:109)
>   at 
> kafka.server.RequestHandlerHelper.handleError(RequestHandlerHelper.scala:79)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:229)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
>   at java.base/java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12701) NPE in MetadataRequest when using topic IDs

2021-04-21 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326628#comment-17326628
 ] 

Justine Olshan edited comment on KAFKA-12701 at 4/21/21, 3:49 PM:
--

Thanks [~twmb] and [~dengziming] for taking a look at this. I am not sure this 
is something for 2.8.0 since that release has already passed the vote. Since 
this is only an issue with v11 and there are admin client public apis 
supporting this usage, I'm thinking that this is ok. We should still fix for 
3.0 and possibly 2.8.x though. Does that make sense?


was (Author: jolshan):
Thanks [~twmb] and [~dengziming] for taking a look at this. I am not sure this 
is something for 2.8.0 since that release has already passed the vote. Since 
this is only an issue with v11 and there are no public apis supporting this 
usage, I'm thinking that this is ok. We should still fix for 3.0 and possibly 
2.8.x though. Does that make sense?

> NPE in MetadataRequest when using topic IDs
> ---
>
> Key: KAFKA-12701
> URL: https://issues.apache.org/jira/browse/KAFKA-12701
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Travis Bischel
>Assignee: dengziming
>Priority: Major
>
> Authorized result checking relies on topic name to not be null, which, when 
> using topic IDs, it is.
> Unlike the logic in handleDeleteTopicsRequest, hanelMetadataRequest does not 
> check zk for the names corresponding to topic IDs if topic IDs are present.
> {noformat}
> [2021-04-21 05:53:01,463] ERROR [KafkaApi-1] Error when handling request: 
> clientId=kgo, correlationId=1, api=METADATA, version=11, 
> body=MetadataRequestData(topics=[MetadataRequestTopic(topicId=LmqOoFOASnqQp_4-oJgeKA,
>  name=null)], allowAutoTopicCreation=false, 
> includeClusterAuthorizedOperations=false, 
> includeTopicAuthorizedOperations=false) (kafka.server.RequestHandlerHelper)
> java.lang.NullPointerException: name
>   at java.base/java.util.Objects.requireNonNull(Unknown Source)
>   at 
> org.apache.kafka.common.resource.ResourcePattern.(ResourcePattern.java:50)
>   at 
> kafka.server.AuthHelper.$anonfun$filterByAuthorized$3(AuthHelper.scala:121)
>   at scala.collection.Iterator$$anon$9.next(Iterator.scala:575)
>   at scala.collection.mutable.Growable.addAll(Growable.scala:62)
>   at scala.collection.mutable.Growable.addAll$(Growable.scala:57)
>   at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:142)
>   at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:42)
>   at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:258)
>   at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:247)
>   at scala.collection.SeqFactory$Delegate.from(Factory.scala:306)
>   at scala.collection.IterableOnceOps.toBuffer(IterableOnce.scala:1270)
>   at scala.collection.IterableOnceOps.toBuffer$(IterableOnce.scala:1270)
>   at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1288)
>   at kafka.server.AuthHelper.filterByAuthorized(AuthHelper.scala:120)
>   at 
> kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:1146)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:170)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
>   at java.base/java.lang.Thread.run(Unknown Source)
> [2021-04-21 05:53:01,464] ERROR [Kafka Request Handler 1 on Broker 1], 
> Exception when handling request (kafka.server.KafkaRequestHandler)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.message.MetadataResponseData$MetadataResponseTopic.addSize(MetadataResponseData.java:1247)
>   at 
> org.apache.kafka.common.message.MetadataResponseData.addSize(MetadataResponseData.java:417)
>   at 
> org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
>   at 
> org.apache.kafka.common.protocol.SendBuilder.buildResponseSend(SendBuilder.java:200)
>   at 
> org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:43)
>   at 
> org.apache.kafka.common.requests.RequestContext.buildResponseSend(RequestContext.java:111)
>   at 
> kafka.network.RequestChannel$Request.buildResponseSend(RequestChannel.scala:132)
>   at 
> kafka.server.RequestHandlerHelper.sendResponse(RequestHandlerHelper.scala:185)
>   at 
> kafka.server.RequestHandlerHelper.sendErrorOrCloseConnection(RequestHandlerHelper.scala:155)
>   at 
> kafka.server.RequestHandlerHelper.sendErrorResponseMaybeThrottle(RequestHandlerHelper.scala:109)
>   at 
> kafka.server.RequestHandlerHelper.handleError(RequestHandlerHelper.scala:79)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:229)
>   at kafka.server.KafkaRequestHandler.run(KafkaRe

[jira] [Comment Edited] (KAFKA-12701) NPE in MetadataRequest when using topic IDs

2021-04-21 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326628#comment-17326628
 ] 

Justine Olshan edited comment on KAFKA-12701 at 4/21/21, 3:49 PM:
--

Thanks [~twmb] and [~dengziming] for taking a look at this. I am not sure this 
is something for 2.8.0 since that release has already passed the vote. I'm 
thinking that this is ok. We should still fix for 3.0 and possibly 2.8.x 
though. Does that make sense?


was (Author: jolshan):
Thanks [~twmb] and [~dengziming] for taking a look at this. I am not sure this 
is something for 2.8.0 since that release has already passed the vote. Since 
this is only an issue with v11 and there are admin client public apis 
supporting this usage, I'm thinking that this is ok. We should still fix for 
3.0 and possibly 2.8.x though. Does that make sense?

> NPE in MetadataRequest when using topic IDs
> ---
>
> Key: KAFKA-12701
> URL: https://issues.apache.org/jira/browse/KAFKA-12701
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Travis Bischel
>Assignee: dengziming
>Priority: Major
>
> Authorized result checking relies on topic name to not be null, which, when 
> using topic IDs, it is.
> Unlike the logic in handleDeleteTopicsRequest, hanelMetadataRequest does not 
> check zk for the names corresponding to topic IDs if topic IDs are present.
> {noformat}
> [2021-04-21 05:53:01,463] ERROR [KafkaApi-1] Error when handling request: 
> clientId=kgo, correlationId=1, api=METADATA, version=11, 
> body=MetadataRequestData(topics=[MetadataRequestTopic(topicId=LmqOoFOASnqQp_4-oJgeKA,
>  name=null)], allowAutoTopicCreation=false, 
> includeClusterAuthorizedOperations=false, 
> includeTopicAuthorizedOperations=false) (kafka.server.RequestHandlerHelper)
> java.lang.NullPointerException: name
>   at java.base/java.util.Objects.requireNonNull(Unknown Source)
>   at 
> org.apache.kafka.common.resource.ResourcePattern.(ResourcePattern.java:50)
>   at 
> kafka.server.AuthHelper.$anonfun$filterByAuthorized$3(AuthHelper.scala:121)
>   at scala.collection.Iterator$$anon$9.next(Iterator.scala:575)
>   at scala.collection.mutable.Growable.addAll(Growable.scala:62)
>   at scala.collection.mutable.Growable.addAll$(Growable.scala:57)
>   at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:142)
>   at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:42)
>   at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:258)
>   at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:247)
>   at scala.collection.SeqFactory$Delegate.from(Factory.scala:306)
>   at scala.collection.IterableOnceOps.toBuffer(IterableOnce.scala:1270)
>   at scala.collection.IterableOnceOps.toBuffer$(IterableOnce.scala:1270)
>   at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1288)
>   at kafka.server.AuthHelper.filterByAuthorized(AuthHelper.scala:120)
>   at 
> kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:1146)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:170)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
>   at java.base/java.lang.Thread.run(Unknown Source)
> [2021-04-21 05:53:01,464] ERROR [Kafka Request Handler 1 on Broker 1], 
> Exception when handling request (kafka.server.KafkaRequestHandler)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.message.MetadataResponseData$MetadataResponseTopic.addSize(MetadataResponseData.java:1247)
>   at 
> org.apache.kafka.common.message.MetadataResponseData.addSize(MetadataResponseData.java:417)
>   at 
> org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
>   at 
> org.apache.kafka.common.protocol.SendBuilder.buildResponseSend(SendBuilder.java:200)
>   at 
> org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:43)
>   at 
> org.apache.kafka.common.requests.RequestContext.buildResponseSend(RequestContext.java:111)
>   at 
> kafka.network.RequestChannel$Request.buildResponseSend(RequestChannel.scala:132)
>   at 
> kafka.server.RequestHandlerHelper.sendResponse(RequestHandlerHelper.scala:185)
>   at 
> kafka.server.RequestHandlerHelper.sendErrorOrCloseConnection(RequestHandlerHelper.scala:155)
>   at 
> kafka.server.RequestHandlerHelper.sendErrorResponseMaybeThrottle(RequestHandlerHelper.scala:109)
>   at 
> kafka.server.RequestHandlerHelper.handleError(RequestHandlerHelper.scala:79)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:229)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
>   at java.base/java.lang.Thread.run(Unknown Source)
> {noform

[jira] [Comment Edited] (KAFKA-12701) NPE in MetadataRequest when using topic IDs

2021-04-21 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326652#comment-17326652
 ] 

Justine Olshan edited comment on KAFKA-12701 at 4/21/21, 3:58 PM:
--

[~dengziming] Just to clarify, we did not yet implement functionality to handle 
using topic IDs in MetadataRequest. So even if we get past the NPE, we wouldn't 
handle the topic ID yet. That will be added in 
https://github.com/apache/kafka/pull/9769, correct? Maybe for 2.8.1 we can 
return an error that the operation is not yet supported for v11 requests. Then 
in https://github.com/apache/kafka/pull/9769, we bump the protocol to v12 so 
that the clients know that topic IDs are supported.


was (Author: jolshan):
[~dengziming] Just to clarify, we did not yet implement functionality to handle 
using topic IDs in MetadataRequest. So even if we get past the NPE, we wouldn't 
handle the topic ID yet. That will be added in 
https://github.com/apache/kafka/pull/9769, correct? Maybe for 2.8.1 we can 
return an error that the operation is not yet supported

> NPE in MetadataRequest when using topic IDs
> ---
>
> Key: KAFKA-12701
> URL: https://issues.apache.org/jira/browse/KAFKA-12701
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Travis Bischel
>Assignee: dengziming
>Priority: Major
>
> Authorized result checking relies on topic name to not be null, which, when 
> using topic IDs, it is.
> Unlike the logic in handleDeleteTopicsRequest, hanelMetadataRequest does not 
> check zk for the names corresponding to topic IDs if topic IDs are present.
> {noformat}
> [2021-04-21 05:53:01,463] ERROR [KafkaApi-1] Error when handling request: 
> clientId=kgo, correlationId=1, api=METADATA, version=11, 
> body=MetadataRequestData(topics=[MetadataRequestTopic(topicId=LmqOoFOASnqQp_4-oJgeKA,
>  name=null)], allowAutoTopicCreation=false, 
> includeClusterAuthorizedOperations=false, 
> includeTopicAuthorizedOperations=false) (kafka.server.RequestHandlerHelper)
> java.lang.NullPointerException: name
>   at java.base/java.util.Objects.requireNonNull(Unknown Source)
>   at 
> org.apache.kafka.common.resource.ResourcePattern.(ResourcePattern.java:50)
>   at 
> kafka.server.AuthHelper.$anonfun$filterByAuthorized$3(AuthHelper.scala:121)
>   at scala.collection.Iterator$$anon$9.next(Iterator.scala:575)
>   at scala.collection.mutable.Growable.addAll(Growable.scala:62)
>   at scala.collection.mutable.Growable.addAll$(Growable.scala:57)
>   at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:142)
>   at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:42)
>   at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:258)
>   at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:247)
>   at scala.collection.SeqFactory$Delegate.from(Factory.scala:306)
>   at scala.collection.IterableOnceOps.toBuffer(IterableOnce.scala:1270)
>   at scala.collection.IterableOnceOps.toBuffer$(IterableOnce.scala:1270)
>   at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1288)
>   at kafka.server.AuthHelper.filterByAuthorized(AuthHelper.scala:120)
>   at 
> kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:1146)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:170)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
>   at java.base/java.lang.Thread.run(Unknown Source)
> [2021-04-21 05:53:01,464] ERROR [Kafka Request Handler 1 on Broker 1], 
> Exception when handling request (kafka.server.KafkaRequestHandler)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.message.MetadataResponseData$MetadataResponseTopic.addSize(MetadataResponseData.java:1247)
>   at 
> org.apache.kafka.common.message.MetadataResponseData.addSize(MetadataResponseData.java:417)
>   at 
> org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
>   at 
> org.apache.kafka.common.protocol.SendBuilder.buildResponseSend(SendBuilder.java:200)
>   at 
> org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:43)
>   at 
> org.apache.kafka.common.requests.RequestContext.buildResponseSend(RequestContext.java:111)
>   at 
> kafka.network.RequestChannel$Request.buildResponseSend(RequestChannel.scala:132)
>   at 
> kafka.server.RequestHandlerHelper.sendResponse(RequestHandlerHelper.scala:185)
>   at 
> kafka.server.RequestHandlerHelper.sendErrorOrCloseConnection(RequestHandlerHelper.scala:155)
>   at 
> kafka.server.RequestHandlerHelper.sendErrorResponseMaybeThrottle(RequestHandlerHelper.scala:109)
>   at 
> kafka.server.RequestHandlerHelper.handleError(RequestHandlerHelper

[jira] [Created] (KAFKA-12704) Concurrent calls to AbstractHerder::getConnector can potentially create two connector instances

2021-04-21 Thread Kalpesh Patel (Jira)
Kalpesh Patel created KAFKA-12704:
-

 Summary: Concurrent calls to AbstractHerder::getConnector can 
potentially create two connector instances
 Key: KAFKA-12704
 URL: https://issues.apache.org/jira/browse/KAFKA-12704
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Kalpesh Patel


Requests to the {{PUT /connector-plugins/\{connectorType}/config/validate}} 
endpoint are [delegated to the 
herder|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java#L81],
 which [caches connector 
instances|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L536-L544]
 that are used [during config 
validation|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L310].
 This has the effect that, should concurrent requests to that endpoint occur 
for the same connector type, the same connector instance may be responsible for 
[validating those 
configurations|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L334]
 concurrently _(may_ instead of _will_ because there is also a race condition 
in the {{AbstractHerder::getConnector}} method that potentially fails to detect 
that an instance of the connector has already been created and, as a result, 
creates a second instance).

This is slightly problematic because the 
[Connector::validate|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java#L122-L127]
 method is not marked as thread-safe. However, because a lot of connectors out 
there tend to use the default implementation for that method, it's probably not 
super urgent that we patch this immediately.

A couple of options are:
 # Update the docs for that method to specify that it must be thread-safe
 # Rewrite the connector validation logic in the framework to avoid 
concurrently invoking {{Connector::validate}} on the same instance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12704) Concurrent calls to AbstractHerder::getConnector can potentially create two connector instances

2021-04-21 Thread Kalpesh Patel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kalpesh Patel reassigned KAFKA-12704:
-

Assignee: Kalpesh Patel

> Concurrent calls to AbstractHerder::getConnector can potentially create two 
> connector instances
> ---
>
> Key: KAFKA-12704
> URL: https://issues.apache.org/jira/browse/KAFKA-12704
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Kalpesh Patel
>Assignee: Kalpesh Patel
>Priority: Minor
>
> Requests to the {{PUT /connector-plugins/\{connectorType}/config/validate}} 
> endpoint are [delegated to the 
> herder|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java#L81],
>  which [caches connector 
> instances|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L536-L544]
>  that are used [during config 
> validation|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L310].
>  This has the effect that, should concurrent requests to that endpoint occur 
> for the same connector type, the same connector instance may be responsible 
> for [validating those 
> configurations|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L334]
>  concurrently _(may_ instead of _will_ because there is also a race condition 
> in the {{AbstractHerder::getConnector}} method that potentially fails to 
> detect that an instance of the connector has already been created and, as a 
> result, creates a second instance).
> This is slightly problematic because the 
> [Connector::validate|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java#L122-L127]
>  method is not marked as thread-safe. However, because a lot of connectors 
> out there tend to use the default implementation for that method, it's 
> probably not super urgent that we patch this immediately.
> A couple of options are:
>  # Update the docs for that method to specify that it must be thread-safe
>  # Rewrite the connector validation logic in the framework to avoid 
> concurrently invoking {{Connector::validate}} on the same instance.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12704) Concurrent calls to AbstractHerder::getConnector can potentially create two connector instances

2021-04-21 Thread Kalpesh Patel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kalpesh Patel updated KAFKA-12704:
--
Description: 
Concurrent requests to the {{PUT 
/connector-plugins/\{connectorType}/config/validate}} endpoint are [delegated 
to the 
herder|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java#L81],
 which [caches connector 
instances|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L536-L544]
 that are used [during config 
validation|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L310].
 This has the effect that, should concurrent requests to that endpoint occur 
for the same connector type and the connector isn't created yet then there is 
also a race condition in the {{AbstractHerder::getConnector}} method that 
potentially fails to detect that an instance of the connector has already been 
created and, as a result, creates another instance.

 

This can be solved by using computeIfAbsent to create the connector

  was:
Requests to the {{PUT /connector-plugins/\{connectorType}/config/validate}} 
endpoint are [delegated to the 
herder|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java#L81],
 which [caches connector 
instances|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L536-L544]
 that are used [during config 
validation|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L310].
 This has the effect that, should concurrent requests to that endpoint occur 
for the same connector type, the same connector instance may be responsible for 
[validating those 
configurations|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L334]
 concurrently _(may_ instead of _will_ because there is also a race condition 
in the {{AbstractHerder::getConnector}} method that potentially fails to detect 
that an instance of the connector has already been created and, as a result, 
creates a second instance).

This is slightly problematic because the 
[Connector::validate|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java#L122-L127]
 method is not marked as thread-safe. However, because a lot of connectors out 
there tend to use the default implementation for that method, it's probably not 
super urgent that we patch this immediately.

A couple of options are:
 # Update the docs for that method to specify that it must be thread-safe
 # Rewrite the connector validation logic in the framework to avoid 
concurrently invoking {{Connector::validate}} on the same instance.


> Concurrent calls to AbstractHerder::getConnector can potentially create two 
> connector instances
> ---
>
> Key: KAFKA-12704
> URL: https://issues.apache.org/jira/browse/KAFKA-12704
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Kalpesh Patel
>Assignee: Kalpesh Patel
>Priority: Minor
>
> Concurrent requests to the {{PUT 
> /connector-plugins/\{connectorType}/config/validate}} endpoint are [delegated 
> to the 
> herder|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java#L81],
>  which [caches connector 
> instances|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L536-L544]
>  that are used [during config 
> validation|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L310].
>  This has the effect that, should concurrent requests to that endpoint occur 
> for the same connector type and the connector isn't created yet then there is 
> also a race condition in the {{AbstractHerder::getConnector}} method that 
> potentially fails to detect that an instance of the connector has already

[jira] [Updated] (KAFKA-12704) Concurrent calls to AbstractHerder::getConnector can potentially create two connector instances

2021-04-21 Thread Kalpesh Patel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kalpesh Patel updated KAFKA-12704:
--
Description: 
As discovered in KAFKA-9560, concurrent requests to the {{PUT 
/connector-plugins/\{connectorType}/config/validate}} endpoint are [delegated 
to the 
herder|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java#L81],
 which [caches connector 
instances|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L536-L544]
 that are used [during config 
validation|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L310].
 This has the effect that, should concurrent requests to that endpoint occur 
for the same connector type and the connector hasn't been cached yet then there 
is a race condition in the {{AbstractHerder::getConnector}} method that 
potentially fails to detect that an instance of the connector has already been 
created and, as a result, creates another instance.

 

This can be solved by using Map.computeIfAbsent to create the connector

  was:
Concurrent requests to the {{PUT 
/connector-plugins/\{connectorType}/config/validate}} endpoint are [delegated 
to the 
herder|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java#L81],
 which [caches connector 
instances|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L536-L544]
 that are used [during config 
validation|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L310].
 This has the effect that, should concurrent requests to that endpoint occur 
for the same connector type and the connector isn't created yet then there is 
also a race condition in the {{AbstractHerder::getConnector}} method that 
potentially fails to detect that an instance of the connector has already been 
created and, as a result, creates another instance.

 

This can be solved by using computeIfAbsent to create the connector


> Concurrent calls to AbstractHerder::getConnector can potentially create two 
> connector instances
> ---
>
> Key: KAFKA-12704
> URL: https://issues.apache.org/jira/browse/KAFKA-12704
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Kalpesh Patel
>Assignee: Kalpesh Patel
>Priority: Minor
>
> As discovered in KAFKA-9560, concurrent requests to the {{PUT 
> /connector-plugins/\{connectorType}/config/validate}} endpoint are [delegated 
> to the 
> herder|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java#L81],
>  which [caches connector 
> instances|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L536-L544]
>  that are used [during config 
> validation|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L310].
>  This has the effect that, should concurrent requests to that endpoint occur 
> for the same connector type and the connector hasn't been cached yet then 
> there is a race condition in the {{AbstractHerder::getConnector}} method that 
> potentially fails to detect that an instance of the connector has already 
> been created and, as a result, creates another instance.
>  
> This can be solved by using Map.computeIfAbsent to create the connector



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9560) Connector::validate is utilized concurrently by the framework, but not documented as thread-safe

2021-04-21 Thread Kalpesh Patel (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kalpesh Patel updated KAFKA-9560:
-
Description: 
Requests to the {{PUT /connector-plugins/\{connectorType}/config/validate}} 
endpoint are [delegated to the 
herder|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java#L81],
 which [caches connector 
instances|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L536-L544]
 that are used [during config 
validation|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L310].
 This has the effect that, should concurrent requests to that endpoint occur 
for the same connector type, the same connector instance may be responsible for 
[validating those 
configurations|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L334]
 concurrently _(may_ instead of _will_ because there is also a race condition 
in the {{AbstractHerder::getConnector}} method that potentially fails to detect 
that an instance of the connector has already been created and, as a result, 
creates a second instance KAFKA-12704).

This is slightly problematic because the 
[Connector::validate|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java#L122-L127]
 method is not marked as thread-safe. However, because a lot of connectors out 
there tend to use the default implementation for that method, it's probably not 
super urgent that we patch this immediately.

A couple of options are:
 # Update the docs for that method to specify that it must be thread-safe
 # Rewrite the connector validation logic in the framework to avoid 
concurrently invoking {{Connector::validate}} on the same instance.

  was:
Requests to the {{PUT /connector-plugins/\{connectorType}/config/validate}} 
endpoint are [delegated to the 
herder|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java#L81],
 which [caches connector 
instances|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L536-L544]
 that are used [during config 
validation|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L310].
 This has the effect that, should concurrent requests to that endpoint occur 
for the same connector type, the same connector instance may be responsible for 
[validating those 
configurations|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L334]
 concurrently _(may_ instead of _will_ because there is also a race condition 
in the {{AbstractHerder::getConnector}} method that potentially fails to detect 
that an instance of the connector has already been created and, as a result, 
creates a second instance).

This is slightly problematic because the 
[Connector::validate|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java#L122-L127]
 method is not marked as thread-safe. However, because a lot of connectors out 
there tend to use the default implementation for that method, it's probably not 
super urgent that we patch this immediately.

A couple of options are:
 # Update the docs for that method to specify that it must be thread-safe
 # Rewrite the connector validation logic in the framework to avoid 
concurrently invoking {{Connector::validate}} on the same instance.


> Connector::validate is utilized concurrently by the framework, but not 
> documented as thread-safe
> 
>
> Key: KAFKA-9560
> URL: https://issues.apache.org/jira/browse/KAFKA-9560
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Priority: Minor
>
> Requests to the {{PUT /connector-plugins/\{connectorType}/config/validate}} 
> endpoint are [delegated to the 
> herder|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafk

[GitHub] [kafka] junrao commented on pull request #10576: KAFKA-12701: Remove topicId from MetadataReq since it was not supported in 2.8.0

2021-04-21 Thread GitBox


junrao commented on pull request #10576:
URL: https://github.com/apache/kafka/pull/10576#issuecomment-824228739


   @dengziming : We probably can't remove a field from an existing protocol 
without changing the version. Otherwise, the client will be confused on the 
exact protocol for a particular version. We could bump up the version (keeping 
all the fields the same) in 3.0 and implement the functionality for topicId 
there. For 2.8, we probably want to at least document in the protocol that 
topicId is not supported.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-21 Thread GitBox


junrao commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r617765100



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataContextSerdes.java
##
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import 
org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataContext;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class provides serialization and deserialization for {@link 
RemoteLogMetadataContext}. This is the root serdes
+ * for the messages that are stored in internal remote log metadata topic.
+ */
+public class RemoteLogMetadataContextSerdes implements 
Serde {
+
+public static final byte REMOTE_LOG_SEGMENT_METADATA_API_KEY = (byte) new 
RemoteLogSegmentMetadataRecord().apiKey();
+public static final byte REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = 
(byte) new RemoteLogSegmentMetadataUpdateRecord().apiKey();
+public static final byte REMOTE_PARTITION_DELETE_API_KEY = (byte) new 
RemotePartitionDeleteMetadataRecord().apiKey();
+
+private static final Map KEY_TO_SERDES = 
createInternalSerde();
+
+private final Deserializer rootDeserializer;
+private final Serializer rootSerializer;
+
+public RemoteLogMetadataContextSerdes() {
+rootSerializer = (topic, data) -> serialize(data);

Review comment:
   @satishd : What you said makes sense. It's useful to be able to use the 
same Serde in tools or client applications. Also, it seems that it's possible 
to make a generic Serde for all ApiMessage since the apiKey is unique within a 
topic and the topic is passed into Serde.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kpatelatwork opened a new pull request #10580: KAFKA-12704 Fixed potential concurrency issue in connector creation

2021-04-21 Thread GitBox


kpatelatwork opened a new pull request #10580:
URL: https://github.com/apache/kafka/pull/10580


   *Concurrent requests to validate endpoint for the same connector type calls 
AbstractHerder::getConnector to get the cached connector instances  and if the 
connector hasn't been cached yet then there is a race condition in the 
AbstractHerder::getConnector method that potentially fails to detect that an 
instance of the connector has already been created and, as a result, can create 
another instance*
   
   *Existing tests are present with enough coverage so no new tests are added.*
   
   @C0urante  @rhauch  Could you please review if this fix is acceptable?
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10580: KAFKA-12704 Fixed potential concurrency issue in connector creation

2021-04-21 Thread GitBox


C0urante commented on pull request #10580:
URL: https://github.com/apache/kafka/pull/10580#issuecomment-824267968


   We may want to upgrade `tempConnectors` from a regular `Map` to a 
`ConcurrentMap` as well, LGTM otherwise though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10564: MINOR: clean up some replication code

2021-04-21 Thread GitBox


junrao commented on a change in pull request #10564:
URL: https://github.com/apache/kafka/pull/10564#discussion_r617773890



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -321,22 +339,17 @@ public void replay(PartitionRecord record) {
 }
 PartitionControlInfo newPartInfo = new PartitionControlInfo(record);
 PartitionControlInfo prevPartInfo = 
topicInfo.parts.get(record.partitionId());
+String topicPart = topicInfo.name + "-" + record.partitionId();

Review comment:
   It's probably useful to log the topicId too?

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -356,7 +369,8 @@ public void replay(PartitionChangeRecord record) {
 brokersToIsrs.update(record.topicId(), record.partitionId(),
 prevPartitionInfo.isr, newPartitionInfo.isr, 
prevPartitionInfo.leader,
 newPartitionInfo.leader);
-log.debug("Applied ISR change record: {}", record.toString());
+String topicPart = topicInfo.name + "-" + record.partitionId();

Review comment:
   It's probably useful to log the topicId too?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10564: MINOR: clean up some replication code

2021-04-21 Thread GitBox


cmccabe commented on a change in pull request #10564:
URL: https://github.com/apache/kafka/pull/10564#discussion_r617785244



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -1119,6 +1053,62 @@ void validateManualPartitionAssignment(List 
assignment,
 }
 }
 
+void generateLeaderAndIsrUpdates(String context,
+ int brokerToRemoveFromIsr,
+ List records,
+ Iterator iterator) {
+int oldSize = records.size();
+while (iterator.hasNext()) {
+TopicIdPartition topicIdPart = iterator.next();
+TopicControlInfo topic = topics.get(topicIdPart.topicId());
+if (topic == null) {
+throw new RuntimeException("Topic ID " + topicIdPart.topicId() 
+ " existed in " +
+"isrMembers, but not in the topics map.");
+}
+PartitionControlInfo partition = 
topic.parts.get(topicIdPart.partitionId());
+if (partition == null) {
+throw new RuntimeException("Partition " + topicIdPart +
+" existed in isrMembers, but not in the partitions map.");
+}
+int[] newIsr = Replicas.copyWithout(partition.isr, 
brokerToRemoveFromIsr);
+int newLeader = Replicas.contains(newIsr, partition.leader) ? 
partition.leader :
+bestLeader(partition.replicas, newIsr, false);
+boolean unclean = newLeader != NO_LEADER && 
!Replicas.contains(newIsr, newLeader);

Review comment:
   Yes, I think we can reuse some code here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #10570: MINOR: Bump to latest version 2.6.2

2021-04-21 Thread GitBox


ableegoldman merged pull request #10570:
URL: https://github.com/apache/kafka/pull/10570


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Naros commented on pull request #10566: KAFKA-12694 Avoid schema mismatch DataException when validating default values

2021-04-21 Thread GitBox


Naros commented on pull request #10566:
URL: https://github.com/apache/kafka/pull/10566#issuecomment-824277958


   @C0urante @rhauch , could either of you review this to see if its acceptable?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10565: KAFKA-12691: Add case where task can be considered idling

2021-04-21 Thread GitBox


ableegoldman commented on a change in pull request #10565:
URL: https://github.com/apache/kafka/pull/10565#discussion_r617805969



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -667,8 +668,15 @@ public boolean isProcessable(final long wallClockTime) {
 // thus, the task is not processable, even if there is available 
data in the record queue
 return false;
 }
-
-return partitionGroup.readyToProcess(wallClockTime);
+final boolean readyToProcess = 
partitionGroup.readyToProcess(wallClockTime);
+if (!readyToProcess) {
+if (!timeCurrentIdlingStarted.isPresent()) {
+timeCurrentIdlingStarted = Optional.of(wallClockTime);
+}
+} else {
+timeCurrentIdlingStarted = Optional.empty();

Review comment:
   Just want to make sure I understand, previously we only considered a 
task as idling if it was suspended so we're just fixing it up to track the 
actual idling. And while since KIP-429 suspension is just a transient state 
that the task passes through right before being closed, it's still used during 
an upgrade from EAGER. So we're going to keep considering suspension as idling 
until we can finally drop support for EAGER -- does that sound right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10566: KAFKA-12694 Avoid schema mismatch DataException when validating default values

2021-04-21 Thread GitBox


C0urante commented on pull request #10566:
URL: https://github.com/apache/kafka/pull/10566#issuecomment-824292010


   Hmm... I'm wondering if this might break existing setups. Since the 
`SchemaBuilder` class does implement the `Schema` interface, it's currently 
possible to do something like this:
   
   ```java
   import org.apache.kafka.connect.data.Schema;
   import org.apache.kafka.connect.data.SchemaBuilder;
   import org.apache.kafka.connect.data.Struct;
   
   SchemaBuilder builder = SchemaBuilder.struct()
   .field("f1", Schema.BOOLEAN_SCHEMA);
   
   Struct defaultValue = new Struct(builder);
   defaultValue.put("f1", true);
   
   Schema schema = builder.defaultValue(defaultValue).build();
   ```
   
   Since validation currently uses the equality method of the `Struct`'s schema 
(https://github.com/apache/kafka/blob/87b24025cedf08c550a180819b2a5a2dbb75f020/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L247),
 which in this case is a `SchemaBuilder` instance that doesn't have an 
overridden `equals` method, I think the change proposed here might cause the 
above example to start to fail.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante edited a comment on pull request #10566: KAFKA-12694 Avoid schema mismatch DataException when validating default values

2021-04-21 Thread GitBox


C0urante edited a comment on pull request #10566:
URL: https://github.com/apache/kafka/pull/10566#issuecomment-824292010


   Hmm... I'm wondering if this might break existing setups. Since the 
`SchemaBuilder` class does implement the `Schema` interface, it's currently 
possible to do something like this:
   
   ```java
   import org.apache.kafka.connect.data.Schema;
   import org.apache.kafka.connect.data.SchemaBuilder;
   import org.apache.kafka.connect.data.Struct;
   
   SchemaBuilder builder = SchemaBuilder.struct()
   .field("f1", Schema.BOOLEAN_SCHEMA);
   
   Struct defaultValue = new Struct(builder);
   defaultValue.put("f1", true);
   
   Schema schema = builder.defaultValue(defaultValue).build();
   ```
   
   Validation currently uses the equality method of the `Struct`'s schema: 
https://github.com/apache/kafka/blob/87b24025cedf08c550a180819b2a5a2dbb75f020/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L247
   which in this case is a `SchemaBuilder` instance that doesn't have an 
overridden `equals` method, so I think the change proposed here might cause the 
above example to start to fail.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante edited a comment on pull request #10566: KAFKA-12694 Avoid schema mismatch DataException when validating default values

2021-04-21 Thread GitBox


C0urante edited a comment on pull request #10566:
URL: https://github.com/apache/kafka/pull/10566#issuecomment-824292010


   Hmm... I'm wondering if this might break existing setups. Since the 
`SchemaBuilder` class does implement the `Schema` interface, it's currently 
possible to do something like this:
   
   ```java
   import org.apache.kafka.connect.data.Schema;
   import org.apache.kafka.connect.data.SchemaBuilder;
   import org.apache.kafka.connect.data.Struct;
   
   SchemaBuilder builder = SchemaBuilder.struct()
   .field("f1", Schema.BOOLEAN_SCHEMA);
   
   Struct defaultValue = new Struct(builder);
   defaultValue.put("f1", true);
   
   Schema schema = builder.defaultValue(defaultValue).build();
   ```
   
   Validation currently uses the equality method of the `Struct`'s schema: 
https://github.com/apache/kafka/blob/87b24025cedf08c550a180819b2a5a2dbb75f020/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L247
   which in this case is a `SchemaBuilder` instance that doesn't have an 
overridden `equals` method, so I think the change proposed here might cause the 
above example to start to fail.
   
   A naive solution could be to modify the validation to be something like `if 
(!schema.equals(struct.schema()))` but that may introduce other edge cases. Can 
you provide a brief example of what sort of connector code ran into this bug in 
the first place? That might help guide the path forward.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12696) Add standard getters to LagInfo class to allow automatic serialization

2021-04-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326845#comment-17326845
 ] 

Matthias J. Sax commented on KAFKA-12696:
-

{quote}Ok. Just to confirm, the confluence where KIPs are created is a separate 
login, so I should sign up?
{quote}
Yes. Just share your account name here, and we can grant write access so you 
can create a KIP.

Just want to clarify again: it will be difficult to get this a KIP approved, 
and it could easily happen that it will be rejected (just want to make sure you 
understand this and won't be disappointed in case it happens).

> Add standard getters to LagInfo class to allow automatic serialization
> --
>
> Key: KAFKA-12696
> URL: https://issues.apache.org/jira/browse/KAFKA-12696
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mikhail Panchenko
>Assignee: Mikhail Panchenko
>Priority: Trivial
>  Labels: needs-kip
>   Original Estimate: 0.5h
>  Remaining Estimate: 0.5h
>
> The LagInfo class has non-standard getters for its member variables. This 
> means that Jackson and other serialization frameworks do not know how to 
> serialize them without additional annotations. So when implementing the sort 
> of system that KAFKA-6144 is meant to enable, as documented here in docs like 
> [Kafka Streams Interactive 
> Queries|https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html#querying-state-stores-during-a-rebalance],
>  one has to either inject a bunch of custom serialization logic into Jersey 
> or wrap this class.
> The patch to fix this is trivial, and I will be putting one up shortly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kpatelatwork commented on pull request #10580: KAFKA-12704 Fixed potential concurrency issue in connector creation

2021-04-21 Thread GitBox


kpatelatwork commented on pull request #10580:
URL: https://github.com/apache/kafka/pull/10580#issuecomment-824299926


   > We may want to upgrade `tempConnectors` from a regular `Map` to a 
`ConcurrentMap` as well, LGTM otherwise though.
   
   it's already a ConcurrentHashMap :).
   `private Map tempConnectors = new 
ConcurrentHashMap<>();
   `


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10580: KAFKA-12704 Fixed potential concurrency issue in connector creation

2021-04-21 Thread GitBox


C0urante commented on pull request #10580:
URL: https://github.com/apache/kafka/pull/10580#issuecomment-824300835


   Sure, I mean this:
   
   ```java
   private ConcurrentMap tempConnectors = new 
ConcurrentHashMap<>();
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10493) KTable out-of-order updates are not being ignored

2021-04-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326860#comment-17326860
 ] 

Matthias J. Sax commented on KAFKA-10493:
-

Thinking about this ticket once more, I actually have one more concern: if we 
start to drop out-of-order updates, me might actually end up in an inconsistent 
state, because topic compaction is offset base... (at least when we apply the 
source-topic optimization – without the optimization and a dedicated changelog 
topic, it would be safe.)

After 
[KIP-280|https://cwiki.apache.org/confluence/display/KAFKA/KIP-280%3A+Enhanced+log+compaction]
 is done, we could drop out-or-order records also with source-table 
materialization enabled.

Thoughts?

> KTable out-of-order updates are not being ignored
> -
>
> Key: KAFKA-10493
> URL: https://issues.apache.org/jira/browse/KAFKA-10493
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Pedro Gontijo
>Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 3.0.0
>
> Attachments: KTableOutOfOrderBug.java
>
>
> On a materialized KTable, out-of-order records for a given key (records which 
> timestamp are older than the current value in store) are not being ignored 
> but used to update the local store value and also being forwarded.
> I believe the bug is here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77]
>  It should return true, not false (see javadoc)
> The bug impacts here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148]
> I have attached a simple stream app that shows the issue happening.
> Thank you!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kpatelatwork commented on pull request #10580: KAFKA-12704 Fixed potential concurrency issue in connector creation

2021-04-21 Thread GitBox


kpatelatwork commented on pull request #10580:
URL: https://github.com/apache/kafka/pull/10580#issuecomment-824306271


   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #10581: HOTFIX: kafka streams lib missing in dependencies.gradle

2021-04-21 Thread GitBox


ableegoldman opened a new pull request #10581:
URL: https://github.com/apache/kafka/pull/10581


   Seems this was missed to add during the original 2.6.0 release


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #10582: MINOR: Bump latest 2.6 version to 2.6.2

2021-04-21 Thread GitBox


ableegoldman opened a new pull request #10582:
URL: https://github.com/apache/kafka/pull/10582


   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #10582: MINOR: Bump latest 2.6 version to 2.6.2

2021-04-21 Thread GitBox


ableegoldman merged pull request #10582:
URL: https://github.com/apache/kafka/pull/10582


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-5761) Serializer API should support ByteBuffer

2021-04-21 Thread Kirill Rodionov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326876#comment-17326876
 ] 

Kirill Rodionov commented on KAFKA-5761:


If we're talking pooled/reused buffers (like Netty's ByteBuf), then allocating 
them in the serializer is not going to achieve much because the data is copied 
to accumulator (in KafkaProducer.doSend) after the serializer is invoked and 
there's no way to release the buffer back to the pool other than in finalize() 
method.

There appears to be more sense in explicit support for ByteBuffer as a value so 
that user code can allocate it before calling producer.send and release after 
that since it's guaranteed that the contents have been copied to producer's 
accumulator by then

The only huccup here is that "value.serializer" property is mandatory and its 
presence is checked at KafkaProducer's contstruction time when there's no value 
to query its type.

If you make the value.serializer property optional, then there's a possibility 
of an error at the first send() invocation if the value happens to be anything 
other than ByteBuffer

 

The other option is to keep value serializer mandatory like now but ignore it 
if the value is a ByteBuffer whose contents can be copied without any 
serializers

valueBytes would have to be removed from Partitioner's signature (no impl I 
know uses that argument anyway)

 

WDYT?

> Serializer API should support ByteBuffer
> 
>
> Key: KAFKA-5761
> URL: https://issues.apache.org/jira/browse/KAFKA-5761
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.11.0.0
>Reporter: Bhaskar Gollapudi
>Assignee: Chia-Ping Tsai
>Priority: Major
>  Labels: features, performance
>
> Consider the Serializer : Its main method is :
> byte[] serialize(String topic, T data);
> Producer applications create a implementation that takes in an instance (
> of T ) and convert that to a byte[]. This byte array is allocated a new for
> this message.This byte array then is handed over to Kafka Producer API
> internals that write the bytes to buffer/ network socket. When the next
> message arrives , the serializer instead of creating a new byte[] , should
> try to reuse the existing byte[] for the new message. This requires two
> things :
> 1. The process of handing off the bytes to the buffer/socket and reusing
> the byte[] must happen on the same thread.
> 2 There should be a way for marking the end of available bytes in the
> byte[].
> The first is reasonably simple to understand. If this does not happen , and
> without other necessary synchrinization , the byte[] get corrupted and so
> is the message written to buffer/socket.However , this requirement is easy
> to meet for a producer application , because it controls the threads on
> which the serializer is invoked.
> The second is where the problem lies with the current API. It does not
> allow a variable size of bytes to be read from a container. It is limited
> by the byte[]'s length. This forces the producer to
> 1 either create a new byte[] for a message that is bigger than the previous
> one.
> OR
> 2. Decide a max size and use a padding .
> Both are cumbersome and error prone, and may cause wasting of network
> bandwidth.
> Instead , if there is an Serializer with this method :
> ByteBuffer serialize(String topic, T data);
> This helps to implements a reusable bytes container for  clients to avoid
> allocations for each message.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-5761) Serializer API should support ByteBuffer

2021-04-21 Thread Kirill Rodionov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326876#comment-17326876
 ] 

Kirill Rodionov edited comment on KAFKA-5761 at 4/21/21, 7:54 PM:
--

If we're talking pooled/reused buffers (like Netty's ByteBuf), then allocating 
them in the serializer is not going to achieve much because the data is copied 
to accumulator (in KafkaProducer.doSend) after the serializer is invoked and 
there's no way to release the buffer back to the pool other than in finalize() 
method.

There appears to be more sense in explicit support for ByteBuffer as a value so 
that user code can allocate it before calling producer.send and release after 
that since it's guaranteed that the contents have been copied to producer's 
accumulator by then

The only huccup here is that "value.serializer" property is mandatory and its 
presence is checked at KafkaProducer's contstruction time when there's no value 
present yet and therefore, no way to know if it's going to be a ByteBuffer

If you make the value.serializer property optional, then there's a possibility 
of an error on the first send() invocation if the value happens to be anything 
other than ByteBuffer

 

The other option is to keep value serializer mandatory like now but ignore it 
if the value is a ByteBuffer whose contents can be copied without any 
serializers

valueBytes would have to be removed from Partitioner's signature (no impl I 
know uses that argument anyway)

 

WDYT?


was (Author: bruto):
If we're talking pooled/reused buffers (like Netty's ByteBuf), then allocating 
them in the serializer is not going to achieve much because the data is copied 
to accumulator (in KafkaProducer.doSend) after the serializer is invoked and 
there's no way to release the buffer back to the pool other than in finalize() 
method.

There appears to be more sense in explicit support for ByteBuffer as a value so 
that user code can allocate it before calling producer.send and release after 
that since it's guaranteed that the contents have been copied to producer's 
accumulator by then

The only huccup here is that "value.serializer" property is mandatory and its 
presence is checked at KafkaProducer's contstruction time when there's no value 
to query its type.

If you make the value.serializer property optional, then there's a possibility 
of an error at the first send() invocation if the value happens to be anything 
other than ByteBuffer

 

The other option is to keep value serializer mandatory like now but ignore it 
if the value is a ByteBuffer whose contents can be copied without any 
serializers

valueBytes would have to be removed from Partitioner's signature (no impl I 
know uses that argument anyway)

 

WDYT?

> Serializer API should support ByteBuffer
> 
>
> Key: KAFKA-5761
> URL: https://issues.apache.org/jira/browse/KAFKA-5761
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.11.0.0
>Reporter: Bhaskar Gollapudi
>Assignee: Chia-Ping Tsai
>Priority: Major
>  Labels: features, performance
>
> Consider the Serializer : Its main method is :
> byte[] serialize(String topic, T data);
> Producer applications create a implementation that takes in an instance (
> of T ) and convert that to a byte[]. This byte array is allocated a new for
> this message.This byte array then is handed over to Kafka Producer API
> internals that write the bytes to buffer/ network socket. When the next
> message arrives , the serializer instead of creating a new byte[] , should
> try to reuse the existing byte[] for the new message. This requires two
> things :
> 1. The process of handing off the bytes to the buffer/socket and reusing
> the byte[] must happen on the same thread.
> 2 There should be a way for marking the end of available bytes in the
> byte[].
> The first is reasonably simple to understand. If this does not happen , and
> without other necessary synchrinization , the byte[] get corrupted and so
> is the message written to buffer/socket.However , this requirement is easy
> to meet for a producer application , because it controls the threads on
> which the serializer is invoked.
> The second is where the problem lies with the current API. It does not
> allow a variable size of bytes to be read from a container. It is limited
> by the byte[]'s length. This forces the producer to
> 1 either create a new byte[] for a message that is bigger than the previous
> one.
> OR
> 2. Decide a max size and use a padding .
> Both are cumbersome and error prone, and may cause wasting of network
> bandwidth.
> Instead , if there is an Serializer with this method :
> ByteBuffer serialize(String topic, T data);
> This helps to implements a reusable bytes con

[GitHub] [kafka] ableegoldman commented on a change in pull request #10581: HOTFIX: kafka streams lib missing in dependencies.gradle

2021-04-21 Thread GitBox


ableegoldman commented on a change in pull request #10581:
URL: https://github.com/apache/kafka/pull/10581#discussion_r617836508



##
File path: gradle/dependencies.gradle
##
@@ -166,6 +166,7 @@ libs += [
   kafkaStreams_23: "org.apache.kafka:kafka-streams:$versions.kafka_23",
   kafkaStreams_24: "org.apache.kafka:kafka-streams:$versions.kafka_24",
   kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25",
+  kafkaStreams_26: "org.apache.kafka:kafka-streams:$versions.kafka_26",

Review comment:
   I'm not 100% sure if this is actually supposed to be on the 2.6 branch 
or not, but the `kafka_26` was already defined and it seems odd to have one but 
not the other. If adding this is wrong, I can take out the `kafka_26` instead 
🤷‍♀️ 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10581: HOTFIX: kafka streams lib missing in dependencies.gradle

2021-04-21 Thread GitBox


ableegoldman commented on a change in pull request #10581:
URL: https://github.com/apache/kafka/pull/10581#discussion_r617836508



##
File path: gradle/dependencies.gradle
##
@@ -166,6 +166,7 @@ libs += [
   kafkaStreams_23: "org.apache.kafka:kafka-streams:$versions.kafka_23",
   kafkaStreams_24: "org.apache.kafka:kafka-streams:$versions.kafka_24",
   kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25",
+  kafkaStreams_26: "org.apache.kafka:kafka-streams:$versions.kafka_26",

Review comment:
   I'm not 100% sure if this is actually supposed to be on the 2.6 branch 
or not, but the `kafka_26` was already defined and it seems odd to have one but 
not the other. If adding this is wrong, I can take out the `kafka_26` instead 
🤷‍♀️ . @mjsax ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10582: MINOR: Bump latest 2.6 version to 2.6.2

2021-04-21 Thread GitBox


ableegoldman commented on pull request #10582:
URL: https://github.com/apache/kafka/pull/10582#issuecomment-824320067


   Cherrypicked back to 2.8 and 2.7


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2

2021-04-21 Thread GitBox


mjsax commented on a change in pull request #10573:
URL: https://github.com/apache/kafka/pull/10573#discussion_r617826977



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -642,9 +647,13 @@ public void beginTransaction() throws 
ProducerFencedException {
  * to the partition leader. See the exception for more details
  * @throws KafkaException if the producer has encountered a previous fatal 
or abortable error, or for any
  * other unexpected error
+ *
+ * @deprecated Since 3.0.0, will be removed in 4.0. Use {@link 
#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} instead.
  */
+@Deprecated
 public void sendOffsetsToTransaction(Map offsets,
  String consumerGroupId) throws 
ProducerFencedException {
+log.warn("This method has been deprecated and will be removed in 4.0, 
please use #sendOffsetsToTransaction(Map, ConsumerGroupMetadata) instead");

Review comment:
   Do we really need to log a WAR? We never did anything link this in KS 
code base in the past. Marking the method as `@Deprecated` should be sufficient 
IMHO? Or it such a WARN log custom in client code base?

##
File path: clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
##
@@ -49,7 +49,10 @@
 
 /**
  * See {@link KafkaProducer#sendOffsetsToTransaction(Map, String)}
+ *
+

Review comment:
   nit: needs cleanup

##
File path: docs/streams/core-concepts.html
##
@@ -291,16 +291,18 @@ https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A+Streams+Exactly-Once+Semantics";>KIP-129.
 
-As of the 2.6.0 release, Kafka Streams supports an improved 
implementation of exactly-once processing, named "exactly-once beta", 
+As of the 2.6.0 release, Kafka Streams supports an improved 
implementation of exactly-once processing, named "exactly-once v2",
 which requires broker version 2.5.0 or newer.
 This implementation is more efficient, because it reduces client and 
broker resource utilization, like client threads and used network connections, 
 and it enables higher throughput and improved scalability.
+As of the 3.0.0 release, the old "alpha" version of exactly-once has 
been deprecated. Users are encouraged to use exactly-once v2 for

Review comment:
   We never used "alpha" anywhere in public, so might be better to avoid 
this term at all?
   
   `old "alpha" -> "first"

##
File path: docs/streams/developer-guide/config-streams.html
##
@@ -667,12 +668,14 @@ probing.rebalance.interval.msprocessing.guarantee
   
 The processing guarantee that should be used.
-  Possible values are "at_least_once" (default),
-  "exactly_once" (for EOS version 1),
-  and "exactly_once_beta" (for EOS version 2).
-  Using "exactly_once" requires broker
-  version 0.11.0 or newer, while using "exactly_once_beta"
-  requires broker version 2.5 or newer.
+  Possible values are "at_least_once" (default)
+  and "exactly_once_v2" (for EOS version 2).
+  Deprecated config options are "exactly_once" (for EOS alpha),

Review comment:
   Do we need to elaborate on all deprecated configs?

##
File path: docs/streams/upgrade-guide.html
##
@@ -93,6 +95,12 @@ Upgrade Guide and API Changes
 
 
 Streams API changes in 3.0.0
+
+  The StreamsConfig.EXACTLY_ONCE and 
StreamsConfig.EXACTLY_ONCE_BETA configs have been deprecated, and 
a new StreamsConfig.EXACTLY_ONCE_V2 config has been

Review comment:
   Might be good to highlight at `beta -> v2` is just a renaming ?

##
File path: docs/streams/upgrade-guide.html
##
@@ -53,17 +53,19 @@ Upgrade Guide and API Changes
 
 
 
-Starting in Kafka Streams 2.6.x, a new processing mode is available, 
named EOS version 2, which is configurable by setting 
-processing.guarantee to "exactly_once_beta".
-NOTE: The "exactly_once_beta" processing mode is 
ready for production (i.e., it's not "beta" software). 
+Starting in Kafka Streams 2.6.x, a new processing mode is available, 
named EOS version 2. This can be configured
+by setting StreamsConfig.PROCESSING_GUARANTEE to 
StreamsConfig.EXACTLY_ONCE_V2 for
+application versions 3.0+, or setting it to 
StreamsConfig.EXACTLY_ONCE_BETA for versions between 2.6 and 3.0.
 To use this new feature, your brokers must be on version 2.5.x or 
newer.
-A switch from "exactly_once" to 
"exactly_once_beta" (or the other way around) is
-only possible if the application is on version 2.6.x.
-If you want to upgrade your application from an older version and 
enable this feature,
-you first need to upgrade your application to version 2.6.x, stayin

[GitHub] [kafka] mjsax commented on a change in pull request #10581: HOTFIX: kafka streams lib missing in dependencies.gradle

2021-04-21 Thread GitBox


mjsax commented on a change in pull request #10581:
URL: https://github.com/apache/kafka/pull/10581#discussion_r617839610



##
File path: gradle/dependencies.gradle
##
@@ -166,6 +166,7 @@ libs += [
   kafkaStreams_23: "org.apache.kafka:kafka-streams:$versions.kafka_23",
   kafkaStreams_24: "org.apache.kafka:kafka-streams:$versions.kafka_24",
   kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25",
+  kafkaStreams_26: "org.apache.kafka:kafka-streams:$versions.kafka_26",

Review comment:
   We don't need this for `2.6` branch, as when we test upgrade from older 
version to `2.6` we just use the jar we just built base on `2.6` branch -- we 
only need to pull in older versions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10581: HOTFIX: kafka streams lib missing in dependencies.gradle

2021-04-21 Thread GitBox


ableegoldman commented on a change in pull request #10581:
URL: https://github.com/apache/kafka/pull/10581#discussion_r617842172



##
File path: gradle/dependencies.gradle
##
@@ -166,6 +166,7 @@ libs += [
   kafkaStreams_23: "org.apache.kafka:kafka-streams:$versions.kafka_23",
   kafkaStreams_24: "org.apache.kafka:kafka-streams:$versions.kafka_24",
   kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25",
+  kafkaStreams_26: "org.apache.kafka:kafka-streams:$versions.kafka_26",

Review comment:
   So I can/should remove the `kafka_26` from this branch as well, yes?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key

2021-04-21 Thread GitBox


mjsax commented on pull request #10548:
URL: https://github.com/apache/kafka/pull/10548#issuecomment-824325232


   > All of them already had the checks. 
   
   Sweet. I did not double check the code before.
   
   Seems, `put(final Windowed sessionKey,...)` and `remove(final Windowed 
sessionKey)` don't check for `sessionKey.key() != null` though? Can we add this 
check?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10581: HOTFIX: kafka streams lib missing in dependencies.gradle

2021-04-21 Thread GitBox


mjsax commented on a change in pull request #10581:
URL: https://github.com/apache/kafka/pull/10581#discussion_r617844560



##
File path: gradle/dependencies.gradle
##
@@ -166,6 +166,7 @@ libs += [
   kafkaStreams_23: "org.apache.kafka:kafka-streams:$versions.kafka_23",
   kafkaStreams_24: "org.apache.kafka:kafka-streams:$versions.kafka_24",
   kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25",
+  kafkaStreams_26: "org.apache.kafka:kafka-streams:$versions.kafka_26",

Review comment:
   Well, this PR adds `kafka_26` but we don't need to add it -- not sure 
what you want to remove?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10581: HOTFIX: kafka streams lib missing in dependencies.gradle

2021-04-21 Thread GitBox


ableegoldman commented on a change in pull request #10581:
URL: https://github.com/apache/kafka/pull/10581#discussion_r617845243



##
File path: gradle/dependencies.gradle
##
@@ -166,6 +166,7 @@ libs += [
   kafkaStreams_23: "org.apache.kafka:kafka-streams:$versions.kafka_23",
   kafkaStreams_24: "org.apache.kafka:kafka-streams:$versions.kafka_24",
   kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25",
+  kafkaStreams_26: "org.apache.kafka:kafka-streams:$versions.kafka_26",

Review comment:
   not ` kafkaStreams_26`, `kafka_26` (defined above) -- it just seems 
weird to me that we would define one but not the other? But I guess it doesn't 
hurt so I can close this PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman closed pull request #10581: HOTFIX: kafka streams lib missing in dependencies.gradle

2021-04-21 Thread GitBox


ableegoldman closed pull request #10581:
URL: https://github.com/apache/kafka/pull/10581


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #10565: KAFKA-12691: Add case where task can be considered idling

2021-04-21 Thread GitBox


wcarlson5 commented on a change in pull request #10565:
URL: https://github.com/apache/kafka/pull/10565#discussion_r617857575



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -667,8 +668,15 @@ public boolean isProcessable(final long wallClockTime) {
 // thus, the task is not processable, even if there is available 
data in the record queue
 return false;
 }
-
-return partitionGroup.readyToProcess(wallClockTime);
+final boolean readyToProcess = 
partitionGroup.readyToProcess(wallClockTime);
+if (!readyToProcess) {
+if (!timeCurrentIdlingStarted.isPresent()) {
+timeCurrentIdlingStarted = Optional.of(wallClockTime);
+}
+} else {
+timeCurrentIdlingStarted = Optional.empty();

Review comment:
   Yes that sounds like what I was thinking




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12701) NPE in MetadataRequest when using topic IDs

2021-04-21 Thread Justine Olshan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326909#comment-17326909
 ] 

Justine Olshan commented on KAFKA-12701:


[~dengziming] I can work on the 2.8 version where we return an error if you 
haven't started on that already.

> NPE in MetadataRequest when using topic IDs
> ---
>
> Key: KAFKA-12701
> URL: https://issues.apache.org/jira/browse/KAFKA-12701
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Travis Bischel
>Assignee: dengziming
>Priority: Major
>
> Authorized result checking relies on topic name to not be null, which, when 
> using topic IDs, it is.
> Unlike the logic in handleDeleteTopicsRequest, hanelMetadataRequest does not 
> check zk for the names corresponding to topic IDs if topic IDs are present.
> {noformat}
> [2021-04-21 05:53:01,463] ERROR [KafkaApi-1] Error when handling request: 
> clientId=kgo, correlationId=1, api=METADATA, version=11, 
> body=MetadataRequestData(topics=[MetadataRequestTopic(topicId=LmqOoFOASnqQp_4-oJgeKA,
>  name=null)], allowAutoTopicCreation=false, 
> includeClusterAuthorizedOperations=false, 
> includeTopicAuthorizedOperations=false) (kafka.server.RequestHandlerHelper)
> java.lang.NullPointerException: name
>   at java.base/java.util.Objects.requireNonNull(Unknown Source)
>   at 
> org.apache.kafka.common.resource.ResourcePattern.(ResourcePattern.java:50)
>   at 
> kafka.server.AuthHelper.$anonfun$filterByAuthorized$3(AuthHelper.scala:121)
>   at scala.collection.Iterator$$anon$9.next(Iterator.scala:575)
>   at scala.collection.mutable.Growable.addAll(Growable.scala:62)
>   at scala.collection.mutable.Growable.addAll$(Growable.scala:57)
>   at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:142)
>   at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:42)
>   at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:258)
>   at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:247)
>   at scala.collection.SeqFactory$Delegate.from(Factory.scala:306)
>   at scala.collection.IterableOnceOps.toBuffer(IterableOnce.scala:1270)
>   at scala.collection.IterableOnceOps.toBuffer$(IterableOnce.scala:1270)
>   at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1288)
>   at kafka.server.AuthHelper.filterByAuthorized(AuthHelper.scala:120)
>   at 
> kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:1146)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:170)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
>   at java.base/java.lang.Thread.run(Unknown Source)
> [2021-04-21 05:53:01,464] ERROR [Kafka Request Handler 1 on Broker 1], 
> Exception when handling request (kafka.server.KafkaRequestHandler)
> java.lang.NullPointerException
>   at 
> org.apache.kafka.common.message.MetadataResponseData$MetadataResponseTopic.addSize(MetadataResponseData.java:1247)
>   at 
> org.apache.kafka.common.message.MetadataResponseData.addSize(MetadataResponseData.java:417)
>   at 
> org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218)
>   at 
> org.apache.kafka.common.protocol.SendBuilder.buildResponseSend(SendBuilder.java:200)
>   at 
> org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:43)
>   at 
> org.apache.kafka.common.requests.RequestContext.buildResponseSend(RequestContext.java:111)
>   at 
> kafka.network.RequestChannel$Request.buildResponseSend(RequestChannel.scala:132)
>   at 
> kafka.server.RequestHandlerHelper.sendResponse(RequestHandlerHelper.scala:185)
>   at 
> kafka.server.RequestHandlerHelper.sendErrorOrCloseConnection(RequestHandlerHelper.scala:155)
>   at 
> kafka.server.RequestHandlerHelper.sendErrorResponseMaybeThrottle(RequestHandlerHelper.scala:109)
>   at 
> kafka.server.RequestHandlerHelper.handleError(RequestHandlerHelper.scala:79)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:229)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
>   at java.base/java.lang.Thread.run(Unknown Source)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #10564: MINOR: clean up some replication code

2021-04-21 Thread GitBox


junrao commented on a change in pull request #10564:
URL: https://github.com/apache/kafka/pull/10564#discussion_r617858509



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -172,47 +174,54 @@ String diff(PartitionControlInfo prev) {
 StringBuilder builder = new StringBuilder();
 String prefix = "";
 if (!Arrays.equals(replicas, prev.replicas)) {
-
builder.append(prefix).append("oldReplicas=").append(Arrays.toString(prev.replicas));
+builder.append(prefix).append("replicas: ").
+append(Arrays.toString(prev.replicas)).
+append(" -> ").append(Arrays.toString(replicas));
 prefix = ", ";
-
builder.append(prefix).append("newReplicas=").append(Arrays.toString(replicas));
 }
 if (!Arrays.equals(isr, prev.isr)) {
-
builder.append(prefix).append("oldIsr=").append(Arrays.toString(prev.isr));
+builder.append(prefix).append("isr: ").
+append(Arrays.toString(prev.isr)).
+append(" -> ").append(Arrays.toString(isr));
 prefix = ", ";
-
builder.append(prefix).append("newIsr=").append(Arrays.toString(isr));
 }
 if (!Arrays.equals(removingReplicas, prev.removingReplicas)) {
-builder.append(prefix).append("oldRemovingReplicas=").
-append(Arrays.toString(prev.removingReplicas));
+builder.append(prefix).append("removingReplicas: ").
+append(Arrays.toString(prev.removingReplicas)).
+append(" -> ").append(Arrays.toString(removingReplicas));
 prefix = ", ";
-builder.append(prefix).append("newRemovingReplicas=").
-append(Arrays.toString(removingReplicas));
 }
 if (!Arrays.equals(addingReplicas, prev.addingReplicas)) {
-builder.append(prefix).append("oldAddingReplicas=").
-append(Arrays.toString(prev.addingReplicas));
+builder.append(prefix).append("addingReplicas: ").
+append(Arrays.toString(prev.addingReplicas)).
+append(" -> ").append(Arrays.toString(addingReplicas));
 prefix = ", ";
-builder.append(prefix).append("newAddingReplicas=").
-append(Arrays.toString(addingReplicas));
 }
 if (leader != prev.leader) {
-
builder.append(prefix).append("oldLeader=").append(prev.leader);
+builder.append(prefix).append("leader: ").
+append(prev.leader).append(" -> ").append(leader);
 prefix = ", ";
-builder.append(prefix).append("newLeader=").append(leader);
 }
 if (leaderEpoch != prev.leaderEpoch) {
-
builder.append(prefix).append("oldLeaderEpoch=").append(prev.leaderEpoch);
+builder.append(prefix).append("leaderEpoch: ").
+append(prev.leaderEpoch).append(" -> 
").append(leaderEpoch);
 prefix = ", ";
-
builder.append(prefix).append("newLeaderEpoch=").append(leaderEpoch);
 }
 if (partitionEpoch != prev.partitionEpoch) {
-
builder.append(prefix).append("oldPartitionEpoch=").append(prev.partitionEpoch);
-prefix = ", ";
-
builder.append(prefix).append("newPartitionEpoch=").append(partitionEpoch);
+builder.append(prefix).append("partitionEpoch: ").
+append(prev.partitionEpoch).append(" -> 
").append(partitionEpoch);
 }
 return builder.toString();
 }
 
+void maybeLogPartitionChange(Logger log, String description, 
PartitionControlInfo prev) {
+if (!electionWasClean(leader, prev.isr)) {
+log.info("UNCLEAN partition change for {}: {}", description, 
diff(prev));
+} else if (log.isDebugEnabled()) {
+log.debug("partition change for {}: {}", description, 
diff(prev));

Review comment:
   We use to log all leader and isr changes in info even for clean leader 
election. 

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -321,22 +336,18 @@ public void replay(PartitionRecord record) {
 }
 PartitionControlInfo newPartInfo = new PartitionControlInfo(record);
 PartitionControlInfo prevPartInfo = 
topicInfo.parts.get(record.partitionId());
+String description = topicInfo.name + "-" + record.partitionId() +
+" with ID " + record.topicId();

Review comment:
   with ID => with topic ID to make it clear?

##
File path: 
metadata/src/main/java/org/apache/kafka/c

[GitHub] [kafka] junrao commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-04-21 Thread GitBox


junrao commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r617904827



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/AbstractApiMessageAndVersionSerde.java
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This class provides serialization/deserialization of {@code 
ApiMessageAndVersion}.
+ * 
+ * Implementors need to extend this class and implement {@link 
#apiMessageFor(short)} method to return a respective
+ * {@code ApiMessage} for the given {@code apiKey}. This is required to 
deserialize the bytes to build the respective
+ * {@code ApiMessage} instance.
+ */
+public abstract class AbstractApiMessageAndVersionSerde  {
+
+public byte[] serialize(ApiMessageAndVersion messageAndVersion) {
+ObjectSerializationCache cache = new ObjectSerializationCache();
+short version = messageAndVersion.version();
+ApiMessage message = messageAndVersion.message();
+
+// Add header containing apiKey and apiVersion,
+// headerSize is 1 byte for apiKey and 1 byte for apiVersion
+int headerSize = 1 + 1;
+int messageSize = message.size(cache, version);
+ByteBufferAccessor writable = new 
ByteBufferAccessor(ByteBuffer.allocate(headerSize + messageSize));
+
+// Write apiKey and version
+writable.writeUnsignedVarint(message.apiKey());

Review comment:
   In 
https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/metadata/MetadataRecordSerde.java#L44,
 the serialization adds a frame version. The intention is to use that as the 
header version. Currently, the only fields in the header are apikey and 
version. The frame version allows us to change that in the future. We need to 
think through whether we should adopt the same strategy here. If we do, perhaps 
we could just reuse MetadataRecordSerde somehow. Also, whether we should share 
the apiKey space with the raft messages.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12705) Task idling is not sufficiently tested

2021-04-21 Thread Walker Carlson (Jira)
Walker Carlson created KAFKA-12705:
--

 Summary: Task idling is not sufficiently tested
 Key: KAFKA-12705
 URL: https://issues.apache.org/jira/browse/KAFKA-12705
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Walker Carlson


The test for task idling are a bit sparse. When I changed it so that 
isProcessable always returns true only one test failed. That means the entire 
code path is hinging on one unit test 
(shouldBeProcessableIfAllPartitionsBuffered) that does not cover all branches 
of logic. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #10581: HOTFIX: kafka streams lib missing in dependencies.gradle

2021-04-21 Thread GitBox


mjsax commented on a change in pull request #10581:
URL: https://github.com/apache/kafka/pull/10581#discussion_r617935620



##
File path: gradle/dependencies.gradle
##
@@ -166,6 +166,7 @@ libs += [
   kafkaStreams_23: "org.apache.kafka:kafka-streams:$versions.kafka_23",
   kafkaStreams_24: "org.apache.kafka:kafka-streams:$versions.kafka_24",
   kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25",
+  kafkaStreams_26: "org.apache.kafka:kafka-streams:$versions.kafka_26",

Review comment:
   Ah. Yes. Could be removed. It's unused. But also does not matter to have 
an unused var defined...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10581: HOTFIX: kafka streams lib missing in dependencies.gradle

2021-04-21 Thread GitBox


mjsax commented on a change in pull request #10581:
URL: https://github.com/apache/kafka/pull/10581#discussion_r617936154



##
File path: gradle/dependencies.gradle
##
@@ -166,6 +166,7 @@ libs += [
   kafkaStreams_23: "org.apache.kafka:kafka-streams:$versions.kafka_23",
   kafkaStreams_24: "org.apache.kafka:kafka-streams:$versions.kafka_24",
   kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25",
+  kafkaStreams_26: "org.apache.kafka:kafka-streams:$versions.kafka_26",

Review comment:
   Seems this PR added if for no reason (but also no hard): 
https://github.com/apache/kafka/commit/9ebc71e3d7d1e361b3844d907c4ae935fcda1240




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10581: HOTFIX: kafka streams lib missing in dependencies.gradle

2021-04-21 Thread GitBox


mjsax commented on a change in pull request #10581:
URL: https://github.com/apache/kafka/pull/10581#discussion_r617936154



##
File path: gradle/dependencies.gradle
##
@@ -166,6 +166,7 @@ libs += [
   kafkaStreams_23: "org.apache.kafka:kafka-streams:$versions.kafka_23",
   kafkaStreams_24: "org.apache.kafka:kafka-streams:$versions.kafka_24",
   kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25",
+  kafkaStreams_26: "org.apache.kafka:kafka-streams:$versions.kafka_26",

Review comment:
   Seems this PR added if for no reason (but also no harm): 
https://github.com/apache/kafka/commit/9ebc71e3d7d1e361b3844d907c4ae935fcda1240




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12706) Consider adding reason and source of error in APPLICATION_SHUTDOWN

2021-04-21 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12706:
--

 Summary: Consider adding reason and source of error in 
APPLICATION_SHUTDOWN
 Key: KAFKA-12706
 URL: https://issues.apache.org/jira/browse/KAFKA-12706
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: A. Sophie Blee-Goldman


At the moment when a user opts to shut down the application in the streams 
uncaught exception handler, we just send a signal to all members of the group 
who then shut down. If there are a large number of application instances 
running it can be annoying and time consuming to locate the client that hit 
this error.

It would be nice if we could let each client log the exception that triggered 
this, and possibly also the client who requested the shutdown. That will make 
it much easier to identify the problem, and figure out which set of logs to 
look into for further information



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9772) Transactional offset commit fails with IllegalStateException

2021-04-21 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-9772:
--

Assignee: Jason Gustafson

> Transactional offset commit fails with IllegalStateException
> 
>
> Key: KAFKA-9772
> URL: https://issues.apache.org/jira/browse/KAFKA-9772
> Project: Kafka
>  Issue Type: Bug
>Reporter: Dhruvil Shah
>Assignee: Jason Gustafson
>Priority: Major
>
> {code:java}
> Trying to complete a transactional offset commit for producerId 7090 and 
> groupId application-id even though the offset commit record itself hasn't 
> been appended to the log.{code}
> {code:java}
> java.lang.IllegalStateException: Trying to complete a transactional offset 
> commit for producerId 7090 and groupId application-id even though the offset 
> commit record itself hasn't been appended to the log. at 
> kafka.coordinator.group.GroupMetadata.$anonfun$completePendingTxnOffsetCommit$2(GroupMetadata.scala:677)
>  at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) at 
> scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at 
> scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at 
> scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at 
> scala.collection.mutable.HashMap.foreach(HashMap.scala:149) at 
> kafka.coordinator.group.GroupMetadata.$anonfun$completePendingTxnOffsetCommit$1(GroupMetadata.scala:674)
>  at 
> kafka.coordinator.group.GroupMetadata.completePendingTxnOffsetCommit(GroupMetadata.scala:673)
>  at 
> kafka.coordinator.group.GroupMetadataManager.$anonfun$handleTxnCompletion$2(GroupMetadataManager.scala:874)
>  at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:228) at 
> kafka.coordinator.group.GroupMetadataManager.$anonfun$handleTxnCompletion$1(GroupMetadataManager.scala:873)
>  at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at 
> kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:870)
>  at 
> kafka.coordinator.group.GroupMetadataManager.$anonfun$scheduleHandleTxnCompletion$1(GroupMetadataManager.scala:865)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) 
> at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:834){code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >