[jira] [Commented] (KAFKA-10038) ConsumerPerformance.scala supports the setting of client.id
[ https://issues.apache.org/jira/browse/KAFKA-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17174125#comment-17174125 ] Luke Chen commented on KAFKA-10038: --- This might need a KIP. Working on that. > ConsumerPerformance.scala supports the setting of client.id > --- > > Key: KAFKA-10038 > URL: https://issues.apache.org/jira/browse/KAFKA-10038 > Project: Kafka > Issue Type: Improvement > Components: consumer, core >Affects Versions: 2.1.1 > Environment: Trunk branch >Reporter: tigertan >Assignee: Luke Chen >Priority: Minor > Labels: newbie, performance > Fix For: 2.7.0 > > > ConsumerPerformance.scala supports the setting of "client.id", which is a > reasonable requirement, and the way "console consumer" and "console producer" > handle "client.id" can be unified. "client.id" defaults to > "perf-consumer-client". > We often use client.id in quotas, if the script of > kafka-producer-perf-test.sh supports the setting of "client.id" , we can do > quota testing through scripts without writing our own consumer programs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9839) IllegalStateException on metadata update when broker learns about its new epoch after the controller
[ https://issues.apache.org/jira/browse/KAFKA-9839?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Viktor Somogyi-Vass updated KAFKA-9839: --- Affects Version/s: 2.2.1 Detected this issue in a customer's environment, I've backported it to 2.2.1 and it seemed to fix it for them. I'll publish my solution upstream soon. > IllegalStateException on metadata update when broker learns about its new > epoch after the controller > > > Key: KAFKA-9839 > URL: https://issues.apache.org/jira/browse/KAFKA-9839 > Project: Kafka > Issue Type: Bug > Components: controller, core >Affects Versions: 2.2.1, 2.3.1, 2.5.0, 2.4.1 >Reporter: Anna Povzner >Assignee: Anna Povzner >Priority: Critical > Fix For: 2.5.1 > > > Broker throws "java.lang.IllegalStateException: Epoch XXX larger than current > broker epoch YYY" on UPDATE_METADATA when the controller learns about the > broker epoch and sends UPDATE_METADATA before KafkaZkCLient.registerBroker > completes (the broker learns about its new epoch). > Here is the scenario we observed in more detail: > 1. ZK session expires on broker 1 > 2. Broker 1 establishes new session to ZK and creates znode > 3. Controller learns about broker 1 and assigns epoch > 4. Broker 1 receives UPDATE_METADATA from controller, but it does not know > about its new epoch yet, so we get an exception: > ERROR [KafkaApi-3] Error when handling request: clientId=1, correlationId=0, > api=UPDATE_METADATA, body={ > . > java.lang.IllegalStateException: Epoch XXX larger than current broker epoch > YYY at kafka.server.KafkaApis.isBrokerEpochStale(KafkaApis.scala:2725) at > kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:320) at > kafka.server.KafkaApis.handle(KafkaApis.scala:139) at > kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at > java.lang.Thread.run(Thread.java:748) > 5. KafkaZkCLient.registerBroker completes on broker 1: "INFO Stat of the > created znode at /brokers/ids/1" > The result is the broker has a stale metadata for some time. > Possible solutions: > 1. Broker returns a more specific error and controller retries UPDATE_MEDATA > 2. Broker accepts UPDATE_METADATA with larger broker epoch. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] viktorsomogyi opened a new pull request #9150: KAFKA-9839; Broker should accept control requests with newer broker epoch
viktorsomogyi opened a new pull request #9150: URL: https://github.com/apache/kafka/pull/9150 This is a backport of #8509. A broker throws IllegalStateException if the broker epoch in the LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest is larger than its current broker epoch. However, there is no guarantee that the broker would receive the latest broker epoch before the controller: when the broker registers with ZK, there are few more instructions to process before this broker "knows" about its epoch, while the controller may already get notified and send UPDATE_METADATA request (as an example) with the new epoch. This will result in clients getting stale metadata from this broker. With this PR, a broker accepts LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest if the broker epoch is newer than the current epoch. Reviewers: David Jacot , Jason Gustafson ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10377) Delete Useless Code
[ https://issues.apache.org/jira/browse/KAFKA-10377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17174157#comment-17174157 ] Tom Bentley commented on KAFKA-10377: - This is used to generate the [protocol documentation|https://kafka.apache.org/protocol], so it's not useless. > Delete Useless Code > --- > > Key: KAFKA-10377 > URL: https://issues.apache.org/jira/browse/KAFKA-10377 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.6.0 >Reporter: Bingkun.ji >Priority: Trivial > Attachments: image-2020-08-10-00-13-28-744.png > > > delete useless code for client > > !image-2020-08-10-00-13-28-744.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] viktorsomogyi commented on pull request #9150: KAFKA-9839; Broker should accept control requests with newer broker epoch
viktorsomogyi commented on pull request #9150: URL: https://github.com/apache/kafka/pull/9150#issuecomment-671213384 @apovzner @hachikuji would you please review this backport? And while I'm at it, I can backport these onto the 2.3 and 2.4 lines as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #9149: KAFKA-10340: improve the logging to help user know what is going on
showuon commented on a change in pull request #9149: URL: https://github.com/apache/kafka/pull/9149#discussion_r467755721 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ## @@ -430,7 +434,7 @@ private void maybeCreateTopic(String topic) { log.info("Created topic '{}' using creation group {}", newTopic, topicGroup); } else { log.warn("Request to create new topic '{}' failed", topic); -throw new ConnectException("Task failed to create new topic " + topic + ". Ensure " +throw new ConnectException("Task failed to create new topic " + newTopic + ". Ensure " Review comment: should be `newTopic` here for error message. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-8362) LogCleaner gets stuck after partition move between log directories
[ https://issues.apache.org/jira/browse/KAFKA-8362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-8362: Assignee: Luke Chen > LogCleaner gets stuck after partition move between log directories > -- > > Key: KAFKA-8362 > URL: https://issues.apache.org/jira/browse/KAFKA-8362 > Project: Kafka > Issue Type: Bug > Components: jbod, log cleaner >Reporter: Julio Ng >Assignee: Luke Chen >Priority: Major > > When a partition is moved from one directory to another, their checkpoint > entry in cleaner-offset-checkpoint file is not removed from the source > directory. > As a consequence when we read the last firstDirtyOffset, we might get a stale > value from the old checkpoint file. > Basically, we need clean up the entry from the check point file in the source > directory when the move is completed > The current issue is that the code in LogCleanerManager: > {noformat} > /** > * @return the position processed for all logs. > */ > def allCleanerCheckpoints: Map[TopicPartition, Long] = { > inLock(lock) { > checkpoints.values.flatMap(checkpoint => { > try { > checkpoint.read() > } catch { > case e: KafkaStorageException => > error(s"Failed to access checkpoint file ${checkpoint.file.getName} > in dir ${checkpoint.file.getParentFile.getAbsolutePath}", e) > Map.empty[TopicPartition, Long] > } > }).toMap > } > }{noformat} > collapses the offsets when multiple entries exist for the topicPartition -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10377) Delete Useless Code
[ https://issues.apache.org/jira/browse/KAFKA-10377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17174183#comment-17174183 ] Dongjin Lee commented on KAFKA-10377: - [~tombentley] is right. Those codes are not useless. [~Bingkun.ji] Please close this issue with 'Not a problem'. > Delete Useless Code > --- > > Key: KAFKA-10377 > URL: https://issues.apache.org/jira/browse/KAFKA-10377 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.6.0 >Reporter: Bingkun.ji >Priority: Trivial > Attachments: image-2020-08-10-00-13-28-744.png > > > delete useless code for client > > !image-2020-08-10-00-13-28-744.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10377) Delete Useless Code
[ https://issues.apache.org/jira/browse/KAFKA-10377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bingkun.ji resolved KAFKA-10377. Resolution: Not A Problem > Delete Useless Code > --- > > Key: KAFKA-10377 > URL: https://issues.apache.org/jira/browse/KAFKA-10377 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.6.0 >Reporter: Bingkun.ji >Priority: Trivial > Attachments: image-2020-08-10-00-13-28-744.png > > > delete useless code for client > > !image-2020-08-10-00-13-28-744.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10377) Delete Useless Code
[ https://issues.apache.org/jira/browse/KAFKA-10377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17174202#comment-17174202 ] Bingkun.ji commented on KAFKA-10377: [~dongjin] [~tombentley] Thank you for your reply > Delete Useless Code > --- > > Key: KAFKA-10377 > URL: https://issues.apache.org/jira/browse/KAFKA-10377 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.6.0 >Reporter: Bingkun.ji >Priority: Trivial > Attachments: image-2020-08-10-00-13-28-744.png > > > delete useless code for client > > !image-2020-08-10-00-13-28-744.png! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10223) ReplicaNotAvailableException must be retriable to handle reassignments
[ https://issues.apache.org/jira/browse/KAFKA-10223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17174223#comment-17174223 ] Rajini Sivaram commented on KAFKA-10223: [~dongjoon] This is not a 2.6.0 issue, ReplicaNotAvailable has always been non-retriable. This turned out to be a bigger issue in non-Java consumers since fetch-from-follower was introduced in 2.4.0. Java consumers always handled this case correctly and were not affected. > ReplicaNotAvailableException must be retriable to handle reassignments > -- > > Key: KAFKA-10223 > URL: https://issues.apache.org/jira/browse/KAFKA-10223 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Critical > Fix For: 2.6.0 > > > ReplicaNotAvailableException should be a retriable `InvalidMetadataException` > since consumers may throw this during reassignments. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] viktorsomogyi opened a new pull request #9152: KAFKA-9839; Broker should accept control requests with newer broker epoch
viktorsomogyi opened a new pull request #9152: URL: https://github.com/apache/kafka/pull/9152 A broker throws IllegalStateException if the broker epoch in the LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest is larger than its current broker epoch. However, there is no guarantee that the broker would receive the latest broker epoch before the controller: when the broker registers with ZK, there are few more instructions to process before this broker "knows" about its epoch, while the controller may already get notified and send UPDATE_METADATA request (as an example) with the new epoch. This will result in clients getting stale metadata from this broker. With this PR, a broker accepts LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest if the broker epoch is newer than the current epoch. Reviewers: David Jacot , Jason Gustafson ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi opened a new pull request #9151: KAFKA-9839; Broker should accept control requests with newer broker epoch
viktorsomogyi opened a new pull request #9151: URL: https://github.com/apache/kafka/pull/9151 A broker throws IllegalStateException if the broker epoch in the LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest is larger than its current broker epoch. However, there is no guarantee that the broker would receive the latest broker epoch before the controller: when the broker registers with ZK, there are few more instructions to process before this broker "knows" about its epoch, while the controller may already get notified and send UPDATE_METADATA request (as an example) with the new epoch. This will result in clients getting stale metadata from this broker. With this PR, a broker accepts LeaderAndIsr/UpdateMetadataRequest/StopReplicaRequest if the broker epoch is newer than the current epoch. Reviewers: David Jacot , Jason Gustafson ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #9150: KAFKA-9839; Broker should accept control requests with newer broker epoch
viktorsomogyi commented on pull request #9150: URL: https://github.com/apache/kafka/pull/9150#issuecomment-671323114 2.3: https://github.com/apache/kafka/pull/9151 2.4: https://github.com/apache/kafka/pull/9151 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi edited a comment on pull request #9150: KAFKA-9839; Broker should accept control requests with newer broker epoch
viktorsomogyi edited a comment on pull request #9150: URL: https://github.com/apache/kafka/pull/9150#issuecomment-671323114 2.3: https://github.com/apache/kafka/pull/9151 2.4: https://github.com/apache/kafka/pull/9152 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10357) Handle accidental deletion of repartition-topics as exceptional failure
[ https://issues.apache.org/jira/browse/KAFKA-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17174279#comment-17174279 ] Bruno Cadonna commented on KAFKA-10357: --- I have a couple of questions regarding wild idea 1): 1) I guess you mean STARTING and not CREATED, don't you? There is no transition from CREATED to PARTITION_REVOKED or PARTITION_ASSIGNED. 2) I suppose this check on the states of the stream threads is done in the group leader. If a Streams client joined an existing group and a stream thread of this newly added Streams client were elected as the group leader, then we would have the situation where the stream thread is in STARTING but it would not be the first-ever rebalance. Is this correct? > Handle accidental deletion of repartition-topics as exceptional failure > --- > > Key: KAFKA-10357 > URL: https://issues.apache.org/jira/browse/KAFKA-10357 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Bruno Cadonna >Priority: Major > > Repartition topics are both written by Stream's producer and read by Stream's > consumer, so when they are accidentally deleted both clients may be notified. > But in practice the consumer would react to it much quicker than producer > since the latter has a delivery timeout expiration period (see > https://issues.apache.org/jira/browse/KAFKA-10356). When consumer reacts to > it, it will re-join the group since metadata changed and during the triggered > rebalance it would auto-recreate the topic silently and continue, causing > data lost silently. > One idea, is to only create all repartition topics *once* in the first > rebalance and not auto-create them any more in future rebalances, instead it > would be treated similar as INCOMPLETE_SOURCE_TOPIC_METADATA error code > (https://issues.apache.org/jira/browse/KAFKA-10355). > The challenge part would be, how to determine if it is the first-ever > rebalance, and there are several wild ideas I'd like to throw out here: > 1) change the thread state transition diagram so that CREATED state would not > transit to PARTITION_REVOKED but only to PARTITION_ASSIGNED, then in the > assign function we can check if the state is still in CREATED and not RUNNING. > 2) augment the subscriptionInfo to encode whether or not this is the first > time ever rebalance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna opened a new pull request #9153: MINOR: Fix state transition diagram for stream threads
cadonna opened a new pull request #9153: URL: https://github.com/apache/kafka/pull/9153 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9153: MINOR: Fix state transition diagram for stream threads
cadonna commented on a change in pull request #9153: URL: https://github.com/apache/kafka/pull/9153#discussion_r467871730 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -93,7 +95,7 @@ * | +-+---+ | * +< | Partitions | | * | | Assigned (3)| <+ - * | +-+---+ | + * | +-+---+ ^ Review comment: Before this change a transition from `STARTING` to `RUNNING` was possible in the diagram. However, such a transition is not valid. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9153: MINOR: Fix state transition diagram for stream threads
cadonna commented on a change in pull request #9153: URL: https://github.com/apache/kafka/pull/9153#discussion_r467872227 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -80,11 +80,13 @@ * | +-+---+ * +< | Starting (1)|->+ * | +-+---+ | - * || | - * || | - * |v | - * | +-+---+ | - * +< | Partitions | | + * | | + * | | + * |+<--+ | + * || | | + * |v | | + * | +-+---+ | | + * +< | Partitions | --+ | Review comment: This valid loop was only mentioned in the notes but was not shown in the diagram. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #9153: MINOR: Fix state transition diagram for stream threads
cadonna commented on pull request #9153: URL: https://github.com/apache/kafka/pull/9153#issuecomment-671328193 Call for review: @guozhangwang @ableegoldman This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9102: KAFKA-10326 Both serializer and deserializer should be able to see th…
chia7712 commented on pull request #9102: URL: https://github.com/apache/kafka/pull/9102#issuecomment-671337544 @guozhangwang @abbccdda More comments? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] astubbs opened a new pull request #9154: Threadsafe mock producer
astubbs opened a new pull request #9154: URL: https://github.com/apache/kafka/pull/9154 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9073: MINOR: add task ':streams:testAll'
vvcephei commented on a change in pull request #9073: URL: https://github.com/apache/kafka/pull/9073#discussion_r467975526 ## File path: build.gradle ## @@ -1266,6 +1266,27 @@ project(':streams') { if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() } standardOutput = new File(generatedDocsDir, "streams_config.html").newOutputStream() } + + task testAll( +dependsOn: [ +':streams:test', +':streams:test-utils:test', +':streams:streams-scala:test', +':streams:upgrade-system-tests-0100:test', +':streams:upgrade-system-tests-0101:test', +':streams:upgrade-system-tests-0102:test', +':streams:upgrade-system-tests-0110:test', +':streams:upgrade-system-tests-10:test', +':streams:upgrade-system-tests-11:test', +':streams:upgrade-system-tests-20:test', +':streams:upgrade-system-tests-21:test', +':streams:upgrade-system-tests-22:test', +':streams:upgrade-system-tests-23:test', +':streams:upgrade-system-tests-24:test', +':streams:upgrade-system-tests-25:test', Review comment: Thanks for the comment anyway. I was on the fence about it, but I went with including these projects just to be sure that `testAll` really tests all of the components of `:streams`. Even though they don't have tests, the `:test` target is convenient because it runs all of the compilation targets, as well as spotbugs and checkstyle. The idea is just to flush out any/all possible sources of failure so that we can be pretty sure that if `:streams:testAll` passes, then so will Jenkins. I agree it's a pain. Maybe we can consider either contributing a script to keep this stuff updated or writing some gradle code to build the list automatically. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header
abbccdda commented on a change in pull request #9144: URL: https://github.com/apache/kafka/pull/9144#discussion_r467975394 ## File path: clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java ## @@ -69,4 +69,11 @@ * Returns the correlation id from the request header. */ int correlationId(); + +/** + * Returns the initial principal name for a forwarded request. + */ +default String initialPrincipalName() { Review comment: Yea, we need to have it here for audit logging. We could of course have the meta comment suggest "do not use for authorization" This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header
abbccdda commented on a change in pull request #9144: URL: https://github.com/apache/kafka/pull/9144#discussion_r467975757 ## File path: clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java ## @@ -37,11 +37,22 @@ public RequestHeader(Struct struct, short headerVersion) { } public RequestHeader(ApiKeys requestApiKey, short requestVersion, String clientId, int correlationId) { -this(new RequestHeaderData(). -setRequestApiKey(requestApiKey.id). -setRequestApiVersion(requestVersion). -setClientId(clientId). -setCorrelationId(correlationId), +this(requestApiKey, requestVersion, clientId, correlationId, null, null); +} + +public RequestHeader(ApiKeys requestApiKey, Review comment: We do have a constructor which doesn't contain the two fields above. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a change in pull request #9136: KAFKA-10211: Add DirectoryConfigProvider
tombentley commented on a change in pull request #9136: URL: https://github.com/apache/kafka/pull/9136#discussion_r467984879 ## File path: clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileFilter; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +/** + * An implementation of {@link ConfigProvider} based on a directory of files. + * Property keys correspond to the names of the regular (i.e. non-directory) + * files in a directory given by the path parameter. + * Property values are taken from the file contents corresponding to each key. + */ +public class DirectoryConfigProvider implements ConfigProvider { + +private final Logger log = LoggerFactory.getLogger(getClass()); + +@Override +public void configure(Map configs) { } + +@Override +public void close() throws IOException { } + +/** + * Retrieves the data contained in regular files in the directory given by {@code path}. + * Non-regular files (such as directories) in the given directory are silently ignored. + * @param path the directory where data files reside. + * @return the configuration data. + */ +@Override +public ConfigData get(String path) { +return get(path, File::isFile); +} + +/** + * Retrieves the data contained in the regular files named by {@code keys} in the directory given by {@code path}. + * Non-regular files (such as directories) in the given directory are silently ignored. + * @param path the directory where data files reside. + * @param keys the keys whose values will be retrieved. + * @return the configuration data. + */ +@Override +public ConfigData get(String path, Set keys) { +return get(path, pathname -> +pathname.isFile() +&& keys.contains(pathname.getName())); +} + +private ConfigData get(String path, FileFilter fileFilter) { +Map map = new HashMap<>(); +if (path != null && !path.isEmpty()) { +File dir = new File(path); +if (!dir.isDirectory()) { +log.warn("The path {} is not a directory", path); +} else { +for (File file : dir.listFiles(fileFilter)) { Review comment: That's a good point about null being used for the error case as well as the directory case. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9136: KAFKA-10211: Add DirectoryConfigProvider
tombentley commented on pull request #9136: URL: https://github.com/apache/kafka/pull/9136#issuecomment-671424344 @mimaison done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stanislavkozlovski opened a new pull request #9155: MINOR: Ensure a single version of scala-library is used
stanislavkozlovski opened a new pull request #9155: URL: https://github.com/apache/kafka/pull/9155 This patch ensures we use a force resolution strategy for the scala-library dependency I've tested this locally and saw a difference in the output: With the change (using 2.4 and the jackson library **2.10.5**): ``` ./core/build/dependant-libs-2.12.10/scala-java8-compat_2.12-0.9.0.jar ./core/build/dependant-libs-2.12.10/scala-collection-compat_2.12-2.1.2.jar ./core/build/dependant-libs-2.12.10/scala-reflect-2.12.10.jar ./core/build/dependant-libs-2.12.10/scala-logging_2.12-3.9.2.jar ./core/build/dependant-libs-2.12.10/scala-library-2.12.10.jar ``` Without (using 2.4 and the jackson library **2.10.0**): ``` find . -name 'scala*.jar' ./core/build/dependant-libs-2.12.10/scala-java8-compat_2.12-0.9.0.jar ./core/build/dependant-libs-2.12.10/scala-collection-compat_2.12-2.1.2.jar ./core/build/dependant-libs-2.12.10/scala-reflect-2.12.10.jar ./core/build/dependant-libs-2.12.10/scala-logging_2.12-3.9.2.jar ./core/build/dependant-libs-2.12.10/scala-library-2.12.12.jar ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on pull request #9155: MINOR: Ensure a single version of scala-library is used
stanislavkozlovski commented on pull request #9155: URL: https://github.com/apache/kafka/pull/9155#issuecomment-671429610 In 2.4 we use `def defaultScala212Version = '2.12.10'` yet it gets updated to `2.12.12` due to the jackson-module-scala library using the 2.12.12 version of scala-library in its 2.10.5 version: https://github.com/FasterXML/jackson-module-scala/blob/0cfeb8d27195a357887fa99f8915cfaa519aabc9/build.sbt#L8 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header
abbccdda commented on a change in pull request #9144: URL: https://github.com/apache/kafka/pull/9144#discussion_r468005080 ## File path: core/src/main/scala/kafka/network/SocketServer.scala ## @@ -88,15 +88,17 @@ class SocketServer(val config: KafkaConfig, private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup) memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName)) private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE - // data-plane - private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]() - private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]() - val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time) // control-plane private var controlPlaneProcessorOpt : Option[Processor] = None private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => -new RequestChannel(20, ControlPlaneMetricPrefix, time)) +new RequestChannel(20, ControlPlaneMetricPrefix, time, true)) + // data-plane + private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]() + private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]() + // If the control plane processor is not defined, just set the flag to true in data plane to bypass the check for whether a given + // request is from the control plane or not. Review comment: What if the user doesn't configure an inter-broker listener? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rajinisivaram commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r467848781 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/ScramMechanism.java ## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +/** + * Representation of a SASL/SCRAM Mechanism. + * + * @see https://cwiki.apache.org/confluence/display/KAFKA/KIP-554%3A+Add+Broker-side+SCRAM+Config+API";>KIP-554: Add Broker-side SCRAM Config API + */ +public enum ScramMechanism { +UNKNOWN((byte) 0), +SCRAM_SHA_256((byte) 1), +SCRAM_SHA_512((byte) 2); + +/** + * + * @param type the type indicator + * @return the instance corresponding to the given type indicator, otherwise {@link #UNKNOWN} + */ +public static ScramMechanism fromType(byte type) { +for (ScramMechanism scramMechanism : ScramMechanism.values()) { +if (scramMechanism.type == type) { +return scramMechanism; +} +} +return UNKNOWN; +} + +/** + * + * @param mechanismName the SASL SCRAM mechanism name + * @return the corresponding SASL SCRAM mechanism enum, otherwise {@link #UNKNOWN} + * @see https://tools.ietf.org/html/rfc5802#section-4> + * Salted Challenge Response Authentication Mechanism (SCRAM) SASL and GSS-API Mechanisms, Section 4 + */ +public static ScramMechanism fromMechanismName(String mechanismName) { +ScramMechanism retvalFoundMechanism = ScramMechanism.valueOf(mechanismName.replace('-', '_')); +return retvalFoundMechanism != null ? retvalFoundMechanism : UNKNOWN; +} + +/** + * + * @return the corresponding SASL SCRAM mechanism name + * @see https://tools.ietf.org/html/rfc5802#section-4> + * Salted Challenge Response Authentication Mechanism (SCRAM) SASL and GSS-API Mechanisms, Section 4 + */ +public String getMechanismName() { Review comment: We don't use `get` prefix elsewhere, just `mechanismName()`? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ## @@ -1214,6 +1215,64 @@ default AlterClientQuotasResult alterClientQuotas(Collection entries, AlterClientQuotasOptions options); +/** + * Describe all SASL/SCRAM credentials. + * + * This is a convenience method for {@link #describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)} + * + * @return The DescribeUserScramCredentialsResult. + */ +default DescribeUserScramCredentialsResult describeUserScramCredentials() { +return describeUserScramCredentials(null, new DescribeUserScramCredentialsOptions()); +} + +/** + * Describe SASL/SCRAM credentials for the given users. + * + * This is a convenience method for {@link #describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)} + * + * @param users the users for which credentials are to be described; all users' credentials are described if null + * or empty. A user explicitly specified here that does not have a SCRAM credential will not appear + * in the results. Review comment: Should we throw an exception for users which don't exist to be consistent with other APIs? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/UserScramCredentialsDescription.java ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
[jira] [Updated] (KAFKA-10381) Add broker to a cluster not rebalancing partitions
[ https://issues.apache.org/jira/browse/KAFKA-10381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yogesh BG updated KAFKA-10381: -- Priority: Trivial (was: Major) > Add broker to a cluster not rebalancing partitions > -- > > Key: KAFKA-10381 > URL: https://issues.apache.org/jira/browse/KAFKA-10381 > Project: Kafka > Issue Type: Bug >Reporter: Yogesh BG >Priority: Trivial > > Hi > I have 3 node cluster, topic with one partition. when a node is deleted and > add another node. Topic goes on unknown state and not able to write/read > anything, below exception is seen > > {code:java} > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition C-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1002,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1002,1004 for partition B-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10381) Add broker to a cluster not rebalancing partitions
Yogesh BG created KAFKA-10381: - Summary: Add broker to a cluster not rebalancing partitions Key: KAFKA-10381 URL: https://issues.apache.org/jira/browse/KAFKA-10381 Project: Kafka Issue Type: Bug Reporter: Yogesh BG Hi I have 3 node cluster, topic with one partition. when a node is deleted and add another node. Topic goes on unknown state and not able to write/read anything, below exception is seen {code:java} [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed to record follower 1005's position 0 since the replica is not recognized to be one of the assigned replicas 1003,1004 for partition A-0. Empty records will be returned for this partition. (kafka.server.ReplicaManager) [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed to record follower 1005's position 0 since the replica is not recognized to be one of the assigned replicas 1003,1004 for partition C-0. Empty records will be returned for this partition. (kafka.server.ReplicaManager) [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed to record follower 1005's position 0 since the replica is not recognized to be one of the assigned replicas 1002,1004 for partition A-0. Empty records will be returned for this partition. (kafka.server.ReplicaManager) [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed to record follower 1005's position 0 since the replica is not recognized to be one of the assigned replicas 1002,1004 for partition B-0. Empty records will be returned for this partition. (kafka.server.ReplicaManager) [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed to record follower 1005's position 0 since the replica is not recognized to be one of the assigned replicas 1003,1004 for partition A-0. Empty records will be returned for this partition. (kafka.server.ReplicaManager) [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 failed to record follower 1005's position 0 since the replica is not recognized to be one of the assigned replicas 1003,1004 for partition A-0. Empty records will be returned for this partition. (kafka.server.ReplicaManager) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
junrao commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r467279790 ## File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json ## @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 50, + "type": "request", + "name": "UpdateFeaturesRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+", + "about": "The list of updates to finalized features.", "fields": [ + {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true, +"about": "The name of the finalized feature to be updated."}, + {"name": "MaxVersionLevel", "type": "int16", "versions": "0+", +"about": "The new maximum version level for the finalized feature. A value >= 1 is valid. A value < 1, is special, and can be used to request the deletion of the finalized feature."}, + {"name": "AllowDowngrade", "type": "bool", "versions": "0+", Review comment: The KIP wiki has AllowDowngrade at the topic level. Could we update that? ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -266,6 +275,199 @@ class KafkaController(val config: KafkaConfig, } } + private def createFeatureZNode(newNode: FeatureZNode): Int = { +info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode") +zkClient.createFeatureZNode(newNode) +val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path) +newVersion + } + + private def updateFeatureZNode(updatedNode: FeatureZNode): Int = { +info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode") +zkClient.updateFeatureZNode(updatedNode) + } + + /** + * This method enables the feature versioning system (KIP-584). + * + * Development in Kafka (from a high level) is organized into features. Each feature is tracked by + * a name and a range of version numbers. A feature can be of two types: + * + * 1. Supported feature: + * A supported feature is represented by a name (String) and a range of versions (defined by a + * {@link SupportedVersionRange}). It refers to a feature that a particular broker advertises + * support for. Each broker advertises the version ranges of its own supported features in its + * own BrokerIdZNode. The contents of the advertisement are specific to the particular broker and + * do not represent any guarantee of a cluster-wide availability of the feature for any particular + * range of versions. + * + * 2. Finalized feature: + * A finalized feature is represented by a name (String) and a range of version levels (defined + * by a {@link FinalizedVersionRange}). Whenever the feature versioning system (KIP-584) is + * enabled, the finalized features are stored in the cluster-wide common FeatureZNode. + * In comparison to a supported feature, the key difference is that a finalized feature exists + * in ZK only when it is guaranteed to be supported by any random broker in the cluster for a + * specified range of version levels. Also, the controller is the only entity modifying the + * information about finalized features. + * + * This method sets up the FeatureZNode with enabled status, which means that the finalized + * features stored in the FeatureZNode are active. The enabled status should be written by the + * controller to the FeatureZNode only when the broker IBP config is greater than or equal to + * KAFKA_2_7_IV0. + * + * There are multiple cases handled here: + * + * 1. New cluster bootstrap: + *A new Kafka cluster (i.e. it is deployed first time) is almost always started with IBP config + *setting greater than or equal to KAFKA_2_7_IV0. We would like to start the cluster with all + *the possible supported features finalized immediately. Assuming this is the case, the + *controller will start up and notice that the FeatureZNode is absent in the new cluster, + *it will then create a FeatureZNode (with enabled status) containing the enti
[jira] [Updated] (KAFKA-10381) Add broker to a cluster not rebalancing partitions
[ https://issues.apache.org/jira/browse/KAFKA-10381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yogesh BG updated KAFKA-10381: -- Priority: Major (was: Trivial) > Add broker to a cluster not rebalancing partitions > -- > > Key: KAFKA-10381 > URL: https://issues.apache.org/jira/browse/KAFKA-10381 > Project: Kafka > Issue Type: Bug >Reporter: Yogesh BG >Priority: Major > > Hi > I have 3 node cluster, topic with one partition. when a node is deleted and > add another node. Topic goes on unknown state and not able to write/read > anything, below exception is seen > > {code:java} > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition C-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1002,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1002,1004 for partition B-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10381) Add broker to a cluster not rebalancing partitions
[ https://issues.apache.org/jira/browse/KAFKA-10381?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yogesh BG updated KAFKA-10381: -- Affects Version/s: 2.3.0 > Add broker to a cluster not rebalancing partitions > -- > > Key: KAFKA-10381 > URL: https://issues.apache.org/jira/browse/KAFKA-10381 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.3.0 >Reporter: Yogesh BG >Priority: Major > > Hi > I have 3 node cluster, topic with one partition. when a node is deleted and > add another node. Topic goes on unknown state and not able to write/read > anything, below exception is seen > > {code:java} > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition C-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1002,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1002,1004 for partition B-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #9155: MINOR: Ensure a single version of scala-library is used
ijuma commented on pull request #9155: URL: https://github.com/apache/kafka/pull/9155#issuecomment-671482140 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10370: --- Fix Version/s: (was: 2.6.0) 2.7.0 > WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) > when (tp, offsets) are supplied by WorkerSinkTaskContext > -- > > Key: KAFKA-10370 > URL: https://issues.apache.org/jira/browse/KAFKA-10370 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Fix For: 2.7.0 > > > In > [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java], > when we want the consumer to consume from certain offsets, rather than from > the last committed offset, > [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66] > provided a way to supply the offsets from external (e.g. implementation of > SinkTask) to rewind the consumer. > In the [poll() > method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], > it first call > [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633] > to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not > empty, (2) consumer.seek(tp, offset) to rewind the consumer. > As a part of [WorkerSinkTask > initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307], > when the [SinkTask > starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88], > we can supply the specific offsets by +"context.offset(supplied_offsets);+" > in start() method, so that when the consumer does the first poll, it should > rewind to the specific offsets in rewind() method. However in practice, we > saw the following IllegalStateException when running consumer.seek(tp, > offsets); > {code:java} > [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} > Rewind test-1 to offset 3 > (org.apache.kafka.connect.runtime.WorkerSinkTask:648) > [2020-08-07 23:53:55,752] INFO [Consumer > clientId=connector-consumer-MirrorSinkConnector-0, > groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 > (org.apache.kafka.clients.consumer.KafkaConsumer:1592) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:187) > java.lang.IllegalStateException: No current assignment for partition test-1 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > is being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask:188) > {code} > As suggested in > https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, > the resolution (that has been initially verified) proposed in the attached > PR is to use *
[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10370: --- Reviewer: Randall Hauch > WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) > when (tp, offsets) are supplied by WorkerSinkTaskContext > -- > > Key: KAFKA-10370 > URL: https://issues.apache.org/jira/browse/KAFKA-10370 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Fix For: 2.7.0 > > > In > [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java], > when we want the consumer to consume from certain offsets, rather than from > the last committed offset, > [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66] > provided a way to supply the offsets from external (e.g. implementation of > SinkTask) to rewind the consumer. > In the [poll() > method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], > it first call > [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633] > to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not > empty, (2) consumer.seek(tp, offset) to rewind the consumer. > As a part of [WorkerSinkTask > initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307], > when the [SinkTask > starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88], > we can supply the specific offsets by +"context.offset(supplied_offsets);+" > in start() method, so that when the consumer does the first poll, it should > rewind to the specific offsets in rewind() method. However in practice, we > saw the following IllegalStateException when running consumer.seek(tp, > offsets); > {code:java} > [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} > Rewind test-1 to offset 3 > (org.apache.kafka.connect.runtime.WorkerSinkTask:648) > [2020-08-07 23:53:55,752] INFO [Consumer > clientId=connector-consumer-MirrorSinkConnector-0, > groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 > (org.apache.kafka.clients.consumer.KafkaConsumer:1592) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:187) > java.lang.IllegalStateException: No current assignment for partition test-1 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > is being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask:188) > {code} > As suggested in > https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, > the resolution (that has been initially verified) proposed in the attached > PR is to use *consumer.assign* with *consumer.s
[jira] [Updated] (KAFKA-10077) Filter downstream of state-store results in spurious tombstones
[ https://issues.apache.org/jira/browse/KAFKA-10077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andy Coates updated KAFKA-10077: Summary: Filter downstream of state-store results in spurious tombstones (was: Filter downstream of state-store results in suprious tombstones) > Filter downstream of state-store results in spurious tombstones > --- > > Key: KAFKA-10077 > URL: https://issues.apache.org/jira/browse/KAFKA-10077 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.5.0 >Reporter: Andy Coates >Assignee: Andy Coates >Priority: Major > > Adding a `filter` call downstream of anything that has a state store, e.g. a > table source, results in spurious tombstones being emitted from the topology > for any key where a new entry doesn't match the filter, _even when no > previous value existed for the row_. > To put this another way: a filer downstream of a state-store will output a > tombstone on an INSERT the doesn't match the filter, when it should only > output a tombstone on an UPDATE. > > This code shows the problem: > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > builder > .table("table", Materialized.with(Serdes.Long(), Serdes.Long())) > .filter((k, v) -> v % 2 == 0) > .toStream() > .to("bob"); > final Topology topology = builder.build(); > final Properties props = new Properties(); > props.put("application.id", "fred"); > props.put("bootstrap.servers", "who cares"); > final TopologyTestDriver driver = new TopologyTestDriver(topology, props); > final TestInputTopic input = driver > .createInputTopic("table", Serdes.Long().serializer(), > Serdes.Long().serializer()); > input.pipeInput(1L, 2L); > input.pipeInput(1L, 1L); > input.pipeInput(2L, 1L); > final TestOutputTopic output = driver > .createOutputTopic("bob", Serdes.Long().deserializer(), > Serdes.Long().deserializer()); > final List> keyValues = output.readKeyValuesToList(); > // keyValues contains: > // 1 -> 1 > // 1 -> null <-- correct tombstone: deletes previous row. > // 2 -> null <-- spurious tombstone: no previous row. > {code} > > These spurious tombstones can cause a LOT of noise when, for example, the > filter is looking for a specific key. In such a situation, _every input > record that does not have that key results in a tombstone!_ meaning there are > many more tombstones than useful data. > I believe the fix is to turn on {{KTableImpl::enableSendingOldValues}} for > any filter that is downstream of a statestore -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ning Zhang updated KAFKA-10370: --- Comment: was deleted (was: https://github.com/apache/kafka/pull/9145) > WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) > when (tp, offsets) are supplied by WorkerSinkTaskContext > -- > > Key: KAFKA-10370 > URL: https://issues.apache.org/jira/browse/KAFKA-10370 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Fix For: 2.7.0 > > > In > [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java], > when we want the consumer to consume from certain offsets, rather than from > the last committed offset, > [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66] > provided a way to supply the offsets from external (e.g. implementation of > SinkTask) to rewind the consumer. > In the [poll() > method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], > it first call > [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633] > to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not > empty, (2) consumer.seek(tp, offset) to rewind the consumer. > As a part of [WorkerSinkTask > initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307], > when the [SinkTask > starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88], > we can supply the specific offsets by +"context.offset(supplied_offsets);+" > in start() method, so that when the consumer does the first poll, it should > rewind to the specific offsets in rewind() method. However in practice, we > saw the following IllegalStateException when running consumer.seek(tp, > offsets); > {code:java} > [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} > Rewind test-1 to offset 3 > (org.apache.kafka.connect.runtime.WorkerSinkTask:648) > [2020-08-07 23:53:55,752] INFO [Consumer > clientId=connector-consumer-MirrorSinkConnector-0, > groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 > (org.apache.kafka.clients.consumer.KafkaConsumer:1592) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:187) > java.lang.IllegalStateException: No current assignment for partition test-1 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > is being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask:188) > {code} > As suggested in > https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594, > the resolution (that has been initially verified) proposed in the attached >
[GitHub] [kafka] big-andy-coates opened a new pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
big-andy-coates opened a new pull request #9156: URL: https://github.com/apache/kafka/pull/9156 fixes: [KAFKA-10077](https://issues.apache.org/jira/browse/KAFKA-10077). Enable sending old values on `KTable.filter` call to avoid the filter forwarding tombstones for rows that do not exist in the output. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext
[ https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17174955#comment-17174955 ] Ning Zhang commented on KAFKA-10370: Hi [~rhauch], when you have a chance, I would like to get your initial feedback / advice on this issue and proposed solution. Thanks cc [~ryannedolan] > WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) > when (tp, offsets) are supplied by WorkerSinkTaskContext > -- > > Key: KAFKA-10370 > URL: https://issues.apache.org/jira/browse/KAFKA-10370 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 2.5.0 >Reporter: Ning Zhang >Assignee: Ning Zhang >Priority: Major > Fix For: 2.7.0 > > > In > [WorkerSinkTask.java|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java], > when we want the consumer to consume from certain offsets, rather than from > the last committed offset, > [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTaskContext.java#L63-L66] > provided a way to supply the offsets from external (e.g. implementation of > SinkTask) to rewind the consumer. > In the [poll() > method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312], > it first call > [rewind()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L615-L633] > to (1) read the offsets from WorkerSinkTaskContext, if the offsets are not > empty, (2) consumer.seek(tp, offset) to rewind the consumer. > As a part of [WorkerSinkTask > initialization|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L290-L307], > when the [SinkTask > starts|https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java#L83-L88], > we can supply the specific offsets by +"context.offset(supplied_offsets);+" > in start() method, so that when the consumer does the first poll, it should > rewind to the specific offsets in rewind() method. However in practice, we > saw the following IllegalStateException when running consumer.seek(tp, > offsets); > {code:java} > [2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} > Rewind test-1 to offset 3 > (org.apache.kafka.connect.runtime.WorkerSinkTask:648) > [2020-08-07 23:53:55,752] INFO [Consumer > clientId=connector-consumer-MirrorSinkConnector-0, > groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 > (org.apache.kafka.clients.consumer.KafkaConsumer:1592) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask:187) > java.lang.IllegalStateException: No current assignment for partition test-1 > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368) > at > org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185) > at > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > [2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task > is being killed and will not recover until manually restarted > (org.apache.kafka.connect.runtime.WorkerTask:188) > {code} > As suggested in > https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-
[jira] [Created] (KAFKA-10382) MockProducer is not ThreadSafe, ideally it should be as the implementation it mocks is
Antony Stubbs created KAFKA-10382: - Summary: MockProducer is not ThreadSafe, ideally it should be as the implementation it mocks is Key: KAFKA-10382 URL: https://issues.apache.org/jira/browse/KAFKA-10382 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.6.0 Reporter: Antony Stubbs In testing my project, I discovered that the MockProducer is not thread safe as I thought. It doesn't use thread safe libraries for it's underlying stores, and only _some_ of it’s methods are synchronised. As performance isn’t an issue for this, I would propose simply synchronising all public methods in the class, as some already are. In my project, send is synchronised and commit transactions isn’t. This was causing weird collection manipulation and messages going missing. My lolcat only solution was simply to synchronise on the MockProducer instance before calling commit. See my workaround: https://github.com/astubbs/async-consumer/pull/13/files#diff-8e93aa2a2003be7436f94956cf809b2eR558 PR available: https://github.com/apache/kafka/pull/9154 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] skaundinya15 commented on pull request #9142: MINOR: Fix delete_topic for system tests
skaundinya15 commented on pull request #9142: URL: https://github.com/apache/kafka/pull/9142#issuecomment-671498810 Link to system test run from this branch: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-08-07--001.1596866417--skaundinya15--minor-fix-delete-topic-implementation--387bd9ed9/report.html. The system test that uses `delete_topic` is ``` Module: kafkatest.tests.core.replica_scale_test Class: ReplicaScaleTest Method: test_clean_bounce Arguments: { "partition_count": 34, "replication_factor": 3, "topic_count": 500 } ``` And from the link above this test passed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #9112: KAFKA-10312 Fix error code returned by getPartitionMetadata
hachikuji commented on pull request #9112: URL: https://github.com/apache/kafka/pull/9112#issuecomment-671500806 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] big-andy-coates commented on pull request #9156: KAFKA-10077: Filter downstream of state-store results in spurious tombstones
big-andy-coates commented on pull request #9156: URL: https://github.com/apache/kafka/pull/9156#issuecomment-671501927 @mjsax as discussed. Please review. I think this change makes the output from the table filter semantically correct, i.e. we no longer output tombstones for rows that didn't exist in the output to begin with. However, this comes at a cost! Now the source table is being materialized, (as you can see from the changes needed to get some other tests to pass). The cost of better semantics could be very high, and the user has no way of avoiding this. Where as previously the user could choose to 'fix' the bad semantics by manually calling `enableSendingOldValues` themselves, if they cared. I'm left feeling a little uneasy about _forcing_ users to pay the cost of materialization, even if they either don't care about the spurious tombstones, or their use case doesn't generate them. This leads me to the following questions: 1. Wouldn't this be a breaking change for existing users of the library? If we stick with this solution, how would we handle this? 1. Might it be better to only enable the sending of old values _if_ the source table is already materialized? This would mean the fix only pays the cost of an additional rocksdb read, which is still not zero, but much lower that forced materialization, and it would also mean this isn't a breaking change. Or maybe we choose to _not_ fix this. Preferring the current semantically incorrect, but better performing, solution with a known workaround for users that require correct semantics? i.e. we could document the use of `enableSendingOldValues` in the `filter` method's java docs. Your thoughts my good man? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r468084269 ## File path: clients/src/main/resources/common/message/UpdateFeaturesRequest.json ## @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 50, + "type": "request", + "name": "UpdateFeaturesRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "FeatureUpdates", "type": "[]FeatureUpdateKey", "versions": "0+", + "about": "The list of updates to finalized features.", "fields": [ + {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true, +"about": "The name of the finalized feature to be updated."}, + {"name": "MaxVersionLevel", "type": "int16", "versions": "0+", +"about": "The new maximum version level for the finalized feature. A value >= 1 is valid. A value < 1, is special, and can be used to request the deletion of the finalized feature."}, + {"name": "AllowDowngrade", "type": "bool", "versions": "0+", Review comment: I'm missing something. Which lines on the KIP-584 were you referring to? I didn't find any mention of the flag being at the topic level. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r468085443 ## File path: clients/src/main/resources/common/message/UpdateFeaturesResponse.json ## @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 50, + "type": "response", + "name": "UpdateFeaturesResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "Results", "type": "[]UpdatableFeatureResult", "versions": "0+", + "about": "Results for each feature update.", "fields": [ + {"name": "Feature", "type": "string", "versions": "0+", "mapKey": true, Review comment: Yes, we changed to have an error code per feature update. I'll update the KIP-584 write up. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] skaundinya15 commented on pull request #9143: MINOR: Fix the way total consumed is calculated for verifiable consumer
skaundinya15 commented on pull request #9143: URL: https://github.com/apache/kafka/pull/9143#issuecomment-671506739 Link for system test run for this branch: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-08-07--001.1596860596--skaundinya15--minor-fix-total-consumed-for-verifiable-consumer--d799e563e/report.html. The one test that might be related is `streams_broker_compatibility_test` but it looked like it failed on something unrelated, so I am rerunning that test to see if it it's okay. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r468089357 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -266,6 +275,199 @@ class KafkaController(val config: KafkaConfig, } } + private def createFeatureZNode(newNode: FeatureZNode): Int = { +info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode") +zkClient.createFeatureZNode(newNode) +val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path) +newVersion + } + + private def updateFeatureZNode(updatedNode: FeatureZNode): Int = { +info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode") +zkClient.updateFeatureZNode(updatedNode) + } + + /** + * This method enables the feature versioning system (KIP-584). + * + * Development in Kafka (from a high level) is organized into features. Each feature is tracked by + * a name and a range of version numbers. A feature can be of two types: + * + * 1. Supported feature: + * A supported feature is represented by a name (String) and a range of versions (defined by a + * {@link SupportedVersionRange}). It refers to a feature that a particular broker advertises + * support for. Each broker advertises the version ranges of its own supported features in its + * own BrokerIdZNode. The contents of the advertisement are specific to the particular broker and + * do not represent any guarantee of a cluster-wide availability of the feature for any particular + * range of versions. + * + * 2. Finalized feature: + * A finalized feature is represented by a name (String) and a range of version levels (defined + * by a {@link FinalizedVersionRange}). Whenever the feature versioning system (KIP-584) is + * enabled, the finalized features are stored in the cluster-wide common FeatureZNode. + * In comparison to a supported feature, the key difference is that a finalized feature exists + * in ZK only when it is guaranteed to be supported by any random broker in the cluster for a + * specified range of version levels. Also, the controller is the only entity modifying the + * information about finalized features. + * + * This method sets up the FeatureZNode with enabled status, which means that the finalized + * features stored in the FeatureZNode are active. The enabled status should be written by the + * controller to the FeatureZNode only when the broker IBP config is greater than or equal to + * KAFKA_2_7_IV0. Review comment: To be sure we are on same page, is this because of a controller failover during an IBP bump? It seems to me that this can happen mainly when IBP is being bumped from a value less than KAFKA_2_7_IV0 to a value greater than or equal to KAFKA_2_7_IV0 (assuming subsequent IBP bumps will be from KAFKA_2_7_IV0 to a higher value, so the node status will remain enabled). In general, I'm not sure how to avoid this node status flip until IBP bump has been completed cluster-wide. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10357) Handle accidental deletion of repartition-topics as exceptional failure
[ https://issues.apache.org/jira/browse/KAFKA-10357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17174986#comment-17174986 ] Sophie Blee-Goldman commented on KAFKA-10357: - How are we going to handle restarts/upgrades/etc? The only way to distinguish between a "first-ever" rebalance and the rebalance following a restart is to persist that information, otherwise a member who gets bounced and rejoins will assume it's the very first rebalance. We could augment the subscription protocol but even that wouldn't be safe for a non-rolling upgrade. If every member is stopped and restarted, they'll all lose knowledge of their past lives and everyone will assume it's the first rebalance. Maybe that's no so bad and we can just warn people not to delete all their topics when they do a full restart. (Of course if warning people was sufficient then we wouldn't be having this conversation in the first place..) > Handle accidental deletion of repartition-topics as exceptional failure > --- > > Key: KAFKA-10357 > URL: https://issues.apache.org/jira/browse/KAFKA-10357 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Bruno Cadonna >Priority: Major > > Repartition topics are both written by Stream's producer and read by Stream's > consumer, so when they are accidentally deleted both clients may be notified. > But in practice the consumer would react to it much quicker than producer > since the latter has a delivery timeout expiration period (see > https://issues.apache.org/jira/browse/KAFKA-10356). When consumer reacts to > it, it will re-join the group since metadata changed and during the triggered > rebalance it would auto-recreate the topic silently and continue, causing > data lost silently. > One idea, is to only create all repartition topics *once* in the first > rebalance and not auto-create them any more in future rebalances, instead it > would be treated similar as INCOMPLETE_SOURCE_TOPIC_METADATA error code > (https://issues.apache.org/jira/browse/KAFKA-10355). > The challenge part would be, how to determine if it is the first-ever > rebalance, and there are several wild ideas I'd like to throw out here: > 1) change the thread state transition diagram so that CREATED state would not > transit to PARTITION_REVOKED but only to PARTITION_ASSIGNED, then in the > assign function we can check if the state is still in CREATED and not RUNNING. > 2) augment the subscriptionInfo to encode whether or not this is the first > time ever rebalance. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header
dajac commented on a change in pull request #9144: URL: https://github.com/apache/kafka/pull/9144#discussion_r468090360 ## File path: clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java ## @@ -74,7 +77,9 @@ public RequestAndSize parseRequest(ByteBuffer buffer) { ", apiVersion: " + header.apiVersion() + ", connectionId: " + connectionId + ", listenerName: " + listenerName + -", principal: " + principal, ex); +", principal: " + principal + +", initial principal: " + initialPrincipalName() + +", initial client id: " + header.initialClientId(), ex); Review comment: nit: Could we use came case like the others? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9136: KAFKA-10211: Add DirectoryConfigProvider
dajac commented on a change in pull request #9136: URL: https://github.com/apache/kafka/pull/9136#discussion_r468093788 ## File path: clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java ## @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import static java.util.Collections.emptyMap; + +/** + * An implementation of {@link ConfigProvider} based on a directory of files. + * Property keys correspond to the names of the regular (i.e. non-directory) + * files in a directory given by the path parameter. + * Property values are taken from the file contents corresponding to each key. + */ +public class DirectoryConfigProvider implements ConfigProvider { + +private static final Logger log = LoggerFactory.getLogger(DirectoryConfigProvider.class); + +@Override +public void configure(Map configs) { } + +@Override +public void close() throws IOException { } + +/** + * Retrieves the data contained in regular files in the directory given by {@code path}. + * Non-regular files (such as directories) in the given directory are silently ignored. + * @param path the directory where data files reside. + * @return the configuration data. + */ +@Override +public ConfigData get(String path) { +return get(path, Files::isRegularFile); +} + +/** + * Retrieves the data contained in the regular files named by {@code keys} in the directory given by {@code path}. + * Non-regular files (such as directories) in the given directory are silently ignored. + * @param path the directory where data files reside. + * @param keys the keys whose values will be retrieved. + * @return the configuration data. + */ +@Override +public ConfigData get(String path, Set keys) { +return get(path, pathname -> +Files.isRegularFile(pathname) +&& keys.contains(pathname.getFileName().toString())); +} + +private static ConfigData get(String path, Predicate fileFilter) { +Map map = emptyMap(); +if (path != null && !path.isEmpty()) { +Path dir = new File(path).toPath(); +if (!Files.isDirectory(dir)) { +log.warn("The path {} is not a directory", path); +} else { +try { +map = Files.list(dir) +.filter(fileFilter) +.collect(Collectors.toMap( +p -> p.getFileName().toString(), +p -> { +try { +return read(p); +} catch (IOException e) { +throw new ConfigException("Could not read file " + p + " for property " + p.getFileName(), e); +} Review comment: nit: Could we push the `try catch` block into the `read` method? That would streamline the code a little bit. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 opened a new pull request #9157: Update for KIP-450 to handle early records
lct45 opened a new pull request #9157: URL: https://github.com/apache/kafka/pull/9157 Handles records that fall between 0 and the timeDifference that would normally create negative windows. This puts a new record that falls into this range in a window from [0, timeDifference] and creates the record's right windows as later records fall into it. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r468097360 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -266,6 +275,199 @@ class KafkaController(val config: KafkaConfig, } } + private def createFeatureZNode(newNode: FeatureZNode): Int = { +info(s"Creating FeatureZNode at path: ${FeatureZNode.path} with contents: $newNode") +zkClient.createFeatureZNode(newNode) +val (_, newVersion) = zkClient.getDataAndVersion(FeatureZNode.path) +newVersion + } + + private def updateFeatureZNode(updatedNode: FeatureZNode): Int = { +info(s"Updating FeatureZNode at path: ${FeatureZNode.path} with contents: $updatedNode") +zkClient.updateFeatureZNode(updatedNode) + } + + /** + * This method enables the feature versioning system (KIP-584). + * + * Development in Kafka (from a high level) is organized into features. Each feature is tracked by + * a name and a range of version numbers. A feature can be of two types: + * + * 1. Supported feature: + * A supported feature is represented by a name (String) and a range of versions (defined by a + * {@link SupportedVersionRange}). It refers to a feature that a particular broker advertises + * support for. Each broker advertises the version ranges of its own supported features in its + * own BrokerIdZNode. The contents of the advertisement are specific to the particular broker and + * do not represent any guarantee of a cluster-wide availability of the feature for any particular + * range of versions. + * + * 2. Finalized feature: + * A finalized feature is represented by a name (String) and a range of version levels (defined + * by a {@link FinalizedVersionRange}). Whenever the feature versioning system (KIP-584) is + * enabled, the finalized features are stored in the cluster-wide common FeatureZNode. + * In comparison to a supported feature, the key difference is that a finalized feature exists + * in ZK only when it is guaranteed to be supported by any random broker in the cluster for a + * specified range of version levels. Also, the controller is the only entity modifying the + * information about finalized features. + * + * This method sets up the FeatureZNode with enabled status, which means that the finalized + * features stored in the FeatureZNode are active. The enabled status should be written by the + * controller to the FeatureZNode only when the broker IBP config is greater than or equal to + * KAFKA_2_7_IV0. + * + * There are multiple cases handled here: + * + * 1. New cluster bootstrap: + *A new Kafka cluster (i.e. it is deployed first time) is almost always started with IBP config + *setting greater than or equal to KAFKA_2_7_IV0. We would like to start the cluster with all + *the possible supported features finalized immediately. Assuming this is the case, the + *controller will start up and notice that the FeatureZNode is absent in the new cluster, + *it will then create a FeatureZNode (with enabled status) containing the entire list of + *default supported features as its finalized features. + * + * 2. Broker binary upgraded, but IBP config set to lower than KAFKA_2_7_IV0: + *Imagine there is an existing Kafka cluster with IBP config less than KAFKA_2_7_IV0, and the + *broker binary has been upgraded to a newer version that supports the feature versioning + *system (KIP-584). This means the user is upgrading from an earlier version of the broker + *binary. In this case, we want to start with no finalized features and allow the user to + *finalize them whenever they are ready i.e. in the future whenever the user sets IBP config + *to be greater than or equal to KAFKA_2_7_IV0, then the user could start finalizing the + *features. This process ensures we do not enable all the possible features immediately after + *an upgrade, which could be harmful to Kafka. + *This is how we handle such a case: + * - Before the IBP config upgrade (i.e. IBP config set to less than KAFKA_2_7_IV0), the + *controller will start up and check if the FeatureZNode is absent. If absent, it will + *react by creating a FeatureZNode with disabled status and empty finalized features. + *Otherwise, if a node already exists in enabled status then the controller will just + *flip the status to disabled and clear the finalized features. + * - After the IBP config upgrade (i.e. IBP config set to greater than or equal to + *KAFKA_2_7_IV0), when the controller starts up it will check if the FeatureZNode exists + *and whether it is disabled. In such a case, it won’t upgrade all features immediately. + *Instead it will just switch the FeatureZNode status to enabled status. This lets
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r468101982 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -977,14 +1179,30 @@ class KafkaController(val config: KafkaConfig, /** * Send the leader information for selected partitions to selected brokers so that they can correctly respond to - * metadata requests + * metadata requests. Particularly, when feature versioning is enabled, we filter out brokers with incompatible + * features from receiving the metadata requests. This is because we do not want to activate incompatible brokers, + * as these may have harmful consequences to the cluster. Review comment: Good question. Yes, the broker will shut itself down. But still there is a possible race condition that needs to be handled to prevent an incompatible broker from causing damage to cluster. The race condition is described in the KIP-584 [in this section](https://cwiki.apache.org/confluence/display/KAFKA/KIP-584%3A+Versioning+scheme+for+features#KIP584:Versioningschemeforfeatures-Incompatiblebrokerlifetimeracecondition). Please let me know your thoughts. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
junrao commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-671520491 @chia7712 : I agree mostly with your assessment. For most delayed operations, the checking for the completeness of the operation and the calling of onComplete() don't have to be protected under the same lock. The only one that I am not quite sure is DelayedJoin. Currently, DelayedJoin.tryCompleteJoin() checks if all members have joined and DelayedJoin.onComplete() modifies the state of the group. Both operations are done under the same group lock. If we relax the lock, it seems that the condition "all members have joined" may no longer be true when we get to DelayedJoin.onComplete() even though that condition was true during the DelayedJoin.tryCompleteJoin() check. It's not clear what we should do in that case. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r468103224 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -1647,6 +1865,192 @@ class KafkaController(val config: KafkaConfig, } } + /** + * Returns the new FinalizedVersionRange for the feature, if there are no feature + * incompatibilities seen with all known brokers for the provided feature update. + * Otherwise returns an ApiError object containing Errors#INVALID_REQUEST. + * + * @param update the feature update to be processed (this can not be meant to delete the feature) + * + * @return the new FinalizedVersionRange or error, as described above. + */ + private def newFinalizedVersionRangeOrIncompatibilityError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, ApiError] = { +if (UpdateFeaturesRequest.isDeleteRequest(update)) { + throw new IllegalArgumentException(s"Provided feature update can not be meant to delete the feature: $update") +} + +val incompatibilityError = "Could not apply finalized feature update because" + + " brokers were found to have incompatible versions for the feature." + +if (brokerFeatures.supportedFeatures.get(update.feature()) == null) { + Right(new ApiError(Errors.INVALID_REQUEST, incompatibilityError)) +} else { + val defaultMinVersionLevel = brokerFeatures.defaultMinVersionLevel(update.feature) + val newVersionRange = new FinalizedVersionRange(defaultMinVersionLevel, update.maxVersionLevel) Review comment: Yes, excellent point. I'll fix this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9659) Kafka Streams / Consumer configured for static membership fails on "fatal exception: group.instance.id gets fenced"
[ https://issues.apache.org/jira/browse/KAFKA-9659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-9659. -- Fix Version/s: 2.6.0 Resolution: Fixed > Kafka Streams / Consumer configured for static membership fails on "fatal > exception: group.instance.id gets fenced" > --- > > Key: KAFKA-9659 > URL: https://issues.apache.org/jira/browse/KAFKA-9659 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0 >Reporter: Rohan Desai >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.6.0 > > Attachments: ksql-1.logs > > > I'm running a KSQL query, which underneath is built into a Kafka Streams > application. The application has been running without issue for a few days, > until today, when all the streams threads exited with: > > > {{[ERROR] 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - > [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] > Received fatal exception: group.instance.id gets fenced}} > {{[ERROR] 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator onFailure - > [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5] > Caught fenced group.instance.id Optional[ksql-1-2] error in heartbeat thread}} > {{[ERROR] 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.streams.processor.internals.StreamThread run - > stream-thread > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > Encountered the following unexpected Kafka exception during processing, this > usually indicate Streams internal errors:}} > \{{ org.apache.kafka.common.errors.FencedInstanceIdException: The broker > rejected this static consumer since another consumer with the same > group.instance.id has registered with a different member.id.}}{{[INFO] > 2020-03-05 00:57:58,776 > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > org.apache.kafka.streams.processor.internals.StreamThread setState - > stream-thread > [_confluent-ksql-pksqlc-xm6g1query_CSAS_RATINGS_WITH_USER_AVERAGE_5-39e8046a-b6e6-44fd-8d6d-37cff78649bf-StreamThread-2] > State transition from RUNNING to PENDING_SHUTDOWN}} > > I've attached the KSQL and Kafka Streams logs to this ticket. Here's a > summary for one of the streams threads (instance id `ksql-1-2`): > > Around 00:56:36 the coordinator fails over from b11 to b2: > > {{[INFO] 2020-03-05 00:56:36,258 > [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle - > [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Attempt to > heartbeat failed since coordinator > b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: 2147483636 rack: > null) is either not started or not valid.}} > {{ [INFO] 2020-03-05 00:56:36,258 > [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] > org.apache.kafka.clients.consumer.internals.AbstractCoordinator > markCoordinatorUnknown - [Consumer instanceId=ksql-1-2, > clientId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2-consumer, > groupId=_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0] Group > coordinator b11-pkc-lzxjz.us-west-2.aws.devel.cpdev.cloud:9092 (id: > 2147483636 rack: null) is unavailable or invalid, will attempt rediscovery}} > {{ [INFO] 2020-03-05 00:56:36,270 > [_confluent-ksql-pksqlc-xm6g1query_CTAS_RATINGS_BY_USER_0-c1df9747-f353-47f1-82fd-30b97c20d038-StreamThread-2] > o
[GitHub] [kafka] ableegoldman commented on a change in pull request #9153: MINOR: Fix state transition diagram for stream threads
ableegoldman commented on a change in pull request #9153: URL: https://github.com/apache/kafka/pull/9153#discussion_r468104424 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -93,7 +95,7 @@ * | +-+---+ | * +< | Partitions | | * | | Assigned (3)| <+ - * | +-+---+ | + * | +-+---+ ^ * || | * ||--+ Review comment: It seems like this here is the problem...AFAICT this line is just to demonstrate that RUNNING -> RUNNING is valid, but it's tied in to the arrows connecting every other state that actually can't transition to Running. Can we change this to a self-contained loop like you added for PARTITIONS_REVOKED? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -93,7 +95,7 @@ * | +-+---+ | * +< | Partitions | | * | | Assigned (3)| <+ - * | +-+---+ | + * | +-+---+ ^ Review comment: This seems a bit subtle. What if we instead break up the loop from Running --> Running below? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -80,11 +80,13 @@ * | +-+---+ * +< | Starting (1)|->+ * | +-+---+ | - * || | - * || | - * |v | - * | +-+---+ | - * +< | Partitions | | + * | | + * | | + * |+<--+ | + * || | | + * |v | | + * | +-+---+ | | + * +< | Partitions | --+ | Review comment: Nice catch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r468109791 ## File path: core/src/main/scala/kafka/controller/KafkaController.scala ## @@ -1647,6 +1865,192 @@ class KafkaController(val config: KafkaConfig, } } + /** + * Returns the new FinalizedVersionRange for the feature, if there are no feature + * incompatibilities seen with all known brokers for the provided feature update. + * Otherwise returns an ApiError object containing Errors#INVALID_REQUEST. + * + * @param update the feature update to be processed (this can not be meant to delete the feature) + * + * @return the new FinalizedVersionRange or error, as described above. + */ + private def newFinalizedVersionRangeOrIncompatibilityError(update: UpdateFeaturesRequestData.FeatureUpdateKey): Either[FinalizedVersionRange, ApiError] = { +if (UpdateFeaturesRequest.isDeleteRequest(update)) { + throw new IllegalArgumentException(s"Provided feature update can not be meant to delete the feature: $update") +} + +val incompatibilityError = "Could not apply finalized feature update because" + + " brokers were found to have incompatible versions for the feature." + +if (brokerFeatures.supportedFeatures.get(update.feature()) == null) { Review comment: It's required because `defaultMinVersionLevel` does not exist for a feature that's not in the supported list. However, I'll change the code to make the check more obvious to the reader (currently it's not). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r468111300 ## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ## @@ -82,18 +110,54 @@ object FinalizedFeatureCache extends Logging { " The existing cache contents are %s").format(latest, oldFeatureAndEpoch) throw new FeatureCacheUpdateException(errorMsg) } else { - val incompatibleFeatures = SupportedFeatures.incompatibleFeatures(latest.features) + val incompatibleFeatures = brokerFeatures.incompatibleFeatures(latest.features) if (!incompatibleFeatures.empty) { val errorMsg = ("FinalizedFeatureCache update failed since feature compatibility" + " checks failed! Supported %s has incompatibilities with the latest %s." - ).format(SupportedFeatures.get, latest) + ).format(brokerFeatures.supportedFeatures, latest) Review comment: Good question. The existing behavior is that it shuts itself down, as triggered by this LOC: https://github.com/apache/kafka/blob/89a3ba69e03acbe9635ee1039abb567bf0c6631b/core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala#L154-L156. The reason to do it is that an incompatible broker can potentially do harmful things to a cluster (because max version level upgrades are used for breaking changes). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r468111785 ## File path: core/src/main/scala/kafka/server/BrokerFeatures.scala ## @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import kafka.utils.Logging +import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange} +import org.apache.kafka.common.feature.Features._ + +import scala.jdk.CollectionConverters._ + +/** + * A class that encapsulates the following: + * + * 1. The latest features supported by the Broker. + * + * 2. The default minimum version levels for specific features. This map enables feature Review comment: Sure, I'll update the PR documenting it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #9153: MINOR: Fix state transition diagram for stream threads
cadonna commented on a change in pull request #9153: URL: https://github.com/apache/kafka/pull/9153#discussion_r468121273 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -93,7 +95,7 @@ * | +-+---+ | * +< | Partitions | | * | | Assigned (3)| <+ - * | +-+---+ | + * | +-+---+ ^ Review comment: I had the same feeling. I will change it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10383) KTable Join on Foreign key is opinionated
Marco Lotz created KAFKA-10383: -- Summary: KTable Join on Foreign key is opinionated Key: KAFKA-10383 URL: https://issues.apache.org/jira/browse/KAFKA-10383 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 2.4.1 Reporter: Marco Lotz *Status Quo:* The current implementation of [KIP-213 |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]] of Foreign Key Join between two KTables is _opinionated_ in terms of storage layer. Independently of the Materialization method provided in the method argument, it generates an intermediary RocksDB state store. Thus, even when the Materialization method provided is "in memory", it will use RocksDB under-the-hood for this state-store. *Related problems:* * **IT Test: Having an implicit materialization method for state-store affects tests using foreign key state-stores. [On windows based systems |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]], that have the RocksDB filesystem removal problem, a solution to avoid the bug is to use in-memory state-stores (rather than exception swallowing). Having the RocksDB storage being forcely created makes that any IT test necessarily use the manual FS deletion with exception swallow hack. * Short lived Streams: Sometimes, Ktables are short lived in a way that neither Persistance storage nor changelogs are desired. The current implementation prevents this. *Suggestion:* One possible solution is to use the same materialization method that is provided in the argument when creating the intermediary Foreign Key state-store. If the Materialization is in memory and without changelog, the same happens in the state-sore. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10383) KTable Join on Foreign key is opinionated
[ https://issues.apache.org/jira/browse/KAFKA-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marco Lotz updated KAFKA-10383: --- Description: *Status Quo:* The current implementation of [KIP-213 |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]] of Foreign Key Join between two KTables is _opinionated_ in terms of storage layer. Independently of the Materialization method provided in the method argument, it generates an intermediary RocksDB state store. Thus, even when the Materialization method provided is "in memory", it will use RocksDB under-the-hood for this internal state-store. *Related problems:* * **IT Test: Having an implicit materialization method for state-store affects tests using foreign key state-stores. [On windows based systems |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]], that have the RocksDB filesystem removal problem, a solution to avoid the bug is to use in-memory state-stores (rather than exception swallowing). Having the RocksDB storage being forcely created makes that any IT test necessarily use the manual FS deletion with exception swallow hack. * Short lived Streams: Sometimes, Ktables are short lived in a way that neither Persistance storage nor changelogs are desired. The current implementation prevents this. *Suggestion:* One possible solution is to use the same materialization method that is provided in the argument when creating the intermediary Foreign Key state-store. If the Materialization is in memory and without changelog, the same happens in the state-sore. was: *Status Quo:* The current implementation of [KIP-213 |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]] of Foreign Key Join between two KTables is _opinionated_ in terms of storage layer. Independently of the Materialization method provided in the method argument, it generates an intermediary RocksDB state store. Thus, even when the Materialization method provided is "in memory", it will use RocksDB under-the-hood for this state-store. *Related problems:* * **IT Test: Having an implicit materialization method for state-store affects tests using foreign key state-stores. [On windows based systems |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]], that have the RocksDB filesystem removal problem, a solution to avoid the bug is to use in-memory state-stores (rather than exception swallowing). Having the RocksDB storage being forcely created makes that any IT test necessarily use the manual FS deletion with exception swallow hack. * Short lived Streams: Sometimes, Ktables are short lived in a way that neither Persistance storage nor changelogs are desired. The current implementation prevents this. *Suggestion:* One possible solution is to use the same materialization method that is provided in the argument when creating the intermediary Foreign Key state-store. If the Materialization is in memory and without changelog, the same happens in the state-sore. > KTable Join on Foreign key is opinionated > -- > > Key: KAFKA-10383 > URL: https://issues.apache.org/jira/browse/KAFKA-10383 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.4.1 >Reporter: Marco Lotz >Priority: Major > > *Status Quo:* > The current implementation of [KIP-213 > |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]] > of Foreign Key Join between two KTables is _opinionated_ in terms of storage > layer. > Independently of the Materialization method provided in the method argument, > it generates an intermediary RocksDB state store. Thus, even when the > Materialization method provided is "in memory", it will use RocksDB > under-the-hood for this internal state-store. > > *Related problems:* > * **IT Test: Having an implicit materialization method for state-store > affects tests using foreign key state-stores. [On windows based systems > |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]], > that have the RocksDB filesystem removal problem, a solution to avoid the > bug is to use in-memory state-stores (rather than exception swallowing). > Having the RocksDB storage being forcely created makes that any IT test > necessarily use the manual FS deletion with exception swallow hack. > * Short lived Streams: Sometimes, Ktables are short lived in a way that > neither Persistance storage nor changelogs are desired. The current > implementation prevents this. > *Suggestion:* > One possible solution is to use
[jira] [Updated] (KAFKA-10383) KTable Join on Foreign key is opinionated
[ https://issues.apache.org/jira/browse/KAFKA-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marco Lotz updated KAFKA-10383: --- Description: *Status Quo:* The current implementation of [KIP-213 |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]] of Foreign Key Join between two KTables is _opinionated_ in terms of storage layer. Independently of the Materialization method provided in the method argument, it generates an intermediary RocksDB state store. Thus, even when the Materialization method provided is "in memory", it will use RocksDB under-the-hood for this internal state-store. *Related problems:* * IT Tests: Having an implicit materialization method for state-store affects tests using foreign key state-stores. [On windows based systems |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]], that have the RocksDB filesystem removal problem, a solution to avoid the bug is to use in-memory state-stores (rather than exception swallowing). Having the RocksDB storage being forcely created makes that any IT test necessarily use the manual FS deletion with exception swallow hack. * Short lived Streams: Sometimes, Ktables are short lived in a way that neither Persistance storage nor changelogs are desired. The current implementation prevents this. *Suggestion:* One possible solution is to use the same materialization method that is provided in the argument when creating the intermediary Foreign Key state-store. If the Materialization is in memory and without changelog, the same happens in the state-sore. was: *Status Quo:* The current implementation of [KIP-213 |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]] of Foreign Key Join between two KTables is _opinionated_ in terms of storage layer. Independently of the Materialization method provided in the method argument, it generates an intermediary RocksDB state store. Thus, even when the Materialization method provided is "in memory", it will use RocksDB under-the-hood for this internal state-store. *Related problems:* * **IT Test: Having an implicit materialization method for state-store affects tests using foreign key state-stores. [On windows based systems |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]], that have the RocksDB filesystem removal problem, a solution to avoid the bug is to use in-memory state-stores (rather than exception swallowing). Having the RocksDB storage being forcely created makes that any IT test necessarily use the manual FS deletion with exception swallow hack. * Short lived Streams: Sometimes, Ktables are short lived in a way that neither Persistance storage nor changelogs are desired. The current implementation prevents this. *Suggestion:* One possible solution is to use the same materialization method that is provided in the argument when creating the intermediary Foreign Key state-store. If the Materialization is in memory and without changelog, the same happens in the state-sore. > KTable Join on Foreign key is opinionated > -- > > Key: KAFKA-10383 > URL: https://issues.apache.org/jira/browse/KAFKA-10383 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.4.1 >Reporter: Marco Lotz >Priority: Major > > *Status Quo:* > The current implementation of [KIP-213 > |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]] > of Foreign Key Join between two KTables is _opinionated_ in terms of storage > layer. > Independently of the Materialization method provided in the method argument, > it generates an intermediary RocksDB state store. Thus, even when the > Materialization method provided is "in memory", it will use RocksDB > under-the-hood for this internal state-store. > > *Related problems:* > * IT Tests: Having an implicit materialization method for state-store > affects tests using foreign key state-stores. [On windows based systems > |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]], > that have the RocksDB filesystem removal problem, a solution to avoid the > bug is to use in-memory state-stores (rather than exception swallowing). > Having the RocksDB storage being forcely created makes that any IT test > necessarily use the manual FS deletion with exception swallow hack. > * Short lived Streams: Sometimes, Ktables are short lived in a way that > neither Persistance storage nor changelogs are desired. The current > implementation prevents this. > *Suggestion:* > One possible solution i
[jira] [Updated] (KAFKA-10383) KTable Join on Foreign key is opinionated
[ https://issues.apache.org/jira/browse/KAFKA-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Marco Lotz updated KAFKA-10383: --- Description: *Status Quo:* The current implementation of [KIP-213 |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]] of Foreign Key Join between two KTables is _opinionated_ in terms of storage layer. Independently of the Materialization method provided in the method argument, it generates an intermediary RocksDB state store. Thus, even when the Materialization method provided is "in memory", it will use RocksDB under-the-hood for this internal state-store. *Related problems:* * IT Tests: Having an implicit materialization method for state-store affects tests using foreign key state-stores. [On windows based systems |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]], that are affected by the RocksDB filesystem removal problem, an approach to avoid the bug is to use in-memory state-stores (rather than exception swallowing). Having the intermediate RocksDB storage being created disregarding materialization method forces any IT test to necessarily use the manual FS deletion with exception swallowing hack. * Short lived Streams: Ktables can be short lived in a way that neither persistent storage nor change-logs creation are desired. The current implementation prevents this. *Suggestion:* One possible solution is to use a similar materialization method (to the one provided in the argument) when creating the intermediary Foreign Key state-store. If the Materialization is in memory and without changelog, the same happens in the intermediate state-sore. was: *Status Quo:* The current implementation of [KIP-213 |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]] of Foreign Key Join between two KTables is _opinionated_ in terms of storage layer. Independently of the Materialization method provided in the method argument, it generates an intermediary RocksDB state store. Thus, even when the Materialization method provided is "in memory", it will use RocksDB under-the-hood for this internal state-store. *Related problems:* * IT Tests: Having an implicit materialization method for state-store affects tests using foreign key state-stores. [On windows based systems |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]], that have the RocksDB filesystem removal problem, a solution to avoid the bug is to use in-memory state-stores (rather than exception swallowing). Having the RocksDB storage being forcely created makes that any IT test necessarily use the manual FS deletion with exception swallow hack. * Short lived Streams: Sometimes, Ktables are short lived in a way that neither Persistance storage nor changelogs are desired. The current implementation prevents this. *Suggestion:* One possible solution is to use the same materialization method that is provided in the argument when creating the intermediary Foreign Key state-store. If the Materialization is in memory and without changelog, the same happens in the state-sore. > KTable Join on Foreign key is opinionated > -- > > Key: KAFKA-10383 > URL: https://issues.apache.org/jira/browse/KAFKA-10383 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.4.1 >Reporter: Marco Lotz >Priority: Major > > *Status Quo:* > The current implementation of [KIP-213 > |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]] > of Foreign Key Join between two KTables is _opinionated_ in terms of storage > layer. > Independently of the Materialization method provided in the method argument, > it generates an intermediary RocksDB state store. Thus, even when the > Materialization method provided is "in memory", it will use RocksDB > under-the-hood for this internal state-store. > > *Related problems:* > * IT Tests: Having an implicit materialization method for state-store > affects tests using foreign key state-stores. [On windows based systems > |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]], > that are affected by the RocksDB filesystem removal problem, an approach to > avoid the bug is to use in-memory state-stores (rather than exception > swallowing). Having the intermediate RocksDB storage being created > disregarding materialization method forces any IT test to necessarily use the > manual FS deletion with exception swallowing hack. > * Short lived Streams: Ktables can be short lived in a way that neither > persisten
[jira] [Created] (KAFKA-10384) Separate converters from generated messages
Colin McCabe created KAFKA-10384: Summary: Separate converters from generated messages Key: KAFKA-10384 URL: https://issues.apache.org/jira/browse/KAFKA-10384 Project: Kafka Issue Type: Bug Reporter: Colin McCabe Assignee: Colin McCabe Separate the JSON converter classes from the message classes, so that the clients module can be used without Jackson on the CLASSPATH. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #9155: MINOR: Ensure a single version of scala-library is used
ijuma commented on pull request #9155: URL: https://github.com/apache/kafka/pull/9155#issuecomment-671562430 2 builds passed, 1 flaky test failure for Java 8. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #9155: MINOR: Ensure a single version of scala-library is used
ijuma merged pull request #9155: URL: https://github.com/apache/kafka/pull/9155 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apovzner commented on a change in pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…
apovzner commented on a change in pull request #8768: URL: https://github.com/apache/kafka/pull/8768#discussion_r468175154 ## File path: core/src/test/scala/unit/kafka/network/ConnectionQuotasTest.scala ## @@ -70,37 +80,38 @@ class ConnectionQuotasTest { blockedPercentMeters.put(name, KafkaMetricsGroup.newMeter( s"${name}BlockedPercent", "blocked time", TimeUnit.NANOSECONDS, Map(ListenerMetricTag -> name))) } +// use system time, because ConnectionQuota causes the current thread to wait with timeout, which waits based on +// system time; so using mock time will likely result in test flakiness due to a mixed use of mock and system time +metrics = new Metrics(new MetricConfig(), Collections.emptyList(), Time.SYSTEM) Review comment: This is required for all tests. For tests that are not supposed to trigger throttling due to connection rate quota, we want this because if the code incorrectly throttles to limit rate (calls wait() with timeout), the existing tests may start failing in a way that is hard to debug (timeout too early or too late, not in a place we expect). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #9153: MINOR: Fix state transition diagram for stream threads
guozhangwang merged pull request #9153: URL: https://github.com/apache/kafka/pull/9153 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10383) KTable Join on Foreign key is opinionated
[ https://issues.apache.org/jira/browse/KAFKA-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10383: - Component/s: (was: core) streams > KTable Join on Foreign key is opinionated > -- > > Key: KAFKA-10383 > URL: https://issues.apache.org/jira/browse/KAFKA-10383 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.4.1 >Reporter: Marco Lotz >Priority: Major > > *Status Quo:* > The current implementation of [KIP-213 > |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]] > of Foreign Key Join between two KTables is _opinionated_ in terms of storage > layer. > Independently of the Materialization method provided in the method argument, > it generates an intermediary RocksDB state store. Thus, even when the > Materialization method provided is "in memory", it will use RocksDB > under-the-hood for this internal state-store. > > *Related problems:* > * IT Tests: Having an implicit materialization method for state-store > affects tests using foreign key state-stores. [On windows based systems > |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]], > that are affected by the RocksDB filesystem removal problem, an approach to > avoid the bug is to use in-memory state-stores (rather than exception > swallowing). Having the intermediate RocksDB storage being created > disregarding materialization method forces any IT test to necessarily use the > manual FS deletion with exception swallowing hack. > * Short lived Streams: Ktables can be short lived in a way that neither > persistent storage nor change-logs creation are desired. The current > implementation prevents this. > *Suggestion:* > One possible solution is to use a similar materialization method (to the one > provided in the argument) when creating the intermediary Foreign Key > state-store. If the Materialization is in memory and without changelog, the > same happens in the intermediate state-sore. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] skaundinya15 commented on pull request #9143: MINOR: Fix the way total consumed is calculated for verifiable consumer
skaundinya15 commented on pull request #9143: URL: https://github.com/apache/kafka/pull/9143#issuecomment-671599091 I re-ran the system tests for `streams_broker_compatibility_test` here: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-08-10--001.1597088498--skaundinya15--minor-fix-total-consumed-for-verifiable-consumer--d799e563e/report.html and got a green build. Seems like anything dependent on this method/variable checks out ok for this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10383) KTable Join on Foreign key is opinionated
[ https://issues.apache.org/jira/browse/KAFKA-10383?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17175074#comment-17175074 ] John Roesler commented on KAFKA-10383: -- Thanks for the report, [~marcolotz]. This seems like a design oversight. It does seem desirable to plug in different stores as the subscription store. I'm not sure if I'd piggy-back on the existing Materialized argument, as the subscription state would have a completely different shape and dynamic from the join result (which is what Materialized configures). Plus, you may want to (eg) set the subscription state to in-memory without materializing the join result. If we piggy-back, there would be no way to express this. At a glance, it seems like we should have a separate argument to the join, which would be a new object allowing to configure the things that make sense for a subscription store: * KeyValueBytesStoreSupplier: the kind of store to use * {color:#00627a}withLoggingEnabled{color}({color:#0033b3}final {color}{color:#00}Map{color}<{color:#00}String{color}, {color:#00}String{color}> config) / withLoggingDisabled(): the changelog configs * withCachingEnabled() / withCachingDisabled(): the caching configs This would require a KIP, of course. Are you open to contributing this feature? I think a lot of people would find it helpful as the feature becomes more popular. I'd be happy to help you with the process if you're willing. Thanks, -John > KTable Join on Foreign key is opinionated > -- > > Key: KAFKA-10383 > URL: https://issues.apache.org/jira/browse/KAFKA-10383 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.4.1 >Reporter: Marco Lotz >Priority: Major > > *Status Quo:* > The current implementation of [KIP-213 > |[https://cwiki.apache.org/confluence/display/KAFKA/KIP-213+Support+non-key+joining+in+KTable]] > of Foreign Key Join between two KTables is _opinionated_ in terms of storage > layer. > Independently of the Materialization method provided in the method argument, > it generates an intermediary RocksDB state store. Thus, even when the > Materialization method provided is "in memory", it will use RocksDB > under-the-hood for this internal state-store. > > *Related problems:* > * IT Tests: Having an implicit materialization method for state-store > affects tests using foreign key state-stores. [On windows based systems > |[https://stackoverflow.com/questions/50602512/failed-to-delete-the-state-directory-in-ide-for-kafka-stream-application]], > that are affected by the RocksDB filesystem removal problem, an approach to > avoid the bug is to use in-memory state-stores (rather than exception > swallowing). Having the intermediate RocksDB storage being created > disregarding materialization method forces any IT test to necessarily use the > manual FS deletion with exception swallowing hack. > * Short lived Streams: Ktables can be short lived in a way that neither > persistent storage nor change-logs creation are desired. The current > implementation prevents this. > *Suggestion:* > One possible solution is to use a similar materialization method (to the one > provided in the argument) when creating the intermediary Foreign Key > state-store. If the Materialization is in memory and without changelog, the > same happens in the intermediate state-sore. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder
vvcephei commented on a change in pull request #9141: URL: https://github.com/apache/kafka/pull/9141#discussion_r468206463 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java ## @@ -85,68 +160,38 @@ groupedStreams.remove(kGrouped); kGrouped.ensureCopartitionWith(groupedStreams); -final Collection processors = new ArrayList<>(); -boolean stateCreated = false; -int counter = 0; -for (final Entry, Aggregator> kGroupedStream : groupPatterns.entrySet()) { -final StatefulProcessorNode statefulProcessorNode = getStatefulProcessorNode( -kGroupedStream.getValue(), -initializer, -named.suffixWithOrElseGet( -"-cogroup-agg-" + counter++, -builder, -CogroupedKStreamImpl.AGGREGATE_NAME), -stateCreated, -storeBuilder, -windows, -sessionWindows, -sessionMerger); -stateCreated = true; -processors.add(statefulProcessorNode); -builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode); -} +} + + KTable createTable(final Collection processors, + final NamedInternal named, + final Serde keySerde, + final Serde valueSerde, + final String queryableName) { final String mergeProcessorName = named.suffixWithOrElseGet( -"-cogroup-merge", -builder, -CogroupedKStreamImpl.MERGE_NAME); +"-cogroup-merge", +builder, +CogroupedKStreamImpl.MERGE_NAME); Review comment: It seems like your indentation is set to 8 spaces instead of 4. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder
vvcephei commented on pull request #9141: URL: https://github.com/apache/kafka/pull/9141#issuecomment-671609419 All the tests were green except for the flaky ``` org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false] org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
vvcephei commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r468214782 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java ## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.internals.ApiUtils; +import org.apache.kafka.streams.processor.TimestampExtractor; +import java.time.Duration; +import java.util.Objects; +import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; + +/** + /** + * A sliding window used for aggregating events. + * + * Sliding Windows are defined based on a record's timestamp, window size based on the given maximum time difference (inclusive) between + * records in the same window and given window grace period. Review comment: Haha, my specialty! The distillation of this sentence is "Windows are defined based on a record's timestamp, window size, and window grace period." I think the meaning is pretty clear, so no need to change anything. Just to point it out, there's structural ambiguity about whether the sentence is saying "a record's (timestamp, window size, window grace period)" (I.e., three properties of the record), or whether there are three top-level things that define the window. The latter was intended. I think actually inserting "the" before "window" both times would clear it up: Windows are defined based on a record's timestamp, the window size, and the window grace period." Another note is that because the second item in the list is so long, the structure of the list gets a little lost. It would be better in this case to use the Oxford comma to clearly delineate the boundary between the second and third items. So, although I think this is fine as-is, if you want me to break out the red pen, I'd say: ``` * Sliding Windows are defined based on a record's timestamp, the window size based on the given maximum time difference (inclusive) between * records in the same window, and the given window grace period. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
vvcephei commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r468214782 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindows.java ## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.internals.ApiUtils; +import org.apache.kafka.streams.processor.TimestampExtractor; +import java.time.Duration; +import java.util.Objects; +import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix; + +/** + /** + * A sliding window used for aggregating events. + * + * Sliding Windows are defined based on a record's timestamp, window size based on the given maximum time difference (inclusive) between + * records in the same window and given window grace period. Review comment: Haha, my specialty! The distillation of this sentence is "Windows are defined based on a record's timestamp, window size, and window grace period." I think the meaning is pretty clear, so no need to change anything. Just to point it out, there's structural ambiguity about whether the sentence is saying "a record's (timestamp, window size, window grace period)" (I.e., three properties of the record), or whether there are three top-level things that define the window. The latter was intended. I think actually inserting "the" before "window" both times would clear it up: "Windows are defined based on a record's timestamp, the window size, and the window grace period." Another note is that because the second item in the list is so long, the structure of the list gets a little lost. It would be better in this case to use the Oxford comma to clearly delineate the boundary between the second and third items. So, although I think this is fine as-is, if you want me to break out the red pen, I'd say: ``` * Sliding Windows are defined based on a record's timestamp, the window size based on the given maximum time difference (inclusive) between * records in the same window, and the given window grace period. ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9155: MINOR: Ensure a single version of scala-library is used
ijuma commented on pull request #9155: URL: https://github.com/apache/kafka/pull/9155#issuecomment-671617848 Merged to master and cherry-picked to 2.6, 2.5 and 2.4 branches. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder
vvcephei merged pull request #9141: URL: https://github.com/apache/kafka/pull/9141 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder
vvcephei commented on pull request #9141: URL: https://github.com/apache/kafka/pull/9141#issuecomment-671622789 Verified the last commit only changed whitespace before merging. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API
rondagostino commented on a change in pull request #9032: URL: https://github.com/apache/kafka/pull/9032#discussion_r468149236 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/Admin.java ## @@ -1214,6 +1215,64 @@ default AlterClientQuotasResult alterClientQuotas(Collection entries, AlterClientQuotasOptions options); +/** + * Describe all SASL/SCRAM credentials. + * + * This is a convenience method for {@link #describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)} + * + * @return The DescribeUserScramCredentialsResult. + */ +default DescribeUserScramCredentialsResult describeUserScramCredentials() { +return describeUserScramCredentials(null, new DescribeUserScramCredentialsOptions()); +} + +/** + * Describe SASL/SCRAM credentials for the given users. + * + * This is a convenience method for {@link #describeUserScramCredentials(List, DescribeUserScramCredentialsOptions)} + * + * @param users the users for which credentials are to be described; all users' credentials are described if null + * or empty. A user explicitly specified here that does not have a SCRAM credential will not appear + * in the results. Review comment: Hmm, good question. The KIP doesn't state what do do here. @cmccabe thoughts? ## File path: core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala ## @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + + +import java.nio.charset.StandardCharsets +import java.util +import java.util.Properties + +import kafka.network.SocketServer +import kafka.security.authorizer.AclAuthorizer +import org.apache.kafka.clients.admin.ScramMechanism +import org.apache.kafka.common.acl.AclOperation +import org.apache.kafka.common.message.{AlterUserScramCredentialsRequestData, DescribeUserScramCredentialsRequestData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse, DescribeUserScramCredentialsRequest, DescribeUserScramCredentialsResponse} +import org.apache.kafka.common.resource.ResourceType +import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder} +import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult} +import org.junit.Assert._ +import org.junit.{Before, Test} + +import scala.jdk.CollectionConverters._ + +/** + * Test AlterUserScramCredentialsRequest/Response API for the cases where either no credentials are altered + * or failure is expected due to lack of authorization, sending the request to a non-controller broker, or some other issue. + * Also tests the Alter and Describe APIs for the case where credentials are successfully altered/described. + */ +class AlterUserScramCredentialsRequestTest extends BaseRequestTest { + override def brokerPropertyOverrides(properties: Properties): Unit = { +properties.put(KafkaConfig.ControlledShutdownEnableProp, "false") +properties.put(KafkaConfig.AuthorizerClassNameProp, classOf[AlterCredentialsTest.TestAuthorizer].getName) +properties.put(KafkaConfig.PrincipalBuilderClassProp, classOf[AlterCredentialsTest.TestPrincipalBuilder].getName) + } + + @Before + override def setUp(): Unit = { +AlterCredentialsTest.principal = KafkaPrincipal.ANONYMOUS // default is to be authorized +super.setUp() + } + + @Test + def testAlterNothing(): Unit = { +val request = new AlterUserScramCredentialsRequest.Builder( + new AlterUserScramCredentialsRequestData() +.setDeletions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialDeletion]) +.setUpsertions(new util.ArrayList[AlterUserScramCredentialsRequestData.ScramCredentialUpsertion])).build() +val response = sendAlterUserScramCredentialsRequest(request) + +val results = response.data.results +assertEquals(0, results.size) + } + + @Test + def testAlterNothingNotAuthorized(): Unit = { +AlterCredentialsTest.principal = AlterCredential
[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #9001: URL: https://github.com/apache/kafka/pull/9001#discussion_r468111300 ## File path: core/src/main/scala/kafka/server/FinalizedFeatureCache.scala ## @@ -82,18 +110,54 @@ object FinalizedFeatureCache extends Logging { " The existing cache contents are %s").format(latest, oldFeatureAndEpoch) throw new FeatureCacheUpdateException(errorMsg) } else { - val incompatibleFeatures = SupportedFeatures.incompatibleFeatures(latest.features) + val incompatibleFeatures = brokerFeatures.incompatibleFeatures(latest.features) if (!incompatibleFeatures.empty) { val errorMsg = ("FinalizedFeatureCache update failed since feature compatibility" + " checks failed! Supported %s has incompatibilities with the latest %s." - ).format(SupportedFeatures.get, latest) + ).format(brokerFeatures.supportedFeatures, latest) Review comment: Good question. The existing behavior is that it shuts itself down, as triggered by this LOC. The reason to do it is that an incompatible broker can potentially do harmful things to a cluster (because max version level upgrades are used for breaking changes): https://github.com/apache/kafka/blob/89a3ba69e03acbe9635ee1039abb567bf0c6631b/core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala#L154-L156. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10223) ReplicaNotAvailableException must be retriable to handle reassignments
[ https://issues.apache.org/jira/browse/KAFKA-10223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17175094#comment-17175094 ] Dongjoon Hyun commented on KAFKA-10223: --- Thank you for the confirmation, [~rsivaram]. > ReplicaNotAvailableException must be retriable to handle reassignments > -- > > Key: KAFKA-10223 > URL: https://issues.apache.org/jira/browse/KAFKA-10223 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Critical > Fix For: 2.6.0 > > > ReplicaNotAvailableException should be a retriable `InvalidMetadataException` > since consumers may throw this during reassignments. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #9112: KAFKA-10312 Fix error code returned by getPartitionMetadata
hachikuji commented on a change in pull request #9112: URL: https://github.com/apache/kafka/pull/9112#discussion_r468270027 ## File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ## @@ -229,7 +205,7 @@ class MetadataCacheTest { errorUnavailableListeners: Boolean): Unit = { val topic = "topic" -val cache = new MetadataCache(1) +val cache = new MetadataCache(9) Review comment: nit: maybe it would be clearer if we passed `brokerId` as an argument? Then we could use broker 0 in the test case, for example, so that the UpdateMetadata request makes more sense. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10381) Add broker to a cluster not rebalancing partitions
[ https://issues.apache.org/jira/browse/KAFKA-10381?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17175147#comment-17175147 ] Yogesh BG commented on KAFKA-10381: --- One more observation is that when i restart the leader node for that partition, it picks up and issue is reolved. But we can not do restart in real scenario - will be having data loss during restarts > Add broker to a cluster not rebalancing partitions > -- > > Key: KAFKA-10381 > URL: https://issues.apache.org/jira/browse/KAFKA-10381 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.3.0 >Reporter: Yogesh BG >Priority: Major > > Hi > I have 3 node cluster, topic with one partition. when a node is deleted and > add another node. Topic goes on unknown state and not able to write/read > anything, below exception is seen > > {code:java} > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition C-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1002,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1002,1004 for partition B-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > [2020-08-10 00:00:00,108] WARN [ReplicaManager broker=1004] Leader 1004 > failed to record follower 1005's position 0 since the replica is not > recognized to be one of the assigned replicas 1003,1004 for partition A-0. > Empty records will be returned for this partition. > (kafka.server.ReplicaManager) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] apovzner commented on pull request #8768: KAFKA-10023: Enforce broker-wide and per-listener connection creation…
apovzner commented on pull request #8768: URL: https://github.com/apache/kafka/pull/8768#issuecomment-671684203 @rajinisivaram Thanks for your review. I updated this PR to expose 'connection-accept-rate' metrics, addressed the remaining comments, and the test should also be fixed. It seems like it would also make sense to add and expose throttle time metrics. What do you think? If so, I will have a separate PR to add throttle-time metrics and use Token Bucket for quota calculation that David added as part of KIP-599 that works better with bursty workloads. I will update the KIP accordingly and notify on voting thread. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)
ableegoldman commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r468218540 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImpl.java ## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Named; +import org.apache.kafka.streams.kstream.SlidingWindows; +import org.apache.kafka.streams.kstream.TimeWindowedCogroupedKStream; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.TimestampedWindowStore; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import java.time.Duration; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public class SlidingWindowedCogroupedKStreamImpl extends AbstractStream implements TimeWindowedCogroupedKStream { +private final SlidingWindows windows; +private final CogroupedStreamAggregateBuilder aggregateBuilder; +private final Map, Aggregator> groupPatterns; + +SlidingWindowedCogroupedKStreamImpl(final SlidingWindows windows, +final InternalStreamsBuilder builder, +final Set subTopologySourceNodes, +final String name, +final CogroupedStreamAggregateBuilder aggregateBuilder, +final StreamsGraphNode streamsGraphNode, +final Map, Aggregator> groupPatterns) { +super(name, null, null, subTopologySourceNodes, streamsGraphNode, builder); +//keySerde and valueSerde are null because there are many different groupStreams that they could be from +this.windows = windows; +this.aggregateBuilder = aggregateBuilder; +this.groupPatterns = groupPatterns; +} + +@Override +public KTable, V> aggregate(final Initializer initializer) { +return aggregate(initializer, Materialized.with(null, null)); +} + +@Override +public KTable, V> aggregate(final Initializer initializer, +final Materialized> materialized) { +return aggregate(initializer, NamedInternal.empty(), materialized); +} + +@Override +public KTable, V> aggregate(final Initializer initializer, +final Named named) { +return aggregate(initializer, named, Materialized.with(null, null)); +} + +@Override +public KTable, V> aggregate(final Initializer initializer, +final Named named, +final Materialized> materialized) { +Objects.requireNonNull(initializer, "initializer can't be null"); +Objects.requireNonNull(named, "named can't be null"); +Objects.requireNonNull(materialized, "materialized can't be null"); +final MaterializedInternal> materializedInternal = new MaterializedInternal<>( +materialized, +builder, +CogroupedKStreamImpl.AGGREGATE_NAME); +return aggregateBuilder.build( +groupPatterns, +initializer, +new NamedInternal(named), +materialize(materializedInternal), +materializedInternal.keySerde() != null ? +new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) +: null, +materializedInternal.value
[GitHub] [kafka] showuon opened a new pull request #9158: MINOR: Update the quickstart link in readme
showuon opened a new pull request #9158: URL: https://github.com/apache/kafka/pull/9158 After the new website launched, the quickstart link also changed. Update the quickstart link in readme.md. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9158: MINOR: Update the quickstart link in readme
showuon commented on pull request #9158: URL: https://github.com/apache/kafka/pull/9158#issuecomment-671702480 @rhauch , could you review this small PR to update quickstart link? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on pull request #9155: MINOR: Ensure a single version of scala-library is used
stanislavkozlovski commented on pull request #9155: URL: https://github.com/apache/kafka/pull/9155#issuecomment-671714884 I just updated the PR description to mention that both tests were using Jackson `2.10.5`. Apologies for the misleading, I must have got confused by the numerous tests I did before finding the issue. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8549: KAFKA-9911: Add new PRODUCER_FENCED error code
guozhangwang commented on a change in pull request #8549: URL: https://github.com/apache/kafka/pull/8549#discussion_r468327920 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ## @@ -1366,6 +1366,10 @@ public void handleResponse(AbstractResponse response) { } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED || error == Errors.CLUSTER_AUTHORIZATION_FAILED) { fatalError(error.exception()); +} else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) { +// We could still receive INVALID_PRODUCER_EPOCH from transaction coordinator, Review comment: Good catch. ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ## @@ -1417,8 +1421,10 @@ public void handleResponse(AbstractResponse response) { } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { reenqueue(); return; -} else if (error == Errors.INVALID_PRODUCER_EPOCH) { -fatalError(error.exception()); +} else if (error == Errors.INVALID_PRODUCER_EPOCH || error == Errors.PRODUCER_FENCED) { Review comment: nit: ".. old versioned transaction coordinator", ditto below. ## File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ## @@ -1571,6 +1637,54 @@ public void testProducerFencedException() throws InterruptedException { Collections.emptyMap(), new ConsumerGroupMetadata("dummyId"))); } +@Test +public void testInvalidProducerEpochConvertToProducerFencedInEndTxn() throws InterruptedException { +doInitTransactions(); + +transactionManager.beginTransaction(); +transactionManager.failIfNotReadyForSend(); +transactionManager.maybeAddPartitionToTransaction(tp0); +TransactionalRequestResult commitResult = transactionManager.beginCommit(); + +Future responseFuture = appendToAccumulator(tp0); + +assertFalse(responseFuture.isDone()); +prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId); +prepareProduceResponse(Errors.NONE, producerId, epoch); +prepareEndTxnResponse(Errors.INVALID_PRODUCER_EPOCH, TransactionResult.COMMIT, producerId, epoch); + +runUntil(commitResult::isCompleted); +runUntil(responseFuture::isDone); + +// make sure the exception was thrown directly from the follow-up calls. +assertThrows(KafkaException.class, () -> transactionManager.beginTransaction()); +assertThrows(KafkaException.class, () -> transactionManager.beginCommit()); +assertThrows(KafkaException.class, () -> transactionManager.beginAbort()); +assertThrows(KafkaException.class, () -> transactionManager.sendOffsetsToTransaction( +Collections.emptyMap(), new ConsumerGroupMetadata("dummyId"))); +} + +@Test +public void testInvalidProducerEpochFromProduce() throws InterruptedException { +doInitTransactions(); + +transactionManager.beginTransaction(); +transactionManager.failIfNotReadyForSend(); +transactionManager.maybeAddPartitionToTransaction(tp0); + +Future responseFuture = appendToAccumulator(tp0); + +assertFalse(responseFuture.isDone()); +prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId); +prepareProduceResponse(Errors.INVALID_PRODUCER_EPOCH, producerId, epoch); +prepareProduceResponse(Errors.NONE, producerId, epoch); + +sender.runOnce(); + +runUntil(responseFuture::isDone); +assertFalse(transactionManager.hasError()); Review comment: Thanks for adding the coverage! ## File path: clients/src/main/java/org/apache/kafka/common/internals/InvalidProducerEpochException.java ## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more Review comment: Why not put this internal exception in `org.apache.kafka.clients.producer.internals`? ## File path: core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala ## @@ -539,7 +539,7 @@ class TransactionCoordinator(brokerId: Int, s"${txnIdAndPidEpoch.transactionalId} due to timeout") case error@(Errors.INVALID_PRODUCER_ID_MAPPING | - Errors.INVALID_PRODUCER_EPOCH | Review comment: In the new broker version, when do we still return `INVALID_PRODUCER_EPOCH` then? Or would we never return it any more? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and u
[jira] [Assigned] (KAFKA-10378) issue when create producer using java
[ https://issues.apache.org/jira/browse/KAFKA-10378?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-10378: - Assignee: Luke Chen > issue when create producer using java > - > > Key: KAFKA-10378 > URL: https://issues.apache.org/jira/browse/KAFKA-10378 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.6.0 > Environment: mac os > java version "1.8.0_231" > intellij >Reporter: Mohammad Abdelqader >Assignee: Luke Chen >Priority: Blocker > Fix For: 2.7.0, 2.6.1 > > > I created simple producer using java by Intellij studio. When i run project , > it return following issue > [kafka-producer-network-thread | producer-1] ERROR > org.apache.kafka.common.utils.KafkaThread - Uncaught exception in thread > 'kafka-producer-network-thread | producer-1':[kafka-producer-network-thread | > producer-1] ERROR org.apache.kafka.common.utils.KafkaThread - Uncaught > exception in thread 'kafka-producer-network-thread | > producer-1':java.lang.NoClassDefFoundError: > com/fasterxml/jackson/databind/JsonNode at > org.apache.kafka.common.requests.ApiVersionsRequest$Builder.(ApiVersionsRequest.java:36) > at > org.apache.kafka.clients.NetworkClient.handleConnections(NetworkClient.java:910) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:555) at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:325) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:240) at > java.lang.Thread.run(Thread.java:748)Caused by: > java.lang.ClassNotFoundException: com.fasterxml.jackson.databind.JsonNode at > java.net.URLClassLoader.findClass(URLClassLoader.java:382) at > java.lang.ClassLoader.loadClass(ClassLoader.java:418) at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at > java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 6 more -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon opened a new pull request #9159: KAFKA-10378: change jacksonDatabind as compile dependency
showuon opened a new pull request #9159: URL: https://github.com/apache/kafka/pull/9159 change `jacksonDatabind` as `compile` dependency ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9159: KAFKA-10378: change jacksonDatabind as compile dependency for clients project
showuon commented on pull request #9159: URL: https://github.com/apache/kafka/pull/9159#issuecomment-671748891 @ijuma , could you review this PR? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-671749755 > Both operations are done under the same group lock. If we relax the lock, it seems that the condition "all members have joined" may no longer be true when we get to DelayedJoin.onComplete() even though that condition was true during the DelayedJoin.tryCompleteJoin() check. It's not clear what we should do in that case. Your feedback always makes sense. 👍 It seems to me the approach has to address following issue. 1. avoid conflicting (multiples) locks 1. small change (don't introduce complicated mechanism) 1. keep behavior compatibility (```hasAllMembersJoined``` and ```onCompleteJoin``` should be included in same lock) I'd like to add an new method (default implementation is empty body) to ```DelayedOperation```. The new method is almost same to ```onComplete``` except for that it is executed without locking. Currently, only ```DelayedJoin``` has to use it. ```scala def cleanup(): Unit = {} private[server] def maybeTryComplete(): Boolean = { lock.lock() if (try tryComplete() finally lock.unlock()) { cleanup() true } else false } override def run(): Unit = { if (forceComplete()) { cleanup() onExpiration() } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org