[GitHub] [kafka] mimaison commented on pull request #8921: KAFKA-10160: Kafka MM2 consumer configuration
mimaison commented on pull request #8921: URL: https://github.com/apache/kafka/pull/8921#issuecomment-659272353 Test failure is unrelated: - org.apache.kafka.streams.integration.BranchedMultiLevelRepartitionConnectedTopologyTest.testTopologyBuild This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #8921: KAFKA-10160: Kafka MM2 consumer configuration
mimaison merged pull request #8921: URL: https://github.com/apache/kafka/pull/8921 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10160) Kafka MM2 consumer configuration
[ https://issues.apache.org/jira/browse/KAFKA-10160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-10160. Fix Version/s: 2.7.0 Resolution: Fixed > Kafka MM2 consumer configuration > > > Key: KAFKA-10160 > URL: https://issues.apache.org/jira/browse/KAFKA-10160 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.5.0, 2.4.1 >Reporter: Pavol Ipoth >Assignee: sats >Priority: Major > Labels: configuration, kafka, mirror-maker > Fix For: 2.7.0 > > > [https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L51,] > according this producer/consumer level properties should be configured as > e.g. somesource->sometarget.consumer.client.id, i try to set > somesource->sometarget.consumer.auto.offset.reset=latest, but without > success, consumer always tries to fetch earliest, not sure if bug or my > misconfiguration, but then at least some update to docu would be useful -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rajinisivaram commented on a change in pull request #8979: KAFKA-10223; Use NOT_LEADER_OR_FOLLOWER instead of non-retriable REPLICA_NOT_AVAILABLE for consumers
rajinisivaram commented on a change in pull request #8979: URL: https://github.com/apache/kafka/pull/8979#discussion_r455644987 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -712,10 +708,13 @@ class ReplicaManager(val config: KafkaConfig, } catch { case e@(_: InvalidTopicException | _: LogDirNotFoundException | - _: ReplicaNotAvailableException | _: KafkaStorageException) => -warn("Unable to alter log dirs for %s".format(topicPartition), e) +warn(s"Unable to alter log dirs for $topicPartition", e) (topicPartition, Errors.forException(e)) + case e: NotLeaderOrFollowerException => +warn(s"Unable to alter log dirs for $topicPartition", e) +// Retaining REPLICA_NOT_AVAILABLE exception for ALTER_REPLICA_LOG_DIRS for older versions for compatibility +(topicPartition, if (config.interBrokerProtocolVersion >= KAFKA_2_7_IV0) Errors.NOT_LEADER_OR_FOLLOWER else Errors.REPLICA_NOT_AVAILABLE) Review comment: Reverted. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #8979: KAFKA-10223; Use NOT_LEADER_OR_FOLLOWER instead of non-retriable REPLICA_NOT_AVAILABLE for consumers
rajinisivaram commented on a change in pull request #8979: URL: https://github.com/apache/kafka/pull/8979#discussion_r455645402 ## File path: clients/src/main/java/org/apache/kafka/common/errors/NotLeaderOrFollowerException.java ## @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +/** + * This server is not the leader or follower for the given partition. + * This could a transient exception during reassignments. + */ +@SuppressWarnings("deprecation") +public class NotLeaderOrFollowerException extends NotLeaderForPartitionException { Review comment: Updated javadoc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #8979: KAFKA-10223; Use NOT_LEADER_OR_FOLLOWER instead of non-retriable REPLICA_NOT_AVAILABLE for consumers
rajinisivaram commented on pull request #8979: URL: https://github.com/apache/kafka/pull/8979#issuecomment-659288156 @hachikuji @ijuma Thanks for the reviews, have addressed the comments so far. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9973) __consumer_offsets record is invalid lead to log clean failed and __consumer_offsets grows too big
[ https://issues.apache.org/jira/browse/KAFKA-9973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159059#comment-17159059 ] zhangzhisheng commented on KAFKA-9973: -- __consumer_offsets can not clean kafka version 2.12-2.4.1 jdk version jdk1.8.0_231 {code:java} // 21G ./data/kafka-logs/topic-2018066952-7 27G ./data/kafka-logs/__consumer_offsets-25 30G ./data/kafka-logs/topic-2018066951-0 30G ./data/kafka-logs/topic-2018066951-1 30G ./data/kafka-logs/topic-2018066951-3 30G ./data/kafka-logs/topic-2018066951-5 30G ./data/kafka-logs/topic-2018066951-6 30G ./data/kafka-logs/topic-2018066951-7 32G ./data/kafka-logs/__consumer_offsets-30 32G ./data/kafka-logs/topic-201911251114-0 32G ./data/kafka-logs/topic-201911251114-2 {code} > __consumer_offsets record is invalid lead to log clean failed and > __consumer_offsets grows too big > -- > > Key: KAFKA-9973 > URL: https://issues.apache.org/jira/browse/KAFKA-9973 > Project: Kafka > Issue Type: Bug > Components: log cleaner >Affects Versions: 2.1.1 >Reporter: leibo >Priority: Major > Attachments: dump.png, log-cleaner.log.1, screenshot-1.png > > > __consumer_offsets-34 grows to 13GB, and can't be compact successf, the error > log as bellow: > {code:java} > //代码占位符 > [2019-12-30 19:21:41,047] INFO [kafka-log-cleaner-thread-6]: Starting > (kafka.log.LogCleaner) > [2019-12-30 19:21:41,079] INFO [kafka-log-cleaner-thread-7]: Starting > (kafka.log.LogCleaner) > [2019-12-30 19:21:42,825] WARN [kafka-log-cleaner-thread-0]: Unexpected > exception thrown when cleaning log > Log(/home/kafka_2.12-2.1.1/data/kafka-logs/__consumer_offsets-34). Marking > its partition (__consumer_offsets-34) as uncleanable (kafka.log.LogCleaner) > org.apache.kafka.common.record.InvalidRecordException: Found invalid number > of record headers -47 > -- > [2019-12-30 20:23:51,340] INFO [kafka-log-cleaner-thread-5]: Starting > (kafka.log.LogCleaner) > [2019-12-30 20:23:51,361] INFO Cleaner 0: Building offset map for log > __consumer_offsets-34 for 52 segments in offset range [26842714, 73423730). > (kafka.log.LogCleaner) > [2019-12-30 20:23:51,398] INFO [kafka-log-cleaner-thread-6]: Starting > (kafka.log.LogCleaner) > [2019-12-30 20:23:51,493] INFO [kafka-log-cleaner-thread-7]: Starting > (kafka.log.LogCleaner) > [2019-12-30 20:23:59,596] WARN [kafka-log-cleaner-thread-0]: Unexpected > exception thrown when cleaning log > Log(/home/kafka_2.12-2.1.1/data/kafka-logs/__consumer_offsets-34). Marking > its partition (__consumer_offsets-34) as uncleanable (kafka.log.LogCleaner) > org.apache.kafka.common.record.InvalidRecordException: Found invalid number > of record headers -47 > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
mimaison commented on a change in pull request #8295: URL: https://github.com/apache/kafka/pull/8295#discussion_r455669663 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -892,136 +895,162 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListOffsetRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion -val mergedResponseMap = if (version == 0) +val topics = if (version == 0) handleListOffsetRequestV0(request) else handleListOffsetRequestV1AndAbove(request) -sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava)) +sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(new ListOffsetResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setTopics(topics.asJava))) } - private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetTopicResponse] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] -val partitionTimestamps = offsetRequest.partitionTimestamps.asScala -val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, partitionTimestamps)(_.topic) - -val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava) -} - -val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - try { -val offsets = replicaManager.legacyFetchOffsetsForTimestamp( - topicPartition = topicPartition, - timestamp = partitionData.timestamp, - maxNumOffsets = partitionData.maxNumOffsets, - isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, - fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) -(topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava)) - } catch { -// NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages -// are typically transient and there is no value in logging the entire stack trace for the same -case e @ (_ : UnknownTopicOrPartitionException | - _ : NotLeaderForPartitionException | - _ : KafkaStorageException) => - debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( -correlationId, clientId, topicPartition, e.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) -case e: Throwable => - error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - } -} -responseMap ++ unauthorizedResponseStatus - } - - private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { -val correlationId = request.header.correlationId -val clientId = request.header.clientId -val offsetRequest = request.body[ListOffsetRequest] - -val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic) - -val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, -ListOffsetResponse.UNKNOWN_TIMESTAMP, -ListOffsetResponse.UNKNOWN_OFFSET, -Optional.empty()) -} - -val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - if (offsetRequest.duplicatePartitions.contains(topicPartition)) { -debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + -s"failed because the partition is duplicated in the request.") -(topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } else { - -def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData) = { - (topicPartition, new ListOffsetResponse.PartitionData( -e, -ListOffsetResponse.UNKNOWN_TIMESTAMP, -
[GitHub] [kafka] mimaison commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
mimaison commented on a change in pull request #8295: URL: https://github.com/apache/kafka/pull/8295#discussion_r455671140 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -994,30 +1023,29 @@ public void onSuccess(ClientResponse response, RequestFuture f * value of each partition may be null only for v0. In v1 and later the ListOffset API would not * return a null timestamp (-1 is returned instead when necessary). */ -private void handleListOffsetResponse(Map timestampsToSearch, +private void handleListOffsetResponse(Map timestampsToSearch, ListOffsetResponse listOffsetResponse, RequestFuture future) { Map fetchedOffsets = new HashMap<>(); Set partitionsToRetry = new HashSet<>(); Set unauthorizedTopics = new HashSet<>(); -for (Map.Entry entry : timestampsToSearch.entrySet()) { +Map partitionsData = byTopicPartitions(listOffsetResponse.responseData()); Review comment: I actually switched logic to loop on the response as you initially suggested This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon opened a new pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
showuon opened a new pull request #9029: URL: https://github.com/apache/kafka/pull/9029 In the original test, we will sleep for static 5 seconds to ensure the automated group offset sync is complete. It sometimes synced fast (less than 1 sec), and sometimes slow (~ 20 seconds). I rewrite the sleep to wait for specific condition: 1. `consumer.endOffsets` to make sure the topic partition metadata is synced 2. `backupClient.listConsumerGroupOffsets` to make sure the consumerGroupOffset is also synced I've tested in my local environment a lot of times. It can make the test more stable. Thanks. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
mimaison commented on a change in pull request #8295: URL: https://github.com/apache/kafka/pull/8295#discussion_r455671951 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1077,7 +1077,7 @@ class ReplicaManager(val config: KafkaConfig, // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition val readInfo: LogReadInfo = partition.readRecords( fetchOffset = fetchInfo.fetchOffset, -currentLeaderEpoch = fetchInfo.currentLeaderEpoch, +currentLeaderEpoch = toScalaOption(fetchInfo.currentLeaderEpoch), Review comment: I think it's ok to keep the Scala Option here. https://github.com/apache/kafka/pull/9008 can just update the field if it changes name. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
showuon commented on a change in pull request #9029: URL: https://github.com/apache/kafka/pull/9029#discussion_r455672187 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -315,9 +319,34 @@ public void testReplication() throws InterruptedException { backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count()); } +private void waitForConsumerGroupOffsetSync(Consumer consumer, List topics) +throws InterruptedException { +Admin backupClient = backup.kafka().createAdminClient(); +List tps = new ArrayList<>(NUM_PARTITIONS * topics.size()); +IntStream.range(0, NUM_PARTITIONS).forEach( +partitionInd -> { +for (String topic: topics) { +tps.add(new TopicPartition(topic, partitionInd)); +} +} +); +long expectedTotalOffsets = NUM_RECORDS_PRODUCED * topics.size(); + +waitForCondition(() -> { +Map consumerGroupOffsets = + backupClient.listConsumerGroupOffsets("consumer-group-1").partitionsToOffsetAndMetadata().get(); +long consumerGroupOffsetTotal = consumerGroupOffsets.values().stream().mapToLong(metadata -> metadata.offset()).sum(); + +Map offsets = consumer.endOffsets(tps, Duration.ofMillis(500)); +long totalOffsets = offsets.values().stream().mapToLong(l -> l).sum(); + +// make sure the consumer group offsets are synced to expected number +return totalOffsets == expectedTotalOffsets && consumerGroupOffsetTotal > 0; +}, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time"); +} @Test -public void testOneWayReplicationWithAutorOffsetSync1() throws InterruptedException { +public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedException { Review comment: rename the typo test name This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
showuon commented on pull request #9029: URL: https://github.com/apache/kafka/pull/9029#issuecomment-659310965 @ning2008wisc @mimaison , could you help review this PR to fix the flaky test? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9027: KAFKA-9161: add docs for KIP-441 and KIP-613 and other configs that need fixing
cadonna commented on a change in pull request #9027: URL: https://github.com/apache/kafka/pull/9027#discussion_r455681362 ## File path: docs/streams/developer-guide/config-streams.html ## @@ -181,6 +193,7 @@ bootstrap.serverscache.max.bytes.buffering Medium Maximum number of memory bytes to be used for record caches across all threads. +Maximum number of memory bytes to be used for record caches across all threads. Review comment: This line seems a duplicate. ## File path: docs/streams/developer-guide/config-streams.html ## @@ -270,21 +296,26 @@ bootstrap.serversThe amount of time in milliseconds, before a request is retried. This applies if the retries parameter is configured to be greater than 0. 100 - rocksdb.config.setter + rocksdb.config.setter Medium The RocksDB configuration. - state.cleanup.delay.ms + state.cleanup.delay.ms Low The amount of time in milliseconds to wait before deleting state when a partition has migrated. 60 milliseconds - state.dir + state.dir High Directory location for state stores. /tmp/kafka-streams + topology.optimization Review comment: I guess it tells the renderer which CSS to use to have alternating row colours. Apparently, CSS3 already offers ways to accomplish this without these classes (see https://www.textfixer.com/tutorials/css-table-alternating-rows.php). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10278) kafka-configs does not show the current properties of running kafka broker upon describe.
kaushik srinivas created KAFKA-10278: Summary: kafka-configs does not show the current properties of running kafka broker upon describe. Key: KAFKA-10278 URL: https://issues.apache.org/jira/browse/KAFKA-10278 Project: Kafka Issue Type: Bug Affects Versions: 2.4.1 Reporter: kaushik srinivas kafka-configs.sh does not list the properties (read-only/per-broker/cluster-wide) with which the kafka broker is currently running. The command returns nothing. Only those properties added or updated via kafka-configs.sh is listed by the describe command. bash-4.2$ env -i bin/kafka-configs.sh --bootstrap-server kf-test-0.kf-test-headless.test.svc.cluster.local:9092 --entity-type brokers --entity-default --describe Default config for brokers in the cluster are: log.cleaner.threads=2 sensitive=false synonyms=\{DYNAMIC_DEFAULT_BROKER_CONFIG:log.cleaner.threads=2} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10278) kafka-configs does not show the current properties of running kafka broker upon describe.
[ https://issues.apache.org/jira/browse/KAFKA-10278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159129#comment-17159129 ] kaushik srinivas commented on KAFKA-10278: -- Hi [~rajinisiva...@gmail.com] [~rsivaram], [~ijuma] Is this a known issue already ? > kafka-configs does not show the current properties of running kafka broker > upon describe. > - > > Key: KAFKA-10278 > URL: https://issues.apache.org/jira/browse/KAFKA-10278 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.1 >Reporter: kaushik srinivas >Priority: Critical > Labels: kafka-configs.sh > > kafka-configs.sh does not list the properties > (read-only/per-broker/cluster-wide) with which the kafka broker is currently > running. > The command returns nothing. > Only those properties added or updated via kafka-configs.sh is listed by the > describe command. > bash-4.2$ env -i bin/kafka-configs.sh --bootstrap-server > kf-test-0.kf-test-headless.test.svc.cluster.local:9092 --entity-type brokers > --entity-default --describe Default config for brokers in the cluster are: > log.cleaner.threads=2 sensitive=false > synonyms=\{DYNAMIC_DEFAULT_BROKER_CONFIG:log.cleaner.threads=2} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
mimaison commented on pull request #8295: URL: https://github.com/apache/kafka/pull/8295#issuecomment-659347339 Thanks @dajac for your comments, I've pushed an update. @abbccdda Can you take a look? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram opened a new pull request #9030: MINOR: Filter out quota configs for ConfigCommand using --bootstrap-server
rajinisivaram opened a new pull request #9030: URL: https://github.com/apache/kafka/pull/9030 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index
[ https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159164#comment-17159164 ] Ivan Ponomarev commented on KAFKA-5488: --- Hi everyone, {quote}I think there are some unnecessary interfaces I don't think the return type needs to be a Map, {quote} [~high.lee] , concerning your comment about the API: the current API is a result of the extensive discussion (you can find the link to the discussion in the KIP itself). The first versions of this KIP didn't have Map return type and Function as a parameter, but there was a concern that all the branches will be in separate variable scopes, which is inconvenient in many cases. There was a really hard discussion with a number of ideas proposed and rejected, what we have now seems to be the best choice from many points of view. {quote}Are you willing to continue working? {quote} Sure, since I proposed this KIP, I'm going to implement it. I've been quite busy recently, but I really hope that I'll be able to post a PR from me in one or maximum two weeks. > KStream.branch should not return a Array of streams we have to access by > known index > > > Key: KAFKA-5488 > URL: https://issues.apache.org/jira/browse/KAFKA-5488 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Marcel "childNo͡.de" Trautwein >Assignee: highluck >Priority: Major > Labels: kip > > KIP-418: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream] > long story short: it's a mess to get a {{KStream<>[]}} out from > {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces > bad code which is not that good to maintain since you have to know the right > index for an unnamed branching stream. > Example > {code:java} > import org.apache.kafka.streams.kstream.KStreamBuilder; > import org.apache.kafka.streams.kstream.KStream; > public class StreamAppWithBranches { > public static void main(String... args) { > KStream[] branchedStreams= new KStreamBuilder() > .stream("eventTopic") > .branch( > (k, v) -> EventType::validData > (k, v) -> true > ); > > branchedStreams[0] > .to("topicValidData"); > > branchedStreams[1] > .to("topicInvalidData"); > } > } > {code} > Quick idea, s.th. like {{void branch(final BranchDefinition, > Consumer>>... branchPredicatesAndHandlers);}} where you can write > branches/streams code nested where it belongs to > so it would be possible to write code like > {code:java} > new KStreamBuilder() > .stream("eventTopic") > .branch( > Branch.create( > (k, v) -> EventType::validData, > stream -> stream.to("topicValidData") > ), > Branch.create( > (k, v) -> true, > stream -> stream.to("topicInvalidData") > ) > ); > {code} > I'll go forward to evaluate some ideas: > [https://gitlab.com/childno.de/apache_kafka/snippets/1665655] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index
[ https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Ponomarev reassigned KAFKA-5488: - Assignee: Ivan Ponomarev (was: highluck) > KStream.branch should not return a Array of streams we have to access by > known index > > > Key: KAFKA-5488 > URL: https://issues.apache.org/jira/browse/KAFKA-5488 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Marcel "childNo͡.de" Trautwein >Assignee: Ivan Ponomarev >Priority: Major > Labels: kip > > KIP-418: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream] > long story short: it's a mess to get a {{KStream<>[]}} out from > {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces > bad code which is not that good to maintain since you have to know the right > index for an unnamed branching stream. > Example > {code:java} > import org.apache.kafka.streams.kstream.KStreamBuilder; > import org.apache.kafka.streams.kstream.KStream; > public class StreamAppWithBranches { > public static void main(String... args) { > KStream[] branchedStreams= new KStreamBuilder() > .stream("eventTopic") > .branch( > (k, v) -> EventType::validData > (k, v) -> true > ); > > branchedStreams[0] > .to("topicValidData"); > > branchedStreams[1] > .to("topicInvalidData"); > } > } > {code} > Quick idea, s.th. like {{void branch(final BranchDefinition, > Consumer>>... branchPredicatesAndHandlers);}} where you can write > branches/streams code nested where it belongs to > so it would be possible to write code like > {code:java} > new KStreamBuilder() > .stream("eventTopic") > .branch( > Branch.create( > (k, v) -> EventType::validData, > stream -> stream.to("topicValidData") > ), > Branch.create( > (k, v) -> true, > stream -> stream.to("topicInvalidData") > ) > ); > {code} > I'll go forward to evaluate some ideas: > [https://gitlab.com/childno.de/apache_kafka/snippets/1665655] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-8296) Kafka Streams branch method raises type warnings
[ https://issues.apache.org/jira/browse/KAFKA-8296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ivan Ponomarev reassigned KAFKA-8296: - Assignee: Ivan Ponomarev > Kafka Streams branch method raises type warnings > > > Key: KAFKA-8296 > URL: https://issues.apache.org/jira/browse/KAFKA-8296 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Michael Drogalis >Assignee: Ivan Ponomarev >Priority: Minor > > Because the branch method in the DSL takes vargargs, using it as follows > raises an unchecked type warning: > {code:java} > KStream[] branches = builder. User>stream(inputTopic) > .branch((key, user) -> "united > states".equals(user.getCountry()), > (key, user) -> "germany".equals(user.getCountry()), > (key, user) -> "mexico".equals(user.getCountry()), > (key, user) -> true); > {code} > The compiler warns with: > {code:java} > Warning:(39, 24) java: unchecked generic array creation for varargs parameter > of type org.apache.kafka.streams.kstream.Predicate super io.confluent.developer.avro.User>[] > {code} > This is unfortunate because of the way Java's type system + generics work. We > could possibly fix this by adding the @SafeVarargs annotation to the branch > method signatures. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index
[ https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159164#comment-17159164 ] Ivan Ponomarev edited comment on KAFKA-5488 at 7/16/20, 12:38 PM: -- Hi everyone, {quote}I think there are some unnecessary interfaces I don't think the return type needs to be a Map, {quote} [~high.lee] , concerning your comment about the API: the current API is a result of the extensive discussion (you can find the link to the discussion in the KIP itself). The first versions of this KIP didn't have Map return type and Function as a parameter, but there was a concern that all the branches will be in separate variable scopes, which is inconvenient in many cases. There was a really hard discussion with a number of ideas proposed and rejected, what we have now seems to be the best choice from many points of view. {quote}Are you willing to continue working? {quote} Sure, since I proposed this KIP, I'm going to implement it. I've been quite busy recently, but I really hope that I'll be able to post a PR in one or maximum two weeks. was (Author: iponomarev): Hi everyone, {quote}I think there are some unnecessary interfaces I don't think the return type needs to be a Map, {quote} [~high.lee] , concerning your comment about the API: the current API is a result of the extensive discussion (you can find the link to the discussion in the KIP itself). The first versions of this KIP didn't have Map return type and Function as a parameter, but there was a concern that all the branches will be in separate variable scopes, which is inconvenient in many cases. There was a really hard discussion with a number of ideas proposed and rejected, what we have now seems to be the best choice from many points of view. {quote}Are you willing to continue working? {quote} Sure, since I proposed this KIP, I'm going to implement it. I've been quite busy recently, but I really hope that I'll be able to post a PR from me in one or maximum two weeks. > KStream.branch should not return a Array of streams we have to access by > known index > > > Key: KAFKA-5488 > URL: https://issues.apache.org/jira/browse/KAFKA-5488 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Marcel "childNo͡.de" Trautwein >Assignee: Ivan Ponomarev >Priority: Major > Labels: kip > > KIP-418: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream] > long story short: it's a mess to get a {{KStream<>[]}} out from > {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces > bad code which is not that good to maintain since you have to know the right > index for an unnamed branching stream. > Example > {code:java} > import org.apache.kafka.streams.kstream.KStreamBuilder; > import org.apache.kafka.streams.kstream.KStream; > public class StreamAppWithBranches { > public static void main(String... args) { > KStream[] branchedStreams= new KStreamBuilder() > .stream("eventTopic") > .branch( > (k, v) -> EventType::validData > (k, v) -> true > ); > > branchedStreams[0] > .to("topicValidData"); > > branchedStreams[1] > .to("topicInvalidData"); > } > } > {code} > Quick idea, s.th. like {{void branch(final BranchDefinition, > Consumer>>... branchPredicatesAndHandlers);}} where you can write > branches/streams code nested where it belongs to > so it would be possible to write code like > {code:java} > new KStreamBuilder() > .stream("eventTopic") > .branch( > Branch.create( > (k, v) -> EventType::validData, > stream -> stream.to("topicValidData") > ), > Branch.create( > (k, v) -> true, > stream -> stream.to("topicInvalidData") > ) > ); > {code} > I'll go forward to evaluate some ideas: > [https://gitlab.com/childno.de/apache_kafka/snippets/1665655] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on a change in pull request #9030: MINOR: Filter out quota configs for ConfigCommand using --bootstrap-server
dajac commented on a change in pull request #9030: URL: https://github.com/apache/kafka/pull/9030#discussion_r455745104 ## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ## @@ -197,18 +197,20 @@ public class RequestResponseTest { +private UnknownServerException unknownServerException = new UnknownServerException("secret"); Review comment: nit: I would add a small comment to explain why we do this here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
dajac commented on a change in pull request #8295: URL: https://github.com/apache/kafka/pull/8295#discussion_r455763010 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -892,136 +895,162 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListOffsetRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion -val mergedResponseMap = if (version == 0) +val topics = if (version == 0) handleListOffsetRequestV0(request) else handleListOffsetRequestV1AndAbove(request) -sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava)) +sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(new ListOffsetResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setTopics(topics.asJava))) } - private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetTopicResponse] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] -val partitionTimestamps = offsetRequest.partitionTimestamps.asScala -val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, partitionTimestamps)(_.topic) - -val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava) -} - -val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - try { -val offsets = replicaManager.legacyFetchOffsetsForTimestamp( - topicPartition = topicPartition, - timestamp = partitionData.timestamp, - maxNumOffsets = partitionData.maxNumOffsets, - isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, - fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) -(topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava)) - } catch { -// NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages -// are typically transient and there is no value in logging the entire stack trace for the same -case e @ (_ : UnknownTopicOrPartitionException | - _ : NotLeaderForPartitionException | - _ : KafkaStorageException) => - debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( -correlationId, clientId, topicPartition, e.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) -case e: Throwable => - error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - } -} -responseMap ++ unauthorizedResponseStatus - } - - private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { -val correlationId = request.header.correlationId -val clientId = request.header.clientId -val offsetRequest = request.body[ListOffsetRequest] - -val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic) - -val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, -ListOffsetResponse.UNKNOWN_TIMESTAMP, -ListOffsetResponse.UNKNOWN_OFFSET, -Optional.empty()) -} - -val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - if (offsetRequest.duplicatePartitions.contains(topicPartition)) { -debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + -s"failed because the partition is duplicated in the request.") -(topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } else { - -def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData) = { - (topicPartition, new ListOffsetResponse.PartitionData( -e, -ListOffsetResponse.UNKNOWN_TIMESTAMP, -
[GitHub] [kafka] dajac commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
dajac commented on a change in pull request #8295: URL: https://github.com/apache/kafka/pull/8295#discussion_r455765374 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1077,7 +1077,7 @@ class ReplicaManager(val config: KafkaConfig, // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition val readInfo: LogReadInfo = partition.readRecords( fetchOffset = fetchInfo.fetchOffset, -currentLeaderEpoch = fetchInfo.currentLeaderEpoch, +currentLeaderEpoch = toScalaOption(fetchInfo.currentLeaderEpoch), Review comment: Btw, you can use `.asScala` on a Java Option to convert it directly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
mimaison commented on a change in pull request #8295: URL: https://github.com/apache/kafka/pull/8295#discussion_r455782710 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1077,7 +1077,7 @@ class ReplicaManager(val config: KafkaConfig, // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition val readInfo: LogReadInfo = partition.readRecords( fetchOffset = fetchInfo.fetchOffset, -currentLeaderEpoch = fetchInfo.currentLeaderEpoch, +currentLeaderEpoch = toScalaOption(fetchInfo.currentLeaderEpoch), Review comment: Yes but it give us a `Option[java.lang.Integer]` oject while we want `Option[Int]` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol
mimaison commented on a change in pull request #8295: URL: https://github.com/apache/kafka/pull/8295#discussion_r455791789 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -892,136 +895,162 @@ class KafkaApis(val requestChannel: RequestChannel, def handleListOffsetRequest(request: RequestChannel.Request): Unit = { val version = request.header.apiVersion -val mergedResponseMap = if (version == 0) +val topics = if (version == 0) handleListOffsetRequestV0(request) else handleListOffsetRequestV1AndAbove(request) -sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava)) +sendResponseMaybeThrottle(request, requestThrottleMs => new ListOffsetResponse(new ListOffsetResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setTopics(topics.asJava))) } - private def handleListOffsetRequestV0(request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData] = { + private def handleListOffsetRequestV0(request : RequestChannel.Request) : List[ListOffsetTopicResponse] = { val correlationId = request.header.correlationId val clientId = request.header.clientId val offsetRequest = request.body[ListOffsetRequest] -val partitionTimestamps = offsetRequest.partitionTimestamps.asScala -val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, partitionTimestamps)(_.topic) - -val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, Seq.empty[JLong].asJava) -} - -val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - try { -val offsets = replicaManager.legacyFetchOffsetsForTimestamp( - topicPartition = topicPartition, - timestamp = partitionData.timestamp, - maxNumOffsets = partitionData.maxNumOffsets, - isFromConsumer = offsetRequest.replicaId == ListOffsetRequest.CONSUMER_REPLICA_ID, - fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID) -(topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava)) - } catch { -// NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages -// are typically transient and there is no value in logging the entire stack trace for the same -case e @ (_ : UnknownTopicOrPartitionException | - _ : NotLeaderForPartitionException | - _ : KafkaStorageException) => - debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( -correlationId, clientId, topicPartition, e.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) -case e: Throwable => - error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava)) - } -} -responseMap ++ unauthorizedResponseStatus - } - - private def handleListOffsetRequestV1AndAbove(request : RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] = { -val correlationId = request.header.correlationId -val clientId = request.header.clientId -val offsetRequest = request.body[ListOffsetRequest] - -val (authorizedRequestInfo, unauthorizedRequestInfo) = partitionMapByAuthorized(request.context, - DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic) - -val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) => - k -> new ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, -ListOffsetResponse.UNKNOWN_TIMESTAMP, -ListOffsetResponse.UNKNOWN_OFFSET, -Optional.empty()) -} - -val responseMap = authorizedRequestInfo.map { case (topicPartition, partitionData) => - if (offsetRequest.duplicatePartitions.contains(topicPartition)) { -debug(s"OffsetRequest with correlation id $correlationId from client $clientId on partition $topicPartition " + -s"failed because the partition is duplicated in the request.") -(topicPartition, new ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST, - ListOffsetResponse.UNKNOWN_TIMESTAMP, - ListOffsetResponse.UNKNOWN_OFFSET, - Optional.empty())) - } else { - -def buildErrorResponse(e: Errors): (TopicPartition, ListOffsetResponse.PartitionData) = { - (topicPartition, new ListOffsetResponse.PartitionData( -e, -ListOffsetResponse.UNKNOWN_TIMESTAMP, -
[GitHub] [kafka] rondagostino commented on a change in pull request #9030: MINOR: Filter out quota configs for ConfigCommand using --bootstrap-server
rondagostino commented on a change in pull request #9030: URL: https://github.com/apache/kafka/pull/9030#discussion_r455792482 ## File path: clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java ## @@ -66,10 +66,11 @@ public DescribeClientQuotasResponse(Map> } public DescribeClientQuotasResponse(int throttleTimeMs, Throwable e) { +ApiError apiError = ApiError.fromThrowable(e); Review comment: Should the same thing happen in `public AlterClientQuotasResponse(Collection entities, int throttleTimeMs, Throwable e)` for consistency? And also maybe in `IncrementalAlterConfigsResponse` (`public static IncrementalAlterConfigsResponseData toResponseData`)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
lbradstreet commented on a change in pull request #9008: URL: https://github.com/apache/kafka/pull/9008#discussion_r455828890 ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ## @@ -273,6 +99,28 @@ public boolean equals(Object o) { } } +private Map toPartitionDataMap(List fetchableTopics) { +Map result = new LinkedHashMap<>(); +fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> { +Optional leaderEpoch = Optional.of(fetchPartition.currentLeaderEpoch()) +.filter(epoch -> epoch != RecordBatch.NO_PARTITION_LEADER_EPOCH); +result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()), +new PartitionData(fetchPartition.fetchOffset(), fetchPartition.logStartOffset(), +fetchPartition.partitionMaxBytes(), leaderEpoch)); +})); +return Collections.unmodifiableMap(result); +} + +private List toForgottonTopicList(List forgottenTopics) { Review comment: Typo "Forgotton" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
lbradstreet commented on a change in pull request #9008: URL: https://github.com/apache/kafka/pull/9008#discussion_r455835380 ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ## @@ -273,6 +99,28 @@ public boolean equals(Object o) { } } +private Map toPartitionDataMap(List fetchableTopics) { + Map result = new LinkedHashMap<>(); +fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> { +Optional leaderEpoch = Optional.of(fetchPartition.currentLeaderEpoch()) +.filter(epoch -> epoch != RecordBatch.NO_PARTITION_LEADER_EPOCH); +result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()), +new PartitionData(fetchPartition.fetchOffset(), fetchPartition.logStartOffset(), +fetchPartition.partitionMaxBytes(), leaderEpoch)); Review comment: Let's open a jira for getting rid of the toPartitionDataMap if we don't address it in this PR. It's a pretty large part of the cost here and there are only a few places we would have to deal with it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
lbradstreet commented on a change in pull request #9008: URL: https://github.com/apache/kafka/pull/9008#discussion_r455835380 ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ## @@ -273,6 +99,28 @@ public boolean equals(Object o) { } } +private Map toPartitionDataMap(List fetchableTopics) { + Map result = new LinkedHashMap<>(); +fetchableTopics.forEach(fetchTopic -> fetchTopic.partitions().forEach(fetchPartition -> { +Optional leaderEpoch = Optional.of(fetchPartition.currentLeaderEpoch()) +.filter(epoch -> epoch != RecordBatch.NO_PARTITION_LEADER_EPOCH); +result.put(new TopicPartition(fetchTopic.topic(), fetchPartition.partition()), +new PartitionData(fetchPartition.fetchOffset(), fetchPartition.logStartOffset(), +fetchPartition.partitionMaxBytes(), leaderEpoch)); Review comment: Let's open a jira for getting rid of the toPartitionDataMap if we don't address it in this PR. It's a pretty large part of the cost here and there are only a few places we would have to deal with it. I think we should fix it sooner rather than later too. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index
[ https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159285#comment-17159285 ] highluck commented on KAFKA-5488: - @Ivan Ponomarev thanks for comment! Call me whenever you need help :) > KStream.branch should not return a Array of streams we have to access by > known index > > > Key: KAFKA-5488 > URL: https://issues.apache.org/jira/browse/KAFKA-5488 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Marcel "childNo͡.de" Trautwein >Assignee: Ivan Ponomarev >Priority: Major > Labels: kip > > KIP-418: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream] > long story short: it's a mess to get a {{KStream<>[]}} out from > {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces > bad code which is not that good to maintain since you have to know the right > index for an unnamed branching stream. > Example > {code:java} > import org.apache.kafka.streams.kstream.KStreamBuilder; > import org.apache.kafka.streams.kstream.KStream; > public class StreamAppWithBranches { > public static void main(String... args) { > KStream[] branchedStreams= new KStreamBuilder() > .stream("eventTopic") > .branch( > (k, v) -> EventType::validData > (k, v) -> true > ); > > branchedStreams[0] > .to("topicValidData"); > > branchedStreams[1] > .to("topicInvalidData"); > } > } > {code} > Quick idea, s.th. like {{void branch(final BranchDefinition, > Consumer>>... branchPredicatesAndHandlers);}} where you can write > branches/streams code nested where it belongs to > so it would be possible to write code like > {code:java} > new KStreamBuilder() > .stream("eventTopic") > .branch( > Branch.create( > (k, v) -> EventType::validData, > stream -> stream.to("topicValidData") > ), > Branch.create( > (k, v) -> true, > stream -> stream.to("topicInvalidData") > ) > ); > {code} > I'll go forward to evaluate some ideas: > [https://gitlab.com/childno.de/apache_kafka/snippets/1665655] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rajinisivaram commented on a change in pull request #8933: KAFKA-10163; Throttle Create Topic, Create Partition and Delete Topic Operations (KIP-599, Part I, Broker Changes)
rajinisivaram commented on a change in pull request #8933: URL: https://github.com/apache/kafka/pull/8933#discussion_r455832706 ## File path: core/src/main/scala/kafka/server/AdminManager.scala ## @@ -295,34 +312,44 @@ class AdminManager(val config: KafkaConfig, throw new InvalidPartitionsException(s"Topic already has $oldNumPartitions partitions.") } -val newPartitionsAssignment = Option(newPartition.assignments) - .map { assignmentMap => -val assignments = assignmentMap.asScala.map { - createPartitionAssignment => createPartitionAssignment.brokerIds.asScala.map(_.toInt) -} -val unknownBrokers = assignments.flatten.toSet -- allBrokerIds -if (unknownBrokers.nonEmpty) - throw new InvalidReplicaAssignmentException( -s"Unknown broker(s) in replica assignment: ${unknownBrokers.mkString(", ")}.") - -if (assignments.size != numPartitionsIncrement) - throw new InvalidReplicaAssignmentException( -s"Increasing the number of partitions by $numPartitionsIncrement " + - s"but ${assignments.size} assignments provided.") - -assignments.zipWithIndex.map { case (replicas, index) => - existingAssignment.size + index -> replicas -}.toMap +val newPartitionsAssignment = Option(newPartition.assignments).map { assignmentMap => + val assignments = assignmentMap.asScala.map { +createPartitionAssignment => createPartitionAssignment.brokerIds.asScala.map(_.toInt) + } + val unknownBrokers = assignments.flatten.toSet -- allBrokerIds + if (unknownBrokers.nonEmpty) +throw new InvalidReplicaAssignmentException( + s"Unknown broker(s) in replica assignment: ${unknownBrokers.mkString(", ")}.") + + if (assignments.size != numPartitionsIncrement) +throw new InvalidReplicaAssignmentException( + s"Increasing the number of partitions by $numPartitionsIncrement " + +s"but ${assignments.size} assignments provided.") + + assignments.zipWithIndex.map { case (replicas, index) => +existingAssignment.size + index -> replicas + }.toMap } -val updatedReplicaAssignment = adminZkClient.addPartitions(topic, existingAssignment, allBrokers, - newPartition.count, newPartitionsAssignment, validateOnly = validateOnly) -CreatePartitionsMetadata(topic, updatedReplicaAssignment.keySet, ApiError.NONE) +val assignmentForNewPartitions = adminZkClient.createNewPartitionsAssignment( + topic, existingAssignment, allBrokers, newPartition.count, newPartitionsAssignment) + +if (validateOnly) { + CreatePartitionsMetadata(topic, (existingAssignment ++ assignmentForNewPartitions).keySet) Review comment: Shouldn't validateOnly tell you know much you would have been throttled? ## File path: core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala ## @@ -20,21 +20,32 @@ import java.util.concurrent.TimeUnit import kafka.network.RequestChannel import org.apache.kafka.common.MetricName +import org.apache.kafka.common.metrics.Sensor.QuotaEnforcementType import org.apache.kafka.common.metrics._ import org.apache.kafka.common.utils.Time import org.apache.kafka.server.quota.ClientQuotaCallback import scala.jdk.CollectionConverters._ +object ClientRequestQuotaManager { + val QuotaRequestPercentDefault = Int.MaxValue.toDouble + val NanosToPercentagePerSecond = 100.0 / TimeUnit.SECONDS.toNanos(1) +} class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig, private val metrics: Metrics, private val time: Time, -threadNamePrefix: String, -quotaCallback: Option[ClientQuotaCallback]) -extends ClientQuotaManager(config, metrics, QuotaType.Request, time, threadNamePrefix, quotaCallback) { - val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds) - def exemptSensor = getOrCreateSensor(exemptSensorName, exemptMetricName) +private val threadNamePrefix: String, +private val quotaCallback: Option[ClientQuotaCallback]) +extends ClientQuotaManager(config, metrics, QuotaType.Request, QuotaEnforcementType.PERMISSIVE, + time, threadNamePrefix, quotaCallback) { + + private val maxThrottleTimeMs = TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds) + private val exemptMetricName = metrics.metricName("exempt-request-time", +QuotaType.Request.toString, "Tracking exempt-request-time utilization percentage") + private val exemptSensorName = "exempt-" + QuotaType.Request Revie
[GitHub] [kafka] vvcephei commented on a change in pull request #9027: KAFKA-9161: add docs for KIP-441 and KIP-613 and other configs that need fixing
vvcephei commented on a change in pull request #9027: URL: https://github.com/apache/kafka/pull/9027#discussion_r455299303 ## File path: docs/streams/developer-guide/config-streams.html ## @@ -181,6 +193,7 @@ bootstrap.serverscache.max.bytes.buffering Medium Maximum number of memory bytes to be used for record caches across all threads. +Maximum number of memory bytes to be used for record caches across all threads. Review comment: duplicate? ## File path: docs/streams/developer-guide/config-streams.html ## @@ -270,43 +319,47 @@ bootstrap.serversThe amount of time in milliseconds, before a request is retried. This applies if the retries parameter is configured to be greater than 0. 100 - rocksdb.config.setter + rocksdb.config.setter Medium The RocksDB configuration. - state.cleanup.delay.ms + state.cleanup.delay.ms Low The amount of time in milliseconds to wait before deleting state when a partition has migrated. 60 milliseconds - state.dir + state.dir High Directory location for state stores. /tmp/kafka-streams - timestamp.extractor + topology.optimization Medium -Timestamp extractor class that implements the TimestampExtractor interface. -See Timestamp Extractor +A configuration telling Kafka Streams if it should optimize the topology +none - upgrade.from + upgrade.from Medium The version you are upgrading from during a rolling upgrade. See Upgrade From - value.serde -Medium -Default serializer/deserializer class for record values, implements the Serde interface (see also key.serde). -Serdes.ByteArray().getClass().getName() - windowstore.changelog.additional.retention.ms Low Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. 8640 milliseconds = 1 day + + acceptable.recovery.lag + + + The maximum acceptable lag (total number of offsets to catch up from the changelog) for an instance to be considered caught-up and able to receive an active task. Streams will only assign + stateful active tasks to instances whose state stores are within the acceptable recovery lag, if any exist, and assign warmup replicas to restore state in the background for instances + that are not yet caught up. Should correspond to a recovery time of well under a minute for a given workload. Must be at least 0. Review comment: Maybe we can also mention that if you set it to Long.MAX_VALUE, it effectively disables warmups and HA task migration, allowing Streams to produce a balanced assignment in one shot. ## File path: docs/streams/developer-guide/running-app.html ## @@ -110,6 +110,18 @@ Removing capacity from your applicationIf a local state store exists, the changelog is replayed from the previously checkpointed offset. The changes are applied and the state is restored to the most recent snapshot. This method takes less time because it is applying a smaller portion of the changelog. For more information, see Standby Replicas. + + As of version 2.6, Streams will now do most of a task's restoration in the background through warmup replicas. These will be assigned to instances that need to restore a lot of state for a task. + A stateful active task will only be assigned to an instance once it's state is within the configured Review comment: ```suggestion A stateful active task will only be assigned to an instance once its state is within the configured ``` ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ## @@ -639,16 +643,16 @@ Serdes.ByteArraySerde.class.getName(), Importance.MEDIUM, DEFAULT_VALUE_SERDE_CLASS_DOC) -.define(NUM_STANDBY_REPLICAS_CONFIG, -Type.INT, -0, +.define(DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, +Type.CLASS, +null, Importance.MEDIUM, -NUM_STANDBY_REPLICAS_DOC) -.define(NUM_STREAM_THREADS_CONFIG, -Type.INT, -1, +DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS_DOC) +
[GitHub] [kafka] cmccabe commented on pull request #8948: KAFKA-10174: Prefer --bootstrap-server for configs command in ducker tests
cmccabe commented on pull request #8948: URL: https://github.com/apache/kafka/pull/8948#issuecomment-659508365 ok, that works for me. LGTM This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #8948: KAFKA-10174: Prefer --bootstrap-server for configs command in ducker tests
cmccabe merged pull request #8948: URL: https://github.com/apache/kafka/pull/8948 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10174) Prefer --bootstrap-server ducktape tests using kafka_configs.sh
[ https://issues.apache.org/jira/browse/KAFKA-10174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-10174. -- Fix Version/s: 2.7 Reviewer: Colin McCabe Resolution: Fixed > Prefer --bootstrap-server ducktape tests using kafka_configs.sh > --- > > Key: KAFKA-10174 > URL: https://issues.apache.org/jira/browse/KAFKA-10174 > Project: Kafka > Issue Type: Sub-task >Reporter: Vinoth Chandar >Assignee: Vinoth Chandar >Priority: Major > Fix For: 2.7 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ning2008wisc commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
ning2008wisc commented on a change in pull request #9029: URL: https://github.com/apache/kafka/pull/9029#discussion_r455902217 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -315,9 +319,34 @@ public void testReplication() throws InterruptedException { backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count()); } +private void waitForConsumerGroupOffsetSync(Consumer consumer, List topics) +throws InterruptedException { +Admin backupClient = backup.kafka().createAdminClient(); +List tps = new ArrayList<>(NUM_PARTITIONS * topics.size()); +IntStream.range(0, NUM_PARTITIONS).forEach( +partitionInd -> { Review comment: could we have a more intuitive variable name for `partitionInd`? e.g. `partitionId` or `partitionIndex`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
ning2008wisc commented on a change in pull request #9029: URL: https://github.com/apache/kafka/pull/9029#discussion_r455903116 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -315,9 +319,34 @@ public void testReplication() throws InterruptedException { backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count()); } +private void waitForConsumerGroupOffsetSync(Consumer consumer, List topics) +throws InterruptedException { +Admin backupClient = backup.kafka().createAdminClient(); +List tps = new ArrayList<>(NUM_PARTITIONS * topics.size()); +IntStream.range(0, NUM_PARTITIONS).forEach( +partitionInd -> { +for (String topic: topics) { +tps.add(new TopicPartition(topic, partitionInd)); +} +} +); +long expectedTotalOffsets = NUM_RECORDS_PRODUCED * topics.size(); + +waitForCondition(() -> { +Map consumerGroupOffsets = + backupClient.listConsumerGroupOffsets("consumer-group-1").partitionsToOffsetAndMetadata().get(); Review comment: could we consider to pass in the consumer group name as a input variable of `waitForConsumerGroupOffsetSync`, so that `waitForConsumerGroupOffsetSync` looks more generic? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition
vvcephei commented on a change in pull request #9020: URL: https://github.com/apache/kafka/pull/9020#discussion_r455904752 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java ## @@ -20,62 +20,49 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.Task; import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.TimestampedWindowStore; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.Objects; +import java.util.stream.Collectors; public class StreamThreadStateStoreProvider { private final StreamThread streamThread; -private final InternalTopologyBuilder internalTopologyBuilder; -public StreamThreadStateStoreProvider(final StreamThread streamThread, - final InternalTopologyBuilder internalTopologyBuilder) { +public StreamThreadStateStoreProvider(final StreamThread streamThread) { this.streamThread = streamThread; -this.internalTopologyBuilder = internalTopologyBuilder; } @SuppressWarnings("unchecked") public List stores(final StoreQueryParameters storeQueryParams) { final String storeName = storeQueryParams.storeName(); final QueryableStoreType queryableStoreType = storeQueryParams.queryableStoreType(); -final TaskId keyTaskId = createKeyTaskId(storeName, storeQueryParams.partition()); if (streamThread.state() == StreamThread.State.DEAD) { return Collections.emptyList(); } final StreamThread.State state = streamThread.state(); if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) { final Map tasks = storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : streamThread.activeTaskMap(); -final List stores = new ArrayList<>(); -if (keyTaskId != null) { -final Task task = tasks.get(keyTaskId); -if (task == null) { +if (storeQueryParams.partition() != null) { +final Task streamTask = findStreamTask(tasks, storeName, storeQueryParams.partition()); +if (streamTask == null) { return Collections.emptyList(); } -final T store = validateAndListStores(task.getStore(storeName), queryableStoreType, storeName, keyTaskId); -if (store != null) { -return Collections.singletonList(store); -} -} else { -for (final Task streamTask : tasks.values()) { -final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); -if (store != null) { -stores.add(store); -} -} +final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); +return store != null ? Collections.singletonList(store) : Collections.emptyList(); Review comment: The nested early-return pattern is pretty hard to follow. Do you mind rewriting it to use if/else blocks? I know it was previously doing some early returns; it'd be better to migrate to a more maintainable style when we update the code, though. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java ## @@ -46,11 +46,22 @@ public void setStoreQueryParameters(final StoreQueryParameters storeQueryParamet public List stores(final String storeName, final QueryableStoreType queryableStoreType) { final List allStores = new ArrayList<>(); -for (final StreamThreadStateStoreProvider provider : storeProviders) { -final List stores = provider.stores(storeQueryParameters); -allStores.addAll(stores); +for (final StreamThreadStateStoreProvider storeProvider : storeProviders) { +final List stores = storeProvider.stores(storeQueryParameters); +if (!stores.isEmpty()) { +allStores.addAll(stores); +if (storeQueryParameters.partition() != null) { +break; +} +}
[GitHub] [kafka] ning2008wisc edited a comment on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
ning2008wisc edited a comment on pull request #9029: URL: https://github.com/apache/kafka/pull/9029#issuecomment-659516303 Hi @showuon thanks for the fix, it looks a good start. Another minor and non-blocking comment may be: if it is a small fix, probably 1 commit in the PR looks more neat. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test
ning2008wisc commented on pull request #9029: URL: https://github.com/apache/kafka/pull/9029#issuecomment-659516303 He @showuon thanks for the fix, it looks a good start. Another minor and non-blocking comment may be: if it is a small fix, probably 1 commit in the PR looks more neat. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10205) NullPointerException in StreamTask
[ https://issues.apache.org/jira/browse/KAFKA-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159332#comment-17159332 ] Sophie Blee-Goldman commented on KAFKA-10205: - Given how many people have hit this, maybe we can add a null check here and throw a more descriptive exception or log a more helpful error (eg "...was null, this may be the result of a non-deterministic topology ordering") > NullPointerException in StreamTask > -- > > Key: KAFKA-10205 > URL: https://issues.apache.org/jira/browse/KAFKA-10205 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Brian Forkan >Assignee: John Roesler >Priority: Minor > > In our Kafka Streams application we have been experiencing a > NullPointerException when deploying a new version of our application. This > does not happen during a normal rolling restart. > The exception is: > {code:java} > Error caught during partition assignment, will abort the current process and > re-throw at the end of > rebalance","stack_trace":"java.lang.NullPointerException: nullError caught > during partition assignment, will abort the current process and re-throw at > the end of rebalance","stack_trace":"java.lang.NullPointerException: null at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:186) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160) > at > org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:490) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173) > at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:86) at > brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:80) at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > {code} > And the relevant lines of code - > [https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L184-L196] > I suspect "topology.source(partition.topic());" is returning null. > Has anyone experienced this issue before? I suspect there is a problem with > our topology but I can't replicate this on my machine so I can't tell. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9030: MINOR: Filter out quota configs for ConfigCommand using --bootstrap-server
rajinisivaram commented on a change in pull request #9030: URL: https://github.com/apache/kafka/pull/9030#discussion_r455916021 ## File path: clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java ## @@ -66,10 +66,11 @@ public DescribeClientQuotasResponse(Map> } public DescribeClientQuotasResponse(int throttleTimeMs, Throwable e) { +ApiError apiError = ApiError.fromThrowable(e); Review comment: @rondagostino Thanks for the review. Updated `AlterClientQuotasResponse`. I think IncrementalAlterConfigsResponse is ok since it uses ApiError, we create the error with the right message when converting from Throwable. Can you just verify that the method with ApiError was the one you meant? Thanks, This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9030: MINOR: Filter out quota configs for ConfigCommand using --bootstrap-server
rajinisivaram commented on a change in pull request #9030: URL: https://github.com/apache/kafka/pull/9030#discussion_r455916169 ## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ## @@ -197,18 +197,20 @@ public class RequestResponseTest { +private UnknownServerException unknownServerException = new UnknownServerException("secret"); Review comment: @dajac Thanks for the review, added comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei opened a new pull request #9031: POC: replace abstract Windows with a proper interface
vvcephei opened a new pull request #9031: URL: https://github.com/apache/kafka/pull/9031 Just a POC for illustrative purposes. No need to review. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vinothchandar commented on pull request #8948: KAFKA-10174: Prefer --bootstrap-server for configs command in ducker tests
vinothchandar commented on pull request #8948: URL: https://github.com/apache/kafka/pull/8948#issuecomment-659531282 Thanks @cmccabe This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9028: KAFKA-10035: Safer conversion of consumer timeout parameters
ableegoldman commented on a change in pull request #9028: URL: https://github.com/apache/kafka/pull/9028#discussion_r455916364 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java ## @@ -151,7 +151,7 @@ private void prepareConfigs() { streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); -streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT); Review comment: Hah, this was pretty janky. Good catch ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java ## @@ -265,7 +265,7 @@ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() throws Exception { Review comment: Why are none of these tests...actually tests? Can you also fix this, ie add `@Test` annotations to all the tests here? I think you can then simplify the two tests that extend this abstract test class (`ResetIntegrationTest` and `ResetIntegrationWithSslTest`) and just remove all the tests that just call `super.testXXX` -- they should automatically run all of the tests in this class This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dima5rr commented on a change in pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition
dima5rr commented on a change in pull request #9020: URL: https://github.com/apache/kafka/pull/9020#discussion_r455934580 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java ## @@ -20,62 +20,49 @@ import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.Task; import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.TimestampedWindowStore; -import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.Objects; +import java.util.stream.Collectors; public class StreamThreadStateStoreProvider { private final StreamThread streamThread; -private final InternalTopologyBuilder internalTopologyBuilder; -public StreamThreadStateStoreProvider(final StreamThread streamThread, - final InternalTopologyBuilder internalTopologyBuilder) { +public StreamThreadStateStoreProvider(final StreamThread streamThread) { this.streamThread = streamThread; -this.internalTopologyBuilder = internalTopologyBuilder; } @SuppressWarnings("unchecked") public List stores(final StoreQueryParameters storeQueryParams) { final String storeName = storeQueryParams.storeName(); final QueryableStoreType queryableStoreType = storeQueryParams.queryableStoreType(); -final TaskId keyTaskId = createKeyTaskId(storeName, storeQueryParams.partition()); if (streamThread.state() == StreamThread.State.DEAD) { return Collections.emptyList(); } final StreamThread.State state = streamThread.state(); if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) { final Map tasks = storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : streamThread.activeTaskMap(); -final List stores = new ArrayList<>(); -if (keyTaskId != null) { -final Task task = tasks.get(keyTaskId); -if (task == null) { +if (storeQueryParams.partition() != null) { +final Task streamTask = findStreamTask(tasks, storeName, storeQueryParams.partition()); +if (streamTask == null) { return Collections.emptyList(); } -final T store = validateAndListStores(task.getStore(storeName), queryableStoreType, storeName, keyTaskId); -if (store != null) { -return Collections.singletonList(store); -} -} else { -for (final Task streamTask : tasks.values()) { -final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); -if (store != null) { -stores.add(store); -} -} +final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); +return store != null ? Collections.singletonList(store) : Collections.emptyList(); Review comment: sure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dima5rr commented on a change in pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition
dima5rr commented on a change in pull request #9020: URL: https://github.com/apache/kafka/pull/9020#discussion_r455939491 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java ## @@ -46,11 +46,22 @@ public void setStoreQueryParameters(final StoreQueryParameters storeQueryParamet public List stores(final String storeName, final QueryableStoreType queryableStoreType) { final List allStores = new ArrayList<>(); -for (final StreamThreadStateStoreProvider provider : storeProviders) { -final List stores = provider.stores(storeQueryParameters); -allStores.addAll(stores); +for (final StreamThreadStateStoreProvider storeProvider : storeProviders) { +final List stores = storeProvider.stores(storeQueryParameters); +if (!stores.isEmpty()) { +allStores.addAll(stores); +if (storeQueryParameters.partition() != null) { +break; +} +} } if (allStores.isEmpty()) { +if (storeQueryParameters.partition() != null) { +throw new InvalidStateStoreException( +String.format("The specified partition %d for store %s does not exist.", Review comment: L65 catches on rebalancing, while L60 is parameter validation for incorrect partition case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition
vvcephei commented on a change in pull request #9020: URL: https://github.com/apache/kafka/pull/9020#discussion_r455948704 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java ## @@ -51,18 +51,22 @@ public StreamThreadStateStoreProvider(final StreamThread streamThread) { final StreamThread.State state = streamThread.state(); if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) { final Map tasks = storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : streamThread.activeTaskMap(); +final List stores = new ArrayList<>(); if (storeQueryParams.partition() != null) { final Task streamTask = findStreamTask(tasks, storeName, storeQueryParams.partition()); -if (streamTask == null) { -return Collections.emptyList(); +if (streamTask != null) { +final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); +if (store != null) { +stores.add(store); +} } -final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); -return store != null ? Collections.singletonList(store) : Collections.emptyList(); +} else { +tasks.values().stream(). +map(streamTask -> validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())). +filter(Objects::nonNull). +forEach(stores::add); } -return tasks.values().stream(). -map(streamTask -> validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())). -filter(Objects::nonNull). -collect(Collectors.toList()); +return Collections.unmodifiableList(stores); Review comment: Ah, sorry, I can see that my prior comment was ambiguous. This is what I meant: ```java if (storeQueryParams.partition() == null) { return tasks.values().stream(). map(streamTask -> validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())). filter(Objects::nonNull). collect(Collectors.toList()); } else { final Task streamTask = findStreamTask(tasks, storeName, storeQueryParams.partition()); if (streamTask == null) { return Collections.emptyList(); } else { final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); return store == null ? Collections.emptyList() : Collections.singletonList(store); } } ``` The reason this is better for maintenence is that you only have to trace a path through the nested conditionals into a single inner block to understand what gets returned. I.e., code comprehension complexity is only the depth of the conditional tree. In contrast, if we do early returns, you have to fully read all the conditional blocks that lead up to the one you're interested (depth-first traversal), so code comprehension is linear instead of logarithmic. If we mutate the collection, you actually have to read _all_ the conditionals to understand what is going to happen, so code comprehension is also linear instead of logarithmic. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10205) NullPointerException in StreamTask
[ https://issues.apache.org/jira/browse/KAFKA-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159365#comment-17159365 ] Brian Forkan commented on KAFKA-10205: -- [~mjsax] this fix appears to do the trick. I will keep monitoring of course and will let you know if we see it again. Thanks for your help everyone. > NullPointerException in StreamTask > -- > > Key: KAFKA-10205 > URL: https://issues.apache.org/jira/browse/KAFKA-10205 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Brian Forkan >Assignee: John Roesler >Priority: Minor > > In our Kafka Streams application we have been experiencing a > NullPointerException when deploying a new version of our application. This > does not happen during a normal rolling restart. > The exception is: > {code:java} > Error caught during partition assignment, will abort the current process and > re-throw at the end of > rebalance","stack_trace":"java.lang.NullPointerException: nullError caught > during partition assignment, will abort the current process and re-throw at > the end of rebalance","stack_trace":"java.lang.NullPointerException: null at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:186) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160) > at > org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:490) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173) > at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:86) at > brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:80) at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > {code} > And the relevant lines of code - > [https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L184-L196] > I suspect "topology.source(partition.topic());" is returning null. > Has anyone experienced this issue before? I suspect there is a problem with > our topology but I can't replicate this on my machine so I can't tell. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #9011: KAFKA-10134: Still depends on existence of any fetchable partitions to block on join
hachikuji commented on pull request #9011: URL: https://github.com/apache/kafka/pull/9011#issuecomment-659560017 @guozhangwang There is one detail I think we're missing. If `updateAssignmentMetadataIfNeeded` does not block, then execution will fall through to `pollForFetches`. I would like to understand why `pollForFetches` is not blocking. As far as I can tell, the only thing that would cause that is if `Heartbeat.timeToNextHeartbeat` is returning 0. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dima5rr commented on a change in pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition
dima5rr commented on a change in pull request #9020: URL: https://github.com/apache/kafka/pull/9020#discussion_r455974476 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java ## @@ -51,18 +51,22 @@ public StreamThreadStateStoreProvider(final StreamThread streamThread) { final StreamThread.State state = streamThread.state(); if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == StreamThread.State.RUNNING) { final Map tasks = storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : streamThread.activeTaskMap(); +final List stores = new ArrayList<>(); if (storeQueryParams.partition() != null) { final Task streamTask = findStreamTask(tasks, storeName, storeQueryParams.partition()); -if (streamTask == null) { -return Collections.emptyList(); +if (streamTask != null) { +final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); +if (store != null) { +stores.add(store); +} } -final T store = validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id()); -return store != null ? Collections.singletonList(store) : Collections.emptyList(); +} else { +tasks.values().stream(). +map(streamTask -> validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())). +filter(Objects::nonNull). +forEach(stores::add); } -return tasks.values().stream(). -map(streamTask -> validateAndListStores(streamTask.getStore(storeName), queryableStoreType, storeName, streamTask.id())). -filter(Objects::nonNull). -collect(Collectors.toList()); +return Collections.unmodifiableList(stores); Review comment: Will concise it into functional way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10035) Improve the AbstractResetIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-10035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159387#comment-17159387 ] Sergey commented on KAFKA-10035: Thanks! > Improve the AbstractResetIntegrationTest > > > Key: KAFKA-10035 > URL: https://issues.apache.org/jira/browse/KAFKA-10035 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: feyman >Assignee: Sergei >Priority: Minor > > In the test: AbstractResetIntegrationTest, there are several places like > below: > > {code:java} > streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + > STREAMS_CONSUMER_TIMEOUT * 100); > {code} > which leverage `Long` to `String` conversion as a workaround. > > {code:java} > streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, > STREAMS_CONSUMER_TIMEOUT * 100); > {code} > or exception will be thrown if it is like: > {code:java} > {{org.apache.kafka.common.config.ConfigException: Invalid value 20 for > configuration session.timeout.ms: Expected value to be a 32-bit integer, but > it was a java.lang.Long > at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:672) > at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:129) > at > org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:606) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:630) > at > org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getRestoreConsumer(DefaultKafkaClientSupplier.java:56) > at > org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:313) > at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:766) > at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:652) > at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:562) > at > org.apache.kafka.streams.integration.AbstractResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(AbstractResetIntegrationTest.java:270) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(ResetIntegrationTest.java:77)}}{code} > This may not seem very intuitive and need enhancement. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-10035) Improve the AbstractResetIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-10035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey updated KAFKA-10035: --- Comment: was deleted (was: Thanks!) > Improve the AbstractResetIntegrationTest > > > Key: KAFKA-10035 > URL: https://issues.apache.org/jira/browse/KAFKA-10035 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: feyman >Assignee: Sergei >Priority: Minor > > In the test: AbstractResetIntegrationTest, there are several places like > below: > > {code:java} > streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + > STREAMS_CONSUMER_TIMEOUT * 100); > {code} > which leverage `Long` to `String` conversion as a workaround. > > {code:java} > streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, > STREAMS_CONSUMER_TIMEOUT * 100); > {code} > or exception will be thrown if it is like: > {code:java} > {{org.apache.kafka.common.config.ConfigException: Invalid value 20 for > configuration session.timeout.ms: Expected value to be a 32-bit integer, but > it was a java.lang.Long > at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:672) > at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:129) > at > org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:606) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:630) > at > org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getRestoreConsumer(DefaultKafkaClientSupplier.java:56) > at > org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:313) > at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:766) > at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:652) > at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:562) > at > org.apache.kafka.streams.integration.AbstractResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(AbstractResetIntegrationTest.java:270) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(ResetIntegrationTest.java:77)}}{code} > This may not seem very intuitive and need enhancement. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10035) Improve the AbstractResetIntegrationTest
[ https://issues.apache.org/jira/browse/KAFKA-10035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159388#comment-17159388 ] Sergei commented on KAFKA-10035: Thanks! > Improve the AbstractResetIntegrationTest > > > Key: KAFKA-10035 > URL: https://issues.apache.org/jira/browse/KAFKA-10035 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: feyman >Assignee: Sergei >Priority: Minor > > In the test: AbstractResetIntegrationTest, there are several places like > below: > > {code:java} > streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + > STREAMS_CONSUMER_TIMEOUT * 100); > {code} > which leverage `Long` to `String` conversion as a workaround. > > {code:java} > streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, > STREAMS_CONSUMER_TIMEOUT * 100); > {code} > or exception will be thrown if it is like: > {code:java} > {{org.apache.kafka.common.config.ConfigException: Invalid value 20 for > configuration session.timeout.ms: Expected value to be a 32-bit integer, but > it was a java.lang.Long > at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:672) > at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474) > at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108) > at > org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:129) > at > org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:606) > at > org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:630) > at > org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getRestoreConsumer(DefaultKafkaClientSupplier.java:56) > at > org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:313) > at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:766) > at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:652) > at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:562) > at > org.apache.kafka.streams.integration.AbstractResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(AbstractResetIntegrationTest.java:270) > at > org.apache.kafka.streams.integration.ResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(ResetIntegrationTest.java:77)}}{code} > This may not seem very intuitive and need enhancement. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-4620) Connection exceptions in JMXTool do not make it to the top level
[ https://issues.apache.org/jira/browse/KAFKA-4620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159392#comment-17159392 ] Kowshik Prakasam commented on KAFKA-4620: - My observation is that it seems this issue is resolved now, as I see the following code: [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/JmxTool.scala#L131-L154] . Perhaps this was fixed as early as in this PR: [https://github.com/apache/kafka/pull/3547 .|https://github.com/apache/kafka/pull/3547] > Connection exceptions in JMXTool do not make it to the top level > > > Key: KAFKA-4620 > URL: https://issues.apache.org/jira/browse/KAFKA-4620 > Project: Kafka > Issue Type: Bug >Reporter: Apurva Mehta >Assignee: Apurva Mehta >Priority: Major > > If you run JMXTool before the target process is initialized, the JMX > connection is refused and the tool quits. > Adding the following retry : > {code:java} > while (retries < maxNumRetries && !connected) { > try { > System.err.println("Trying to connect to JMX url: %s".format(url)) > jmxc = JMXConnectorFactory.connect(url, null) > mbsc = jmxc.getMBeanServerConnection() > connected = true > } catch { > case e : Exception => { > System.err.println("Could not connect to JMX url: %s. Exception > %s".format(url, e.getMessage)) > retries += 1 > Thread.sleep(500) > } > } > } > {code} > does not work because the exceptions do not make it to the top level. Running > the above code results in the following output on stderr > {noformat} > Trying to connect to JMX url: > service:jmx:rmi:///jndi/rmi://127.0.0.1:9192/jmxrmi > Jan 11, 2017 8:20:33 PM ClientCommunicatorAdmin restart > WARNING: Failed to restart: java.io.IOException: Failed to get a RMI stub: > javax.naming.ServiceUnavailableException [Root exception is > java.rmi.ConnectException: Connection refused to host: 127.0.0.1; nested > exception is: > java.net.ConnectException: Connection refused] > Jan 11, 2017 8:20:33 PM RMIConnector RMIClientCommunicatorAdmin-doStop > WARNING: Failed to call the method close():java.rmi.ConnectException: > Connection refused to host: 172.31.15.109; nested exception is: > java.net.ConnectException: Connection refused > Jan 11, 2017 8:20:33 PM ClientCommunicatorAdmin Checker-run > WARNING: Failed to check connection: java.net.ConnectException: Connection > refused > Jan 11, 2017 8:20:33 PM ClientCommunicatorAdmin Checker-run > WARNING: stopping > {noformat} > We need to add working retry logic to JMXTool so that it can start correctly > even if the target process is not ready initially. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] bdbyrne commented on a change in pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...
bdbyrne commented on a change in pull request #9022: URL: https://github.com/apache/kafka/pull/9022#discussion_r455998374 ## File path: core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala ## @@ -56,12 +56,17 @@ class TopicCommandWithAdminClientTest extends KafkaServerTestHarness with Loggin zkConnect = zkConnect, rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 -> "rack3", 5 -> "rack3"), numPartitions = numPartitions, -defaultReplicationFactor = defaultReplicationFactor +defaultReplicationFactor = defaultReplicationFactor, +replicaFetchMaxBytes = replicaFetchMaxBytes(), ).map(KafkaConfig.fromProps) private val numPartitions = 1 private val defaultReplicationFactor = 1.toShort + private def replicaFetchMaxBytes() = +if (testName.getMethodName == "testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress") Some(1) Review comment: Agreed, the `KafkaServerTestHarness` makes it more difficult. I've updated the test to set max fetch bytes to 1 for all tests, which is fine given none of the other tests produce data. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #8979: KAFKA-10223; Use NOT_LEADER_OR_FOLLOWER instead of non-retriable REPLICA_NOT_AVAILABLE for consumers
hachikuji commented on a change in pull request #8979: URL: https://github.com/apache/kafka/pull/8979#discussion_r456008354 ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ## @@ -58,8 +58,8 @@ * * - {@link Errors#OFFSET_OUT_OF_RANGE} If the fetch offset is out of range for a requested partition * - {@link Errors#TOPIC_AUTHORIZATION_FAILED} If the user does not have READ access to a requested topic - * - {@link Errors#REPLICA_NOT_AVAILABLE} If the request is received by a broker which is not a replica - * - {@link Errors#NOT_LEADER_FOR_PARTITION} If the broker is not a leader and either the provided leader epoch + * - {@link Errors#REPLICA_NOT_AVAILABLE} If the request is received by a broker with version 2.4 to 2.6 which is not a replica Review comment: Is the note about the range from 2.4 to 2.6 correct? I think this error has always been possible in the case of a reassignment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10279) Allow dynamic update of certificates with additional SubjectAltNames
Rajini Sivaram created KAFKA-10279: -- Summary: Allow dynamic update of certificates with additional SubjectAltNames Key: KAFKA-10279 URL: https://issues.apache.org/jira/browse/KAFKA-10279 Project: Kafka Issue Type: Improvement Components: security Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 2.7.0 At the moment, we don't allow dynamic keystore update in brokers if DN and SubjectAltNames don't match exactly. This is to ensure that existing clients and inter-broker communication don't break. Since addition of new entries to SubjectAltNames will not break any authentication, we should allow that and just verify that new SubjectAltNames is a superset of the old one. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...
hachikuji commented on pull request #9022: URL: https://github.com/apache/kafka/pull/9022#issuecomment-659626413 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10279) Allow dynamic update of certificates with additional SubjectAltNames
[ https://issues.apache.org/jira/browse/KAFKA-10279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159436#comment-17159436 ] Maulin Vasavada commented on KAFKA-10279: - +1 Rajini. > Allow dynamic update of certificates with additional SubjectAltNames > > > Key: KAFKA-10279 > URL: https://issues.apache.org/jira/browse/KAFKA-10279 > Project: Kafka > Issue Type: Improvement > Components: security >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 2.7.0 > > > At the moment, we don't allow dynamic keystore update in brokers if DN and > SubjectAltNames don't match exactly. This is to ensure that existing clients > and inter-broker communication don't break. Since addition of new entries to > SubjectAltNames will not break any authentication, we should allow that and > just verify that new SubjectAltNames is a superset of the old one. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10280) Message filtering support based on keys format/Headers
Raj created KAFKA-10280: --- Summary: Message filtering support based on keys format/Headers Key: KAFKA-10280 URL: https://issues.apache.org/jira/browse/KAFKA-10280 Project: Kafka Issue Type: Improvement Components: consumer, core Reporter: Raj There are many scenarios where there is a need for consumers to subscribe to a message based on pattern eg. Key format or headers. Typically, this scenario gets solved using external components implementing a filtering logic. In a deployment with a large number of consumers,This becomes a huger performance (network/IO) overheads if most messages are getting discarded by the consumer based on the filter. If there are 1's of consumers subscribing to the topic partitions, there is unnecessary IO, which could be avoided broker can apply filter per subscriptions. I wanted to hear insights from the Kafka community around how they are solving this problem and gauge interest in formally submitting KIP around filtering as part of core capability. Thanks, Raj -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jeffkbkim commented on a change in pull request #8935: KAFKA-10189: reset event queue time histogram when queue is empty
jeffkbkim commented on a change in pull request #8935: URL: https://github.com/apache/kafka/pull/8935#discussion_r456064275 ## File path: core/src/main/scala/kafka/controller/ControllerEventManager.scala ## @@ -139,4 +140,19 @@ class ControllerEventManager(controllerId: Int, } } + private def pollFromEventQueue(): QueuedEvent = { Review comment: unfortunately, the metrics version kafka uses (v2.2.0) is no longer supported by yammer https://github.com/dropwizard/metrics/issues/1618 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] serjchebotarev commented on a change in pull request #9028: KAFKA-10035: Safer conversion of consumer timeout parameters
serjchebotarev commented on a change in pull request #9028: URL: https://github.com/apache/kafka/pull/9028#discussion_r456067480 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java ## @@ -265,7 +265,7 @@ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() throws Exception { Review comment: Tried on this, partially it went good, but several tests fail under `ResetIntegrationWithSslTest`, namely: - `shouldNotAllowToResetWhileStreamsIsRunning` - `shouldNotAllowToResetWhenInputTopicAbsent` - `shouldNotAllowToResetWhenIntermediateTopicAbsent` From output of `shouldNotAllowToResetWhileStreamsIsRunning` there are some log lines that mention SSL handshake failure: ``` [2020-07-16 23:00:20,764] INFO stream-thread [reset-with-ssl-integration-test-not-reset-during-runtime-6971acc7-e471-4973-a652-5a09b7dce10d-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING (org.apache.kafka.streams.processor.internals.StreamThread:220) [2020-07-16 23:00:20,766] INFO stream-client [reset-with-ssl-integration-test-not-reset-during-runtime-6971acc7-e471-4973-a652-5a09b7dce10d] State transition from REBALANCING to RUNNING (org.apache.kafka.streams.KafkaStreams:283) [2020-07-16 23:00:20,771] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector:616) [2020-07-16 23:00:21,067] INFO [Consumer clientId=reset-with-ssl-integration-test-not-reset-during-runtime-6971acc7-e471-4973-a652-5a09b7dce10d-StreamThread-1-consumer, groupId=reset-with-ssl-integration-test-not-reset-during-runtime] Found no committed offset for partition inputTopic-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1349) [2020-07-16 23:00:21,093] INFO [Consumer clientId=reset-with-ssl-integration-test-not-reset-during-runtime-6971acc7-e471-4973-a652-5a09b7dce10d-StreamThread-1-consumer, groupId=reset-with-ssl-integration-test-not-reset-during-runtime] Resetting offset for partition inputTopic-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState:397) [2020-07-16 23:00:21,174] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector:616) [2020-07-16 23:00:21,548] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector:616) [2020-07-16 23:00:21,798] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector:616) [2020-07-16 23:00:22,028] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector:616) [2020-07-16 23:00:22,329] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector:616) [2020-07-16 23:00:22,556] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector:616) [2020-07-16 23:00:22,793] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector:616) ``` Was not able to dig deeper into what could be the reason for this. For now just left these three tests in the base class without `@Test` annotation and calling them as before in `ResetIntegrationTest` only. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] serjchebotarev commented on a change in pull request #9028: KAFKA-10035: Safer conversion of consumer timeout parameters
serjchebotarev commented on a change in pull request #9028: URL: https://github.com/apache/kafka/pull/9028#discussion_r456067480 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java ## @@ -265,7 +265,7 @@ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() throws Exception { Review comment: Tried on this, partially it went good, but several tests fail under `ResetIntegrationWithSslTest`, namely: - `shouldNotAllowToResetWhileStreamsIsRunning` - `shouldNotAllowToResetWhenInputTopicAbsent` - `shouldNotAllowToResetWhenIntermediateTopicAbsent` From output of `shouldNotAllowToResetWhileStreamsIsRunning` there are some log lines that mention SSL handshake failure: ``` ... [2020-07-16 23:00:20,764] INFO stream-thread [reset-with-ssl-integration-test-not-reset-during-runtime-6971acc7-e471-4973-a652-5a09b7dce10d-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING (org.apache.kafka.streams.processor.internals.StreamThread:220) [2020-07-16 23:00:20,766] INFO stream-client [reset-with-ssl-integration-test-not-reset-during-runtime-6971acc7-e471-4973-a652-5a09b7dce10d] State transition from REBALANCING to RUNNING (org.apache.kafka.streams.KafkaStreams:283) [2020-07-16 23:00:20,771] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector:616) [2020-07-16 23:00:21,067] INFO [Consumer clientId=reset-with-ssl-integration-test-not-reset-during-runtime-6971acc7-e471-4973-a652-5a09b7dce10d-StreamThread-1-consumer, groupId=reset-with-ssl-integration-test-not-reset-during-runtime] Found no committed offset for partition inputTopic-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1349) [2020-07-16 23:00:21,093] INFO [Consumer clientId=reset-with-ssl-integration-test-not-reset-during-runtime-6971acc7-e471-4973-a652-5a09b7dce10d-StreamThread-1-consumer, groupId=reset-with-ssl-integration-test-not-reset-during-runtime] Resetting offset for partition inputTopic-0 to offset 0. (org.apache.kafka.clients.consumer.internals.SubscriptionState:397) [2020-07-16 23:00:21,174] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector:616) [2020-07-16 23:00:21,548] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector:616) [2020-07-16 23:00:21,798] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector:616) [2020-07-16 23:00:22,028] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector:616) [2020-07-16 23:00:22,329] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector:616) [2020-07-16 23:00:22,556] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector:616) [2020-07-16 23:00:22,793] INFO [SocketServer brokerId=0] Failed authentication with /127.0.0.1 (SSL handshake failed) (org.apache.kafka.common.network.Selector:616) org.apache.kafka.streams.integration.ResetIntegrationWithSslTest > shouldNotAllowToResetWhileStreamsIsRunning SKIPPED > Task :streams:test FAILED :streams:test (Thread[Execution worker for ':',5,main]) completed. Took 14.586 secs. FAILURE: Build failed with an exception. ``` Was not able to dig deeper into what could be the reason for this. For now just left these three tests in the base class without `@Test` annotation and calling them as before in `ResetIntegrationTest` only. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] serjchebotarev commented on a change in pull request #9028: KAFKA-10035: Safer conversion of consumer timeout parameters
serjchebotarev commented on a change in pull request #9028: URL: https://github.com/apache/kafka/pull/9028#discussion_r456068727 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java ## @@ -151,7 +151,7 @@ private void prepareConfigs() { streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100); streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100); streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); -streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT); Review comment: thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
mumrah commented on pull request #9008: URL: https://github.com/apache/kafka/pull/9008#issuecomment-659665576 Updated the benchmarks with @lbradstreet's suggestions. Here are the results for 3 partitions, 10 topics. GC profiles included. On this branch: ``` Benchmark (partitionCount) (topicCount) Mode Cnt ScoreError Units FetchRequestBenchmark.testFetchRequestForConsumer 310 avgt 15 2110.741 ± 27.935 ns/op FetchRequestBenchmark.testFetchRequestForReplica 310 avgt 15 2021.114 ± 7.816 ns/op FetchRequestBenchmark.testSerializeFetchRequestForConsumer 310 avgt 15 3452.799 ± 16.013 ns/op FetchRequestBenchmark.testSerializeFetchRequestForReplica 310 avgt 15 3691.157 ± 60.260 ns/op GC Profile (partitionCount) (topicCount) Mode Cnt ScoreError Units FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate 310 avgt 15 4295.532 ± 56.061 MB/sec FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate.norm 310 avgt 15 9984.000 ± 0.001 B/op FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space 310 avgt 15 4292.525 ± 56.341 MB/sec FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm 310 avgt 15 9977.037 ± 28.311 B/op FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space 310 avgt 15 0.187 ± 0.027 MB/sec FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm 310 avgt 15 0.435 ± 0.060B/op FetchRequestBenchmark.testFetchRequestForConsumer:·gc.count 310 avgt 15 2335.000 counts FetchRequestBenchmark.testFetchRequestForConsumer:·gc.time 310 avgt 15 1375.000 ms FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate 310 avgt 15 4416.855 ± 16.429 MB/sec FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate.norm 310 avgt 15 9832.000 ± 0.001 B/op FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space 310 avgt 15 4417.032 ± 24.858 MB/sec FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm 310 avgt 15 9832.358 ± 28.932 B/op FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space 310 avgt 15 0.186 ± 0.015 MB/sec FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm 310 avgt 15 0.415 ± 0.033 B/op FetchRequestBenchmark.testFetchRequestForReplica:·gc.count 310 avgt 15 2280.000 counts FetchRequestBenchmark.testFetchRequestForReplica:·gc.time 310 avgt 15 1376.000 ms FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate 310 avgt 15 3256.172 ± 15.524 MB/sec FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate.norm 310 avgt 15 12384.000 ± 0.001 B/op FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space 310 avgt 15 3255.019 ± 21.484 MB/sec FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm 310 avgt 15 12379.587 ± 49.161B/op FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space 310 avgt 15 0.122 ± 0.022 MB/sec FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm 310 avgt 15 0.462 ± 0.084B/op Fetch
[GitHub] [kafka] mumrah edited a comment on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
mumrah edited a comment on pull request #9008: URL: https://github.com/apache/kafka/pull/9008#issuecomment-659665576 Updated the benchmarks with @lbradstreet's suggestions. Here are the results for 3 partitions, 10 topics. GC profiles included. On this branch: ``` Benchmark (partitionCount) (topicCount) Mode Cnt ScoreError Units FetchRequestBenchmark.testFetchRequestForConsumer 310 avgt 15 2110.741 ± 27.935 ns/op FetchRequestBenchmark.testFetchRequestForReplica 310 avgt 15 2021.114 ± 7.816 ns/op FetchRequestBenchmark.testSerializeFetchRequestForConsumer 310 avgt 15 3452.799 ± 16.013 ns/op FetchRequestBenchmark.testSerializeFetchRequestForReplica 310 avgt 15 3691.157 ± 60.260 ns/op GC Profile (partitionCount) (topicCount) Mode Cnt ScoreError Units FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate 310 avgt 15 4295.532 ± 56.061 MB/sec FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate.norm 310 avgt 15 9984.000 ± 0.001 B/op FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space 310 avgt 15 4292.525 ± 56.341 MB/sec FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm 310 avgt 15 9977.037 ± 28.311 B/op FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space 310 avgt 15 0.187 ± 0.027 MB/sec FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm 310 avgt 15 0.435 ± 0.060B/op FetchRequestBenchmark.testFetchRequestForConsumer:·gc.count 310 avgt 15 2335.000 counts FetchRequestBenchmark.testFetchRequestForConsumer:·gc.time 310 avgt 15 1375.000 ms FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate 310 avgt 15 4416.855 ± 16.429 MB/sec FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate.norm 310 avgt 15 9832.000 ± 0.001 B/op FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space 310 avgt 15 4417.032 ± 24.858 MB/sec FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm 310 avgt 15 9832.358 ± 28.932 B/op FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space 310 avgt 15 0.186 ± 0.015 MB/sec FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm 310 avgt 15 0.415 ± 0.033 B/op FetchRequestBenchmark.testFetchRequestForReplica:·gc.count 310 avgt 15 2280.000 counts FetchRequestBenchmark.testFetchRequestForReplica:·gc.time 310 avgt 15 1376.000 ms FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate 310 avgt 15 3256.172 ± 15.524 MB/sec FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate.norm 310 avgt 15 12384.000 ± 0.001 B/op FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space 310 avgt 15 3255.019 ± 21.484 MB/sec FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm 310 avgt 15 12379.587 ± 49.161B/op FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space 310 avgt 15 0.122 ± 0.022 MB/sec FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm 310 avgt 15 0.462 ± 0.084B/op
[GitHub] [kafka] ijuma commented on a change in pull request #8979: KAFKA-10223; Use NOT_LEADER_OR_FOLLOWER instead of non-retriable REPLICA_NOT_AVAILABLE for consumers
ijuma commented on a change in pull request #8979: URL: https://github.com/apache/kafka/pull/8979#discussion_r456074355 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ## @@ -139,13 +139,15 @@ InvalidFetchSizeException::new), LEADER_NOT_AVAILABLE(5, "There is no leader for this topic-partition as we are in the middle of a leadership election.", LeaderNotAvailableException::new), -NOT_LEADER_FOR_PARTITION(6, "This server is not the leader for that topic-partition.", -NotLeaderForPartitionException::new), +NOT_LEADER_OR_FOLLOWER(6, "For requests intended only for the leader, this error indicates that the broker is not the current leader. " + +"For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.", +NotLeaderOrFollowerException::new), REQUEST_TIMED_OUT(7, "The request timed out.", TimeoutException::new), BROKER_NOT_AVAILABLE(8, "The broker is not available.", BrokerNotAvailableException::new), -REPLICA_NOT_AVAILABLE(9, "The replica is not available for the requested topic-partition.", +REPLICA_NOT_AVAILABLE(9, "The replica is not available for the requested topic-partition. This is used for backward compatibility for " + Review comment: @hachikuji @rajinisivaram Since this is still used outside of produce/fetch, maybe the backwards compatibility message needs to be qualified? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8979: KAFKA-10223; Use NOT_LEADER_OR_FOLLOWER instead of non-retriable REPLICA_NOT_AVAILABLE for consumers
ijuma commented on pull request #8979: URL: https://github.com/apache/kafka/pull/8979#issuecomment-659670683 @rajinisivaram With regards to MetadataResponse, it's currently documented as: ``` /** * Possible topic-level error codes: * UnknownTopic (3) * LeaderNotAvailable (5) * InvalidTopic (17) * TopicAuthorizationFailed (29) * Possible partition-level error codes: * LeaderNotAvailable (5) * ReplicaNotAvailable (9) */ ``` I don't think we should change it, but it does raise the question of whether we should make `ReplicaNotAvailable` retriable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
lbradstreet commented on pull request #9008: URL: https://github.com/apache/kafka/pull/9008#issuecomment-659673793 > Updated the benchmarks with @lbradstreet's suggestions. Here are the results for 3 partitions, 10 topics. GC profiles included. > > On this branch: > > ``` > Benchmark (partitionCount) (topicCount) Mode Cnt ScoreError Units > FetchRequestBenchmark.testFetchRequestForConsumer 310 avgt 15 2110.741 ± 27.935 ns/op > FetchRequestBenchmark.testFetchRequestForReplica 310 avgt 15 2021.114 ± 7.816 ns/op > FetchRequestBenchmark.testSerializeFetchRequestForConsumer 310 avgt 15 3452.799 ± 16.013 ns/op > FetchRequestBenchmark.testSerializeFetchRequestForReplica 310 avgt 15 3691.157 ± 60.260 ns/op > > GC Profile (partitionCount) (topicCount) Mode Cnt Score Error Units > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate 310 avgt 15 4295.532 ± 56.061 MB/sec > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate.norm 310 avgt 15 9984.000 ± 0.001 B/op > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space 310 avgt 15 4292.525 ± 56.341 MB/sec > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm 310 avgt 15 9977.037 ± 28.311 B/op > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space 310 avgt 15 0.187 ± 0.027 MB/sec > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm 310 avgt 15 0.435 ± 0.060B/op > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.count 310 avgt 15 2335.000 counts > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.time 310 avgt 15 1375.000 ms > FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate 310 avgt 15 4416.855 ± 16.429 MB/sec > FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate.norm 310 avgt 15 9832.000 ± 0.001 B/op > FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space 310 avgt 15 4417.032 ± 24.858 MB/sec > FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm 310 avgt 15 9832.358 ± 28.932 B/op > FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space 310 avgt 15 0.186 ± 0.015 MB/sec > FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm 310 avgt 15 0.415 ± 0.033 B/op > FetchRequestBenchmark.testFetchRequestForReplica:·gc.count 310 avgt 15 2280.000 counts > FetchRequestBenchmark.testFetchRequestForReplica:·gc.time 310 avgt 15 1376.000 ms > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate 310 avgt 15 3256.172 ± 15.524 MB/sec > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate.norm 310 avgt 15 12384.000 ± 0.001 B/op > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space 310 avgt 15 3255.019 ± 21.484 MB/sec > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm 310 avgt 15 12379.587 ± 49.161B/op > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space 310 avgt 15 0.122 ± 0.022 MB/sec > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm
[GitHub] [kafka] hachikuji commented on pull request #8979: KAFKA-10223; Use NOT_LEADER_OR_FOLLOWER instead of non-retriable REPLICA_NOT_AVAILABLE for consumers
hachikuji commented on pull request #8979: URL: https://github.com/apache/kafka/pull/8979#issuecomment-659676133 I think it would make sense to change `ReplicaNotAvailableException` to extend `InvalidMetadataException`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
hachikuji commented on a change in pull request #9008: URL: https://github.com/apache/kafka/pull/9008#discussion_r456076927 ## File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java ## @@ -16,5 +16,16 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.protocol.ApiMessage; + public interface AbstractRequestResponse { +/** + * Return the auto-generated `Message` instance if this request/response relies on one for + * serialization/deserialization. If this class has not yet been updated to rely on the auto-generated protocol + * classes, return `null`. + * @return + */ +default ApiMessage data() { Review comment: Is there an advantage to pulling this up? Seems like we still need to update a bunch more classes. Until we have all the protocols converted, it might be safer to find another approach. ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ## @@ -1249,26 +1249,26 @@ private CompletedFetch initializeCompletedFetch(CompletedFetch nextCompletedFetc } } -if (partition.highWatermark >= 0) { -log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark); -subscriptions.updateHighWatermark(tp, partition.highWatermark); +if (partition.highWatermark() >= 0) { +log.trace("Updating high watermark for partition {} to {}", tp, partition.highWatermark()); +subscriptions.updateHighWatermark(tp, partition.highWatermark()); } -if (partition.logStartOffset >= 0) { -log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset); -subscriptions.updateLogStartOffset(tp, partition.logStartOffset); +if (partition.logStartOffset() >= 0) { +log.trace("Updating log start offset for partition {} to {}", tp, partition.logStartOffset()); +subscriptions.updateLogStartOffset(tp, partition.logStartOffset()); } -if (partition.lastStableOffset >= 0) { -log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset); -subscriptions.updateLastStableOffset(tp, partition.lastStableOffset); +if (partition.lastStableOffset() >= 0) { +log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset()); +subscriptions.updateLastStableOffset(tp, partition.lastStableOffset()); } -if (partition.preferredReadReplica.isPresent()) { - subscriptions.updatePreferredReadReplica(completedFetch.partition, partition.preferredReadReplica.get(), () -> { +if (partition.preferredReadReplica().isPresent()) { Review comment: nit: could probably change this to use `ifPresent` ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ## @@ -492,74 +327,51 @@ public int maxBytes() { } public boolean isFromFollower() { -return replicaId >= 0; +return replicaId() >= 0; } public IsolationLevel isolationLevel() { -return isolationLevel; +return IsolationLevel.forId(data.isolationLevel()); } public FetchMetadata metadata() { return metadata; } public String rackId() { -return rackId; +return data.rackId(); } public static FetchRequest parse(ByteBuffer buffer, short version) { -return new FetchRequest(ApiKeys.FETCH.parseRequest(version, buffer), version); +ByteBufferAccessor accessor = new ByteBufferAccessor(buffer); +FetchRequestData message = new FetchRequestData(); +message.read(accessor, version); +return new FetchRequest(message, version); +} + +@Override +public ByteBuffer serialize(RequestHeader header) { Review comment: Are we overriding this so that we save the conversion to `Struct`? As far as I can tell, there's nothing specific to `FetchRequest` below. I wonder if we can move this implementation to `AbstractRequest.serialize` so that we save the conversion to Struct for all APIs that have been converted? ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ## @@ -492,74 +327,51 @@ public int maxBytes() { } public boolean isFromFollower() { -return replicaId >= 0; +return replicaId() >= 0; } public IsolationLevel isolationLevel() { -return isolationLevel; +return IsolationLevel.forId(data.isolationLevel()); } pu
[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
ijuma commented on a change in pull request #9008: URL: https://github.com/apache/kafka/pull/9008#discussion_r456091241 ## File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java ## @@ -16,5 +16,16 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.protocol.ApiMessage; + public interface AbstractRequestResponse { +/** + * Return the auto-generated `Message` instance if this request/response relies on one for + * serialization/deserialization. If this class has not yet been updated to rely on the auto-generated protocol + * classes, return `null`. + * @return + */ +default ApiMessage data() { Review comment: I have a PR that does need. I really need to get that over the line. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
hachikuji commented on a change in pull request #9008: URL: https://github.com/apache/kafka/pull/9008#discussion_r456092037 ## File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java ## @@ -16,5 +16,16 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.protocol.ApiMessage; + public interface AbstractRequestResponse { +/** + * Return the auto-generated `Message` instance if this request/response relies on one for + * serialization/deserialization. If this class has not yet been updated to rely on the auto-generated protocol + * classes, return `null`. + * @return + */ +default ApiMessage data() { Review comment: Perhaps instead we could add this to a mixin type. Then if we find cases where getting accessing to the `ApiMessage` generally would be useful, we could just use `instanceof` checks. These would ultimately go away after the conversions are finished. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9028: KAFKA-10035: Safer conversion of consumer timeout parameters
ableegoldman commented on a change in pull request #9028: URL: https://github.com/apache/kafka/pull/9028#discussion_r456101096 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java ## @@ -265,7 +265,7 @@ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() throws Exception { Review comment: Thanks for looking into this! It definitely doesn't sound good that some of these tests don't pass with SSL. Maybe there's just some additional setup that's needed for these tests? 🤔 Anyways, we don't need to solve all that in this PR. We can revisit the issue once this is merged This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9028: KAFKA-10035: Safer conversion of consumer timeout parameters
vvcephei commented on pull request #9028: URL: https://github.com/apache/kafka/pull/9028#issuecomment-659701606 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9028: KAFKA-10035: Safer conversion of consumer timeout parameters
vvcephei commented on pull request #9028: URL: https://github.com/apache/kafka/pull/9028#issuecomment-659701794 Ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino opened a new pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino opened a new pull request #9032: URL: https://github.com/apache/kafka/pull/9032 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10281) KIP-640: Add log compression analysis tool
Chris Beard created KAFKA-10281: --- Summary: KIP-640: Add log compression analysis tool Key: KAFKA-10281 URL: https://issues.apache.org/jira/browse/KAFKA-10281 Project: Kafka Issue Type: Improvement Components: tools Reporter: Chris Beard Assignee: Chris Beard Link to KIP: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-640%3A+Add+log+compression+analysis+tool] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #9024: [DO NOT MERGE (yet)] MINOR: bump Streams integration test log level to DEBUG
ableegoldman commented on pull request #9024: URL: https://github.com/apache/kafka/pull/9024#issuecomment-659747346 Well it looks like we still get too many logs from the brokers even at INFO level. I think we should just demote all broker logs to WARN and just bump it back up to INFO if really necessary On the other hand, seems like turning DEBUG logs on for Streams caused over 100 tests to fail...not sure what that's about This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9841) Connector and Task duplicated when a worker join with old generation assignment
[ https://issues.apache.org/jira/browse/KAFKA-9841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159570#comment-17159570 ] Viktor Utkin commented on KAFKA-9841: - Hi guys, will fix be back-ported to older versions of kafka? > Connector and Task duplicated when a worker join with old generation > assignment > --- > > Key: KAFKA-9841 > URL: https://issues.apache.org/jira/browse/KAFKA-9841 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0, 2.3.1, 2.4.1 >Reporter: Yu Wang >Assignee: Yu Wang >Priority: Major > Labels: pull-request-available > Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1 > > > When using IncrementalCooperativeAssignor.class to assign connectors and > tasks. > Suppose there is a worker 'W' got some connection issue with the coordinator. > During the connection issue, the connectors/tasks on 'W' are assigned to the > others worker > When the connection issue disappear, 'W' will join the group with an old > generation assignment. Then the group leader will get duplicated > connectors/tasks in the metadata sent by the workers. But the duplicated > connectors/tasks will not be revoked. > > Generation 3: > Worker1: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker2: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker3: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-3], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 (org.apache.kafka.connect.runtime.dist 1480 > ributed.DistributedHerder) > Worker4: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[misc], > taskIds=[misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker5: > [2020-03-17 04:31:23,482] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} > with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > > Generation 4: > Worker1: > [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 4 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xx-4:8083/', offset=515, connectorIds=[], > taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker2: > [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 4 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xx-4:8083/', offset=515, connectorIds=[], > taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.r
[GitHub] [kafka] vutkin commented on pull request #8453: KAFKA-9841: Revoke duplicate connectors and tasks when zombie workers return with an outdated assignment
vutkin commented on pull request #8453: URL: https://github.com/apache/kafka/pull/8453#issuecomment-659747738 Hi guys, will fix be back-ported to older versions of kafka? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9027: KAFKA-9161: add docs for KIP-441 and KIP-613 and other configs that need fixing
ableegoldman commented on a change in pull request #9027: URL: https://github.com/apache/kafka/pull/9027#discussion_r456147419 ## File path: docs/streams/developer-guide/config-streams.html ## @@ -181,6 +193,7 @@ bootstrap.serverscache.max.bytes.buffering Medium Maximum number of memory bytes to be used for record caches across all threads. +Maximum number of memory bytes to be used for record caches across all threads. Review comment: How did that happen 🤔 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10172) [Kafka connect] connectors, tasks metrics doubled
[ https://issues.apache.org/jira/browse/KAFKA-10172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Viktor Utkin updated KAFKA-10172: - Affects Version/s: 2.4.0 > [Kafka connect] connectors, tasks metrics doubled > - > > Key: KAFKA-10172 > URL: https://issues.apache.org/jira/browse/KAFKA-10172 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0, 2.3.1 >Reporter: Viktor Utkin >Priority: Critical > Attachments: image-2020-06-16-12-42-47-753.png > > > After re-balance of connect cluster (4 nodes in total) we noticed that metrics > * kafka_connect_connect_worker_metrics_task_count > * kafka_connect_connect_worker_metrics_connector_count > are doubled (2x,3x,4x,etc.), so connect show wrong values. > > !image-2020-06-16-12-42-47-753.png|width=481,height=240! > > But when we request number of connector via RESP API it shows only 36: > {code:java} > › curl -s 127.0.0.1:8083/connectors | jq '.[]' | sort | wc -l > 36{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10172) [Kafka connect] connectors, tasks metrics doubled
[ https://issues.apache.org/jira/browse/KAFKA-10172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159573#comment-17159573 ] Viktor Utkin commented on KAFKA-10172: -- Looks like there is a bugfix: https://issues.apache.org/jira/browse/KAFKA-9841 > [Kafka connect] connectors, tasks metrics doubled > - > > Key: KAFKA-10172 > URL: https://issues.apache.org/jira/browse/KAFKA-10172 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0, 2.3.1 >Reporter: Viktor Utkin >Priority: Critical > Attachments: image-2020-06-16-12-42-47-753.png > > > After re-balance of connect cluster (4 nodes in total) we noticed that metrics > * kafka_connect_connect_worker_metrics_task_count > * kafka_connect_connect_worker_metrics_connector_count > are doubled (2x,3x,4x,etc.), so connect show wrong values. > > !image-2020-06-16-12-42-47-753.png|width=481,height=240! > > But when we request number of connector via RESP API it shows only 36: > {code:java} > › curl -s 127.0.0.1:8083/connectors | jq '.[]' | sort | wc -l > 36{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #9027: KAFKA-9161: add docs for KIP-441 and KIP-613 and other configs that need fixing
ableegoldman commented on a change in pull request #9027: URL: https://github.com/apache/kafka/pull/9027#discussion_r456148936 ## File path: docs/streams/developer-guide/running-app.html ## @@ -110,6 +110,18 @@ Removing capacity from your applicationIf a local state store exists, the changelog is replayed from the previously checkpointed offset. The changes are applied and the state is restored to the most recent snapshot. This method takes less time because it is applying a smaller portion of the changelog. For more information, see Standby Replicas. + + As of version 2.6, Streams will now do most of a task's restoration in the background through warmup replicas. These will be assigned to instances that need to restore a lot of state for a task. + A stateful active task will only be assigned to an instance once it's state is within the configured Review comment: Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
abbccdda commented on pull request #9001: URL: https://github.com/apache/kafka/pull/9001#issuecomment-659752993 retest this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10282) Log metrics are removed if a log is deleted and re-created quickly enough
Bob Barrett created KAFKA-10282: --- Summary: Log metrics are removed if a log is deleted and re-created quickly enough Key: KAFKA-10282 URL: https://issues.apache.org/jira/browse/KAFKA-10282 Project: Kafka Issue Type: Bug Components: log Affects Versions: 2.6.0 Reporter: Bob Barrett Assignee: Bob Barrett Fix For: 2.7.0, 2.6.1 When we delete a local log, we mark it for asynchronous deletion by renaming it with a `.delete` extension, and then wait `LogConfig.FileDeleteDelayMs` milliseconds before actually deleting the files on disk. We don't remove the Log metrics from the metrics registry until the actual deletion takes place. If we recreate a log of the same topic partition (for example, if we reassign the partition away from the broker and quickly reassign it back), the metrics are registered when the new log is created, but then unregistered when the async deletion of the original log takes place. This leaves us with a partition that is not reporting any Log metrics (size, offsets, number of segments, etc). To fix this, the LogManager should check when creating new logs to see if a log for the same topic partition is marked for deletion, and if so, signal to that log not to unregister its metrics when it is deleted. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10283) Consolidate client-level and consumer-level assignment within ClientState
Guozhang Wang created KAFKA-10283: - Summary: Consolidate client-level and consumer-level assignment within ClientState Key: KAFKA-10283 URL: https://issues.apache.org/jira/browse/KAFKA-10283 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang In StreamsPartitionAssignor, we do a two-level assignment, one on the client-level, and then after the assignment is done we further decide within the client how to distributed among consumers if there are more. The {{ClientState}} class is used for book-keeping the assigned tasks, however it is only used for the first level, while for the second level it is done outside of the class and we only keep track of the results in a few maps for logging purposes. This leaves us with a bunch of hierarchical maps, e.g. some on the client level and some on the consumer level. We would like to consolidate some of these maps into a single data structure for better keeping track of the assignment information, and also for less bug vulnerability causing the assignment information to be inconsistent. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on pull request #9024: [DO NOT MERGE (yet)] MINOR: bump Streams integration test log level to DEBUG
guozhangwang commented on pull request #9024: URL: https://github.com/apache/kafka/pull/9024#issuecomment-659774428 @ableegoldman probably because the too much logs cause some of the operations cannot be completed within the timeout; let's demote brokers to WARN and I will trigger jenkins again. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8997: MINOR: Improve log4j for per-consumer assignment
guozhangwang commented on pull request #8997: URL: https://github.com/apache/kafka/pull/8997#issuecomment-659775048 @abbccdda updated per your comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10284) Group membership update due to static member rejoin should be persisted
Boyang Chen created KAFKA-10284: --- Summary: Group membership update due to static member rejoin should be persisted Key: KAFKA-10284 URL: https://issues.apache.org/jira/browse/KAFKA-10284 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 2.5.0, 2.4.0, 2.3.0, 2.6.0 Reporter: Boyang Chen Assignee: Boyang Chen Fix For: 2.6.1 For known static members rejoin, we would update its corresponding member.id without triggering a new rebalance. This serves the purpose for avoiding unnecessary rebalance for static membership, as well as fencing purpose if some still uses the old member.id. The bug is that we don't actually persist the membership update, so if no upcoming rebalance gets triggered, this new member.id information will get lost during group coordinator immigration, thus bringing up the zombie member identity. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
abbccdda commented on pull request #9001: URL: https://github.com/apache/kafka/pull/9001#issuecomment-659793027 retest this This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10205) NullPointerException in StreamTask
[ https://issues.apache.org/jira/browse/KAFKA-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159615#comment-17159615 ] Matthias J. Sax commented on KAFKA-10205: - [~ableegoldman] Good idea. Not everybody reads the docs in detail; we should update the docs in addition anyway, too. [~vvcephei] This ticket is assigned to you atm. Do you want to do the PR? Or should we re-assign? > NullPointerException in StreamTask > -- > > Key: KAFKA-10205 > URL: https://issues.apache.org/jira/browse/KAFKA-10205 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Brian Forkan >Assignee: John Roesler >Priority: Minor > > In our Kafka Streams application we have been experiencing a > NullPointerException when deploying a new version of our application. This > does not happen during a normal rolling restart. > The exception is: > {code:java} > Error caught during partition assignment, will abort the current process and > re-throw at the end of > rebalance","stack_trace":"java.lang.NullPointerException: nullError caught > during partition assignment, will abort the current process and re-throw at > the end of rebalance","stack_trace":"java.lang.NullPointerException: null at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:186) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295) > at > org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160) > at > org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120) > at > org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:490) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173) > at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:86) at > brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:80) at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > {code} > And the relevant lines of code - > [https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L184-L196] > I suspect "topology.source(partition.topic());" is returning null. > Has anyone experienced this issue before? I suspect there is a problem with > our topology but I can't replicate this on my machine so I can't tell. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-8296) Kafka Streams branch method raises type warnings
[ https://issues.apache.org/jira/browse/KAFKA-8296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-8296. Resolution: Duplicate Closing this ticket as duplicate. > Kafka Streams branch method raises type warnings > > > Key: KAFKA-8296 > URL: https://issues.apache.org/jira/browse/KAFKA-8296 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Michael Drogalis >Assignee: Ivan Ponomarev >Priority: Minor > > Because the branch method in the DSL takes vargargs, using it as follows > raises an unchecked type warning: > {code:java} > KStream[] branches = builder. User>stream(inputTopic) > .branch((key, user) -> "united > states".equals(user.getCountry()), > (key, user) -> "germany".equals(user.getCountry()), > (key, user) -> "mexico".equals(user.getCountry()), > (key, user) -> true); > {code} > The compiler warns with: > {code:java} > Warning:(39, 24) java: unchecked generic array creation for varargs parameter > of type org.apache.kafka.streams.kstream.Predicate super io.confluent.developer.avro.User>[] > {code} > This is unfortunate because of the way Java's type system + generics work. We > could possibly fix this by adding the @SafeVarargs annotation to the branch > method signatures. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
abbccdda commented on pull request #9001: URL: https://github.com/apache/kafka/pull/9001#issuecomment-659793614 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10284) Group membership update due to static member rejoin should be persisted
[ https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159616#comment-17159616 ] Boyang Chen commented on KAFKA-10284: - [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1042] > Group membership update due to static member rejoin should be persisted > --- > > Key: KAFKA-10284 > URL: https://issues.apache.org/jira/browse/KAFKA-10284 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0 >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.6.1 > > > For known static members rejoin, we would update its corresponding member.id > without triggering a new rebalance. This serves the purpose for avoiding > unnecessary rebalance for static membership, as well as fencing purpose if > some still uses the old member.id. > The bug is that we don't actually persist the membership update, so if no > upcoming rebalance gets triggered, this new member.id information will get > lost during group coordinator immigration, thus bringing up the zombie member > identity. -- This message was sent by Atlassian Jira (v8.3.4#803005)