[GitHub] [kafka] chia7712 commented on pull request #10567: Ensure `ignorable` is a boolean value.
chia7712 commented on pull request #10567: URL: https://github.com/apache/kafka/pull/10567#issuecomment-823837778 > Does it cause any failures? According to source code (https://github.com/FasterXML/jackson-databind/blob/2.10/src/main/java/com/fasterxml/jackson/databind/deser/std/NumberDeserializers.java#L242), it should be fine to use "true" (string type). However, I like consistent value (boolean type) :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12700) The admin.listeners config has wonky valid values in the docs
[ https://issues.apache.org/jira/browse/KAFKA-12700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-12700: - Assignee: Luke Chen > The admin.listeners config has wonky valid values in the docs > - > > Key: KAFKA-12700 > URL: https://issues.apache.org/jira/browse/KAFKA-12700 > Project: Kafka > Issue Type: Bug > Components: docs, KafkaConnect >Reporter: A. Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Major > > Noticed this while updating the docs for the 2.6.2 release, the docs for > these configs are generated from the config definition, including info such > as default, type, valid values, etc. > When defining WorkerConfig.ADMIN_LISTENERS_CONFIG we seem to pass an actual > `new AdminListenersValidator()` object in for the "valid values" parameter, > causing this field to display some wonky useless object reference in the > docs. See > https://kafka.apache.org/documentation/#connectconfigs_admin.listeners: > Valid Values: > org.apache.kafka.connect.runtime.WorkerConfig$AdminListenersValidator@383534aa -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #10446: KAFKA-12661 ConfigEntry#equal does not compare other fields when value is NOT null
chia7712 commented on pull request #10446: URL: https://github.com/apache/kafka/pull/10446#issuecomment-823838830 ``` Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplication() ``` unrelated error. merge trunk to trigger QA again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12701) NPE in MetadataRequest when using topic IDs
[ https://issues.apache.org/jira/browse/KAFKA-12701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326316#comment-17326316 ] Travis Bischel commented on KAFKA-12701: Metadata v11 is supported in 2.8.0. I've added support for specifying topic IDs in my {{kcl}} command line client [here|https://github.com/twmb/kcl/commit/a09f6f8cca4f87b878d943a2bed4ef8ed2e10a7e] (you'd need to build off of master if you want to test this yourself). {noformat} [01:20:51] twmb@h4x3r:~ $ kcl admin topic create foo NAME IDMESSAGE foo 6a62c01129a341c8a0231f7b3c21bc9b OK [01:20:57] twmb@h4x3r:~ $ kcl metadata -t --ids 6a62c01129a341c8a0231f7b3c21bc9b ^C [01:21:11] twmb@h4x3r:~ $ kcl metadata -t foo NAME IDPARTITIONS REPLICAS foo 6a62c01129a341c8a0231f7b3c21bc9b 20 1 [01:21:16] twmb@h4x3r:~ $ kcl admin topic delete --ids 6a62c01129a341c8a0231f7b3c21bc9b foo OK {noformat} What this is doing is issuing a metadata request that has a single topic where the topic name is null and the topic ID is the hex-decoded ID that is passed in. > NPE in MetadataRequest when using topic IDs > --- > > Key: KAFKA-12701 > URL: https://issues.apache.org/jira/browse/KAFKA-12701 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Travis Bischel >Assignee: dengziming >Priority: Major > > Authorized result checking relies on topic name to not be null, which, when > using topic IDs, it is. > Unlike the logic in handleDeleteTopicsRequest, hanelMetadataRequest does not > check zk for the names corresponding to topic IDs if topic IDs are present. > {noformat} > [2021-04-21 05:53:01,463] ERROR [KafkaApi-1] Error when handling request: > clientId=kgo, correlationId=1, api=METADATA, version=11, > body=MetadataRequestData(topics=[MetadataRequestTopic(topicId=LmqOoFOASnqQp_4-oJgeKA, > name=null)], allowAutoTopicCreation=false, > includeClusterAuthorizedOperations=false, > includeTopicAuthorizedOperations=false) (kafka.server.RequestHandlerHelper) > java.lang.NullPointerException: name > at java.base/java.util.Objects.requireNonNull(Unknown Source) > at > org.apache.kafka.common.resource.ResourcePattern.(ResourcePattern.java:50) > at > kafka.server.AuthHelper.$anonfun$filterByAuthorized$3(AuthHelper.scala:121) > at scala.collection.Iterator$$anon$9.next(Iterator.scala:575) > at scala.collection.mutable.Growable.addAll(Growable.scala:62) > at scala.collection.mutable.Growable.addAll$(Growable.scala:57) > at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:142) > at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:42) > at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:258) > at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:247) > at scala.collection.SeqFactory$Delegate.from(Factory.scala:306) > at scala.collection.IterableOnceOps.toBuffer(IterableOnce.scala:1270) > at scala.collection.IterableOnceOps.toBuffer$(IterableOnce.scala:1270) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1288) > at kafka.server.AuthHelper.filterByAuthorized(AuthHelper.scala:120) > at > kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:1146) > at kafka.server.KafkaApis.handle(KafkaApis.scala:170) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74) > at java.base/java.lang.Thread.run(Unknown Source) > [2021-04-21 05:53:01,464] ERROR [Kafka Request Handler 1 on Broker 1], > Exception when handling request (kafka.server.KafkaRequestHandler) > java.lang.NullPointerException > at > org.apache.kafka.common.message.MetadataResponseData$MetadataResponseTopic.addSize(MetadataResponseData.java:1247) > at > org.apache.kafka.common.message.MetadataResponseData.addSize(MetadataResponseData.java:417) > at > org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218) > at > org.apache.kafka.common.protocol.SendBuilder.buildResponseSend(SendBuilder.java:200) > at > org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:43) > at > org.apache.kafka.common.requests.RequestContext.buildResponseSend(RequestContext.java:111) > at > kafka.network.RequestChannel$Request.buildResponseSend(RequestChannel.scala:132) > at > kafka.server.RequestHandlerHelper.sendResponse(RequestHandlerHelper.scala:185) > at > kafka.server.RequestHandlerHelper.sendErrorOrCloseConnection(RequestHandlerHelper.scala:155) > at > kafka.server.RequestHandlerHelper.sendErrorResponseMaybeThrottle(RequestHandlerHelper.scala:109) > at > kafka.server.RequestHandlerHelper.h
[GitHub] [kafka] showuon opened a new pull request #10574: KAFKA-12700: override toString method to show correct value in doc
showuon opened a new pull request #10574: URL: https://github.com/apache/kafka/pull/10574 We use customized validator here, but forgot to override the `toString` method to show correct value in doc. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10574: KAFKA-12700: override toString method to show correct value in doc
showuon commented on pull request #10574: URL: https://github.com/apache/kafka/pull/10574#issuecomment-823848128 @rhauch @wicknicks , could you help review this PR to update the doc? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12700) The admin.listeners config has wonky valid values in the docs
[ https://issues.apache.org/jira/browse/KAFKA-12700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326325#comment-17326325 ] Luke Chen commented on KAFKA-12700: --- Nice catch!! > The admin.listeners config has wonky valid values in the docs > - > > Key: KAFKA-12700 > URL: https://issues.apache.org/jira/browse/KAFKA-12700 > Project: Kafka > Issue Type: Bug > Components: docs, KafkaConnect >Reporter: A. Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Major > > Noticed this while updating the docs for the 2.6.2 release, the docs for > these configs are generated from the config definition, including info such > as default, type, valid values, etc. > When defining WorkerConfig.ADMIN_LISTENERS_CONFIG we seem to pass an actual > `new AdminListenersValidator()` object in for the "valid values" parameter, > causing this field to display some wonky useless object reference in the > docs. See > https://kafka.apache.org/documentation/#connectconfigs_admin.listeners: > Valid Values: > org.apache.kafka.connect.runtime.WorkerConfig$AdminListenersValidator@383534aa -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on pull request #10567: Ensure `ignorable` is a boolean value.
dajac commented on pull request #10567: URL: https://github.com/apache/kafka/pull/10567#issuecomment-823855071 > > Does it cause any failures? > > According to source code (https://github.com/FasterXML/jackson-databind/blob/2.10/src/main/java/com/fasterxml/jackson/databind/deser/std/NumberDeserializers.java#L242), it should be fine to use "true" (string type). However, I like consistent value (boolean type) :) Sounds good, thanks for the clarification. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12702) Unhandled exception caught in InterBrokerSendThread
Wenbing Shen created KAFKA-12702: Summary: Unhandled exception caught in InterBrokerSendThread Key: KAFKA-12702 URL: https://issues.apache.org/jira/browse/KAFKA-12702 Project: Kafka Issue Type: Bug Affects Versions: 2.8.0 Reporter: Wenbing Shen Attachments: afterFixing.png, beforeFixing.png In kraft mode, if listeners and advertised.listeners are not configured with host addresses, the host parameter value of Listener in BrokerRegistrationRequestData will be null. When the broker is started, a null pointer exception will be thrown, causing startup failure. A feasible solution is to replace the empty host of endPoint in advertisedListeners with InetAddress.getLocalHost.getCanonicalHostName in Broker Server when building networkListeners. The following is the debug log: before fixing: [2021-04-21 14:15:20,032] DEBUG (broker-2-to-controller-send-thread org.apache.kafka.clients.NetworkClient 522) [broker-2-to-controller] Sending BROKER_REGISTRATION request with header RequestHeader(apiKey=BROKER_REGIS TRATION, apiVersion=0, clientId=2, correlationId=6) and timeout 3 to node 2: BrokerRegistrationRequestData(brokerId=2, clusterId='nCqve6D1TEef3NpQniA0Mg', incarnationId=X8w4_1DFT2yUjOm6asPjIQ, listeners=[Listener(n ame='PLAINTEXT', {color:#FF}host=null,{color} port=9092, securityProtocol=0)], features=[], rack=null) [2021-04-21 14:15:20,033] ERROR (broker-2-to-controller-send-thread kafka.server.BrokerToControllerRequestThread 76) [broker-2-to-controller-send-thread]: unhandled exception caught in InterBrokerSendThread java.lang.NullPointerException at org.apache.kafka.common.message.BrokerRegistrationRequestData$Listener.addSize(BrokerRegistrationRequestData.java:515) at org.apache.kafka.common.message.BrokerRegistrationRequestData.addSize(BrokerRegistrationRequestData.java:216) at org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218) at org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187) at org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101) at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:525) at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:501) at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:461) at kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1(InterBrokerSendThread.scala:104) at kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1$adapted(InterBrokerSendThread.scala:99) at kafka.common.InterBrokerSendThread$$Lambda$259/910445654.apply(Unknown Source) at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) at scala.collection.AbstractIterable.foreach(Iterable.scala:919) at kafka.common.InterBrokerSendThread.sendRequests(InterBrokerSendThread.scala:99) at kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:73) at kafka.server.BrokerToControllerRequestThread.doWork(BrokerToControllerChannelManager.scala:368) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) [2021-04-21 14:15:20,034] INFO (broker-2-to-controller-send-thread kafka.server.BrokerToControllerRequestThread 66) [broker-2-to-controller-send-thread]: Stopped after fixing: [2021-04-21 15:05:01,095] DEBUG (BrokerToControllerChannelManager broker=2 name=heartbeat org.apache.kafka.clients.NetworkClient 512) [BrokerToControllerChannelManager broker=2 name=heartbeat] Sending BROKER_REGISTRATI ON request with header RequestHeader(apiKey=BROKER_REGISTRATION, apiVersion=0, clientId=2, correlationId=0) and timeout 3 to node 2: BrokerRegistrationRequestData(brokerId=2, clusterId='nCqve6D1TEef3NpQniA0Mg', inc arnationId=xF29h_IRR1KzrERWwssQ2w, listeners=[Listener(name='PLAINTEXT', host='hdpxxx.cn', port=9092, securityProtocol=0)], features=[], rack=null) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on a change in pull request #10564: MINOR: clean up some replication code
chia7712 commented on a change in pull request #10564: URL: https://github.com/apache/kafka/pull/10564#discussion_r617297824 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -321,22 +339,17 @@ public void replay(PartitionRecord record) { } PartitionControlInfo newPartInfo = new PartitionControlInfo(record); PartitionControlInfo prevPartInfo = topicInfo.parts.get(record.partitionId()); +String topicPart = topicInfo.name + "-" + record.partitionId(); if (prevPartInfo == null) { -log.info("Created partition {}:{} with {}.", record.topicId(), -record.partitionId(), newPartInfo.toString()); +log.info("Created partition {} with {}.", topicPart, newPartInfo.toString()); Review comment: Is it intentional to use "name" to replace "id"? ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -1119,6 +1053,62 @@ void validateManualPartitionAssignment(List assignment, } } +void generateLeaderAndIsrUpdates(String context, + int brokerToRemoveFromIsr, + List records, + Iterator iterator) { +int oldSize = records.size(); +while (iterator.hasNext()) { +TopicIdPartition topicIdPart = iterator.next(); +TopicControlInfo topic = topics.get(topicIdPart.topicId()); +if (topic == null) { +throw new RuntimeException("Topic ID " + topicIdPart.topicId() + " existed in " + +"isrMembers, but not in the topics map."); +} +PartitionControlInfo partition = topic.parts.get(topicIdPart.partitionId()); +if (partition == null) { +throw new RuntimeException("Partition " + topicIdPart + +" existed in isrMembers, but not in the partitions map."); +} +int[] newIsr = Replicas.copyWithout(partition.isr, brokerToRemoveFromIsr); +int newLeader = Replicas.contains(newIsr, partition.leader) ? partition.leader : +bestLeader(partition.replicas, newIsr, false); +boolean unclean = newLeader != NO_LEADER && !Replicas.contains(newIsr, newLeader); Review comment: Could we reuse `wasCleanlyDerivedFrom` to get "unclean"? ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -907,13 +843,13 @@ ApiError electLeader(String topic, int partitionId, boolean unclean, } PartitionChangeRecord record = new PartitionChangeRecord(). setPartitionId(partitionId). -setTopicId(topicId); -if (unclean && !Replicas.contains(partitionInfo.isr, newLeader)) { -// If the election was unclean, we may have to forcibly add the replica to -// the ISR. This can result in data loss! +setTopicId(topicId). +setLeader(newLeader); +if (!Replicas.contains(partitionInfo.isr, newLeader)) { +// If the election was unclean, we have to forcibly set the ISR to just the +// new leader. This can result in data loss! Review comment: redundant "space" ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -1119,6 +1053,62 @@ void validateManualPartitionAssignment(List assignment, } } +void generateLeaderAndIsrUpdates(String context, + int brokerToRemoveFromIsr, + List records, + Iterator iterator) { +int oldSize = records.size(); +while (iterator.hasNext()) { +TopicIdPartition topicIdPart = iterator.next(); +TopicControlInfo topic = topics.get(topicIdPart.topicId()); +if (topic == null) { +throw new RuntimeException("Topic ID " + topicIdPart.topicId() + " existed in " + +"isrMembers, but not in the topics map."); +} +PartitionControlInfo partition = topic.parts.get(topicIdPart.partitionId()); +if (partition == null) { +throw new RuntimeException("Partition " + topicIdPart + +" existed in isrMembers, but not in the partitions map."); +} +int[] newIsr = Replicas.copyWithout(partition.isr, brokerToRemoveFromIsr); +int newLeader = Replicas.contains(newIsr, partition.leader) ? partition.leader : +bestLeader(partition.replicas, newIsr, false); +boolean unclean = newLeader != NO_LEADER && !Replicas.contains(newIsr, newLeader); +if (unclean) { +// After an unclean leader ele
[jira] [Updated] (KAFKA-12702) Unhandled exception caught in InterBrokerSendThread
[ https://issues.apache.org/jira/browse/KAFKA-12702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenbing Shen updated KAFKA-12702: - Attachment: image-2021-04-21-17-12-28-471.png > Unhandled exception caught in InterBrokerSendThread > --- > > Key: KAFKA-12702 > URL: https://issues.apache.org/jira/browse/KAFKA-12702 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Wenbing Shen >Priority: Blocker > Attachments: afterFixing.png, beforeFixing.png, > image-2021-04-21-17-12-28-471.png > > > In kraft mode, if listeners and advertised.listeners are not configured with > host addresses, the host parameter value of Listener in > BrokerRegistrationRequestData will be null. When the broker is started, a > null pointer exception will be thrown, causing startup failure. > A feasible solution is to replace the empty host of endPoint in > advertisedListeners with InetAddress.getLocalHost.getCanonicalHostName in > Broker Server when building networkListeners. > The following is the debug log: > before fixing: > [2021-04-21 14:15:20,032] DEBUG (broker-2-to-controller-send-thread > org.apache.kafka.clients.NetworkClient 522) [broker-2-to-controller] Sending > BROKER_REGISTRATION request with header RequestHeader(apiKey=BROKER_REGIS > TRATION, apiVersion=0, clientId=2, correlationId=6) and timeout 3 to node > 2: BrokerRegistrationRequestData(brokerId=2, > clusterId='nCqve6D1TEef3NpQniA0Mg', incarnationId=X8w4_1DFT2yUjOm6asPjIQ, > listeners=[Listener(n > ame='PLAINTEXT', {color:#FF}host=null,{color} port=9092, > securityProtocol=0)], features=[], rack=null) > [2021-04-21 14:15:20,033] ERROR (broker-2-to-controller-send-thread > kafka.server.BrokerToControllerRequestThread 76) > [broker-2-to-controller-send-thread]: unhandled exception caught in > InterBrokerSendThread > java.lang.NullPointerException > at > org.apache.kafka.common.message.BrokerRegistrationRequestData$Listener.addSize(BrokerRegistrationRequestData.java:515) > at > org.apache.kafka.common.message.BrokerRegistrationRequestData.addSize(BrokerRegistrationRequestData.java:216) > at > org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218) > at > org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187) > at > org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101) > at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:525) > at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:501) > at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:461) > at > kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1(InterBrokerSendThread.scala:104) > at > kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1$adapted(InterBrokerSendThread.scala:99) > at kafka.common.InterBrokerSendThread$$Lambda$259/910445654.apply(Unknown > Source) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) > at scala.collection.AbstractIterable.foreach(Iterable.scala:919) > at > kafka.common.InterBrokerSendThread.sendRequests(InterBrokerSendThread.scala:99) > at > kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:73) > at > kafka.server.BrokerToControllerRequestThread.doWork(BrokerToControllerChannelManager.scala:368) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) > [2021-04-21 14:15:20,034] INFO (broker-2-to-controller-send-thread > kafka.server.BrokerToControllerRequestThread 66) > [broker-2-to-controller-send-thread]: Stopped > after fixing: > [2021-04-21 15:05:01,095] DEBUG (BrokerToControllerChannelManager broker=2 > name=heartbeat org.apache.kafka.clients.NetworkClient 512) > [BrokerToControllerChannelManager broker=2 name=heartbeat] Sending > BROKER_REGISTRATI > ON request with header RequestHeader(apiKey=BROKER_REGISTRATION, > apiVersion=0, clientId=2, correlationId=0) and timeout 3 to node 2: > BrokerRegistrationRequestData(brokerId=2, clusterId='nCqve6D1TEef3NpQniA0Mg', > inc > arnationId=xF29h_IRR1KzrERWwssQ2w, listeners=[Listener(name='PLAINTEXT', > host='hdpxxx.cn', port=9092, securityProtocol=0)], features=[], rack=null) > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12702) Unhandled exception caught in InterBrokerSendThread
[ https://issues.apache.org/jira/browse/KAFKA-12702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326387#comment-17326387 ] Wenbing Shen commented on KAFKA-12702: -- We need to fix this problem, because according to the comments in the configuration file (config/kraft/server.properties), if listeners and advertised.listeners are not configured with an address, the program will automatically obtain java.net.InetAddress.getCanonicalHostName(), but this will actually cause the service to fail to start. !image-2021-04-21-17-12-28-471.png! > Unhandled exception caught in InterBrokerSendThread > --- > > Key: KAFKA-12702 > URL: https://issues.apache.org/jira/browse/KAFKA-12702 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Wenbing Shen >Priority: Blocker > Attachments: afterFixing.png, beforeFixing.png, > image-2021-04-21-17-12-28-471.png > > > In kraft mode, if listeners and advertised.listeners are not configured with > host addresses, the host parameter value of Listener in > BrokerRegistrationRequestData will be null. When the broker is started, a > null pointer exception will be thrown, causing startup failure. > A feasible solution is to replace the empty host of endPoint in > advertisedListeners with InetAddress.getLocalHost.getCanonicalHostName in > Broker Server when building networkListeners. > The following is the debug log: > before fixing: > [2021-04-21 14:15:20,032] DEBUG (broker-2-to-controller-send-thread > org.apache.kafka.clients.NetworkClient 522) [broker-2-to-controller] Sending > BROKER_REGISTRATION request with header RequestHeader(apiKey=BROKER_REGIS > TRATION, apiVersion=0, clientId=2, correlationId=6) and timeout 3 to node > 2: BrokerRegistrationRequestData(brokerId=2, > clusterId='nCqve6D1TEef3NpQniA0Mg', incarnationId=X8w4_1DFT2yUjOm6asPjIQ, > listeners=[Listener(n > ame='PLAINTEXT', {color:#FF}host=null,{color} port=9092, > securityProtocol=0)], features=[], rack=null) > [2021-04-21 14:15:20,033] ERROR (broker-2-to-controller-send-thread > kafka.server.BrokerToControllerRequestThread 76) > [broker-2-to-controller-send-thread]: unhandled exception caught in > InterBrokerSendThread > java.lang.NullPointerException > at > org.apache.kafka.common.message.BrokerRegistrationRequestData$Listener.addSize(BrokerRegistrationRequestData.java:515) > at > org.apache.kafka.common.message.BrokerRegistrationRequestData.addSize(BrokerRegistrationRequestData.java:216) > at > org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218) > at > org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187) > at > org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101) > at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:525) > at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:501) > at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:461) > at > kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1(InterBrokerSendThread.scala:104) > at > kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1$adapted(InterBrokerSendThread.scala:99) > at kafka.common.InterBrokerSendThread$$Lambda$259/910445654.apply(Unknown > Source) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) > at scala.collection.AbstractIterable.foreach(Iterable.scala:919) > at > kafka.common.InterBrokerSendThread.sendRequests(InterBrokerSendThread.scala:99) > at > kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:73) > at > kafka.server.BrokerToControllerRequestThread.doWork(BrokerToControllerChannelManager.scala:368) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) > [2021-04-21 14:15:20,034] INFO (broker-2-to-controller-send-thread > kafka.server.BrokerToControllerRequestThread 66) > [broker-2-to-controller-send-thread]: Stopped > after fixing: > [2021-04-21 15:05:01,095] DEBUG (BrokerToControllerChannelManager broker=2 > name=heartbeat org.apache.kafka.clients.NetworkClient 512) > [BrokerToControllerChannelManager broker=2 name=heartbeat] Sending > BROKER_REGISTRATI > ON request with header RequestHeader(apiKey=BROKER_REGISTRATION, > apiVersion=0, clientId=2, correlationId=0) and timeout 3 to node 2: > BrokerRegistrationRequestData(brokerId=2, clusterId='nCqve6D1TEef3NpQniA0Mg', > inc > arnationId=xF29h_IRR1KzrERWwssQ2w, listeners=[Listener(name='PLAINTEXT', > host='hdpxxx.cn', port=9092, securityProtocol=0)], features=[], rack=null) > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12701) NPE in MetadataRequest when using topic IDs
[ https://issues.apache.org/jira/browse/KAFKA-12701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326392#comment-17326392 ] dengziming commented on KAFKA-12701: In fact, we first planned to support topicId in MetadataRequest and MetadataResponse, and we added 2 tickets KAFKA-10547 and KAFKA-10774. though KAFKA-10547 add topicId in MetadataResponse and MetadataRequest, describe topic using topicId is supported by KAFKA-10774. sadly KAFKA-10547 PR was merged but KAFKA-10774 PR was held off, for more details: [https://github.com/apache/kafka/pull/9769#issuecomment-772830472] I think this is a bug so we should fix this for 2.8.0, I will submit a PR target at 2.8.0 to remove TopicID from MetadataRequest since it isn't supported. > NPE in MetadataRequest when using topic IDs > --- > > Key: KAFKA-12701 > URL: https://issues.apache.org/jira/browse/KAFKA-12701 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Travis Bischel >Assignee: dengziming >Priority: Major > > Authorized result checking relies on topic name to not be null, which, when > using topic IDs, it is. > Unlike the logic in handleDeleteTopicsRequest, hanelMetadataRequest does not > check zk for the names corresponding to topic IDs if topic IDs are present. > {noformat} > [2021-04-21 05:53:01,463] ERROR [KafkaApi-1] Error when handling request: > clientId=kgo, correlationId=1, api=METADATA, version=11, > body=MetadataRequestData(topics=[MetadataRequestTopic(topicId=LmqOoFOASnqQp_4-oJgeKA, > name=null)], allowAutoTopicCreation=false, > includeClusterAuthorizedOperations=false, > includeTopicAuthorizedOperations=false) (kafka.server.RequestHandlerHelper) > java.lang.NullPointerException: name > at java.base/java.util.Objects.requireNonNull(Unknown Source) > at > org.apache.kafka.common.resource.ResourcePattern.(ResourcePattern.java:50) > at > kafka.server.AuthHelper.$anonfun$filterByAuthorized$3(AuthHelper.scala:121) > at scala.collection.Iterator$$anon$9.next(Iterator.scala:575) > at scala.collection.mutable.Growable.addAll(Growable.scala:62) > at scala.collection.mutable.Growable.addAll$(Growable.scala:57) > at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:142) > at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:42) > at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:258) > at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:247) > at scala.collection.SeqFactory$Delegate.from(Factory.scala:306) > at scala.collection.IterableOnceOps.toBuffer(IterableOnce.scala:1270) > at scala.collection.IterableOnceOps.toBuffer$(IterableOnce.scala:1270) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1288) > at kafka.server.AuthHelper.filterByAuthorized(AuthHelper.scala:120) > at > kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:1146) > at kafka.server.KafkaApis.handle(KafkaApis.scala:170) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74) > at java.base/java.lang.Thread.run(Unknown Source) > [2021-04-21 05:53:01,464] ERROR [Kafka Request Handler 1 on Broker 1], > Exception when handling request (kafka.server.KafkaRequestHandler) > java.lang.NullPointerException > at > org.apache.kafka.common.message.MetadataResponseData$MetadataResponseTopic.addSize(MetadataResponseData.java:1247) > at > org.apache.kafka.common.message.MetadataResponseData.addSize(MetadataResponseData.java:417) > at > org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218) > at > org.apache.kafka.common.protocol.SendBuilder.buildResponseSend(SendBuilder.java:200) > at > org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:43) > at > org.apache.kafka.common.requests.RequestContext.buildResponseSend(RequestContext.java:111) > at > kafka.network.RequestChannel$Request.buildResponseSend(RequestChannel.scala:132) > at > kafka.server.RequestHandlerHelper.sendResponse(RequestHandlerHelper.scala:185) > at > kafka.server.RequestHandlerHelper.sendErrorOrCloseConnection(RequestHandlerHelper.scala:155) > at > kafka.server.RequestHandlerHelper.sendErrorResponseMaybeThrottle(RequestHandlerHelper.scala:109) > at > kafka.server.RequestHandlerHelper.handleError(RequestHandlerHelper.scala:79) > at kafka.server.KafkaApis.handle(KafkaApis.scala:229) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74) > at java.base/java.lang.Thread.run(Unknown Source) > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wenbingshen opened a new pull request #10575: KAFKA-12702: Fix NPE in networkListeners from BrokerServer
wenbingshen opened a new pull request #10575: URL: https://github.com/apache/kafka/pull/10575 According to the comments in the configuration file (config/kraft/server.properties), if listeners and advertised.listeners are not configured with an address, the program will automatically obtain java.net.InetAddress.getCanonicalHostName(), but this will actually cause the service to fail to start. Because the host parameter value of Listener in BrokerRegistrationRequestData will be null. For more information stack, can link to this jiraId to view: [KAFKA-12702](https://issues.apache.org/jira/browse/KAFKA-12702) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #10567: Ensure `ignorable` is a boolean value.
chia7712 commented on pull request #10567: URL: https://github.com/apache/kafka/pull/10567#issuecomment-823919500 ``` Build / JDK 11 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions() org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplicationWithEmptyPartition() Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplication() ``` unrelated error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12680) Failed to restart the broker in kraft mode
[ https://issues.apache.org/jira/browse/KAFKA-12680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenbing Shen resolved KAFKA-12680. -- Resolution: Cannot Reproduce > Failed to restart the broker in kraft mode > -- > > Key: KAFKA-12680 > URL: https://issues.apache.org/jira/browse/KAFKA-12680 > Project: Kafka > Issue Type: Bug >Reporter: Wenbing Shen >Priority: Major > > I tested kraft mode for the first time today, I deployed a single node kraft > mode broker according to the documentation: > [https://github.com/apache/kafka/blob/6d1d68617ecd023b787f54aafc24a4232663428d/config/kraft/README.md] > > first step: ./bin/kafka-storage.sh random-uuid > Second step: Use the uuid generated above to execute the following commands: > ./bin/kafka-storage.sh format -t -c ./config/kraft/server.properties > > third step: ./bin/kafka-server-start.sh ./config/kraft/server.properties > > Then I created two topics with two partitions and a single replica. > ./bin/kafka-topics.sh --create --topic test-01 --partitions 2 > --replication-factor 1 --bootstrap-server localhost:9092 > Verify that there is no problem with production and consumption, but when I > call kafka-server-stop.sh, when I call the start command again, the broker > starts to report an error. > I am not sure if it is a known bug or a problem with my usage > > [2021-04-18 00:19:37,443] ERROR Exiting Kafka due to fatal exception > (kafka.Kafka$) > java.io.IOException: Invalid argument > at java.io.RandomAccessFile.setLength(Native Method) > at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:189) > at kafka.log.AbstractIndex.resize(AbstractIndex.scala:175) > at > kafka.log.AbstractIndex.$anonfun$trimToValidSize$1(AbstractIndex.scala:241) > at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:241) > at kafka.log.LogSegment.recover(LogSegment.scala:385) > at kafka.log.Log.recoverSegment(Log.scala:741) > at kafka.log.Log.recoverLog(Log.scala:894) > at kafka.log.Log.$anonfun$loadSegments$2(Log.scala:816) > at kafka.log.Log$$Lambda$153/391630194.apply$mcJ$sp(Unknown Source) > at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17) > at kafka.log.Log.retryOnOffsetOverflow(Log.scala:2456) > at kafka.log.Log.loadSegments(Log.scala:816) > at kafka.log.Log.(Log.scala:326) > at kafka.log.Log$.apply(Log.scala:2593) > at kafka.raft.KafkaMetadataLog$.apply(KafkaMetadataLog.scala:358) > at kafka.raft.KafkaRaftManager.buildMetadataLog(RaftManager.scala:253) > at kafka.raft.KafkaRaftManager.(RaftManager.scala:127) > at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:74) > at kafka.Kafka$.buildServer(Kafka.scala:79) > at kafka.Kafka$.main(Kafka.scala:87) > at kafka.Kafka.main(Kafka.scala) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on a change in pull request #10575: KAFKA-12702: Fix NPE in networkListeners from BrokerServer
chia7712 commented on a change in pull request #10575: URL: https://github.com/apache/kafka/pull/10575#discussion_r617365981 ## File path: core/src/main/scala/kafka/server/BrokerServer.scala ## @@ -272,7 +273,7 @@ class BrokerServer( val networkListeners = new ListenerCollection() config.advertisedListeners.foreach { ep => networkListeners.add(new Listener(). - setHost(ep.host). + setHost(if (ep.host == null || ep.host.trim.isEmpty) InetAddress.getLocalHost.getCanonicalHostName else ep.host). Review comment: Could you use `Utils.isBlank(ep.host)`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wenbingshen commented on a change in pull request #10575: KAFKA-12702: Fix NPE in networkListeners from BrokerServer
wenbingshen commented on a change in pull request #10575: URL: https://github.com/apache/kafka/pull/10575#discussion_r617379329 ## File path: core/src/main/scala/kafka/server/BrokerServer.scala ## @@ -272,7 +273,7 @@ class BrokerServer( val networkListeners = new ListenerCollection() config.advertisedListeners.foreach { ep => networkListeners.add(new Listener(). - setHost(ep.host). + setHost(if (ep.host == null || ep.host.trim.isEmpty) InetAddress.getLocalHost.getCanonicalHostName else ep.host). Review comment: Good idea.Thank you for your guidance and submit your feedback. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming opened a new pull request #10576: KAFKA-12701: Remove topicId from MetadataReq since it was not supported in 2.8.0
dengziming opened a new pull request #10576: URL: https://github.com/apache/kafka/pull/10576 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* #9622 add topicId in MetadataResponse and MetadataRequest, but describe topic using topicId is supported by #9769 sadly #9622 PR was merged but #9769 PR was held off, for more details: https://github.com/apache/kafka/pull/9769#issuecomment-772830472 I think this is a bug so we should fix this for 2.8.0. This PR should be cherry-picked to 2.8.0. *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12702) Unhandled exception caught in InterBrokerSendThread
[ https://issues.apache.org/jira/browse/KAFKA-12702?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-12702: -- Assignee: Wenbing Shen > Unhandled exception caught in InterBrokerSendThread > --- > > Key: KAFKA-12702 > URL: https://issues.apache.org/jira/browse/KAFKA-12702 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Wenbing Shen >Assignee: Wenbing Shen >Priority: Blocker > Attachments: afterFixing.png, beforeFixing.png, > image-2021-04-21-17-12-28-471.png > > > In kraft mode, if listeners and advertised.listeners are not configured with > host addresses, the host parameter value of Listener in > BrokerRegistrationRequestData will be null. When the broker is started, a > null pointer exception will be thrown, causing startup failure. > A feasible solution is to replace the empty host of endPoint in > advertisedListeners with InetAddress.getLocalHost.getCanonicalHostName in > Broker Server when building networkListeners. > The following is the debug log: > before fixing: > [2021-04-21 14:15:20,032] DEBUG (broker-2-to-controller-send-thread > org.apache.kafka.clients.NetworkClient 522) [broker-2-to-controller] Sending > BROKER_REGISTRATION request with header RequestHeader(apiKey=BROKER_REGIS > TRATION, apiVersion=0, clientId=2, correlationId=6) and timeout 3 to node > 2: BrokerRegistrationRequestData(brokerId=2, > clusterId='nCqve6D1TEef3NpQniA0Mg', incarnationId=X8w4_1DFT2yUjOm6asPjIQ, > listeners=[Listener(n > ame='PLAINTEXT', {color:#FF}host=null,{color} port=9092, > securityProtocol=0)], features=[], rack=null) > [2021-04-21 14:15:20,033] ERROR (broker-2-to-controller-send-thread > kafka.server.BrokerToControllerRequestThread 76) > [broker-2-to-controller-send-thread]: unhandled exception caught in > InterBrokerSendThread > java.lang.NullPointerException > at > org.apache.kafka.common.message.BrokerRegistrationRequestData$Listener.addSize(BrokerRegistrationRequestData.java:515) > at > org.apache.kafka.common.message.BrokerRegistrationRequestData.addSize(BrokerRegistrationRequestData.java:216) > at > org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218) > at > org.apache.kafka.common.protocol.SendBuilder.buildRequestSend(SendBuilder.java:187) > at > org.apache.kafka.common.requests.AbstractRequest.toSend(AbstractRequest.java:101) > at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:525) > at org.apache.kafka.clients.NetworkClient.doSend(NetworkClient.java:501) > at org.apache.kafka.clients.NetworkClient.send(NetworkClient.java:461) > at > kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1(InterBrokerSendThread.scala:104) > at > kafka.common.InterBrokerSendThread.$anonfun$sendRequests$1$adapted(InterBrokerSendThread.scala:99) > at kafka.common.InterBrokerSendThread$$Lambda$259/910445654.apply(Unknown > Source) > at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) > at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) > at scala.collection.AbstractIterable.foreach(Iterable.scala:919) > at > kafka.common.InterBrokerSendThread.sendRequests(InterBrokerSendThread.scala:99) > at > kafka.common.InterBrokerSendThread.pollOnce(InterBrokerSendThread.scala:73) > at > kafka.server.BrokerToControllerRequestThread.doWork(BrokerToControllerChannelManager.scala:368) > at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) > [2021-04-21 14:15:20,034] INFO (broker-2-to-controller-send-thread > kafka.server.BrokerToControllerRequestThread 66) > [broker-2-to-controller-send-thread]: Stopped > after fixing: > [2021-04-21 15:05:01,095] DEBUG (BrokerToControllerChannelManager broker=2 > name=heartbeat org.apache.kafka.clients.NetworkClient 512) > [BrokerToControllerChannelManager broker=2 name=heartbeat] Sending > BROKER_REGISTRATI > ON request with header RequestHeader(apiKey=BROKER_REGISTRATION, > apiVersion=0, clientId=2, correlationId=0) and timeout 3 to node 2: > BrokerRegistrationRequestData(brokerId=2, clusterId='nCqve6D1TEef3NpQniA0Mg', > inc > arnationId=xF29h_IRR1KzrERWwssQ2w, listeners=[Listener(name='PLAINTEXT', > host='hdpxxx.cn', port=9092, securityProtocol=0)], features=[], rack=null) > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 opened a new pull request #10577: MINOR: check duplicate advertised listeners based on resolved host
chia7712 opened a new pull request #10577: URL: https://github.com/apache/kafka/pull/10577 related to #4897 I noticed this issue when reviewing #10575. With this patch, the listener `:12345` gets fast failure if its resolved host and port is registered already. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Iskuskov commented on a change in pull request #9541: KAFKA-10675: Add schema name to ConnectSchema.validateValue() error message
Iskuskov commented on a change in pull request #9541: URL: https://github.com/apache/kafka/pull/9541#discussion_r617423452 ## File path: connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java ## @@ -266,7 +267,7 @@ public static void validateValue(String name, Schema schema, Object value) { private static List expectedClassesFor(Schema schema) { List expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name()); if (expectedClasses == null) -expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type()); +expectedClasses = SCHEMA_TYPE_CLASSES.getOrDefault(schema.type(), Collections.emptyList()); Review comment: Thank you! Highly appreciate your comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r617482721 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataContextSerdes.java ## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataContext; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +/** + * This class provides serialization and deserialization for {@link RemoteLogMetadataContext}. This is the root serdes + * for the messages that are stored in internal remote log metadata topic. + */ +public class RemoteLogMetadataContextSerdes implements Serde { + +public static final byte REMOTE_LOG_SEGMENT_METADATA_API_KEY = (byte) new RemoteLogSegmentMetadataRecord().apiKey(); +public static final byte REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = (byte) new RemoteLogSegmentMetadataUpdateRecord().apiKey(); +public static final byte REMOTE_PARTITION_DELETE_API_KEY = (byte) new RemotePartitionDeleteMetadataRecord().apiKey(); + +private static final Map KEY_TO_SERDES = createInternalSerde(); + +private final Deserializer rootDeserializer; +private final Serializer rootSerializer; + +public RemoteLogMetadataContextSerdes() { +rootSerializer = (topic, data) -> serialize(data); Review comment: Deserializer can be used by devs to build any tools that they need based on remote log metadata topic. But they can also use the deserializer and invoke that for each message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r617483955 ## File path: build.gradle ## @@ -1409,6 +1409,7 @@ project(':storage') { dependencies { implementation project(':storage:api') implementation project(':clients') +implementation project(':metadata') Review comment: I will remove this dependency once I move `ApiMessageAndVersion` to `clients` module. This is done in a followup PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd opened a new pull request #10578: MINOR Moved ApiMessageAndVersion and AbstractApiMessageAndVersionSerde to clients module.
satishd opened a new pull request #10578: URL: https://github.com/apache/kafka/pull/10578 MINOR Moved ApiMessageAndVersion and AbstractApiMessageAndVersionSerde to clients module. Existing unit tests would be sufficient as this change is more about moving classes into `client` module. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on pull request #10578: MINOR Moved ApiMessageAndVersion and AbstractApiMessageAndVersionSerde to clients module.
satishd commented on pull request #10578: URL: https://github.com/apache/kafka/pull/10578#issuecomment-824024671 @junrao This PR is based on https://github.com/apache/kafka/pull/10271. So, this should be merged only after https://github.com/apache/kafka/pull/10271 is merged. Review can be done starting with the commit https://github.com/apache/kafka/pull/10578/commits/223ce42ea2a9f18798b5300f386df1cc15d60c31 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r617483955 ## File path: build.gradle ## @@ -1409,6 +1409,7 @@ project(':storage') { dependencies { implementation project(':storage:api') implementation project(':clients') +implementation project(':metadata') Review comment: I will remove this dependency once I move `ApiMessageAndVersion` to `clients` module. This is done in a followup PR https://github.com/apache/kafka/pull/10271. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12680) Failed to restart the broker in kraft mode
[ https://issues.apache.org/jira/browse/KAFKA-12680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326499#comment-17326499 ] Ismael Juma commented on KAFKA-12680: - Hi [~wenbing.shen]. Thanks for the ticket. Can you please explain why you closed it? You were unable to reproduce it after seeing this once? > Failed to restart the broker in kraft mode > -- > > Key: KAFKA-12680 > URL: https://issues.apache.org/jira/browse/KAFKA-12680 > Project: Kafka > Issue Type: Bug >Reporter: Wenbing Shen >Priority: Major > > I tested kraft mode for the first time today, I deployed a single node kraft > mode broker according to the documentation: > [https://github.com/apache/kafka/blob/6d1d68617ecd023b787f54aafc24a4232663428d/config/kraft/README.md] > > first step: ./bin/kafka-storage.sh random-uuid > Second step: Use the uuid generated above to execute the following commands: > ./bin/kafka-storage.sh format -t -c ./config/kraft/server.properties > > third step: ./bin/kafka-server-start.sh ./config/kraft/server.properties > > Then I created two topics with two partitions and a single replica. > ./bin/kafka-topics.sh --create --topic test-01 --partitions 2 > --replication-factor 1 --bootstrap-server localhost:9092 > Verify that there is no problem with production and consumption, but when I > call kafka-server-stop.sh, when I call the start command again, the broker > starts to report an error. > I am not sure if it is a known bug or a problem with my usage > > [2021-04-18 00:19:37,443] ERROR Exiting Kafka due to fatal exception > (kafka.Kafka$) > java.io.IOException: Invalid argument > at java.io.RandomAccessFile.setLength(Native Method) > at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:189) > at kafka.log.AbstractIndex.resize(AbstractIndex.scala:175) > at > kafka.log.AbstractIndex.$anonfun$trimToValidSize$1(AbstractIndex.scala:241) > at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:241) > at kafka.log.LogSegment.recover(LogSegment.scala:385) > at kafka.log.Log.recoverSegment(Log.scala:741) > at kafka.log.Log.recoverLog(Log.scala:894) > at kafka.log.Log.$anonfun$loadSegments$2(Log.scala:816) > at kafka.log.Log$$Lambda$153/391630194.apply$mcJ$sp(Unknown Source) > at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17) > at kafka.log.Log.retryOnOffsetOverflow(Log.scala:2456) > at kafka.log.Log.loadSegments(Log.scala:816) > at kafka.log.Log.(Log.scala:326) > at kafka.log.Log$.apply(Log.scala:2593) > at kafka.raft.KafkaMetadataLog$.apply(KafkaMetadataLog.scala:358) > at kafka.raft.KafkaRaftManager.buildMetadataLog(RaftManager.scala:253) > at kafka.raft.KafkaRaftManager.(RaftManager.scala:127) > at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:74) > at kafka.Kafka$.buildServer(Kafka.scala:79) > at kafka.Kafka$.main(Kafka.scala:87) > at kafka.Kafka.main(Kafka.scala) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] satishd opened a new pull request #10579: KAFKA-9555 Initial version of default RLMM implementation.
satishd opened a new pull request #10579: URL: https://github.com/apache/kafka/pull/10579 KAFKA-9555 Initial version of default RLMM implementation. This is still in draft mode. This includes changes containing default RLMM configs, RLMM implementation, producer/consumer managers. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10564: MINOR: clean up some replication code
cmccabe commented on a change in pull request #10564: URL: https://github.com/apache/kafka/pull/10564#discussion_r617505731 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -321,22 +339,17 @@ public void replay(PartitionRecord record) { } PartitionControlInfo newPartInfo = new PartitionControlInfo(record); PartitionControlInfo prevPartInfo = topicInfo.parts.get(record.partitionId()); +String topicPart = topicInfo.name + "-" + record.partitionId(); if (prevPartInfo == null) { -log.info("Created partition {}:{} with {}.", record.topicId(), -record.partitionId(), newPartInfo.toString()); +log.info("Created partition {} with {}.", topicPart, newPartInfo.toString()); Review comment: Yes, this was an intentional change to make it clearer which topic was getting created. The ID is still available in the metadata log, of course. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10564: MINOR: clean up some replication code
cmccabe commented on a change in pull request #10564: URL: https://github.com/apache/kafka/pull/10564#discussion_r617507396 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -1119,6 +1053,62 @@ void validateManualPartitionAssignment(List assignment, } } +void generateLeaderAndIsrUpdates(String context, + int brokerToRemoveFromIsr, + List records, + Iterator iterator) { +int oldSize = records.size(); +while (iterator.hasNext()) { +TopicIdPartition topicIdPart = iterator.next(); +TopicControlInfo topic = topics.get(topicIdPart.topicId()); +if (topic == null) { +throw new RuntimeException("Topic ID " + topicIdPart.topicId() + " existed in " + +"isrMembers, but not in the topics map."); +} +PartitionControlInfo partition = topic.parts.get(topicIdPart.partitionId()); +if (partition == null) { +throw new RuntimeException("Partition " + topicIdPart + +" existed in isrMembers, but not in the partitions map."); +} +int[] newIsr = Replicas.copyWithout(partition.isr, brokerToRemoveFromIsr); +int newLeader = Replicas.contains(newIsr, partition.leader) ? partition.leader : +bestLeader(partition.replicas, newIsr, false); +boolean unclean = newLeader != NO_LEADER && !Replicas.contains(newIsr, newLeader); +if (unclean) { +// After an unclean leader election, the ISR is reset to just the new leader. +newIsr = new int[] {newLeader}; +} else if (newIsr.length == 0) { +// We never want to shrink the ISR to size 0. Review comment: I agree that if `newIsr.length` is 0 on line 1080, then `newLeader` must be `NO_LEADER`. However, it's not necessary to add any special logic for this since `bestLeader` already handles this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12703) Unencrypted PEM files don't work
Brian Bascoy created KAFKA-12703: Summary: Unencrypted PEM files don't work Key: KAFKA-12703 URL: https://issues.apache.org/jira/browse/KAFKA-12703 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.8.0 Reporter: Brian Bascoy Unencrypted PEM files seem to be internally [supported in the codebase|https://github.com/apache/kafka/blob/a46beb9d29781e0709baf596601122f770a5fa31/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java#L509] but setting an ssl.key.password is currently enforced by createKeystore (on DefaultSslEngineFactory). I was unable to find a reason for this, so I wonder if this limitation could simply be removed: [https://github.com/pera/kafka/commit/8df2feab5fc6955cf8c89a7d132f05d8f562e16b] Thanks -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] tombentley commented on a change in pull request #10530: KAFKA-10231 fail bootstrap of Rest server if the host name in the adv…
tombentley commented on a change in pull request #10530: URL: https://github.com/apache/kafka/pull/10530#discussion_r616833243 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java ## @@ -369,9 +370,34 @@ else if (serverConnector != null && serverConnector.getHost() != null && serverC else if (serverConnector != null && serverConnector.getPort() > 0) builder.port(serverConnector.getPort()); -log.info("Advertised URI: {}", builder.build()); +URI uri = builder.build(); +validateUriHost(uri); +log.info("Advertised URI: {}", uri); -return builder.build(); +return uri; +} + +/** + * Parses the uri and throws a more definitive error + * when the internal node to node communication can't happen due to an invalid host name. + */ +static void validateUriHost(URI uri) { +//java URI parsing will fail silently returning null in the host if the host name contains invalid characters like _ +//this bubbles up later when the Herder tries to communicate on the advertised url and the current HttpClient fails with an ambiguous message +//we need to revisit this when we upgrade to a better HttpClient that can communicate with such host names or throws a better error message Review comment: The place for plans for future changes is JIRA. Statements about changes which may or may not happen in the future means the comment could become stale (in this case when the client does get changed), or at least means someone has to know to amend the comment when the client does change. So best drop the last line imo. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10530: KAFKA-10231 fail bootstrap of Rest server if the host name in the adv…
kpatelatwork commented on a change in pull request #10530: URL: https://github.com/apache/kafka/pull/10530#discussion_r617556919 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java ## @@ -369,9 +370,34 @@ else if (serverConnector != null && serverConnector.getHost() != null && serverC else if (serverConnector != null && serverConnector.getPort() > 0) builder.port(serverConnector.getPort()); -log.info("Advertised URI: {}", builder.build()); +URI uri = builder.build(); +validateUriHost(uri); +log.info("Advertised URI: {}", uri); -return builder.build(); +return uri; +} + +/** + * Parses the uri and throws a more definitive error + * when the internal node to node communication can't happen due to an invalid host name. + */ +static void validateUriHost(URI uri) { +//java URI parsing will fail silently returning null in the host if the host name contains invalid characters like _ +//this bubbles up later when the Herder tries to communicate on the advertised url and the current HttpClient fails with an ambiguous message +//we need to revisit this when we upgrade to a better HttpClient that can communicate with such host names or throws a better error message Review comment: @tombentley excellent points about not parsing the hostname from URI and I also felt Utils.getHost was broken as it didn't worked on a case with path in the URL and that's why I passed authority to it. Let me work on a change using the approach you suggested and get back to you. I was doing uri.getHost null check to avoid the blast radius in case we didn't anticipated some pattern so I think we should still use the null check but as you said use the hostname that is available to us instead of parsing it from authority and host. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on a change in pull request #10530: KAFKA-10231 fail bootstrap of Rest server if the host name in the adv…
kpatelatwork commented on a change in pull request #10530: URL: https://github.com/apache/kafka/pull/10530#discussion_r617556919 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java ## @@ -369,9 +370,34 @@ else if (serverConnector != null && serverConnector.getHost() != null && serverC else if (serverConnector != null && serverConnector.getPort() > 0) builder.port(serverConnector.getPort()); -log.info("Advertised URI: {}", builder.build()); +URI uri = builder.build(); +validateUriHost(uri); +log.info("Advertised URI: {}", uri); -return builder.build(); +return uri; +} + +/** + * Parses the uri and throws a more definitive error + * when the internal node to node communication can't happen due to an invalid host name. + */ +static void validateUriHost(URI uri) { +//java URI parsing will fail silently returning null in the host if the host name contains invalid characters like _ +//this bubbles up later when the Herder tries to communicate on the advertised url and the current HttpClient fails with an ambiguous message +//we need to revisit this when we upgrade to a better HttpClient that can communicate with such host names or throws a better error message Review comment: @tombentley excellent points about not parsing the hostname from URI and I also felt Utils.getHost was broken as it didn't worked on a case with path in the URL and that's why I passed authority to it. Let me work on a change using the approach you suggested and get back to you. I was doing uri.getHost null check to avoid the blast radius in case we didn't anticipated some pattern so I think we should still use the null check but as you said use the hostname that is available to us instead of parsing it from authority and uri. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12703) Unencrypted PEM files can't be loaded
[ https://issues.apache.org/jira/browse/KAFKA-12703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Brian Bascoy updated KAFKA-12703: - Summary: Unencrypted PEM files can't be loaded (was: Unencrypted PEM files don't work) > Unencrypted PEM files can't be loaded > - > > Key: KAFKA-12703 > URL: https://issues.apache.org/jira/browse/KAFKA-12703 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.8.0 >Reporter: Brian Bascoy >Priority: Major > > Unencrypted PEM files seem to be internally [supported in the > codebase|https://github.com/apache/kafka/blob/a46beb9d29781e0709baf596601122f770a5fa31/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java#L509] > but setting an ssl.key.password is currently enforced by createKeystore (on > DefaultSslEngineFactory). I was unable to find a reason for this, so I wonder > if this limitation could simply be removed: > > [https://github.com/pera/kafka/commit/8df2feab5fc6955cf8c89a7d132f05d8f562e16b] > > Thanks -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12680) Failed to restart the broker in kraft mode
[ https://issues.apache.org/jira/browse/KAFKA-12680?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326589#comment-17326589 ] Wenbing Shen commented on KAFKA-12680: -- Hi [~ijuma] Sorry, this problem is caused by me. In fact, this is a bug of wsl. The link for this bug is here: [https://github.com/microsoft/WSL/issues/2281] I did not find the problem in my centos physical machine test today. In fact, I should classify this problem as not a problem. I will change it now. Sorry for wasting your precious time. This is my first time using wsl. I Unexpectedly, it has this problem. Put your time on important issues. I wish the kraft model a great success in the end, and I will do my best to accomplish this together. :) > Failed to restart the broker in kraft mode > -- > > Key: KAFKA-12680 > URL: https://issues.apache.org/jira/browse/KAFKA-12680 > Project: Kafka > Issue Type: Bug >Reporter: Wenbing Shen >Priority: Major > > I tested kraft mode for the first time today, I deployed a single node kraft > mode broker according to the documentation: > [https://github.com/apache/kafka/blob/6d1d68617ecd023b787f54aafc24a4232663428d/config/kraft/README.md] > > first step: ./bin/kafka-storage.sh random-uuid > Second step: Use the uuid generated above to execute the following commands: > ./bin/kafka-storage.sh format -t -c ./config/kraft/server.properties > > third step: ./bin/kafka-server-start.sh ./config/kraft/server.properties > > Then I created two topics with two partitions and a single replica. > ./bin/kafka-topics.sh --create --topic test-01 --partitions 2 > --replication-factor 1 --bootstrap-server localhost:9092 > Verify that there is no problem with production and consumption, but when I > call kafka-server-stop.sh, when I call the start command again, the broker > starts to report an error. > I am not sure if it is a known bug or a problem with my usage > > [2021-04-18 00:19:37,443] ERROR Exiting Kafka due to fatal exception > (kafka.Kafka$) > java.io.IOException: Invalid argument > at java.io.RandomAccessFile.setLength(Native Method) > at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:189) > at kafka.log.AbstractIndex.resize(AbstractIndex.scala:175) > at > kafka.log.AbstractIndex.$anonfun$trimToValidSize$1(AbstractIndex.scala:241) > at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:241) > at kafka.log.LogSegment.recover(LogSegment.scala:385) > at kafka.log.Log.recoverSegment(Log.scala:741) > at kafka.log.Log.recoverLog(Log.scala:894) > at kafka.log.Log.$anonfun$loadSegments$2(Log.scala:816) > at kafka.log.Log$$Lambda$153/391630194.apply$mcJ$sp(Unknown Source) > at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17) > at kafka.log.Log.retryOnOffsetOverflow(Log.scala:2456) > at kafka.log.Log.loadSegments(Log.scala:816) > at kafka.log.Log.(Log.scala:326) > at kafka.log.Log$.apply(Log.scala:2593) > at kafka.raft.KafkaMetadataLog$.apply(KafkaMetadataLog.scala:358) > at kafka.raft.KafkaRaftManager.buildMetadataLog(RaftManager.scala:253) > at kafka.raft.KafkaRaftManager.(RaftManager.scala:127) > at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:74) > at kafka.Kafka$.buildServer(Kafka.scala:79) > at kafka.Kafka$.main(Kafka.scala:87) > at kafka.Kafka.main(Kafka.scala) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (KAFKA-12680) Failed to restart the broker in kraft mode
[ https://issues.apache.org/jira/browse/KAFKA-12680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenbing Shen reopened KAFKA-12680: -- > Failed to restart the broker in kraft mode > -- > > Key: KAFKA-12680 > URL: https://issues.apache.org/jira/browse/KAFKA-12680 > Project: Kafka > Issue Type: Bug >Reporter: Wenbing Shen >Priority: Major > > I tested kraft mode for the first time today, I deployed a single node kraft > mode broker according to the documentation: > [https://github.com/apache/kafka/blob/6d1d68617ecd023b787f54aafc24a4232663428d/config/kraft/README.md] > > first step: ./bin/kafka-storage.sh random-uuid > Second step: Use the uuid generated above to execute the following commands: > ./bin/kafka-storage.sh format -t -c ./config/kraft/server.properties > > third step: ./bin/kafka-server-start.sh ./config/kraft/server.properties > > Then I created two topics with two partitions and a single replica. > ./bin/kafka-topics.sh --create --topic test-01 --partitions 2 > --replication-factor 1 --bootstrap-server localhost:9092 > Verify that there is no problem with production and consumption, but when I > call kafka-server-stop.sh, when I call the start command again, the broker > starts to report an error. > I am not sure if it is a known bug or a problem with my usage > > [2021-04-18 00:19:37,443] ERROR Exiting Kafka due to fatal exception > (kafka.Kafka$) > java.io.IOException: Invalid argument > at java.io.RandomAccessFile.setLength(Native Method) > at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:189) > at kafka.log.AbstractIndex.resize(AbstractIndex.scala:175) > at > kafka.log.AbstractIndex.$anonfun$trimToValidSize$1(AbstractIndex.scala:241) > at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:241) > at kafka.log.LogSegment.recover(LogSegment.scala:385) > at kafka.log.Log.recoverSegment(Log.scala:741) > at kafka.log.Log.recoverLog(Log.scala:894) > at kafka.log.Log.$anonfun$loadSegments$2(Log.scala:816) > at kafka.log.Log$$Lambda$153/391630194.apply$mcJ$sp(Unknown Source) > at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17) > at kafka.log.Log.retryOnOffsetOverflow(Log.scala:2456) > at kafka.log.Log.loadSegments(Log.scala:816) > at kafka.log.Log.(Log.scala:326) > at kafka.log.Log$.apply(Log.scala:2593) > at kafka.raft.KafkaMetadataLog$.apply(KafkaMetadataLog.scala:358) > at kafka.raft.KafkaRaftManager.buildMetadataLog(RaftManager.scala:253) > at kafka.raft.KafkaRaftManager.(RaftManager.scala:127) > at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:74) > at kafka.Kafka$.buildServer(Kafka.scala:79) > at kafka.Kafka$.main(Kafka.scala:87) > at kafka.Kafka.main(Kafka.scala) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12680) Failed to restart the broker in kraft mode
[ https://issues.apache.org/jira/browse/KAFKA-12680?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenbing Shen resolved KAFKA-12680. -- Resolution: Not A Problem > Failed to restart the broker in kraft mode > -- > > Key: KAFKA-12680 > URL: https://issues.apache.org/jira/browse/KAFKA-12680 > Project: Kafka > Issue Type: Bug >Reporter: Wenbing Shen >Priority: Major > > I tested kraft mode for the first time today, I deployed a single node kraft > mode broker according to the documentation: > [https://github.com/apache/kafka/blob/6d1d68617ecd023b787f54aafc24a4232663428d/config/kraft/README.md] > > first step: ./bin/kafka-storage.sh random-uuid > Second step: Use the uuid generated above to execute the following commands: > ./bin/kafka-storage.sh format -t -c ./config/kraft/server.properties > > third step: ./bin/kafka-server-start.sh ./config/kraft/server.properties > > Then I created two topics with two partitions and a single replica. > ./bin/kafka-topics.sh --create --topic test-01 --partitions 2 > --replication-factor 1 --bootstrap-server localhost:9092 > Verify that there is no problem with production and consumption, but when I > call kafka-server-stop.sh, when I call the start command again, the broker > starts to report an error. > I am not sure if it is a known bug or a problem with my usage > > [2021-04-18 00:19:37,443] ERROR Exiting Kafka due to fatal exception > (kafka.Kafka$) > java.io.IOException: Invalid argument > at java.io.RandomAccessFile.setLength(Native Method) > at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:189) > at kafka.log.AbstractIndex.resize(AbstractIndex.scala:175) > at > kafka.log.AbstractIndex.$anonfun$trimToValidSize$1(AbstractIndex.scala:241) > at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:241) > at kafka.log.LogSegment.recover(LogSegment.scala:385) > at kafka.log.Log.recoverSegment(Log.scala:741) > at kafka.log.Log.recoverLog(Log.scala:894) > at kafka.log.Log.$anonfun$loadSegments$2(Log.scala:816) > at kafka.log.Log$$Lambda$153/391630194.apply$mcJ$sp(Unknown Source) > at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17) > at kafka.log.Log.retryOnOffsetOverflow(Log.scala:2456) > at kafka.log.Log.loadSegments(Log.scala:816) > at kafka.log.Log.(Log.scala:326) > at kafka.log.Log$.apply(Log.scala:2593) > at kafka.raft.KafkaMetadataLog$.apply(KafkaMetadataLog.scala:358) > at kafka.raft.KafkaRaftManager.buildMetadataLog(RaftManager.scala:253) > at kafka.raft.KafkaRaftManager.(RaftManager.scala:127) > at kafka.server.KafkaRaftServer.(KafkaRaftServer.scala:74) > at kafka.Kafka$.buildServer(Kafka.scala:79) > at kafka.Kafka$.main(Kafka.scala:87) > at kafka.Kafka.main(Kafka.scala) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10773) When I execute the below command, Kafka cannot start in local.
[ https://issues.apache.org/jira/browse/KAFKA-10773?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326600#comment-17326600 ] Wenbing Shen commented on KAFKA-10773: -- Similar to KAFKA-12680. In fact, this is a bug of wsl. The link for this bug is here: [https://github.com/microsoft/WSL/issues/2281] > When I execute the below command, Kafka cannot start in local. > -- > > Key: KAFKA-10773 > URL: https://issues.apache.org/jira/browse/KAFKA-10773 > Project: Kafka > Issue Type: Bug >Reporter: NAYUSIK >Priority: Critical > Attachments: image-2020-11-27-19-09-49-389.png > > > When I execute the below command, Kafka cannot start in local. > *confluent local services start* > *Please check the below error log and let me know how I modify it.* > [2020-11-27 18:52:21,019] ERROR Error while deleting the clean shutdown file > in dir /tmp/confluent.056805/kafka/data (kafka.server.LogDirFailureChannel) > java.io.IOException: Invalid argument > at java.io.RandomAccessFile.setLength(Native Method) > at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:190) > at kafka.log.AbstractIndex.resize(AbstractIndex.scala:176) > at > kafka.log.AbstractIndex.$anonfun$trimToValidSize$1(AbstractIndex.scala:242) > at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:242) > at kafka.log.LogSegment.recover(LogSegment.scala:380) > at kafka.log.Log.recoverSegment(Log.scala:692) > at kafka.log.Log.recoverLog(Log.scala:830) > at kafka.log.Log.$anonfun$loadSegments$3(Log.scala:767) > at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17) > at kafka.log.Log.retryOnOffsetOverflow(Log.scala:2626) > at kafka.log.Log.loadSegments(Log.scala:767) > at kafka.log.Log.(Log.scala:313) > at kafka.log.MergedLog$.apply(MergedLog.scala:796) > at kafka.log.LogManager.loadLog(LogManager.scala:294) > at kafka.log.LogManager.$anonfun$loadLogs$12(LogManager.scala:373) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2020-11-27 18:52:21,019] ERROR Error while deleting the clean shutdown file > in dir /tmp/confluent.056805/kafka/data (kafka.server.LogDirFailureChannel) > java.io.IOException: Invalid argument > at java.io.RandomAccessFile.setLength(Native Method) > at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:190) > at kafka.log.AbstractIndex.resize(AbstractIndex.scala:176) > at > kafka.log.AbstractIndex.$anonfun$trimToValidSize$1(AbstractIndex.scala:242) > at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:242) > at kafka.log.LogSegment.recover(LogSegment.scala:380) > at kafka.log.Log.recoverSegment(Log.scala:692) > at kafka.log.Log.recoverLog(Log.scala:830) > at kafka.log.Log.$anonfun$loadSegments$3(Log.scala:767) > at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.scala:17) > at kafka.log.Log.retryOnOffsetOverflow(Log.scala:2626) > at kafka.log.Log.loadSegments(Log.scala:767) > at kafka.log.Log.(Log.scala:313) > at kafka.log.MergedLog$.apply(MergedLog.scala:796) > at kafka.log.LogManager.loadLog(LogManager.scala:294) > at kafka.log.LogManager.$anonfun$loadLogs$12(LogManager.scala:373) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > > > [2020-11-27 18:52:22,261] ERROR Shutdown broker because all log dirs in > /tmp/confluent.056805/kafka/data have failed (kafka.log.LogManager) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10800) Validate the snapshot id when the state machine creates a snapshot
[ https://issues.apache.org/jira/browse/KAFKA-10800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326611#comment-17326611 ] Haoran Xuan commented on KAFKA-10800: - Hi, [~jagsancio], I just started working on this task, and want to make sure I have the correct understanding before going too far. - "When the state machine attempts to create a snapshot writer we should validate that the following is true: # The end offset and epoch of the snapshot is less than the high-watermark. # The end offset and epoch of the snapshot is valid based on the leader epoch cache. Note that this validation should not be performed when the raft client creates the snapshot writer because in that case the local log is out of date and the follower should trust the snapshot id sent by the partition leader." Questions are: 1) What does "the state machine" mean here? I assume it's the KafkaRaftClient? And "attempts to create a snapshot writer", I assume this refers to `log.createSnapshot(snapshotId)`? 2) "The end offset and epoch of the snapshot is less than the high-watermark", does the "high-watermark" refer to the leader's highwatermark or the follower's highwatermark? If it is the former, shouldn't it be the leader's responsibility to satisfy this ? If it's the latter, then I think the snapshotId can actually be larger than itself's highwatermark, say the follower has been lagged too much, and its highwatermark == its logEndOffset, which is smaller than the leader's logStartOffset, in this case, the follower's highwatermark will be updated to the snapshotId's endOffset when the snapshot fetching has completed, did I miss anything? 3) "validation should not be performed when the raft client creates the snapshot writer ", if my assumption in Question 1) is correct, then this seems to be in conflict with 1) Thanks! > Validate the snapshot id when the state machine creates a snapshot > -- > > Key: KAFKA-10800 > URL: https://issues.apache.org/jira/browse/KAFKA-10800 > Project: Kafka > Issue Type: Sub-task > Components: replication >Reporter: Jose Armando Garcia Sancio >Assignee: Haoran Xuan >Priority: Major > > When the state machine attempts to create a snapshot writer we should > validate that the following is true: > # The end offset and epoch of the snapshot is less than the high-watermark. > # The end offset and epoch of the snapshot is valid based on the leader > epoch cache. > Note that this validation should not be performed when the raft client > creates the snapshot writer because in that case the local log is out of date > and the follower should trust the snapshot id sent by the partition leader. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] Nathan22177 commented on pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key
Nathan22177 commented on pull request #10548: URL: https://github.com/apache/kafka/pull/10548#issuecomment-824141690 @mjsax > MeteredSessionStore: > > * put() (should verify `sessionKey != null` and `sessionKey.key() != null`) > * remove() > * fetchSession() > * fetch(K) > * backwardFetch(K) > * fetch(K, K) > * backwardFetch(K, K) > * findSession(K, long, long) > * backwardFindSession > * findSession(K, K, long, long) > * backwardFindSession(K, K, long, long) All of them already had the checks. Some lacked test coverage, though. I made changes according to your comments and added some tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork commented on pull request #10530: KAFKA-10231 fail bootstrap of Rest server if the host name in the adv…
kpatelatwork commented on pull request #10530: URL: https://github.com/apache/kafka/pull/10530#issuecomment-824141840 @tombentley I took another shot of implementing the PR using the above suggestions. Could you please check if it still needs improvement? I didn't do explicit IPV4 and IPV6 checks because the `uri.getHost()` won't be null if it's a valid address. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12701) NPE in MetadataRequest when using topic IDs
[ https://issues.apache.org/jira/browse/KAFKA-12701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326628#comment-17326628 ] Justine Olshan commented on KAFKA-12701: Thanks [~twmb] and [~dengziming] for taking a look at this. I am not sure this is something for 2.8.0 since that release has already passed the vote. Since this is only an issue with v11 and there are no public apis supporting this usage, I'm thinking that this is ok. We should still fix for 3.0 and possibly 2.8.x though. Does that make sense? > NPE in MetadataRequest when using topic IDs > --- > > Key: KAFKA-12701 > URL: https://issues.apache.org/jira/browse/KAFKA-12701 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Travis Bischel >Assignee: dengziming >Priority: Major > > Authorized result checking relies on topic name to not be null, which, when > using topic IDs, it is. > Unlike the logic in handleDeleteTopicsRequest, hanelMetadataRequest does not > check zk for the names corresponding to topic IDs if topic IDs are present. > {noformat} > [2021-04-21 05:53:01,463] ERROR [KafkaApi-1] Error when handling request: > clientId=kgo, correlationId=1, api=METADATA, version=11, > body=MetadataRequestData(topics=[MetadataRequestTopic(topicId=LmqOoFOASnqQp_4-oJgeKA, > name=null)], allowAutoTopicCreation=false, > includeClusterAuthorizedOperations=false, > includeTopicAuthorizedOperations=false) (kafka.server.RequestHandlerHelper) > java.lang.NullPointerException: name > at java.base/java.util.Objects.requireNonNull(Unknown Source) > at > org.apache.kafka.common.resource.ResourcePattern.(ResourcePattern.java:50) > at > kafka.server.AuthHelper.$anonfun$filterByAuthorized$3(AuthHelper.scala:121) > at scala.collection.Iterator$$anon$9.next(Iterator.scala:575) > at scala.collection.mutable.Growable.addAll(Growable.scala:62) > at scala.collection.mutable.Growable.addAll$(Growable.scala:57) > at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:142) > at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:42) > at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:258) > at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:247) > at scala.collection.SeqFactory$Delegate.from(Factory.scala:306) > at scala.collection.IterableOnceOps.toBuffer(IterableOnce.scala:1270) > at scala.collection.IterableOnceOps.toBuffer$(IterableOnce.scala:1270) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1288) > at kafka.server.AuthHelper.filterByAuthorized(AuthHelper.scala:120) > at > kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:1146) > at kafka.server.KafkaApis.handle(KafkaApis.scala:170) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74) > at java.base/java.lang.Thread.run(Unknown Source) > [2021-04-21 05:53:01,464] ERROR [Kafka Request Handler 1 on Broker 1], > Exception when handling request (kafka.server.KafkaRequestHandler) > java.lang.NullPointerException > at > org.apache.kafka.common.message.MetadataResponseData$MetadataResponseTopic.addSize(MetadataResponseData.java:1247) > at > org.apache.kafka.common.message.MetadataResponseData.addSize(MetadataResponseData.java:417) > at > org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218) > at > org.apache.kafka.common.protocol.SendBuilder.buildResponseSend(SendBuilder.java:200) > at > org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:43) > at > org.apache.kafka.common.requests.RequestContext.buildResponseSend(RequestContext.java:111) > at > kafka.network.RequestChannel$Request.buildResponseSend(RequestChannel.scala:132) > at > kafka.server.RequestHandlerHelper.sendResponse(RequestHandlerHelper.scala:185) > at > kafka.server.RequestHandlerHelper.sendErrorOrCloseConnection(RequestHandlerHelper.scala:155) > at > kafka.server.RequestHandlerHelper.sendErrorResponseMaybeThrottle(RequestHandlerHelper.scala:109) > at > kafka.server.RequestHandlerHelper.handleError(RequestHandlerHelper.scala:79) > at kafka.server.KafkaApis.handle(KafkaApis.scala:229) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74) > at java.base/java.lang.Thread.run(Unknown Source) > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r617482721 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataContextSerdes.java ## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataContext; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +/** + * This class provides serialization and deserialization for {@link RemoteLogMetadataContext}. This is the root serdes + * for the messages that are stored in internal remote log metadata topic. + */ +public class RemoteLogMetadataContextSerdes implements Serde { + +public static final byte REMOTE_LOG_SEGMENT_METADATA_API_KEY = (byte) new RemoteLogSegmentMetadataRecord().apiKey(); +public static final byte REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = (byte) new RemoteLogSegmentMetadataUpdateRecord().apiKey(); +public static final byte REMOTE_PARTITION_DELETE_API_KEY = (byte) new RemotePartitionDeleteMetadataRecord().apiKey(); + +private static final Map KEY_TO_SERDES = createInternalSerde(); + +private final Deserializer rootDeserializer; +private final Serializer rootSerializer; + +public RemoteLogMetadataContextSerdes() { +rootSerializer = (topic, data) -> serialize(data); Review comment: Serde deserializer can be used by devs to build any tools that they need based on remote log metadata topic by setting it as value deserializer for consumer. But they can also use byte array deserializer on consumer and invoke RemoteLogContextDeserializer for each message to convert from byte array to RemoteLogContext. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
satishd commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r617482721 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataContextSerdes.java ## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataContext; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +/** + * This class provides serialization and deserialization for {@link RemoteLogMetadataContext}. This is the root serdes + * for the messages that are stored in internal remote log metadata topic. + */ +public class RemoteLogMetadataContextSerdes implements Serde { + +public static final byte REMOTE_LOG_SEGMENT_METADATA_API_KEY = (byte) new RemoteLogSegmentMetadataRecord().apiKey(); +public static final byte REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = (byte) new RemoteLogSegmentMetadataUpdateRecord().apiKey(); +public static final byte REMOTE_PARTITION_DELETE_API_KEY = (byte) new RemotePartitionDeleteMetadataRecord().apiKey(); + +private static final Map KEY_TO_SERDES = createInternalSerde(); + +private final Deserializer rootDeserializer; +private final Serializer rootSerializer; + +public RemoteLogMetadataContextSerdes() { +rootSerializer = (topic, data) -> serialize(data); Review comment: Serde deserializer can be used by devs to build any tools that they need based on remote log metadata topic by setting it as value deserializer for consumer. But they can also use byte array deserializer on consumer and invoke RemoteLogContextDeserializer for each message to convert from byte array to RemoteLogContext. I updated the PR with the suggested changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12701) NPE in MetadataRequest when using topic IDs
[ https://issues.apache.org/jira/browse/KAFKA-12701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326633#comment-17326633 ] Ismael Juma commented on KAFKA-12701: - I suggest fixing it in 3.0.0 and 2.8.1. > NPE in MetadataRequest when using topic IDs > --- > > Key: KAFKA-12701 > URL: https://issues.apache.org/jira/browse/KAFKA-12701 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Travis Bischel >Assignee: dengziming >Priority: Major > > Authorized result checking relies on topic name to not be null, which, when > using topic IDs, it is. > Unlike the logic in handleDeleteTopicsRequest, hanelMetadataRequest does not > check zk for the names corresponding to topic IDs if topic IDs are present. > {noformat} > [2021-04-21 05:53:01,463] ERROR [KafkaApi-1] Error when handling request: > clientId=kgo, correlationId=1, api=METADATA, version=11, > body=MetadataRequestData(topics=[MetadataRequestTopic(topicId=LmqOoFOASnqQp_4-oJgeKA, > name=null)], allowAutoTopicCreation=false, > includeClusterAuthorizedOperations=false, > includeTopicAuthorizedOperations=false) (kafka.server.RequestHandlerHelper) > java.lang.NullPointerException: name > at java.base/java.util.Objects.requireNonNull(Unknown Source) > at > org.apache.kafka.common.resource.ResourcePattern.(ResourcePattern.java:50) > at > kafka.server.AuthHelper.$anonfun$filterByAuthorized$3(AuthHelper.scala:121) > at scala.collection.Iterator$$anon$9.next(Iterator.scala:575) > at scala.collection.mutable.Growable.addAll(Growable.scala:62) > at scala.collection.mutable.Growable.addAll$(Growable.scala:57) > at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:142) > at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:42) > at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:258) > at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:247) > at scala.collection.SeqFactory$Delegate.from(Factory.scala:306) > at scala.collection.IterableOnceOps.toBuffer(IterableOnce.scala:1270) > at scala.collection.IterableOnceOps.toBuffer$(IterableOnce.scala:1270) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1288) > at kafka.server.AuthHelper.filterByAuthorized(AuthHelper.scala:120) > at > kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:1146) > at kafka.server.KafkaApis.handle(KafkaApis.scala:170) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74) > at java.base/java.lang.Thread.run(Unknown Source) > [2021-04-21 05:53:01,464] ERROR [Kafka Request Handler 1 on Broker 1], > Exception when handling request (kafka.server.KafkaRequestHandler) > java.lang.NullPointerException > at > org.apache.kafka.common.message.MetadataResponseData$MetadataResponseTopic.addSize(MetadataResponseData.java:1247) > at > org.apache.kafka.common.message.MetadataResponseData.addSize(MetadataResponseData.java:417) > at > org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218) > at > org.apache.kafka.common.protocol.SendBuilder.buildResponseSend(SendBuilder.java:200) > at > org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:43) > at > org.apache.kafka.common.requests.RequestContext.buildResponseSend(RequestContext.java:111) > at > kafka.network.RequestChannel$Request.buildResponseSend(RequestChannel.scala:132) > at > kafka.server.RequestHandlerHelper.sendResponse(RequestHandlerHelper.scala:185) > at > kafka.server.RequestHandlerHelper.sendErrorOrCloseConnection(RequestHandlerHelper.scala:155) > at > kafka.server.RequestHandlerHelper.sendErrorResponseMaybeThrottle(RequestHandlerHelper.scala:109) > at > kafka.server.RequestHandlerHelper.handleError(RequestHandlerHelper.scala:79) > at kafka.server.KafkaApis.handle(KafkaApis.scala:229) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74) > at java.base/java.lang.Thread.run(Unknown Source) > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12696) Add standard getters to LagInfo class to allow automatic serialization
[ https://issues.apache.org/jira/browse/KAFKA-12696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326635#comment-17326635 ] Mikhail Panchenko commented on KAFKA-12696: --- Ok. Just to confirm, the confluence where KIPs are created is a separate login, so I should sign up? Re: the naming convention, is this documented somewhere that I can reference in the KIP? I'm inclined to make the KIP a bit broader than "change this one interface." It would be nice to be able to reference the convention directly when proposing to change it. I see a "avoid getters and setters" in the [Coding Guide|http://kafka.apache.org/coding-guide.html], but that appears to be directed at Scala, not Java code. > Add standard getters to LagInfo class to allow automatic serialization > -- > > Key: KAFKA-12696 > URL: https://issues.apache.org/jira/browse/KAFKA-12696 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mikhail Panchenko >Assignee: Mikhail Panchenko >Priority: Trivial > Labels: needs-kip > Original Estimate: 0.5h > Remaining Estimate: 0.5h > > The LagInfo class has non-standard getters for its member variables. This > means that Jackson and other serialization frameworks do not know how to > serialize them without additional annotations. So when implementing the sort > of system that KAFKA-6144 is meant to enable, as documented here in docs like > [Kafka Streams Interactive > Queries|https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html#querying-state-stores-during-a-rebalance], > one has to either inject a bunch of custom serialization logic into Jersey > or wrap this class. > The patch to fix this is trivial, and I will be putting one up shortly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12696) Add standard getters to LagInfo class to allow automatic serialization
[ https://issues.apache.org/jira/browse/KAFKA-12696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326641#comment-17326641 ] Ismael Juma commented on KAFKA-12696: - The "avoid getters and setters" note applies to both Java and Scala. We may need to make the docs clearer. Note that even Java itself doesn't use the getter/setter convention for new apis. > Add standard getters to LagInfo class to allow automatic serialization > -- > > Key: KAFKA-12696 > URL: https://issues.apache.org/jira/browse/KAFKA-12696 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mikhail Panchenko >Assignee: Mikhail Panchenko >Priority: Trivial > Labels: needs-kip > Original Estimate: 0.5h > Remaining Estimate: 0.5h > > The LagInfo class has non-standard getters for its member variables. This > means that Jackson and other serialization frameworks do not know how to > serialize them without additional annotations. So when implementing the sort > of system that KAFKA-6144 is meant to enable, as documented here in docs like > [Kafka Streams Interactive > Queries|https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html#querying-state-stores-during-a-rebalance], > one has to either inject a bunch of custom serialization logic into Jersey > or wrap this class. > The patch to fix this is trivial, and I will be putting one up shortly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12701) NPE in MetadataRequest when using topic IDs
[ https://issues.apache.org/jira/browse/KAFKA-12701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326652#comment-17326652 ] Justine Olshan commented on KAFKA-12701: [~dengziming] Just to clarify, we did not yet implement functionality to handle using topic IDs in MetadataRequest. So even if we get past the NPE, we wouldn't handle the topic ID yet. That will be added in https://github.com/apache/kafka/pull/9769, correct? Maybe for 2.8.1 we can return an error that the operation is not yet supported > NPE in MetadataRequest when using topic IDs > --- > > Key: KAFKA-12701 > URL: https://issues.apache.org/jira/browse/KAFKA-12701 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Travis Bischel >Assignee: dengziming >Priority: Major > > Authorized result checking relies on topic name to not be null, which, when > using topic IDs, it is. > Unlike the logic in handleDeleteTopicsRequest, hanelMetadataRequest does not > check zk for the names corresponding to topic IDs if topic IDs are present. > {noformat} > [2021-04-21 05:53:01,463] ERROR [KafkaApi-1] Error when handling request: > clientId=kgo, correlationId=1, api=METADATA, version=11, > body=MetadataRequestData(topics=[MetadataRequestTopic(topicId=LmqOoFOASnqQp_4-oJgeKA, > name=null)], allowAutoTopicCreation=false, > includeClusterAuthorizedOperations=false, > includeTopicAuthorizedOperations=false) (kafka.server.RequestHandlerHelper) > java.lang.NullPointerException: name > at java.base/java.util.Objects.requireNonNull(Unknown Source) > at > org.apache.kafka.common.resource.ResourcePattern.(ResourcePattern.java:50) > at > kafka.server.AuthHelper.$anonfun$filterByAuthorized$3(AuthHelper.scala:121) > at scala.collection.Iterator$$anon$9.next(Iterator.scala:575) > at scala.collection.mutable.Growable.addAll(Growable.scala:62) > at scala.collection.mutable.Growable.addAll$(Growable.scala:57) > at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:142) > at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:42) > at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:258) > at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:247) > at scala.collection.SeqFactory$Delegate.from(Factory.scala:306) > at scala.collection.IterableOnceOps.toBuffer(IterableOnce.scala:1270) > at scala.collection.IterableOnceOps.toBuffer$(IterableOnce.scala:1270) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1288) > at kafka.server.AuthHelper.filterByAuthorized(AuthHelper.scala:120) > at > kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:1146) > at kafka.server.KafkaApis.handle(KafkaApis.scala:170) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74) > at java.base/java.lang.Thread.run(Unknown Source) > [2021-04-21 05:53:01,464] ERROR [Kafka Request Handler 1 on Broker 1], > Exception when handling request (kafka.server.KafkaRequestHandler) > java.lang.NullPointerException > at > org.apache.kafka.common.message.MetadataResponseData$MetadataResponseTopic.addSize(MetadataResponseData.java:1247) > at > org.apache.kafka.common.message.MetadataResponseData.addSize(MetadataResponseData.java:417) > at > org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218) > at > org.apache.kafka.common.protocol.SendBuilder.buildResponseSend(SendBuilder.java:200) > at > org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:43) > at > org.apache.kafka.common.requests.RequestContext.buildResponseSend(RequestContext.java:111) > at > kafka.network.RequestChannel$Request.buildResponseSend(RequestChannel.scala:132) > at > kafka.server.RequestHandlerHelper.sendResponse(RequestHandlerHelper.scala:185) > at > kafka.server.RequestHandlerHelper.sendErrorOrCloseConnection(RequestHandlerHelper.scala:155) > at > kafka.server.RequestHandlerHelper.sendErrorResponseMaybeThrottle(RequestHandlerHelper.scala:109) > at > kafka.server.RequestHandlerHelper.handleError(RequestHandlerHelper.scala:79) > at kafka.server.KafkaApis.handle(KafkaApis.scala:229) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74) > at java.base/java.lang.Thread.run(Unknown Source) > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12701) NPE in MetadataRequest when using topic IDs
[ https://issues.apache.org/jira/browse/KAFKA-12701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326628#comment-17326628 ] Justine Olshan edited comment on KAFKA-12701 at 4/21/21, 3:49 PM: -- Thanks [~twmb] and [~dengziming] for taking a look at this. I am not sure this is something for 2.8.0 since that release has already passed the vote. Since this is only an issue with v11 and there are admin client public apis supporting this usage, I'm thinking that this is ok. We should still fix for 3.0 and possibly 2.8.x though. Does that make sense? was (Author: jolshan): Thanks [~twmb] and [~dengziming] for taking a look at this. I am not sure this is something for 2.8.0 since that release has already passed the vote. Since this is only an issue with v11 and there are no public apis supporting this usage, I'm thinking that this is ok. We should still fix for 3.0 and possibly 2.8.x though. Does that make sense? > NPE in MetadataRequest when using topic IDs > --- > > Key: KAFKA-12701 > URL: https://issues.apache.org/jira/browse/KAFKA-12701 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Travis Bischel >Assignee: dengziming >Priority: Major > > Authorized result checking relies on topic name to not be null, which, when > using topic IDs, it is. > Unlike the logic in handleDeleteTopicsRequest, hanelMetadataRequest does not > check zk for the names corresponding to topic IDs if topic IDs are present. > {noformat} > [2021-04-21 05:53:01,463] ERROR [KafkaApi-1] Error when handling request: > clientId=kgo, correlationId=1, api=METADATA, version=11, > body=MetadataRequestData(topics=[MetadataRequestTopic(topicId=LmqOoFOASnqQp_4-oJgeKA, > name=null)], allowAutoTopicCreation=false, > includeClusterAuthorizedOperations=false, > includeTopicAuthorizedOperations=false) (kafka.server.RequestHandlerHelper) > java.lang.NullPointerException: name > at java.base/java.util.Objects.requireNonNull(Unknown Source) > at > org.apache.kafka.common.resource.ResourcePattern.(ResourcePattern.java:50) > at > kafka.server.AuthHelper.$anonfun$filterByAuthorized$3(AuthHelper.scala:121) > at scala.collection.Iterator$$anon$9.next(Iterator.scala:575) > at scala.collection.mutable.Growable.addAll(Growable.scala:62) > at scala.collection.mutable.Growable.addAll$(Growable.scala:57) > at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:142) > at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:42) > at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:258) > at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:247) > at scala.collection.SeqFactory$Delegate.from(Factory.scala:306) > at scala.collection.IterableOnceOps.toBuffer(IterableOnce.scala:1270) > at scala.collection.IterableOnceOps.toBuffer$(IterableOnce.scala:1270) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1288) > at kafka.server.AuthHelper.filterByAuthorized(AuthHelper.scala:120) > at > kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:1146) > at kafka.server.KafkaApis.handle(KafkaApis.scala:170) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74) > at java.base/java.lang.Thread.run(Unknown Source) > [2021-04-21 05:53:01,464] ERROR [Kafka Request Handler 1 on Broker 1], > Exception when handling request (kafka.server.KafkaRequestHandler) > java.lang.NullPointerException > at > org.apache.kafka.common.message.MetadataResponseData$MetadataResponseTopic.addSize(MetadataResponseData.java:1247) > at > org.apache.kafka.common.message.MetadataResponseData.addSize(MetadataResponseData.java:417) > at > org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218) > at > org.apache.kafka.common.protocol.SendBuilder.buildResponseSend(SendBuilder.java:200) > at > org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:43) > at > org.apache.kafka.common.requests.RequestContext.buildResponseSend(RequestContext.java:111) > at > kafka.network.RequestChannel$Request.buildResponseSend(RequestChannel.scala:132) > at > kafka.server.RequestHandlerHelper.sendResponse(RequestHandlerHelper.scala:185) > at > kafka.server.RequestHandlerHelper.sendErrorOrCloseConnection(RequestHandlerHelper.scala:155) > at > kafka.server.RequestHandlerHelper.sendErrorResponseMaybeThrottle(RequestHandlerHelper.scala:109) > at > kafka.server.RequestHandlerHelper.handleError(RequestHandlerHelper.scala:79) > at kafka.server.KafkaApis.handle(KafkaApis.scala:229) > at kafka.server.KafkaRequestHandler.run(KafkaRe
[jira] [Comment Edited] (KAFKA-12701) NPE in MetadataRequest when using topic IDs
[ https://issues.apache.org/jira/browse/KAFKA-12701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326628#comment-17326628 ] Justine Olshan edited comment on KAFKA-12701 at 4/21/21, 3:49 PM: -- Thanks [~twmb] and [~dengziming] for taking a look at this. I am not sure this is something for 2.8.0 since that release has already passed the vote. I'm thinking that this is ok. We should still fix for 3.0 and possibly 2.8.x though. Does that make sense? was (Author: jolshan): Thanks [~twmb] and [~dengziming] for taking a look at this. I am not sure this is something for 2.8.0 since that release has already passed the vote. Since this is only an issue with v11 and there are admin client public apis supporting this usage, I'm thinking that this is ok. We should still fix for 3.0 and possibly 2.8.x though. Does that make sense? > NPE in MetadataRequest when using topic IDs > --- > > Key: KAFKA-12701 > URL: https://issues.apache.org/jira/browse/KAFKA-12701 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Travis Bischel >Assignee: dengziming >Priority: Major > > Authorized result checking relies on topic name to not be null, which, when > using topic IDs, it is. > Unlike the logic in handleDeleteTopicsRequest, hanelMetadataRequest does not > check zk for the names corresponding to topic IDs if topic IDs are present. > {noformat} > [2021-04-21 05:53:01,463] ERROR [KafkaApi-1] Error when handling request: > clientId=kgo, correlationId=1, api=METADATA, version=11, > body=MetadataRequestData(topics=[MetadataRequestTopic(topicId=LmqOoFOASnqQp_4-oJgeKA, > name=null)], allowAutoTopicCreation=false, > includeClusterAuthorizedOperations=false, > includeTopicAuthorizedOperations=false) (kafka.server.RequestHandlerHelper) > java.lang.NullPointerException: name > at java.base/java.util.Objects.requireNonNull(Unknown Source) > at > org.apache.kafka.common.resource.ResourcePattern.(ResourcePattern.java:50) > at > kafka.server.AuthHelper.$anonfun$filterByAuthorized$3(AuthHelper.scala:121) > at scala.collection.Iterator$$anon$9.next(Iterator.scala:575) > at scala.collection.mutable.Growable.addAll(Growable.scala:62) > at scala.collection.mutable.Growable.addAll$(Growable.scala:57) > at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:142) > at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:42) > at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:258) > at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:247) > at scala.collection.SeqFactory$Delegate.from(Factory.scala:306) > at scala.collection.IterableOnceOps.toBuffer(IterableOnce.scala:1270) > at scala.collection.IterableOnceOps.toBuffer$(IterableOnce.scala:1270) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1288) > at kafka.server.AuthHelper.filterByAuthorized(AuthHelper.scala:120) > at > kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:1146) > at kafka.server.KafkaApis.handle(KafkaApis.scala:170) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74) > at java.base/java.lang.Thread.run(Unknown Source) > [2021-04-21 05:53:01,464] ERROR [Kafka Request Handler 1 on Broker 1], > Exception when handling request (kafka.server.KafkaRequestHandler) > java.lang.NullPointerException > at > org.apache.kafka.common.message.MetadataResponseData$MetadataResponseTopic.addSize(MetadataResponseData.java:1247) > at > org.apache.kafka.common.message.MetadataResponseData.addSize(MetadataResponseData.java:417) > at > org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218) > at > org.apache.kafka.common.protocol.SendBuilder.buildResponseSend(SendBuilder.java:200) > at > org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:43) > at > org.apache.kafka.common.requests.RequestContext.buildResponseSend(RequestContext.java:111) > at > kafka.network.RequestChannel$Request.buildResponseSend(RequestChannel.scala:132) > at > kafka.server.RequestHandlerHelper.sendResponse(RequestHandlerHelper.scala:185) > at > kafka.server.RequestHandlerHelper.sendErrorOrCloseConnection(RequestHandlerHelper.scala:155) > at > kafka.server.RequestHandlerHelper.sendErrorResponseMaybeThrottle(RequestHandlerHelper.scala:109) > at > kafka.server.RequestHandlerHelper.handleError(RequestHandlerHelper.scala:79) > at kafka.server.KafkaApis.handle(KafkaApis.scala:229) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74) > at java.base/java.lang.Thread.run(Unknown Source) > {noform
[jira] [Comment Edited] (KAFKA-12701) NPE in MetadataRequest when using topic IDs
[ https://issues.apache.org/jira/browse/KAFKA-12701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326652#comment-17326652 ] Justine Olshan edited comment on KAFKA-12701 at 4/21/21, 3:58 PM: -- [~dengziming] Just to clarify, we did not yet implement functionality to handle using topic IDs in MetadataRequest. So even if we get past the NPE, we wouldn't handle the topic ID yet. That will be added in https://github.com/apache/kafka/pull/9769, correct? Maybe for 2.8.1 we can return an error that the operation is not yet supported for v11 requests. Then in https://github.com/apache/kafka/pull/9769, we bump the protocol to v12 so that the clients know that topic IDs are supported. was (Author: jolshan): [~dengziming] Just to clarify, we did not yet implement functionality to handle using topic IDs in MetadataRequest. So even if we get past the NPE, we wouldn't handle the topic ID yet. That will be added in https://github.com/apache/kafka/pull/9769, correct? Maybe for 2.8.1 we can return an error that the operation is not yet supported > NPE in MetadataRequest when using topic IDs > --- > > Key: KAFKA-12701 > URL: https://issues.apache.org/jira/browse/KAFKA-12701 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Travis Bischel >Assignee: dengziming >Priority: Major > > Authorized result checking relies on topic name to not be null, which, when > using topic IDs, it is. > Unlike the logic in handleDeleteTopicsRequest, hanelMetadataRequest does not > check zk for the names corresponding to topic IDs if topic IDs are present. > {noformat} > [2021-04-21 05:53:01,463] ERROR [KafkaApi-1] Error when handling request: > clientId=kgo, correlationId=1, api=METADATA, version=11, > body=MetadataRequestData(topics=[MetadataRequestTopic(topicId=LmqOoFOASnqQp_4-oJgeKA, > name=null)], allowAutoTopicCreation=false, > includeClusterAuthorizedOperations=false, > includeTopicAuthorizedOperations=false) (kafka.server.RequestHandlerHelper) > java.lang.NullPointerException: name > at java.base/java.util.Objects.requireNonNull(Unknown Source) > at > org.apache.kafka.common.resource.ResourcePattern.(ResourcePattern.java:50) > at > kafka.server.AuthHelper.$anonfun$filterByAuthorized$3(AuthHelper.scala:121) > at scala.collection.Iterator$$anon$9.next(Iterator.scala:575) > at scala.collection.mutable.Growable.addAll(Growable.scala:62) > at scala.collection.mutable.Growable.addAll$(Growable.scala:57) > at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:142) > at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:42) > at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:258) > at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:247) > at scala.collection.SeqFactory$Delegate.from(Factory.scala:306) > at scala.collection.IterableOnceOps.toBuffer(IterableOnce.scala:1270) > at scala.collection.IterableOnceOps.toBuffer$(IterableOnce.scala:1270) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1288) > at kafka.server.AuthHelper.filterByAuthorized(AuthHelper.scala:120) > at > kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:1146) > at kafka.server.KafkaApis.handle(KafkaApis.scala:170) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74) > at java.base/java.lang.Thread.run(Unknown Source) > [2021-04-21 05:53:01,464] ERROR [Kafka Request Handler 1 on Broker 1], > Exception when handling request (kafka.server.KafkaRequestHandler) > java.lang.NullPointerException > at > org.apache.kafka.common.message.MetadataResponseData$MetadataResponseTopic.addSize(MetadataResponseData.java:1247) > at > org.apache.kafka.common.message.MetadataResponseData.addSize(MetadataResponseData.java:417) > at > org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218) > at > org.apache.kafka.common.protocol.SendBuilder.buildResponseSend(SendBuilder.java:200) > at > org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:43) > at > org.apache.kafka.common.requests.RequestContext.buildResponseSend(RequestContext.java:111) > at > kafka.network.RequestChannel$Request.buildResponseSend(RequestChannel.scala:132) > at > kafka.server.RequestHandlerHelper.sendResponse(RequestHandlerHelper.scala:185) > at > kafka.server.RequestHandlerHelper.sendErrorOrCloseConnection(RequestHandlerHelper.scala:155) > at > kafka.server.RequestHandlerHelper.sendErrorResponseMaybeThrottle(RequestHandlerHelper.scala:109) > at > kafka.server.RequestHandlerHelper.handleError(RequestHandlerHelper
[jira] [Created] (KAFKA-12704) Concurrent calls to AbstractHerder::getConnector can potentially create two connector instances
Kalpesh Patel created KAFKA-12704: - Summary: Concurrent calls to AbstractHerder::getConnector can potentially create two connector instances Key: KAFKA-12704 URL: https://issues.apache.org/jira/browse/KAFKA-12704 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Kalpesh Patel Requests to the {{PUT /connector-plugins/\{connectorType}/config/validate}} endpoint are [delegated to the herder|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java#L81], which [caches connector instances|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L536-L544] that are used [during config validation|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L310]. This has the effect that, should concurrent requests to that endpoint occur for the same connector type, the same connector instance may be responsible for [validating those configurations|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L334] concurrently _(may_ instead of _will_ because there is also a race condition in the {{AbstractHerder::getConnector}} method that potentially fails to detect that an instance of the connector has already been created and, as a result, creates a second instance). This is slightly problematic because the [Connector::validate|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java#L122-L127] method is not marked as thread-safe. However, because a lot of connectors out there tend to use the default implementation for that method, it's probably not super urgent that we patch this immediately. A couple of options are: # Update the docs for that method to specify that it must be thread-safe # Rewrite the connector validation logic in the framework to avoid concurrently invoking {{Connector::validate}} on the same instance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12704) Concurrent calls to AbstractHerder::getConnector can potentially create two connector instances
[ https://issues.apache.org/jira/browse/KAFKA-12704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kalpesh Patel reassigned KAFKA-12704: - Assignee: Kalpesh Patel > Concurrent calls to AbstractHerder::getConnector can potentially create two > connector instances > --- > > Key: KAFKA-12704 > URL: https://issues.apache.org/jira/browse/KAFKA-12704 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Kalpesh Patel >Assignee: Kalpesh Patel >Priority: Minor > > Requests to the {{PUT /connector-plugins/\{connectorType}/config/validate}} > endpoint are [delegated to the > herder|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java#L81], > which [caches connector > instances|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L536-L544] > that are used [during config > validation|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L310]. > This has the effect that, should concurrent requests to that endpoint occur > for the same connector type, the same connector instance may be responsible > for [validating those > configurations|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L334] > concurrently _(may_ instead of _will_ because there is also a race condition > in the {{AbstractHerder::getConnector}} method that potentially fails to > detect that an instance of the connector has already been created and, as a > result, creates a second instance). > This is slightly problematic because the > [Connector::validate|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java#L122-L127] > method is not marked as thread-safe. However, because a lot of connectors > out there tend to use the default implementation for that method, it's > probably not super urgent that we patch this immediately. > A couple of options are: > # Update the docs for that method to specify that it must be thread-safe > # Rewrite the connector validation logic in the framework to avoid > concurrently invoking {{Connector::validate}} on the same instance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12704) Concurrent calls to AbstractHerder::getConnector can potentially create two connector instances
[ https://issues.apache.org/jira/browse/KAFKA-12704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kalpesh Patel updated KAFKA-12704: -- Description: Concurrent requests to the {{PUT /connector-plugins/\{connectorType}/config/validate}} endpoint are [delegated to the herder|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java#L81], which [caches connector instances|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L536-L544] that are used [during config validation|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L310]. This has the effect that, should concurrent requests to that endpoint occur for the same connector type and the connector isn't created yet then there is also a race condition in the {{AbstractHerder::getConnector}} method that potentially fails to detect that an instance of the connector has already been created and, as a result, creates another instance. This can be solved by using computeIfAbsent to create the connector was: Requests to the {{PUT /connector-plugins/\{connectorType}/config/validate}} endpoint are [delegated to the herder|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java#L81], which [caches connector instances|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L536-L544] that are used [during config validation|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L310]. This has the effect that, should concurrent requests to that endpoint occur for the same connector type, the same connector instance may be responsible for [validating those configurations|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L334] concurrently _(may_ instead of _will_ because there is also a race condition in the {{AbstractHerder::getConnector}} method that potentially fails to detect that an instance of the connector has already been created and, as a result, creates a second instance). This is slightly problematic because the [Connector::validate|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java#L122-L127] method is not marked as thread-safe. However, because a lot of connectors out there tend to use the default implementation for that method, it's probably not super urgent that we patch this immediately. A couple of options are: # Update the docs for that method to specify that it must be thread-safe # Rewrite the connector validation logic in the framework to avoid concurrently invoking {{Connector::validate}} on the same instance. > Concurrent calls to AbstractHerder::getConnector can potentially create two > connector instances > --- > > Key: KAFKA-12704 > URL: https://issues.apache.org/jira/browse/KAFKA-12704 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Kalpesh Patel >Assignee: Kalpesh Patel >Priority: Minor > > Concurrent requests to the {{PUT > /connector-plugins/\{connectorType}/config/validate}} endpoint are [delegated > to the > herder|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java#L81], > which [caches connector > instances|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L536-L544] > that are used [during config > validation|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L310]. > This has the effect that, should concurrent requests to that endpoint occur > for the same connector type and the connector isn't created yet then there is > also a race condition in the {{AbstractHerder::getConnector}} method that > potentially fails to detect that an instance of the connector has already
[jira] [Updated] (KAFKA-12704) Concurrent calls to AbstractHerder::getConnector can potentially create two connector instances
[ https://issues.apache.org/jira/browse/KAFKA-12704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kalpesh Patel updated KAFKA-12704: -- Description: As discovered in KAFKA-9560, concurrent requests to the {{PUT /connector-plugins/\{connectorType}/config/validate}} endpoint are [delegated to the herder|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java#L81], which [caches connector instances|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L536-L544] that are used [during config validation|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L310]. This has the effect that, should concurrent requests to that endpoint occur for the same connector type and the connector hasn't been cached yet then there is a race condition in the {{AbstractHerder::getConnector}} method that potentially fails to detect that an instance of the connector has already been created and, as a result, creates another instance. This can be solved by using Map.computeIfAbsent to create the connector was: Concurrent requests to the {{PUT /connector-plugins/\{connectorType}/config/validate}} endpoint are [delegated to the herder|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java#L81], which [caches connector instances|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L536-L544] that are used [during config validation|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L310]. This has the effect that, should concurrent requests to that endpoint occur for the same connector type and the connector isn't created yet then there is also a race condition in the {{AbstractHerder::getConnector}} method that potentially fails to detect that an instance of the connector has already been created and, as a result, creates another instance. This can be solved by using computeIfAbsent to create the connector > Concurrent calls to AbstractHerder::getConnector can potentially create two > connector instances > --- > > Key: KAFKA-12704 > URL: https://issues.apache.org/jira/browse/KAFKA-12704 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Kalpesh Patel >Assignee: Kalpesh Patel >Priority: Minor > > As discovered in KAFKA-9560, concurrent requests to the {{PUT > /connector-plugins/\{connectorType}/config/validate}} endpoint are [delegated > to the > herder|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java#L81], > which [caches connector > instances|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L536-L544] > that are used [during config > validation|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L310]. > This has the effect that, should concurrent requests to that endpoint occur > for the same connector type and the connector hasn't been cached yet then > there is a race condition in the {{AbstractHerder::getConnector}} method that > potentially fails to detect that an instance of the connector has already > been created and, as a result, creates another instance. > > This can be solved by using Map.computeIfAbsent to create the connector -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9560) Connector::validate is utilized concurrently by the framework, but not documented as thread-safe
[ https://issues.apache.org/jira/browse/KAFKA-9560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kalpesh Patel updated KAFKA-9560: - Description: Requests to the {{PUT /connector-plugins/\{connectorType}/config/validate}} endpoint are [delegated to the herder|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java#L81], which [caches connector instances|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L536-L544] that are used [during config validation|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L310]. This has the effect that, should concurrent requests to that endpoint occur for the same connector type, the same connector instance may be responsible for [validating those configurations|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L334] concurrently _(may_ instead of _will_ because there is also a race condition in the {{AbstractHerder::getConnector}} method that potentially fails to detect that an instance of the connector has already been created and, as a result, creates a second instance KAFKA-12704). This is slightly problematic because the [Connector::validate|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java#L122-L127] method is not marked as thread-safe. However, because a lot of connectors out there tend to use the default implementation for that method, it's probably not super urgent that we patch this immediately. A couple of options are: # Update the docs for that method to specify that it must be thread-safe # Rewrite the connector validation logic in the framework to avoid concurrently invoking {{Connector::validate}} on the same instance. was: Requests to the {{PUT /connector-plugins/\{connectorType}/config/validate}} endpoint are [delegated to the herder|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java#L81], which [caches connector instances|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L536-L544] that are used [during config validation|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L310]. This has the effect that, should concurrent requests to that endpoint occur for the same connector type, the same connector instance may be responsible for [validating those configurations|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L334] concurrently _(may_ instead of _will_ because there is also a race condition in the {{AbstractHerder::getConnector}} method that potentially fails to detect that an instance of the connector has already been created and, as a result, creates a second instance). This is slightly problematic because the [Connector::validate|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java#L122-L127] method is not marked as thread-safe. However, because a lot of connectors out there tend to use the default implementation for that method, it's probably not super urgent that we patch this immediately. A couple of options are: # Update the docs for that method to specify that it must be thread-safe # Rewrite the connector validation logic in the framework to avoid concurrently invoking {{Connector::validate}} on the same instance. > Connector::validate is utilized concurrently by the framework, but not > documented as thread-safe > > > Key: KAFKA-9560 > URL: https://issues.apache.org/jira/browse/KAFKA-9560 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chris Egerton >Priority: Minor > > Requests to the {{PUT /connector-plugins/\{connectorType}/config/validate}} > endpoint are [delegated to the > herder|https://github.com/apache/kafka/blob/16ee326755e3f13914a0ed446c34c84e65fc0bc4/connect/runtime/src/main/java/org/apache/kafk
[GitHub] [kafka] junrao commented on pull request #10576: KAFKA-12701: Remove topicId from MetadataReq since it was not supported in 2.8.0
junrao commented on pull request #10576: URL: https://github.com/apache/kafka/pull/10576#issuecomment-824228739 @dengziming : We probably can't remove a field from an existing protocol without changing the version. Otherwise, the client will be confused on the exact protocol for a particular version. We could bump up the version (keeping all the fields the same) in 3.0 and implement the functionality for topicId there. For 2.8, we probably want to at least document in the protocol that topicId is not supported. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
junrao commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r617765100 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataContextSerdes.java ## @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogMetadataContext; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord; +import org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +/** + * This class provides serialization and deserialization for {@link RemoteLogMetadataContext}. This is the root serdes + * for the messages that are stored in internal remote log metadata topic. + */ +public class RemoteLogMetadataContextSerdes implements Serde { + +public static final byte REMOTE_LOG_SEGMENT_METADATA_API_KEY = (byte) new RemoteLogSegmentMetadataRecord().apiKey(); +public static final byte REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = (byte) new RemoteLogSegmentMetadataUpdateRecord().apiKey(); +public static final byte REMOTE_PARTITION_DELETE_API_KEY = (byte) new RemotePartitionDeleteMetadataRecord().apiKey(); + +private static final Map KEY_TO_SERDES = createInternalSerde(); + +private final Deserializer rootDeserializer; +private final Serializer rootSerializer; + +public RemoteLogMetadataContextSerdes() { +rootSerializer = (topic, data) -> serialize(data); Review comment: @satishd : What you said makes sense. It's useful to be able to use the same Serde in tools or client applications. Also, it seems that it's possible to make a generic Serde for all ApiMessage since the apiKey is unique within a topic and the topic is passed into Serde. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kpatelatwork opened a new pull request #10580: KAFKA-12704 Fixed potential concurrency issue in connector creation
kpatelatwork opened a new pull request #10580: URL: https://github.com/apache/kafka/pull/10580 *Concurrent requests to validate endpoint for the same connector type calls AbstractHerder::getConnector to get the cached connector instances and if the connector hasn't been cached yet then there is a race condition in the AbstractHerder::getConnector method that potentially fails to detect that an instance of the connector has already been created and, as a result, can create another instance* *Existing tests are present with enough coverage so no new tests are added.* @C0urante @rhauch Could you please review if this fix is acceptable? ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #10580: KAFKA-12704 Fixed potential concurrency issue in connector creation
C0urante commented on pull request #10580: URL: https://github.com/apache/kafka/pull/10580#issuecomment-824267968 We may want to upgrade `tempConnectors` from a regular `Map` to a `ConcurrentMap` as well, LGTM otherwise though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10564: MINOR: clean up some replication code
junrao commented on a change in pull request #10564: URL: https://github.com/apache/kafka/pull/10564#discussion_r617773890 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -321,22 +339,17 @@ public void replay(PartitionRecord record) { } PartitionControlInfo newPartInfo = new PartitionControlInfo(record); PartitionControlInfo prevPartInfo = topicInfo.parts.get(record.partitionId()); +String topicPart = topicInfo.name + "-" + record.partitionId(); Review comment: It's probably useful to log the topicId too? ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -356,7 +369,8 @@ public void replay(PartitionChangeRecord record) { brokersToIsrs.update(record.topicId(), record.partitionId(), prevPartitionInfo.isr, newPartitionInfo.isr, prevPartitionInfo.leader, newPartitionInfo.leader); -log.debug("Applied ISR change record: {}", record.toString()); +String topicPart = topicInfo.name + "-" + record.partitionId(); Review comment: It's probably useful to log the topicId too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10564: MINOR: clean up some replication code
cmccabe commented on a change in pull request #10564: URL: https://github.com/apache/kafka/pull/10564#discussion_r617785244 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -1119,6 +1053,62 @@ void validateManualPartitionAssignment(List assignment, } } +void generateLeaderAndIsrUpdates(String context, + int brokerToRemoveFromIsr, + List records, + Iterator iterator) { +int oldSize = records.size(); +while (iterator.hasNext()) { +TopicIdPartition topicIdPart = iterator.next(); +TopicControlInfo topic = topics.get(topicIdPart.topicId()); +if (topic == null) { +throw new RuntimeException("Topic ID " + topicIdPart.topicId() + " existed in " + +"isrMembers, but not in the topics map."); +} +PartitionControlInfo partition = topic.parts.get(topicIdPart.partitionId()); +if (partition == null) { +throw new RuntimeException("Partition " + topicIdPart + +" existed in isrMembers, but not in the partitions map."); +} +int[] newIsr = Replicas.copyWithout(partition.isr, brokerToRemoveFromIsr); +int newLeader = Replicas.contains(newIsr, partition.leader) ? partition.leader : +bestLeader(partition.replicas, newIsr, false); +boolean unclean = newLeader != NO_LEADER && !Replicas.contains(newIsr, newLeader); Review comment: Yes, I think we can reuse some code here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #10570: MINOR: Bump to latest version 2.6.2
ableegoldman merged pull request #10570: URL: https://github.com/apache/kafka/pull/10570 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Naros commented on pull request #10566: KAFKA-12694 Avoid schema mismatch DataException when validating default values
Naros commented on pull request #10566: URL: https://github.com/apache/kafka/pull/10566#issuecomment-824277958 @C0urante @rhauch , could either of you review this to see if its acceptable? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10565: KAFKA-12691: Add case where task can be considered idling
ableegoldman commented on a change in pull request #10565: URL: https://github.com/apache/kafka/pull/10565#discussion_r617805969 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -667,8 +668,15 @@ public boolean isProcessable(final long wallClockTime) { // thus, the task is not processable, even if there is available data in the record queue return false; } - -return partitionGroup.readyToProcess(wallClockTime); +final boolean readyToProcess = partitionGroup.readyToProcess(wallClockTime); +if (!readyToProcess) { +if (!timeCurrentIdlingStarted.isPresent()) { +timeCurrentIdlingStarted = Optional.of(wallClockTime); +} +} else { +timeCurrentIdlingStarted = Optional.empty(); Review comment: Just want to make sure I understand, previously we only considered a task as idling if it was suspended so we're just fixing it up to track the actual idling. And while since KIP-429 suspension is just a transient state that the task passes through right before being closed, it's still used during an upgrade from EAGER. So we're going to keep considering suspension as idling until we can finally drop support for EAGER -- does that sound right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #10566: KAFKA-12694 Avoid schema mismatch DataException when validating default values
C0urante commented on pull request #10566: URL: https://github.com/apache/kafka/pull/10566#issuecomment-824292010 Hmm... I'm wondering if this might break existing setups. Since the `SchemaBuilder` class does implement the `Schema` interface, it's currently possible to do something like this: ```java import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; SchemaBuilder builder = SchemaBuilder.struct() .field("f1", Schema.BOOLEAN_SCHEMA); Struct defaultValue = new Struct(builder); defaultValue.put("f1", true); Schema schema = builder.defaultValue(defaultValue).build(); ``` Since validation currently uses the equality method of the `Struct`'s schema (https://github.com/apache/kafka/blob/87b24025cedf08c550a180819b2a5a2dbb75f020/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L247), which in this case is a `SchemaBuilder` instance that doesn't have an overridden `equals` method, I think the change proposed here might cause the above example to start to fail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante edited a comment on pull request #10566: KAFKA-12694 Avoid schema mismatch DataException when validating default values
C0urante edited a comment on pull request #10566: URL: https://github.com/apache/kafka/pull/10566#issuecomment-824292010 Hmm... I'm wondering if this might break existing setups. Since the `SchemaBuilder` class does implement the `Schema` interface, it's currently possible to do something like this: ```java import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; SchemaBuilder builder = SchemaBuilder.struct() .field("f1", Schema.BOOLEAN_SCHEMA); Struct defaultValue = new Struct(builder); defaultValue.put("f1", true); Schema schema = builder.defaultValue(defaultValue).build(); ``` Validation currently uses the equality method of the `Struct`'s schema: https://github.com/apache/kafka/blob/87b24025cedf08c550a180819b2a5a2dbb75f020/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L247 which in this case is a `SchemaBuilder` instance that doesn't have an overridden `equals` method, so I think the change proposed here might cause the above example to start to fail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante edited a comment on pull request #10566: KAFKA-12694 Avoid schema mismatch DataException when validating default values
C0urante edited a comment on pull request #10566: URL: https://github.com/apache/kafka/pull/10566#issuecomment-824292010 Hmm... I'm wondering if this might break existing setups. Since the `SchemaBuilder` class does implement the `Schema` interface, it's currently possible to do something like this: ```java import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; SchemaBuilder builder = SchemaBuilder.struct() .field("f1", Schema.BOOLEAN_SCHEMA); Struct defaultValue = new Struct(builder); defaultValue.put("f1", true); Schema schema = builder.defaultValue(defaultValue).build(); ``` Validation currently uses the equality method of the `Struct`'s schema: https://github.com/apache/kafka/blob/87b24025cedf08c550a180819b2a5a2dbb75f020/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java#L247 which in this case is a `SchemaBuilder` instance that doesn't have an overridden `equals` method, so I think the change proposed here might cause the above example to start to fail. A naive solution could be to modify the validation to be something like `if (!schema.equals(struct.schema()))` but that may introduce other edge cases. Can you provide a brief example of what sort of connector code ran into this bug in the first place? That might help guide the path forward. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12696) Add standard getters to LagInfo class to allow automatic serialization
[ https://issues.apache.org/jira/browse/KAFKA-12696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326845#comment-17326845 ] Matthias J. Sax commented on KAFKA-12696: - {quote}Ok. Just to confirm, the confluence where KIPs are created is a separate login, so I should sign up? {quote} Yes. Just share your account name here, and we can grant write access so you can create a KIP. Just want to clarify again: it will be difficult to get this a KIP approved, and it could easily happen that it will be rejected (just want to make sure you understand this and won't be disappointed in case it happens). > Add standard getters to LagInfo class to allow automatic serialization > -- > > Key: KAFKA-12696 > URL: https://issues.apache.org/jira/browse/KAFKA-12696 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Mikhail Panchenko >Assignee: Mikhail Panchenko >Priority: Trivial > Labels: needs-kip > Original Estimate: 0.5h > Remaining Estimate: 0.5h > > The LagInfo class has non-standard getters for its member variables. This > means that Jackson and other serialization frameworks do not know how to > serialize them without additional annotations. So when implementing the sort > of system that KAFKA-6144 is meant to enable, as documented here in docs like > [Kafka Streams Interactive > Queries|https://docs.confluent.io/platform/current/streams/developer-guide/interactive-queries.html#querying-state-stores-during-a-rebalance], > one has to either inject a bunch of custom serialization logic into Jersey > or wrap this class. > The patch to fix this is trivial, and I will be putting one up shortly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kpatelatwork commented on pull request #10580: KAFKA-12704 Fixed potential concurrency issue in connector creation
kpatelatwork commented on pull request #10580: URL: https://github.com/apache/kafka/pull/10580#issuecomment-824299926 > We may want to upgrade `tempConnectors` from a regular `Map` to a `ConcurrentMap` as well, LGTM otherwise though. it's already a ConcurrentHashMap :). `private Map tempConnectors = new ConcurrentHashMap<>(); ` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #10580: KAFKA-12704 Fixed potential concurrency issue in connector creation
C0urante commented on pull request #10580: URL: https://github.com/apache/kafka/pull/10580#issuecomment-824300835 Sure, I mean this: ```java private ConcurrentMap tempConnectors = new ConcurrentHashMap<>(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10493) KTable out-of-order updates are not being ignored
[ https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326860#comment-17326860 ] Matthias J. Sax commented on KAFKA-10493: - Thinking about this ticket once more, I actually have one more concern: if we start to drop out-of-order updates, me might actually end up in an inconsistent state, because topic compaction is offset base... (at least when we apply the source-topic optimization – without the optimization and a dedicated changelog topic, it would be safe.) After [KIP-280|https://cwiki.apache.org/confluence/display/KAFKA/KIP-280%3A+Enhanced+log+compaction] is done, we could drop out-or-order records also with source-table materialization enabled. Thoughts? > KTable out-of-order updates are not being ignored > - > > Key: KAFKA-10493 > URL: https://issues.apache.org/jira/browse/KAFKA-10493 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Pedro Gontijo >Assignee: Matthias J. Sax >Priority: Blocker > Fix For: 3.0.0 > > Attachments: KTableOutOfOrderBug.java > > > On a materialized KTable, out-of-order records for a given key (records which > timestamp are older than the current value in store) are not being ignored > but used to update the local store value and also being forwarded. > I believe the bug is here: > [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77] > It should return true, not false (see javadoc) > The bug impacts here: > [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148] > I have attached a simple stream app that shows the issue happening. > Thank you! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kpatelatwork commented on pull request #10580: KAFKA-12704 Fixed potential concurrency issue in connector creation
kpatelatwork commented on pull request #10580: URL: https://github.com/apache/kafka/pull/10580#issuecomment-824306271 done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #10581: HOTFIX: kafka streams lib missing in dependencies.gradle
ableegoldman opened a new pull request #10581: URL: https://github.com/apache/kafka/pull/10581 Seems this was missed to add during the original 2.6.0 release -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #10582: MINOR: Bump latest 2.6 version to 2.6.2
ableegoldman opened a new pull request #10582: URL: https://github.com/apache/kafka/pull/10582 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #10582: MINOR: Bump latest 2.6 version to 2.6.2
ableegoldman merged pull request #10582: URL: https://github.com/apache/kafka/pull/10582 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-5761) Serializer API should support ByteBuffer
[ https://issues.apache.org/jira/browse/KAFKA-5761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326876#comment-17326876 ] Kirill Rodionov commented on KAFKA-5761: If we're talking pooled/reused buffers (like Netty's ByteBuf), then allocating them in the serializer is not going to achieve much because the data is copied to accumulator (in KafkaProducer.doSend) after the serializer is invoked and there's no way to release the buffer back to the pool other than in finalize() method. There appears to be more sense in explicit support for ByteBuffer as a value so that user code can allocate it before calling producer.send and release after that since it's guaranteed that the contents have been copied to producer's accumulator by then The only huccup here is that "value.serializer" property is mandatory and its presence is checked at KafkaProducer's contstruction time when there's no value to query its type. If you make the value.serializer property optional, then there's a possibility of an error at the first send() invocation if the value happens to be anything other than ByteBuffer The other option is to keep value serializer mandatory like now but ignore it if the value is a ByteBuffer whose contents can be copied without any serializers valueBytes would have to be removed from Partitioner's signature (no impl I know uses that argument anyway) WDYT? > Serializer API should support ByteBuffer > > > Key: KAFKA-5761 > URL: https://issues.apache.org/jira/browse/KAFKA-5761 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 0.11.0.0 >Reporter: Bhaskar Gollapudi >Assignee: Chia-Ping Tsai >Priority: Major > Labels: features, performance > > Consider the Serializer : Its main method is : > byte[] serialize(String topic, T data); > Producer applications create a implementation that takes in an instance ( > of T ) and convert that to a byte[]. This byte array is allocated a new for > this message.This byte array then is handed over to Kafka Producer API > internals that write the bytes to buffer/ network socket. When the next > message arrives , the serializer instead of creating a new byte[] , should > try to reuse the existing byte[] for the new message. This requires two > things : > 1. The process of handing off the bytes to the buffer/socket and reusing > the byte[] must happen on the same thread. > 2 There should be a way for marking the end of available bytes in the > byte[]. > The first is reasonably simple to understand. If this does not happen , and > without other necessary synchrinization , the byte[] get corrupted and so > is the message written to buffer/socket.However , this requirement is easy > to meet for a producer application , because it controls the threads on > which the serializer is invoked. > The second is where the problem lies with the current API. It does not > allow a variable size of bytes to be read from a container. It is limited > by the byte[]'s length. This forces the producer to > 1 either create a new byte[] for a message that is bigger than the previous > one. > OR > 2. Decide a max size and use a padding . > Both are cumbersome and error prone, and may cause wasting of network > bandwidth. > Instead , if there is an Serializer with this method : > ByteBuffer serialize(String topic, T data); > This helps to implements a reusable bytes container for clients to avoid > allocations for each message. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-5761) Serializer API should support ByteBuffer
[ https://issues.apache.org/jira/browse/KAFKA-5761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326876#comment-17326876 ] Kirill Rodionov edited comment on KAFKA-5761 at 4/21/21, 7:54 PM: -- If we're talking pooled/reused buffers (like Netty's ByteBuf), then allocating them in the serializer is not going to achieve much because the data is copied to accumulator (in KafkaProducer.doSend) after the serializer is invoked and there's no way to release the buffer back to the pool other than in finalize() method. There appears to be more sense in explicit support for ByteBuffer as a value so that user code can allocate it before calling producer.send and release after that since it's guaranteed that the contents have been copied to producer's accumulator by then The only huccup here is that "value.serializer" property is mandatory and its presence is checked at KafkaProducer's contstruction time when there's no value present yet and therefore, no way to know if it's going to be a ByteBuffer If you make the value.serializer property optional, then there's a possibility of an error on the first send() invocation if the value happens to be anything other than ByteBuffer The other option is to keep value serializer mandatory like now but ignore it if the value is a ByteBuffer whose contents can be copied without any serializers valueBytes would have to be removed from Partitioner's signature (no impl I know uses that argument anyway) WDYT? was (Author: bruto): If we're talking pooled/reused buffers (like Netty's ByteBuf), then allocating them in the serializer is not going to achieve much because the data is copied to accumulator (in KafkaProducer.doSend) after the serializer is invoked and there's no way to release the buffer back to the pool other than in finalize() method. There appears to be more sense in explicit support for ByteBuffer as a value so that user code can allocate it before calling producer.send and release after that since it's guaranteed that the contents have been copied to producer's accumulator by then The only huccup here is that "value.serializer" property is mandatory and its presence is checked at KafkaProducer's contstruction time when there's no value to query its type. If you make the value.serializer property optional, then there's a possibility of an error at the first send() invocation if the value happens to be anything other than ByteBuffer The other option is to keep value serializer mandatory like now but ignore it if the value is a ByteBuffer whose contents can be copied without any serializers valueBytes would have to be removed from Partitioner's signature (no impl I know uses that argument anyway) WDYT? > Serializer API should support ByteBuffer > > > Key: KAFKA-5761 > URL: https://issues.apache.org/jira/browse/KAFKA-5761 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 0.11.0.0 >Reporter: Bhaskar Gollapudi >Assignee: Chia-Ping Tsai >Priority: Major > Labels: features, performance > > Consider the Serializer : Its main method is : > byte[] serialize(String topic, T data); > Producer applications create a implementation that takes in an instance ( > of T ) and convert that to a byte[]. This byte array is allocated a new for > this message.This byte array then is handed over to Kafka Producer API > internals that write the bytes to buffer/ network socket. When the next > message arrives , the serializer instead of creating a new byte[] , should > try to reuse the existing byte[] for the new message. This requires two > things : > 1. The process of handing off the bytes to the buffer/socket and reusing > the byte[] must happen on the same thread. > 2 There should be a way for marking the end of available bytes in the > byte[]. > The first is reasonably simple to understand. If this does not happen , and > without other necessary synchrinization , the byte[] get corrupted and so > is the message written to buffer/socket.However , this requirement is easy > to meet for a producer application , because it controls the threads on > which the serializer is invoked. > The second is where the problem lies with the current API. It does not > allow a variable size of bytes to be read from a container. It is limited > by the byte[]'s length. This forces the producer to > 1 either create a new byte[] for a message that is bigger than the previous > one. > OR > 2. Decide a max size and use a padding . > Both are cumbersome and error prone, and may cause wasting of network > bandwidth. > Instead , if there is an Serializer with this method : > ByteBuffer serialize(String topic, T data); > This helps to implements a reusable bytes con
[GitHub] [kafka] ableegoldman commented on a change in pull request #10581: HOTFIX: kafka streams lib missing in dependencies.gradle
ableegoldman commented on a change in pull request #10581: URL: https://github.com/apache/kafka/pull/10581#discussion_r617836508 ## File path: gradle/dependencies.gradle ## @@ -166,6 +166,7 @@ libs += [ kafkaStreams_23: "org.apache.kafka:kafka-streams:$versions.kafka_23", kafkaStreams_24: "org.apache.kafka:kafka-streams:$versions.kafka_24", kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25", + kafkaStreams_26: "org.apache.kafka:kafka-streams:$versions.kafka_26", Review comment: I'm not 100% sure if this is actually supposed to be on the 2.6 branch or not, but the `kafka_26` was already defined and it seems odd to have one but not the other. If adding this is wrong, I can take out the `kafka_26` instead 🤷♀️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10581: HOTFIX: kafka streams lib missing in dependencies.gradle
ableegoldman commented on a change in pull request #10581: URL: https://github.com/apache/kafka/pull/10581#discussion_r617836508 ## File path: gradle/dependencies.gradle ## @@ -166,6 +166,7 @@ libs += [ kafkaStreams_23: "org.apache.kafka:kafka-streams:$versions.kafka_23", kafkaStreams_24: "org.apache.kafka:kafka-streams:$versions.kafka_24", kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25", + kafkaStreams_26: "org.apache.kafka:kafka-streams:$versions.kafka_26", Review comment: I'm not 100% sure if this is actually supposed to be on the 2.6 branch or not, but the `kafka_26` was already defined and it seems odd to have one but not the other. If adding this is wrong, I can take out the `kafka_26` instead 🤷♀️ . @mjsax ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10582: MINOR: Bump latest 2.6 version to 2.6.2
ableegoldman commented on pull request #10582: URL: https://github.com/apache/kafka/pull/10582#issuecomment-824320067 Cherrypicked back to 2.8 and 2.7 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10573: KAFKA-12574: KIP-732, Deprecate eos-alpha and replace eos-beta with eos-v2
mjsax commented on a change in pull request #10573: URL: https://github.com/apache/kafka/pull/10573#discussion_r617826977 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ## @@ -642,9 +647,13 @@ public void beginTransaction() throws ProducerFencedException { * to the partition leader. See the exception for more details * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any * other unexpected error + * + * @deprecated Since 3.0.0, will be removed in 4.0. Use {@link #sendOffsetsToTransaction(Map, ConsumerGroupMetadata)} instead. */ +@Deprecated public void sendOffsetsToTransaction(Map offsets, String consumerGroupId) throws ProducerFencedException { +log.warn("This method has been deprecated and will be removed in 4.0, please use #sendOffsetsToTransaction(Map, ConsumerGroupMetadata) instead"); Review comment: Do we really need to log a WAR? We never did anything link this in KS code base in the past. Marking the method as `@Deprecated` should be sufficient IMHO? Or it such a WARN log custom in client code base? ## File path: clients/src/main/java/org/apache/kafka/clients/producer/Producer.java ## @@ -49,7 +49,10 @@ /** * See {@link KafkaProducer#sendOffsetsToTransaction(Map, String)} + * + Review comment: nit: needs cleanup ## File path: docs/streams/core-concepts.html ## @@ -291,16 +291,18 @@ https://cwiki.apache.org/confluence/display/KAFKA/KIP-129%3A+Streams+Exactly-Once+Semantics";>KIP-129. -As of the 2.6.0 release, Kafka Streams supports an improved implementation of exactly-once processing, named "exactly-once beta", +As of the 2.6.0 release, Kafka Streams supports an improved implementation of exactly-once processing, named "exactly-once v2", which requires broker version 2.5.0 or newer. This implementation is more efficient, because it reduces client and broker resource utilization, like client threads and used network connections, and it enables higher throughput and improved scalability. +As of the 3.0.0 release, the old "alpha" version of exactly-once has been deprecated. Users are encouraged to use exactly-once v2 for Review comment: We never used "alpha" anywhere in public, so might be better to avoid this term at all? `old "alpha" -> "first" ## File path: docs/streams/developer-guide/config-streams.html ## @@ -667,12 +668,14 @@ probing.rebalance.interval.msprocessing.guarantee The processing guarantee that should be used. - Possible values are "at_least_once" (default), - "exactly_once" (for EOS version 1), - and "exactly_once_beta" (for EOS version 2). - Using "exactly_once" requires broker - version 0.11.0 or newer, while using "exactly_once_beta" - requires broker version 2.5 or newer. + Possible values are "at_least_once" (default) + and "exactly_once_v2" (for EOS version 2). + Deprecated config options are "exactly_once" (for EOS alpha), Review comment: Do we need to elaborate on all deprecated configs? ## File path: docs/streams/upgrade-guide.html ## @@ -93,6 +95,12 @@ Upgrade Guide and API Changes Streams API changes in 3.0.0 + + The StreamsConfig.EXACTLY_ONCE and StreamsConfig.EXACTLY_ONCE_BETA configs have been deprecated, and a new StreamsConfig.EXACTLY_ONCE_V2 config has been Review comment: Might be good to highlight at `beta -> v2` is just a renaming ? ## File path: docs/streams/upgrade-guide.html ## @@ -53,17 +53,19 @@ Upgrade Guide and API Changes -Starting in Kafka Streams 2.6.x, a new processing mode is available, named EOS version 2, which is configurable by setting -processing.guarantee to "exactly_once_beta". -NOTE: The "exactly_once_beta" processing mode is ready for production (i.e., it's not "beta" software). +Starting in Kafka Streams 2.6.x, a new processing mode is available, named EOS version 2. This can be configured +by setting StreamsConfig.PROCESSING_GUARANTEE to StreamsConfig.EXACTLY_ONCE_V2 for +application versions 3.0+, or setting it to StreamsConfig.EXACTLY_ONCE_BETA for versions between 2.6 and 3.0. To use this new feature, your brokers must be on version 2.5.x or newer. -A switch from "exactly_once" to "exactly_once_beta" (or the other way around) is -only possible if the application is on version 2.6.x. -If you want to upgrade your application from an older version and enable this feature, -you first need to upgrade your application to version 2.6.x, stayin
[GitHub] [kafka] mjsax commented on a change in pull request #10581: HOTFIX: kafka streams lib missing in dependencies.gradle
mjsax commented on a change in pull request #10581: URL: https://github.com/apache/kafka/pull/10581#discussion_r617839610 ## File path: gradle/dependencies.gradle ## @@ -166,6 +166,7 @@ libs += [ kafkaStreams_23: "org.apache.kafka:kafka-streams:$versions.kafka_23", kafkaStreams_24: "org.apache.kafka:kafka-streams:$versions.kafka_24", kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25", + kafkaStreams_26: "org.apache.kafka:kafka-streams:$versions.kafka_26", Review comment: We don't need this for `2.6` branch, as when we test upgrade from older version to `2.6` we just use the jar we just built base on `2.6` branch -- we only need to pull in older versions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10581: HOTFIX: kafka streams lib missing in dependencies.gradle
ableegoldman commented on a change in pull request #10581: URL: https://github.com/apache/kafka/pull/10581#discussion_r617842172 ## File path: gradle/dependencies.gradle ## @@ -166,6 +166,7 @@ libs += [ kafkaStreams_23: "org.apache.kafka:kafka-streams:$versions.kafka_23", kafkaStreams_24: "org.apache.kafka:kafka-streams:$versions.kafka_24", kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25", + kafkaStreams_26: "org.apache.kafka:kafka-streams:$versions.kafka_26", Review comment: So I can/should remove the `kafka_26` from this branch as well, yes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #10548: KAFKA-12396 added a nullcheck before trying to retrieve a key
mjsax commented on pull request #10548: URL: https://github.com/apache/kafka/pull/10548#issuecomment-824325232 > All of them already had the checks. Sweet. I did not double check the code before. Seems, `put(final Windowed sessionKey,...)` and `remove(final Windowed sessionKey)` don't check for `sessionKey.key() != null` though? Can we add this check? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10581: HOTFIX: kafka streams lib missing in dependencies.gradle
mjsax commented on a change in pull request #10581: URL: https://github.com/apache/kafka/pull/10581#discussion_r617844560 ## File path: gradle/dependencies.gradle ## @@ -166,6 +166,7 @@ libs += [ kafkaStreams_23: "org.apache.kafka:kafka-streams:$versions.kafka_23", kafkaStreams_24: "org.apache.kafka:kafka-streams:$versions.kafka_24", kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25", + kafkaStreams_26: "org.apache.kafka:kafka-streams:$versions.kafka_26", Review comment: Well, this PR adds `kafka_26` but we don't need to add it -- not sure what you want to remove? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10581: HOTFIX: kafka streams lib missing in dependencies.gradle
ableegoldman commented on a change in pull request #10581: URL: https://github.com/apache/kafka/pull/10581#discussion_r617845243 ## File path: gradle/dependencies.gradle ## @@ -166,6 +166,7 @@ libs += [ kafkaStreams_23: "org.apache.kafka:kafka-streams:$versions.kafka_23", kafkaStreams_24: "org.apache.kafka:kafka-streams:$versions.kafka_24", kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25", + kafkaStreams_26: "org.apache.kafka:kafka-streams:$versions.kafka_26", Review comment: not ` kafkaStreams_26`, `kafka_26` (defined above) -- it just seems weird to me that we would define one but not the other? But I guess it doesn't hurt so I can close this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman closed pull request #10581: HOTFIX: kafka streams lib missing in dependencies.gradle
ableegoldman closed pull request #10581: URL: https://github.com/apache/kafka/pull/10581 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #10565: KAFKA-12691: Add case where task can be considered idling
wcarlson5 commented on a change in pull request #10565: URL: https://github.com/apache/kafka/pull/10565#discussion_r617857575 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -667,8 +668,15 @@ public boolean isProcessable(final long wallClockTime) { // thus, the task is not processable, even if there is available data in the record queue return false; } - -return partitionGroup.readyToProcess(wallClockTime); +final boolean readyToProcess = partitionGroup.readyToProcess(wallClockTime); +if (!readyToProcess) { +if (!timeCurrentIdlingStarted.isPresent()) { +timeCurrentIdlingStarted = Optional.of(wallClockTime); +} +} else { +timeCurrentIdlingStarted = Optional.empty(); Review comment: Yes that sounds like what I was thinking -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12701) NPE in MetadataRequest when using topic IDs
[ https://issues.apache.org/jira/browse/KAFKA-12701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326909#comment-17326909 ] Justine Olshan commented on KAFKA-12701: [~dengziming] I can work on the 2.8 version where we return an error if you haven't started on that already. > NPE in MetadataRequest when using topic IDs > --- > > Key: KAFKA-12701 > URL: https://issues.apache.org/jira/browse/KAFKA-12701 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0 >Reporter: Travis Bischel >Assignee: dengziming >Priority: Major > > Authorized result checking relies on topic name to not be null, which, when > using topic IDs, it is. > Unlike the logic in handleDeleteTopicsRequest, hanelMetadataRequest does not > check zk for the names corresponding to topic IDs if topic IDs are present. > {noformat} > [2021-04-21 05:53:01,463] ERROR [KafkaApi-1] Error when handling request: > clientId=kgo, correlationId=1, api=METADATA, version=11, > body=MetadataRequestData(topics=[MetadataRequestTopic(topicId=LmqOoFOASnqQp_4-oJgeKA, > name=null)], allowAutoTopicCreation=false, > includeClusterAuthorizedOperations=false, > includeTopicAuthorizedOperations=false) (kafka.server.RequestHandlerHelper) > java.lang.NullPointerException: name > at java.base/java.util.Objects.requireNonNull(Unknown Source) > at > org.apache.kafka.common.resource.ResourcePattern.(ResourcePattern.java:50) > at > kafka.server.AuthHelper.$anonfun$filterByAuthorized$3(AuthHelper.scala:121) > at scala.collection.Iterator$$anon$9.next(Iterator.scala:575) > at scala.collection.mutable.Growable.addAll(Growable.scala:62) > at scala.collection.mutable.Growable.addAll$(Growable.scala:57) > at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:142) > at scala.collection.mutable.ArrayBuffer.addAll(ArrayBuffer.scala:42) > at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:258) > at scala.collection.mutable.ArrayBuffer$.from(ArrayBuffer.scala:247) > at scala.collection.SeqFactory$Delegate.from(Factory.scala:306) > at scala.collection.IterableOnceOps.toBuffer(IterableOnce.scala:1270) > at scala.collection.IterableOnceOps.toBuffer$(IterableOnce.scala:1270) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1288) > at kafka.server.AuthHelper.filterByAuthorized(AuthHelper.scala:120) > at > kafka.server.KafkaApis.handleTopicMetadataRequest(KafkaApis.scala:1146) > at kafka.server.KafkaApis.handle(KafkaApis.scala:170) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74) > at java.base/java.lang.Thread.run(Unknown Source) > [2021-04-21 05:53:01,464] ERROR [Kafka Request Handler 1 on Broker 1], > Exception when handling request (kafka.server.KafkaRequestHandler) > java.lang.NullPointerException > at > org.apache.kafka.common.message.MetadataResponseData$MetadataResponseTopic.addSize(MetadataResponseData.java:1247) > at > org.apache.kafka.common.message.MetadataResponseData.addSize(MetadataResponseData.java:417) > at > org.apache.kafka.common.protocol.SendBuilder.buildSend(SendBuilder.java:218) > at > org.apache.kafka.common.protocol.SendBuilder.buildResponseSend(SendBuilder.java:200) > at > org.apache.kafka.common.requests.AbstractResponse.toSend(AbstractResponse.java:43) > at > org.apache.kafka.common.requests.RequestContext.buildResponseSend(RequestContext.java:111) > at > kafka.network.RequestChannel$Request.buildResponseSend(RequestChannel.scala:132) > at > kafka.server.RequestHandlerHelper.sendResponse(RequestHandlerHelper.scala:185) > at > kafka.server.RequestHandlerHelper.sendErrorOrCloseConnection(RequestHandlerHelper.scala:155) > at > kafka.server.RequestHandlerHelper.sendErrorResponseMaybeThrottle(RequestHandlerHelper.scala:109) > at > kafka.server.RequestHandlerHelper.handleError(RequestHandlerHelper.scala:79) > at kafka.server.KafkaApis.handle(KafkaApis.scala:229) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74) > at java.base/java.lang.Thread.run(Unknown Source) > {noformat} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #10564: MINOR: clean up some replication code
junrao commented on a change in pull request #10564: URL: https://github.com/apache/kafka/pull/10564#discussion_r617858509 ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -172,47 +174,54 @@ String diff(PartitionControlInfo prev) { StringBuilder builder = new StringBuilder(); String prefix = ""; if (!Arrays.equals(replicas, prev.replicas)) { - builder.append(prefix).append("oldReplicas=").append(Arrays.toString(prev.replicas)); +builder.append(prefix).append("replicas: "). +append(Arrays.toString(prev.replicas)). +append(" -> ").append(Arrays.toString(replicas)); prefix = ", "; - builder.append(prefix).append("newReplicas=").append(Arrays.toString(replicas)); } if (!Arrays.equals(isr, prev.isr)) { - builder.append(prefix).append("oldIsr=").append(Arrays.toString(prev.isr)); +builder.append(prefix).append("isr: "). +append(Arrays.toString(prev.isr)). +append(" -> ").append(Arrays.toString(isr)); prefix = ", "; - builder.append(prefix).append("newIsr=").append(Arrays.toString(isr)); } if (!Arrays.equals(removingReplicas, prev.removingReplicas)) { -builder.append(prefix).append("oldRemovingReplicas="). -append(Arrays.toString(prev.removingReplicas)); +builder.append(prefix).append("removingReplicas: "). +append(Arrays.toString(prev.removingReplicas)). +append(" -> ").append(Arrays.toString(removingReplicas)); prefix = ", "; -builder.append(prefix).append("newRemovingReplicas="). -append(Arrays.toString(removingReplicas)); } if (!Arrays.equals(addingReplicas, prev.addingReplicas)) { -builder.append(prefix).append("oldAddingReplicas="). -append(Arrays.toString(prev.addingReplicas)); +builder.append(prefix).append("addingReplicas: "). +append(Arrays.toString(prev.addingReplicas)). +append(" -> ").append(Arrays.toString(addingReplicas)); prefix = ", "; -builder.append(prefix).append("newAddingReplicas="). -append(Arrays.toString(addingReplicas)); } if (leader != prev.leader) { - builder.append(prefix).append("oldLeader=").append(prev.leader); +builder.append(prefix).append("leader: "). +append(prev.leader).append(" -> ").append(leader); prefix = ", "; -builder.append(prefix).append("newLeader=").append(leader); } if (leaderEpoch != prev.leaderEpoch) { - builder.append(prefix).append("oldLeaderEpoch=").append(prev.leaderEpoch); +builder.append(prefix).append("leaderEpoch: "). +append(prev.leaderEpoch).append(" -> ").append(leaderEpoch); prefix = ", "; - builder.append(prefix).append("newLeaderEpoch=").append(leaderEpoch); } if (partitionEpoch != prev.partitionEpoch) { - builder.append(prefix).append("oldPartitionEpoch=").append(prev.partitionEpoch); -prefix = ", "; - builder.append(prefix).append("newPartitionEpoch=").append(partitionEpoch); +builder.append(prefix).append("partitionEpoch: "). +append(prev.partitionEpoch).append(" -> ").append(partitionEpoch); } return builder.toString(); } +void maybeLogPartitionChange(Logger log, String description, PartitionControlInfo prev) { +if (!electionWasClean(leader, prev.isr)) { +log.info("UNCLEAN partition change for {}: {}", description, diff(prev)); +} else if (log.isDebugEnabled()) { +log.debug("partition change for {}: {}", description, diff(prev)); Review comment: We use to log all leader and isr changes in info even for clean leader election. ## File path: metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java ## @@ -321,22 +336,18 @@ public void replay(PartitionRecord record) { } PartitionControlInfo newPartInfo = new PartitionControlInfo(record); PartitionControlInfo prevPartInfo = topicInfo.parts.get(record.partitionId()); +String description = topicInfo.name + "-" + record.partitionId() + +" with ID " + record.topicId(); Review comment: with ID => with topic ID to make it clear? ## File path: metadata/src/main/java/org/apache/kafka/c
[GitHub] [kafka] junrao commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.
junrao commented on a change in pull request #10271: URL: https://github.com/apache/kafka/pull/10271#discussion_r617904827 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/AbstractApiMessageAndVersionSerde.java ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage.serialization; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.ObjectSerializationCache; +import org.apache.kafka.metadata.ApiMessageAndVersion; + +import java.nio.ByteBuffer; + +/** + * This class provides serialization/deserialization of {@code ApiMessageAndVersion}. + * + * Implementors need to extend this class and implement {@link #apiMessageFor(short)} method to return a respective + * {@code ApiMessage} for the given {@code apiKey}. This is required to deserialize the bytes to build the respective + * {@code ApiMessage} instance. + */ +public abstract class AbstractApiMessageAndVersionSerde { + +public byte[] serialize(ApiMessageAndVersion messageAndVersion) { +ObjectSerializationCache cache = new ObjectSerializationCache(); +short version = messageAndVersion.version(); +ApiMessage message = messageAndVersion.message(); + +// Add header containing apiKey and apiVersion, +// headerSize is 1 byte for apiKey and 1 byte for apiVersion +int headerSize = 1 + 1; +int messageSize = message.size(cache, version); +ByteBufferAccessor writable = new ByteBufferAccessor(ByteBuffer.allocate(headerSize + messageSize)); + +// Write apiKey and version +writable.writeUnsignedVarint(message.apiKey()); Review comment: In https://github.com/apache/kafka/blob/trunk/raft/src/main/java/org/apache/kafka/raft/metadata/MetadataRecordSerde.java#L44, the serialization adds a frame version. The intention is to use that as the header version. Currently, the only fields in the header are apikey and version. The frame version allows us to change that in the future. We need to think through whether we should adopt the same strategy here. If we do, perhaps we could just reuse MetadataRecordSerde somehow. Also, whether we should share the apiKey space with the raft messages. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12705) Task idling is not sufficiently tested
Walker Carlson created KAFKA-12705: -- Summary: Task idling is not sufficiently tested Key: KAFKA-12705 URL: https://issues.apache.org/jira/browse/KAFKA-12705 Project: Kafka Issue Type: Improvement Components: streams Reporter: Walker Carlson The test for task idling are a bit sparse. When I changed it so that isProcessable always returns true only one test failed. That means the entire code path is hinging on one unit test (shouldBeProcessableIfAllPartitionsBuffered) that does not cover all branches of logic. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #10581: HOTFIX: kafka streams lib missing in dependencies.gradle
mjsax commented on a change in pull request #10581: URL: https://github.com/apache/kafka/pull/10581#discussion_r617935620 ## File path: gradle/dependencies.gradle ## @@ -166,6 +166,7 @@ libs += [ kafkaStreams_23: "org.apache.kafka:kafka-streams:$versions.kafka_23", kafkaStreams_24: "org.apache.kafka:kafka-streams:$versions.kafka_24", kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25", + kafkaStreams_26: "org.apache.kafka:kafka-streams:$versions.kafka_26", Review comment: Ah. Yes. Could be removed. It's unused. But also does not matter to have an unused var defined... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10581: HOTFIX: kafka streams lib missing in dependencies.gradle
mjsax commented on a change in pull request #10581: URL: https://github.com/apache/kafka/pull/10581#discussion_r617936154 ## File path: gradle/dependencies.gradle ## @@ -166,6 +166,7 @@ libs += [ kafkaStreams_23: "org.apache.kafka:kafka-streams:$versions.kafka_23", kafkaStreams_24: "org.apache.kafka:kafka-streams:$versions.kafka_24", kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25", + kafkaStreams_26: "org.apache.kafka:kafka-streams:$versions.kafka_26", Review comment: Seems this PR added if for no reason (but also no hard): https://github.com/apache/kafka/commit/9ebc71e3d7d1e361b3844d907c4ae935fcda1240 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10581: HOTFIX: kafka streams lib missing in dependencies.gradle
mjsax commented on a change in pull request #10581: URL: https://github.com/apache/kafka/pull/10581#discussion_r617936154 ## File path: gradle/dependencies.gradle ## @@ -166,6 +166,7 @@ libs += [ kafkaStreams_23: "org.apache.kafka:kafka-streams:$versions.kafka_23", kafkaStreams_24: "org.apache.kafka:kafka-streams:$versions.kafka_24", kafkaStreams_25: "org.apache.kafka:kafka-streams:$versions.kafka_25", + kafkaStreams_26: "org.apache.kafka:kafka-streams:$versions.kafka_26", Review comment: Seems this PR added if for no reason (but also no harm): https://github.com/apache/kafka/commit/9ebc71e3d7d1e361b3844d907c4ae935fcda1240 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12706) Consider adding reason and source of error in APPLICATION_SHUTDOWN
A. Sophie Blee-Goldman created KAFKA-12706: -- Summary: Consider adding reason and source of error in APPLICATION_SHUTDOWN Key: KAFKA-12706 URL: https://issues.apache.org/jira/browse/KAFKA-12706 Project: Kafka Issue Type: Improvement Components: streams Reporter: A. Sophie Blee-Goldman At the moment when a user opts to shut down the application in the streams uncaught exception handler, we just send a signal to all members of the group who then shut down. If there are a large number of application instances running it can be annoying and time consuming to locate the client that hit this error. It would be nice if we could let each client log the exception that triggered this, and possibly also the client who requested the shutdown. That will make it much easier to identify the problem, and figure out which set of logs to look into for further information -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9772) Transactional offset commit fails with IllegalStateException
[ https://issues.apache.org/jira/browse/KAFKA-9772?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-9772: -- Assignee: Jason Gustafson > Transactional offset commit fails with IllegalStateException > > > Key: KAFKA-9772 > URL: https://issues.apache.org/jira/browse/KAFKA-9772 > Project: Kafka > Issue Type: Bug >Reporter: Dhruvil Shah >Assignee: Jason Gustafson >Priority: Major > > {code:java} > Trying to complete a transactional offset commit for producerId 7090 and > groupId application-id even though the offset commit record itself hasn't > been appended to the log.{code} > {code:java} > java.lang.IllegalStateException: Trying to complete a transactional offset > commit for producerId 7090 and groupId application-id even though the offset > commit record itself hasn't been appended to the log. at > kafka.coordinator.group.GroupMetadata.$anonfun$completePendingTxnOffsetCommit$2(GroupMetadata.scala:677) > at scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) at > scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at > scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at > scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at > scala.collection.mutable.HashMap.foreach(HashMap.scala:149) at > kafka.coordinator.group.GroupMetadata.$anonfun$completePendingTxnOffsetCommit$1(GroupMetadata.scala:674) > at > kafka.coordinator.group.GroupMetadata.completePendingTxnOffsetCommit(GroupMetadata.scala:673) > at > kafka.coordinator.group.GroupMetadataManager.$anonfun$handleTxnCompletion$2(GroupMetadataManager.scala:874) > at kafka.coordinator.group.GroupMetadata.inLock(GroupMetadata.scala:228) at > kafka.coordinator.group.GroupMetadataManager.$anonfun$handleTxnCompletion$1(GroupMetadataManager.scala:873) > at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) at > kafka.coordinator.group.GroupMetadataManager.handleTxnCompletion(GroupMetadataManager.scala:870) > at > kafka.coordinator.group.GroupMetadataManager.$anonfun$scheduleHandleTxnCompletion$1(GroupMetadataManager.scala:865) > at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:114) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > at java.base/java.lang.Thread.run(Thread.java:834){code} -- This message was sent by Atlassian Jira (v8.3.4#803005)