[GitHub] [kafka] ableegoldman commented on a change in pull request #11584: MINOR: improve logging
ableegoldman commented on a change in pull request #11584: URL: https://github.com/apache/kafka/pull/11584#discussion_r765559208 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -288,7 +288,7 @@ private Exception invokeOnAssignment(final ConsumerPartitionAssignor assignor, f } private Exception invokePartitionsAssigned(final Set assignedPartitions) { -log.info("Adding newly assigned partitions: {}", Utils.join(assignedPartitions, ", ")); +log.info("Adding newly assigned partitions: {}", Utils.join(assignedPartitions.stream().sorted().toArray(), ", ")); Review comment: FYI `TopicPartition` doesn't implement `Comparable` so you can't use `.stream().sorted()` ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -215,7 +215,7 @@ public String protocolType() { @Override protected JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() { -log.debug("Joining group with current subscription: {}", subscriptions.subscription()); +log.debug("Joining group with current subscription: {}", Utils.join(subscriptions.subscription().stream().sorted().toArray(), ", ")); Review comment: Could we just keep the subscription sorted to begin with? (ie store in a sorted data structure) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #11575: KAFKA-13511: Update TimestampConverter support unix epoch as millis, micros, and seconds
mimaison commented on pull request #11575: URL: https://github.com/apache/kafka/pull/11575#issuecomment-989705328 Hi @twobeeb, thanks for the PR, this looks like a useful addition. However as this is adding a new configuration, this change requires a KIP before we can accept it. Take a look at https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals. This page details the KIP process. Let me know if you have any questions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11584: MINOR: improve logging
showuon commented on a change in pull request #11584: URL: https://github.com/apache/kafka/pull/11584#discussion_r765638758 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -288,7 +288,7 @@ private Exception invokeOnAssignment(final ConsumerPartitionAssignor assignor, f } private Exception invokePartitionsAssigned(final Set assignedPartitions) { -log.info("Adding newly assigned partitions: {}", Utils.join(assignedPartitions, ", ")); +log.info("Adding newly assigned partitions: {}", Utils.join(assignedPartitions.stream().sorted().toArray(), ", ")); Review comment: Good point! ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -397,10 +397,10 @@ protected void onJoinComplete(int generation, "\tCurrent owned partitions: {}\n" + "\tAdded partitions (assigned - owned): {}\n" + "\tRevoked partitions (owned - assigned): {}\n", -assignedPartitions, -ownedPartitions, -addedPartitions, -revokedPartitions +Utils.join(assignedPartitions.stream().sorted().toArray(), ", "), +Utils.join(ownedPartitions.stream().sorted().toArray(), ", "), +Utils.join(addedPartitions.stream().sorted().toArray(), ", "), +Utils.join(revokedPartitions.stream().sorted().toArray(), ", ") Review comment: Actually, the `assignedPartitions`, `addedPartitions`, and `revokedPartitions`, and the following `invokePartitionsRevoked`, `invokePartitionsAssigned` are coming from the `assignment.partitions()`(or the `assignedPartitions`). We can sort it first, and the rest of the partitions will be sorted. Just the `ownedPartitions` might still need to sort itself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vpapavas opened a new pull request #11585: chore: cleanup minor leftovers
vpapavas opened a new pull request #11585: URL: https://github.com/apache/kafka/pull/11585 Clean up some minor things that were left over from PR https://github.com/apache/kafka/pull/11513 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10503) MockProducer doesn't throw ClassCastException when no partition for topic
[ https://issues.apache.org/jira/browse/KAFKA-10503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456308#comment-17456308 ] Gonzalo Muñoz Fernández commented on KAFKA-10503: - [~dhunziker] yes, if the serializers are null, there's a NPE. I agree with that, thanks for pointing out! > MockProducer doesn't throw ClassCastException when no partition for topic > - > > Key: KAFKA-10503 > URL: https://issues.apache.org/jira/browse/KAFKA-10503 > Project: Kafka > Issue Type: Improvement > Components: clients, producer >Affects Versions: 2.6.0 >Reporter: Gonzalo Muñoz Fernández >Assignee: Gonzalo Muñoz Fernández >Priority: Minor > Labels: mock, producer > Fix For: 2.7.0 > > Original Estimate: 1h > Remaining Estimate: 1h > > Though {{MockProducer}} admits serializers in its constructors, it doesn't > check during {{send}} method that those serializers are the proper ones to > serialize key/value included into the {{ProducerRecord}}. > [This > check|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java#L499-L500] > is only done if there is a partition assigned for that topic. > It would be an enhancement if these serialize methods were also invoked in > simple scenarios, where no partition is assigned to a topic. > eg: > {code:java} > @Test > public void shouldThrowClassCastException() { > MockProducer producer = new MockProducer<>(true, new > IntegerSerializer(), new StringSerializer()); > ProducerRecord record = new ProducerRecord(TOPIC, "key1", "value1"); > try { > producer.send(record); > fail("Should have thrown ClassCastException because record cannot > be casted with serializers"); > } catch (ClassCastException e) {} > } > {code} > Currently, for obtaining the ClassCastException is needed to define the topic > into a partition: > {code:java} > PartitionInfo partitionInfo = new PartitionInfo(TOPIC, 0, null, null, null); > Cluster cluster = new Cluster(null, emptyList(), asList(partitionInfo), > emptySet(), emptySet()); > producer = new MockProducer(cluster, > true, > new DefaultPartitioner(), > new IntegerSerializer(), > new StringSerializer()); > {code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13527) Add top-level error code field to DescribeLogDirsResponse
Mickael Maison created KAFKA-13527: -- Summary: Add top-level error code field to DescribeLogDirsResponse Key: KAFKA-13527 URL: https://issues.apache.org/jira/browse/KAFKA-13527 Project: Kafka Issue Type: Bug Reporter: Mickael Maison Assignee: Mickael Maison Ticket for KIP-784: https://cwiki.apache.org/confluence/display/KAFKA/KIP-784%3A+Add+top-level+error+code+field+to+DescribeLogDirsResponse -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] ableegoldman merged pull request #11562: KAFKA-12648: extend IQ APIs to work with named topologies
ableegoldman merged pull request #11562: URL: https://github.com/apache/kafka/pull/11562 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a change in pull request #11393: MINOR: Refactor RequestResponseTest
tombentley commented on a change in pull request #11393: URL: https://github.com/apache/kafka/pull/11393#discussion_r765722507 ## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ## @@ -1269,24 +909,656 @@ public void testDeletableTopicResultErrorMessageIsNullByDefault() { assertNull(result.errorMessage()); } -private ResponseHeader createResponseHeader(short headerVersion) { -return new ResponseHeader(10, headerVersion); +/** + * Check that all error codes in the response get included in {@link AbstractResponse#errorCounts()}. + */ +@Test +public void testErrorCountsIncludesNone() { +assertEquals(1, createAddOffsetsToTxnResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createAddPartitionsToTxnResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createAlterClientQuotasResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createAlterConfigsResponse().errorCounts().get(Errors.NONE)); +assertEquals(2, createAlterPartitionReassignmentsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createAlterReplicaLogDirsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createApiVersionResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createBrokerHeartbeatResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createBrokerRegistrationResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createControlledShutdownResponse().errorCounts().get(Errors.NONE)); +assertEquals(2, createCreateAclsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createCreatePartitionsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createCreateTokenResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createCreateTopicResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createDeleteAclsResponse(DELETE_ACLS.latestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(1, createDeleteGroupsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createDeleteTopicsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createDescribeAclsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createDescribeClientQuotasResponse().errorCounts().get(Errors.NONE)); +assertEquals(2, createDescribeConfigsResponse(DESCRIBE_CONFIGS.latestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(1, createDescribeGroupResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createDescribeTokenResponse().errorCounts().get(Errors.NONE)); +assertEquals(2, createElectLeadersResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createEndTxnResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createExpireTokenResponse().errorCounts().get(Errors.NONE)); +assertEquals(3, createFetchResponse(123).errorCounts().get(Errors.NONE)); +assertEquals(1, createFindCoordinatorResponse(FIND_COORDINATOR.oldestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(1, createFindCoordinatorResponse(FIND_COORDINATOR.latestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(1, createHeartBeatResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createIncrementalAlterConfigsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createJoinGroupResponse(JOIN_GROUP.latestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(2, createLeaderAndIsrResponse(4).errorCounts().get(Errors.NONE)); +assertEquals(2, createLeaderAndIsrResponse(5).errorCounts().get(Errors.NONE)); +assertEquals(3, createLeaderEpochResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createLeaveGroupResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createListGroupsResponse(LIST_GROUPS.latestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(1, createListOffsetResponse(LIST_OFFSETS.latestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(1, createListPartitionReassignmentsResponse().errorCounts().get(Errors.NONE)); +assertEquals(3, createMetadataResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createOffsetCommitResponse().errorCounts().get(Errors.NONE)); +assertEquals(2, createOffsetDeleteResponse().errorCounts().get(Errors.NONE)); +assertEquals(3, createOffsetFetchResponse(OFFSET_FETCH.latestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(1, createProduceResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createRenewTokenResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createSaslAuthenticateResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createSaslHandsh
[GitHub] [kafka] tombentley commented on a change in pull request #11393: MINOR: Refactor RequestResponseTest
tombentley commented on a change in pull request #11393: URL: https://github.com/apache/kafka/pull/11393#discussion_r765726907 ## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ## @@ -1269,24 +909,656 @@ public void testDeletableTopicResultErrorMessageIsNullByDefault() { assertNull(result.errorMessage()); } -private ResponseHeader createResponseHeader(short headerVersion) { -return new ResponseHeader(10, headerVersion); +/** + * Check that all error codes in the response get included in {@link AbstractResponse#errorCounts()}. + */ +@Test +public void testErrorCountsIncludesNone() { +assertEquals(1, createAddOffsetsToTxnResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createAddPartitionsToTxnResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createAlterClientQuotasResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createAlterConfigsResponse().errorCounts().get(Errors.NONE)); +assertEquals(2, createAlterPartitionReassignmentsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createAlterReplicaLogDirsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createApiVersionResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createBrokerHeartbeatResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createBrokerRegistrationResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createControlledShutdownResponse().errorCounts().get(Errors.NONE)); +assertEquals(2, createCreateAclsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createCreatePartitionsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createCreateTokenResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createCreateTopicResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createDeleteAclsResponse(DELETE_ACLS.latestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(1, createDeleteGroupsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createDeleteTopicsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createDescribeAclsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createDescribeClientQuotasResponse().errorCounts().get(Errors.NONE)); +assertEquals(2, createDescribeConfigsResponse(DESCRIBE_CONFIGS.latestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(1, createDescribeGroupResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createDescribeTokenResponse().errorCounts().get(Errors.NONE)); +assertEquals(2, createElectLeadersResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createEndTxnResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createExpireTokenResponse().errorCounts().get(Errors.NONE)); +assertEquals(3, createFetchResponse(123).errorCounts().get(Errors.NONE)); +assertEquals(1, createFindCoordinatorResponse(FIND_COORDINATOR.oldestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(1, createFindCoordinatorResponse(FIND_COORDINATOR.latestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(1, createHeartBeatResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createIncrementalAlterConfigsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createJoinGroupResponse(JOIN_GROUP.latestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(2, createLeaderAndIsrResponse(4).errorCounts().get(Errors.NONE)); +assertEquals(2, createLeaderAndIsrResponse(5).errorCounts().get(Errors.NONE)); +assertEquals(3, createLeaderEpochResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createLeaveGroupResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createListGroupsResponse(LIST_GROUPS.latestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(1, createListOffsetResponse(LIST_OFFSETS.latestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(1, createListPartitionReassignmentsResponse().errorCounts().get(Errors.NONE)); +assertEquals(3, createMetadataResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createOffsetCommitResponse().errorCounts().get(Errors.NONE)); +assertEquals(2, createOffsetDeleteResponse().errorCounts().get(Errors.NONE)); +assertEquals(3, createOffsetFetchResponse(OFFSET_FETCH.latestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(1, createProduceResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createRenewTokenResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createSaslAuthenticateResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createSaslHandsh
[GitHub] [kafka] tombentley commented on a change in pull request #11393: MINOR: Refactor RequestResponseTest
tombentley commented on a change in pull request #11393: URL: https://github.com/apache/kafka/pull/11393#discussion_r765733569 ## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ## @@ -1098,7 +741,7 @@ public void testJoinGroupRequestVersion0RebalanceTimeout() { @Test public void testOffsetFetchRequestBuilderToStringV0ToV7() { Review comment: We have `testJoinGroupRequestVersion0RebalanceTimeout` but `testOffsetFetchRequestBuilderToStringV0ToV7`, so rename `testJoinGroupRequestVersion0RebalanceTimeout` → `testJoinGroupRequestV0RebalanceTimeout` for consistency ## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ## @@ -247,403 +280,123 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +// This class performs tests requests and responses for all API keys public class RequestResponseTest { // Exception includes a message that we verify is not included in error responses private final UnknownServerException unknownServerException = new UnknownServerException("secret"); + +@Test +public void testSerialization() { +Map> toSkip = new HashMap<>(); +// It's not possible to create a MetadataRequest v0 via the builder +toSkip.put(METADATA, singletonList((short) 0)); +// DescribeLogDirsResponse does not have a top level error field +toSkip.put(DESCRIBE_LOG_DIRS, DESCRIBE_LOG_DIRS.allVersions()); +// ElectLeaders v0 does not have a top level error field, when accessing it, it defaults to NONE +toSkip.put(ELECT_LEADERS, singletonList((short) 0)); + +for (ApiKeys apikey : ApiKeys.values()) { +for (short version : apikey.allVersions()) { +if (toSkip.containsKey(apikey) && toSkip.get(apikey).contains(version)) continue; +AbstractRequest request = getRequest(apikey, version); +checkRequest(request); +checkErrorResponse(request, unknownServerException); +checkResponse(getResponse(apikey, version), version); +} +} +} + +// This test validates special cases that are not checked in testSerialization @Test -public void testSerialization() throws Exception { -checkRequest(createControlledShutdownRequest(), true); -checkResponse(createControlledShutdownResponse(), 1, true); -checkErrorResponse(createControlledShutdownRequest(), unknownServerException, true); -checkErrorResponse(createControlledShutdownRequest(0), unknownServerException, true); -checkRequest(createFetchRequest(4), true); -checkResponse(createFetchResponse(true), 4, true); +public void testSerializationSpecialCases() { Review comment: `testSerialization` iterates over all keys and call getRequest, such that it's impossible for someone adding a a new RPC to forget to write a serialization test. I wonder if we should do a similar thing for special cases, adding a `testSerializationSpecialCase(ApiKeys key)` which switches and calls per-key methods to do the test. Maybe it's overkill, because many RPCs don't _appear_ to have special cases, but maybe it's really lack of coverage? ## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ## @@ -771,11 +424,11 @@ public void testPartitionSize() { CompressionType.NONE, new SimpleRecord("woot".getBytes()), new SimpleRecord("woot".getBytes())); ProduceRequest request = ProduceRequest.forMagic(RecordBatch.MAGIC_VALUE_V2, new ProduceRequestData() -.setTopicData(new ProduceRequestData.TopicProduceDataCollection(Arrays.asList( Review comment: A comment for line 418: Should we rename this `testProduceRequestPartitionSize` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11408: KAFKA-13374: update doc to allow read from leader/followers
showuon commented on pull request #11408: URL: https://github.com/apache/kafka/pull/11408#issuecomment-989817483 @hachikuji @mimaison , could you help check this 1 line change PR. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #11561: MINOR: Bump version of grgit to 4.1.1
cadonna commented on pull request #11561: URL: https://github.com/apache/kafka/pull/11561#issuecomment-989818400 Cherry-picked to 3.1 \cc @dajac -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #11561: MINOR: Bump version of grgit to 4.1.1
cadonna commented on pull request #11561: URL: https://github.com/apache/kafka/pull/11561#issuecomment-989822040 Cherry-picked to 3.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjinleekr opened a new pull request #11586: KAFKA-13516: Connection level metrics are not closed
dongjinleekr opened a new pull request #11586: URL: https://github.com/apache/kafka/pull/11586 Here is the fix. The core of this approach is using a `ConcurrentHashMap` to manage connection ids and their corresponding sensors. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13516) Connection level metrics are not closed
[ https://issues.apache.org/jira/browse/KAFKA-13516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjin Lee reassigned KAFKA-13516: --- Assignee: Dongjin Lee > Connection level metrics are not closed > --- > > Key: KAFKA-13516 > URL: https://issues.apache.org/jira/browse/KAFKA-13516 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.0 >Reporter: Aman Agarwal >Assignee: Dongjin Lee >Priority: Major > Original Estimate: 24h > Remaining Estimate: 24h > > Connection level metrics are not closed by the Selector on connection close, > hence leaking the sensors. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-9837) New RPC for notifying controller of failed replica
[ https://issues.apache.org/jira/browse/KAFKA-9837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez updated KAFKA-9837: --- Fix Version/s: 3.2.0 (was: 2.9) > New RPC for notifying controller of failed replica > -- > > Key: KAFKA-9837 > URL: https://issues.apache.org/jira/browse/KAFKA-9837 > Project: Kafka > Issue Type: New Feature > Components: controller, core >Reporter: David Arthur >Assignee: dengziming >Priority: Major > Labels: kip-500 > Fix For: 3.2.0 > > > This is the tracking ticket for > [KIP-589|https://cwiki.apache.org/confluence/display/KAFKA/KIP-589+Add+API+to+update+Replica+state+in+Controller]. > For the bridge release, brokers should no longer use ZooKeeper to notify the > controller that a log dir has failed. It should instead use an RPC mechanism. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] rondagostino commented on a change in pull request #11503: KAFKA-13456: Tighten KRaft config checks/constraints
rondagostino commented on a change in pull request #11503: URL: https://github.com/apache/kafka/pull/11503#discussion_r765806468 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1959,10 +1969,28 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami } } - def listenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = { -getMap(KafkaConfig.ListenerSecurityProtocolMapProp, getString(KafkaConfig.ListenerSecurityProtocolMapProp)) + def effectiveListenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = { Review comment: > One more thing to check is that the validations in DynamicListenerConfig make sense in the context of these changes I just push a commit fixing all of the minor issues, I'll look into this possible issue later today. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #11573: KAFKA-13507: GlobalProcessor ignores user specified names
bbejeck commented on pull request #11573: URL: https://github.com/apache/kafka/pull/11573#issuecomment-989875505 Java 8 and Java 11 passed Java 17 failure unrelated - `org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] patrickstuedi commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2
patrickstuedi commented on a change in pull request #11581: URL: https://github.com/apache/kafka/pull/11581#discussion_r765750361 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -62,4 +71,32 @@ public static void updatePosition( position.withComponent(meta.topic(), meta.partition(), meta.offset()); } } + +public static boolean isPermitted( +final Position position, +final PositionBound positionBound, +final int partition +) { +if (positionBound.isUnbounded()) { Review comment: I know we've discussed this, but just for the record :-), I believe the type PositionBound could be saved, unbounded is basically an empty Position. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -62,4 +71,32 @@ public static void updatePosition( position.withComponent(meta.topic(), meta.partition(), meta.offset()); } } + +public static boolean isPermitted( +final Position position, +final PositionBound positionBound, +final int partition +) { +if (positionBound.isUnbounded()) { Review comment: I still think this entire check should be method on Position, as in, Position::dominates(PositionBound) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] patrickstuedi commented on pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2
patrickstuedi commented on pull request #11581: URL: https://github.com/apache/kafka/pull/11581#issuecomment-989876770 Looks good, thanks John! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #11450: KAFKA-13414: Replace Powermock/EasyMock by Mockito in connect.storage
mimaison merged pull request #11450: URL: https://github.com/apache/kafka/pull/11450 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #11393: MINOR: Refactor RequestResponseTest
mimaison commented on a change in pull request #11393: URL: https://github.com/apache/kafka/pull/11393#discussion_r765828093 ## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ## @@ -1269,24 +909,656 @@ public void testDeletableTopicResultErrorMessageIsNullByDefault() { assertNull(result.errorMessage()); } -private ResponseHeader createResponseHeader(short headerVersion) { -return new ResponseHeader(10, headerVersion); +/** + * Check that all error codes in the response get included in {@link AbstractResponse#errorCounts()}. + */ +@Test +public void testErrorCountsIncludesNone() { +assertEquals(1, createAddOffsetsToTxnResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createAddPartitionsToTxnResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createAlterClientQuotasResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createAlterConfigsResponse().errorCounts().get(Errors.NONE)); +assertEquals(2, createAlterPartitionReassignmentsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createAlterReplicaLogDirsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createApiVersionResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createBrokerHeartbeatResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createBrokerRegistrationResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createControlledShutdownResponse().errorCounts().get(Errors.NONE)); +assertEquals(2, createCreateAclsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createCreatePartitionsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createCreateTokenResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createCreateTopicResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createDeleteAclsResponse(DELETE_ACLS.latestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(1, createDeleteGroupsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createDeleteTopicsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createDescribeAclsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createDescribeClientQuotasResponse().errorCounts().get(Errors.NONE)); +assertEquals(2, createDescribeConfigsResponse(DESCRIBE_CONFIGS.latestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(1, createDescribeGroupResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createDescribeTokenResponse().errorCounts().get(Errors.NONE)); +assertEquals(2, createElectLeadersResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createEndTxnResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createExpireTokenResponse().errorCounts().get(Errors.NONE)); +assertEquals(3, createFetchResponse(123).errorCounts().get(Errors.NONE)); +assertEquals(1, createFindCoordinatorResponse(FIND_COORDINATOR.oldestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(1, createFindCoordinatorResponse(FIND_COORDINATOR.latestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(1, createHeartBeatResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createIncrementalAlterConfigsResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createJoinGroupResponse(JOIN_GROUP.latestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(2, createLeaderAndIsrResponse(4).errorCounts().get(Errors.NONE)); +assertEquals(2, createLeaderAndIsrResponse(5).errorCounts().get(Errors.NONE)); +assertEquals(3, createLeaderEpochResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createLeaveGroupResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createListGroupsResponse(LIST_GROUPS.latestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(1, createListOffsetResponse(LIST_OFFSETS.latestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(1, createListPartitionReassignmentsResponse().errorCounts().get(Errors.NONE)); +assertEquals(3, createMetadataResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createOffsetCommitResponse().errorCounts().get(Errors.NONE)); +assertEquals(2, createOffsetDeleteResponse().errorCounts().get(Errors.NONE)); +assertEquals(3, createOffsetFetchResponse(OFFSET_FETCH.latestVersion()).errorCounts().get(Errors.NONE)); +assertEquals(1, createProduceResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createRenewTokenResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createSaslAuthenticateResponse().errorCounts().get(Errors.NONE)); +assertEquals(1, createSaslHandshak
[GitHub] [kafka] mimaison commented on a change in pull request #11393: MINOR: Refactor RequestResponseTest
mimaison commented on a change in pull request #11393: URL: https://github.com/apache/kafka/pull/11393#discussion_r765833040 ## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ## @@ -247,403 +280,123 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +// This class performs tests requests and responses for all API keys public class RequestResponseTest { // Exception includes a message that we verify is not included in error responses private final UnknownServerException unknownServerException = new UnknownServerException("secret"); + +@Test +public void testSerialization() { +Map> toSkip = new HashMap<>(); +// It's not possible to create a MetadataRequest v0 via the builder +toSkip.put(METADATA, singletonList((short) 0)); +// DescribeLogDirsResponse does not have a top level error field +toSkip.put(DESCRIBE_LOG_DIRS, DESCRIBE_LOG_DIRS.allVersions()); +// ElectLeaders v0 does not have a top level error field, when accessing it, it defaults to NONE +toSkip.put(ELECT_LEADERS, singletonList((short) 0)); + +for (ApiKeys apikey : ApiKeys.values()) { +for (short version : apikey.allVersions()) { +if (toSkip.containsKey(apikey) && toSkip.get(apikey).contains(version)) continue; +AbstractRequest request = getRequest(apikey, version); +checkRequest(request); +checkErrorResponse(request, unknownServerException); +checkResponse(getResponse(apikey, version), version); +} +} +} + +// This test validates special cases that are not checked in testSerialization @Test -public void testSerialization() throws Exception { -checkRequest(createControlledShutdownRequest(), true); -checkResponse(createControlledShutdownResponse(), 1, true); -checkErrorResponse(createControlledShutdownRequest(), unknownServerException, true); -checkErrorResponse(createControlledShutdownRequest(0), unknownServerException, true); -checkRequest(createFetchRequest(4), true); -checkResponse(createFetchResponse(true), 4, true); +public void testSerializationSpecialCases() { Review comment: It's much harder to cover all special cases. Are we missing coverage for some special cases? Possibly! But at least for the common case, we should be covered now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request #11587: HOTFIX: Bump version of grgit fro 4.1.0 to 4.1.1
cadonna opened a new pull request #11587: URL: https://github.com/apache/kafka/pull/11587 grgit 4.1.0 caused the following error with gradle > Could not resolve all artifacts for configuration ':classpath'. > Could not resolve org.eclipse.jgit:org.eclipse.jgit:latest.release. Required by: project : > org.ajoberstar.grgit:grgit-core:4.1.0 ... making it impossible to build the branch. grgit 4.1.1 fixed this issue. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #11587: HOTFIX: Bump version of grgit fro 4.1.0 to 4.1.1
cadonna commented on pull request #11587: URL: https://github.com/apache/kafka/pull/11587#issuecomment-989908970 Call for review: @ijuma @vvcephei -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #11587: HOTFIX: Bump version of grgit fro 4.1.0 to 4.1.1
cadonna commented on pull request #11587: URL: https://github.com/apache/kafka/pull/11587#issuecomment-989909719 I opened this PR instead of cherry-picking https://github.com/apache/kafka/pull/11561 since the issue is a bit different on 2.8. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #11393: MINOR: Refactor RequestResponseTest
mimaison commented on pull request #11393: URL: https://github.com/apache/kafka/pull/11393#issuecomment-989911761 > A lot (may be all) of the `create*()` methods could be static. Perhaps that would allow factoring some of the RPC-specific tests into their own classes which would make this class less massive. However, it's not really clear to me that that would result in increased coverage, or make it easier to reason about missing coverage, so I guess I don't really see a benefit to that, but wondered what you thought. Yeah this class is massive and splitting it may be helpful to allow taking a closer look at parts. Considering this PR is already almost +/- 2000 lines, I propose to not do it in this PR though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10199) Separate state restoration into separate threads
[ https://issues.apache.org/jira/browse/KAFKA-10199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna reassigned KAFKA-10199: - Assignee: Bruno Cadonna (was: Guozhang Wang) > Separate state restoration into separate threads > > > Key: KAFKA-10199 > URL: https://issues.apache.org/jira/browse/KAFKA-10199 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Bruno Cadonna >Priority: Major > > As part of the restoration optimization effort, we would like to move the > restoration process to separate threads such that: > 1. Stream threads would not be restricted by the main consumer `poll` > frequency to keep as part of the group. > 2. We can allow larger batches of data to be written into the restoration. > Besides this, we'd also like to fix the known issues that for piggy-backed > source topics as changelog topics, the serde exception / extra processing > logic would be skipped. > We would also cleanup the global update tasks as part of this effort to > consolidate to the separate restoration threads, and would also gear them up > with corresponding monitoring metrics (KIPs in progress). -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13500) Consider adding a dedicated standby consumer
[ https://issues.apache.org/jira/browse/KAFKA-13500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456480#comment-17456480 ] Bruno Cadonna commented on KAFKA-13500: --- I am planning to maintain also the standby tasks in the restoration thread. I even called the interface {{StateUpdater}} because of that. > Consider adding a dedicated standby consumer > > > Key: KAFKA-13500 > URL: https://issues.apache.org/jira/browse/KAFKA-13500 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > > We currently use the restore consumer to recover state for active tasks and > to maintain standby tasks during regular processing. This setup has a few > disadvantages > # During state recovery, we might want to apply different consumer configs > compared to standby maintenance during regular processing. > # It make monitoring confusing: because we never commit offsets for > changelog topics, users can only monitor the client's "lag metric" to > observer restore progress (without the need to register a restore listener). > However, if they are interesting in a restore metric, during regular > processing it would report the standby lag, which can be rather confusing. > Because the restore consumer does not use consumer group management, it seems > to be low overhead to actually use a third consumer, because there won't be > any heartbeat thread. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] bbejeck merged pull request #11573: KAFKA-13507: GlobalProcessor ignores user specified names
bbejeck merged pull request #11573: URL: https://github.com/apache/kafka/pull/11573 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #11573: KAFKA-13507: GlobalProcessor ignores user specified names
bbejeck commented on pull request #11573: URL: https://github.com/apache/kafka/pull/11573#issuecomment-989915792 merged #11573 into trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #11573: KAFKA-13507: GlobalProcessor ignores user specified names
bbejeck commented on pull request #11573: URL: https://github.com/apache/kafka/pull/11573#issuecomment-989917167 Thanks @tamara-skokova for the contribution! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10333) Provide an API to retrieve Kafka Connect task configurations
[ https://issues.apache.org/jira/browse/KAFKA-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Mickael Maison resolved KAFKA-10333. Fix Version/s: 2.8.0 Resolution: Duplicate DUP of KAFKA-10833 > Provide an API to retrieve Kafka Connect task configurations > > > Key: KAFKA-10333 > URL: https://issues.apache.org/jira/browse/KAFKA-10333 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > Fix For: 2.8.0 > > > Kafka Connect exposes an API to retrieve configurations from connectors. > Connectors are responsible for creating tasks. When doing so, they have to > build configurations for individual tasks. In some case, the configuration > can be passed as is to tasks but in some others, the configuration is mutated > to make tasks do specific work. > For example with MirrorSourceConnector, the connector configuration has a > field "topics" which is a list of topics and patterns to mirror. When the > connector builds task configurations, it resolves the list of topic names and > patterns to exact partitions and spread the partitions over all the tasks. > It would be useful to identify the exact configuration each task is given. > For MM2, it would allow identifying the partitions that matched the topics > field. I would also help understanding the impact when a task fails -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13507) GlobalProcessor ignores user specified names
[ https://issues.apache.org/jira/browse/KAFKA-13507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-13507: Fix Version/s: 3.2.0 > GlobalProcessor ignores user specified names > > > Key: KAFKA-13507 > URL: https://issues.apache.org/jira/browse/KAFKA-13507 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Matthias J. Sax >Assignee: Tamara Skokova >Priority: Minor > Labels: beginner, newbie > Fix For: 3.2.0 > > > Using `StreamsBuilder.addGlobalStore` users can specify a name via `Consumed` > parameter. However, the specified name is ignored and the created source and > global processor get generated names assigned. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13507) GlobalProcessor ignores user specified names
[ https://issues.apache.org/jira/browse/KAFKA-13507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck updated KAFKA-13507: Affects Version/s: 2.8.0 2.7.0 2.6.0 2.5.0 > GlobalProcessor ignores user specified names > > > Key: KAFKA-13507 > URL: https://issues.apache.org/jira/browse/KAFKA-13507 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.5.0, 2.6.0, 2.7.0, 2.8.0 >Reporter: Matthias J. Sax >Assignee: Tamara Skokova >Priority: Minor > Labels: beginner, newbie > Fix For: 3.2.0 > > > Using `StreamsBuilder.addGlobalStore` users can specify a name via `Consumed` > parameter. However, the specified name is ignored and the created source and > global processor get generated names assigned. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] mimaison commented on a change in pull request #11456: KAFKA-13351: Add possibility to write kafka headers in Kafka Console Producer
mimaison commented on a change in pull request #11456: URL: https://github.com/apache/kafka/pull/11456#discussion_r765876199 ## File path: core/src/main/scala/kafka/tools/ConsoleProducer.scala ## @@ -206,11 +210,25 @@ object ConsoleProducer { .describedAs("size") .ofType(classOf[java.lang.Integer]) .defaultsTo(1024*100) -val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " + - "This allows custom configuration for a user-defined message reader. Default properties include:\n" + - "\tparse.key=true|false\n" + - "\tkey.separator=\n" + - "\tignore.error=true|false") +val propertyOpt = parser.accepts("property", + """A mechanism to pass user-defined properties in the form key=value to the message reader. This allows custom configuration for a user-defined message reader. +|Default properties include: +| parse.key=false +| parse.headers=false +| ignore.error=false +| key.separator=\t +| headers.delimiter=\t +| headers.separator=, +| headers.key.separator=: +|Default parsing pattern when: +| parse.headers=true & parse.key=true: Review comment: Maybe it's time to use more than 80 columns! But that's a discussion for another KIP. As all these `parse.*` default to `false`, what about only listing the configs set to `true` in the example? For example: `parse.headers=false & parse.key=true` -> `parse.key=true` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13077) Replication failing after unclean shutdown of ZK and all brokers
[ https://issues.apache.org/jira/browse/KAFKA-13077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456515#comment-17456515 ] Shivakumar commented on KAFKA-13077: Hi [~junrao] here is the summary of our issue, hope you can help us here Kafka(2.8.1), ZooKeeper(3.6.3) in eks kubernetes 1.19 kafka cluster size = 3 zk cluster size = 3 1) after rolling restart of zk , sometimes all the partitions of the topic become out of sync especially for broker 2, ISR=2 and Leader=2 and other brokers are out of ISR Topic: __consumer_offsets PartitionCount: 50 ReplicationFactor: 3 Configs: compression.type=producer,cleanup.policy=compact,segment.bytes=104857600 Topic: __consumer_offsets Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2 Topic: __consumer_offsets Partition: 1 Leader: 2 Replicas: 1,2,0 Isr: 2 Topic: __consumer_offsets Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,1,0 Topic: __consumer_offsets Partition: 3 Leader: 2 Replicas: 0,2,1 Isr: 2 Topic: __consumer_offsets Partition: 4 Leader: 2 Replicas: 1,0,2 Isr: 2 Topic: __consumer_offsets Partition: 5 Leader: 2 Replicas: 2,1,0 Isr: 2 Topic: __consumer_offsets Partition: 6 Leader: 2 Replicas: 0,1,2 Isr: 2 Topic: __consumer_offsets Partition: 7 Leader: 2 Replicas: 1,2,0 Isr: 2 Topic: __consumer_offsets Partition: 8 Leader: 2 Replicas: 2,0,1 Isr: 2 Topic: __consumer_offsets Partition: 9 Leader: 2 Replicas: 0,2,1 Isr: 2 Topic: __consumer_offsets Partition: 10 Leader: 2 Replicas: 1,0,2 Isr: 2 Topic: __consumer_offsets Partition: 11 Leader: 2 Replicas: 2,1,0 Isr: 2 Topic: __consumer_offsets Partition: 12 Leader: 2 Replicas: 0,1,2 Isr: 2 Topic: __consumer_offsets Partition: 13 Leader: 2 Replicas: 1,2,0 Isr: 2 Topic: __consumer_offsets Partition: 14 Leader: 2 Replicas: 2,0,1 Isr: 2 Topic: __consumer_offsets Partition: 15 Leader: 2 Replicas: 0,2,1 Isr: 2 Topic: __consumer_offsets Partition: 16 Leader: 2 Replicas: 1,0,2 Isr: 2 Topic: __consumer_offsets Partition: 17 Leader: 2 Replicas: 2,1,0 Isr: 2 Topic: __consumer_offsets Partition: 18 Leader: 2 Replicas: 0,1,2 Isr: 2 Topic: __consumer_offsets Partition: 19 Leader: 2 Replicas: 1,2,0 Isr: 2 Topic: __consumer_offsets Partition: 20 Leader: 2 Replicas: 2,0,1 Isr: 2 Topic: __consumer_offsets Partition: 21 Leader: 2 Replicas: 0,2,1 Isr: 2 Topic: __consumer_offsets Partition: 22 Leader: 2 Replicas: 1,0,2 Isr: 2 Topic: __consumer_offsets Partition: 23 Leader: 2 Replicas: 2,1,0 Isr: 2 Topic: __consumer_offsets Partition: 24 Leader: 2 Replicas: 0,1,2 Isr: 2 Topic: __consumer_offsets Partition: 25 Leader: 2 Replicas: 1,2,0 Isr: 2 Topic: __consumer_offsets Partition: 26 Leader: 2 Replicas: 2,0,1 Isr: 2 Topic: __consumer_offsets Partition: 27 Leader: 2 Replicas: 0,2,1 Isr: 2 Topic: __consumer_offsets Partition: 28 Leader: 2 Replicas: 1,0,2 Isr: 2 Topic: __consumer_offsets Partition: 29 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: __consumer_offsets Partition: 30 Leader: 2 Replicas: 0,1,2 Isr: 2 Topic: __consumer_offsets Partition: 31 Leader: 2 Replicas: 1,2,0 Isr: 2 Topic: __consumer_offsets Partition: 32 Leader: 2 Replicas: 2,0,1 Isr: 2 Topic: __consumer_offsets Partition: 33 Leader: 2 Replicas: 0,2,1 Isr: 2 Topic: __consumer_offsets Partition: 34 Leader: 2 Replicas: 1,0,2 Isr: 2 Topic: __consumer_offsets Partition: 35 Leader: 2 Replicas: 2,1,0 Isr: 2 Topic: __consumer_offsets Partition: 36 Leader: 2 Replicas: 0,1,2 Isr: 2 Topic: __consumer_offsets Partition: 37 Leader: 2 Replicas: 1,2,0 Isr: 2 Topic: __consumer_offsets Partition: 38 Leader: 2 Replicas: 2,0,1 Isr: 2 Topic: __consumer_offsets Partition: 39 Leader: 2 Replicas: 0,2,1 Isr: 2 Topic: __consumer_offsets Partition: 40 Leader: 2 Replicas: 1,0,2 Isr: 2 Topic: __consumer_offsets Partition: 41 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: __consumer_offsets Partition: 42 Leader: 2 Replicas: 0,1,2 Isr: 2 Topic: __consumer_offsets Partition: 43 Leader: 2 Replicas: 1,2,0 Isr: 2 Topic: __consumer_offsets Partition: 44 Leader: 2 Replicas: 2,0,1 Isr: 2 Topic: __consumer_offsets Partition: 45 Leader: 2 Replicas: 0,2,1 Isr: 2 Topic: __consumer_offsets Partition: 46 Leader: 2 Replicas: 1,0,2 Isr: 2 Top
[jira] [Comment Edited] (KAFKA-13077) Replication failing after unclean shutdown of ZK and all brokers
[ https://issues.apache.org/jira/browse/KAFKA-13077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456515#comment-17456515 ] Shivakumar edited comment on KAFKA-13077 at 12/9/21, 3:44 PM: -- Hi [~junrao] here is the summary of our issue, hope you can help us here Kafka(2.8.1), ZooKeeper(3.6.3) in eks kubernetes 1.19 kafka cluster size = 3 zk cluster size = 3 1) after rolling restart of zk , sometimes all the partitions of the topic become out of sync especially for broker 2, ISR=2 and Leader=2 and other brokers are out of ISR Topic: __consumer_offsets PartitionCount: 50 ReplicationFactor: 3 Configs: compression.type=producer,cleanup.policy=compact,segment.bytes=104857600 Topic: __consumer_offsets Partition: 0 Leader: 2 Replicas: 0,1,2 Isr: 2 Topic: __consumer_offsets Partition: 1 Leader: 2 Replicas: 1,2,0 Isr: 2 Topic: __consumer_offsets Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,1,0 Topic: __consumer_offsets Partition: 3 Leader: 2 Replicas: 0,2,1 Isr: 2 Topic: __consumer_offsets Partition: 4 Leader: 2 Replicas: 1,0,2 Isr: 2 Topic: __consumer_offsets Partition: 5 Leader: 2 Replicas: 2,1,0 Isr: 2 Topic: __consumer_offsets Partition: 6 Leader: 2 Replicas: 0,1,2 Isr: 2 Topic: __consumer_offsets Partition: 7 Leader: 2 Replicas: 1,2,0 Isr: 2 Topic: __consumer_offsets Partition: 8 Leader: 2 Replicas: 2,0,1 Isr: 2 Topic: __consumer_offsets Partition: 9 Leader: 2 Replicas: 0,2,1 Isr: 2 Topic: __consumer_offsets Partition: 10 Leader: 2 Replicas: 1,0,2 Isr: 2 Topic: __consumer_offsets Partition: 11 Leader: 2 Replicas: 2,1,0 Isr: 2 Topic: __consumer_offsets Partition: 12 Leader: 2 Replicas: 0,1,2 Isr: 2 Topic: __consumer_offsets Partition: 13 Leader: 2 Replicas: 1,2,0 Isr: 2 Topic: __consumer_offsets Partition: 14 Leader: 2 Replicas: 2,0,1 Isr: 2 Topic: __consumer_offsets Partition: 15 Leader: 2 Replicas: 0,2,1 Isr: 2 Topic: __consumer_offsets Partition: 16 Leader: 2 Replicas: 1,0,2 Isr: 2 Topic: __consumer_offsets Partition: 17 Leader: 2 Replicas: 2,1,0 Isr: 2 Topic: __consumer_offsets Partition: 18 Leader: 2 Replicas: 0,1,2 Isr: 2 Topic: __consumer_offsets Partition: 19 Leader: 2 Replicas: 1,2,0 Isr: 2 Topic: __consumer_offsets Partition: 20 Leader: 2 Replicas: 2,0,1 Isr: 2 Topic: __consumer_offsets Partition: 21 Leader: 2 Replicas: 0,2,1 Isr: 2 Topic: __consumer_offsets Partition: 22 Leader: 2 Replicas: 1,0,2 Isr: 2 Topic: __consumer_offsets Partition: 23 Leader: 2 Replicas: 2,1,0 Isr: 2 Topic: __consumer_offsets Partition: 24 Leader: 2 Replicas: 0,1,2 Isr: 2 Topic: __consumer_offsets Partition: 25 Leader: 2 Replicas: 1,2,0 Isr: 2 Topic: __consumer_offsets Partition: 26 Leader: 2 Replicas: 2,0,1 Isr: 2 Topic: __consumer_offsets Partition: 27 Leader: 2 Replicas: 0,2,1 Isr: 2 Topic: __consumer_offsets Partition: 28 Leader: 2 Replicas: 1,0,2 Isr: 2 Topic: __consumer_offsets Partition: 29 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: __consumer_offsets Partition: 30 Leader: 2 Replicas: 0,1,2 Isr: 2 Topic: __consumer_offsets Partition: 31 Leader: 2 Replicas: 1,2,0 Isr: 2 Topic: __consumer_offsets Partition: 32 Leader: 2 Replicas: 2,0,1 Isr: 2 Topic: __consumer_offsets Partition: 33 Leader: 2 Replicas: 0,2,1 Isr: 2 Topic: __consumer_offsets Partition: 34 Leader: 2 Replicas: 1,0,2 Isr: 2 Topic: __consumer_offsets Partition: 35 Leader: 2 Replicas: 2,1,0 Isr: 2 Topic: __consumer_offsets Partition: 36 Leader: 2 Replicas: 0,1,2 Isr: 2 Topic: __consumer_offsets Partition: 37 Leader: 2 Replicas: 1,2,0 Isr: 2 Topic: __consumer_offsets Partition: 38 Leader: 2 Replicas: 2,0,1 Isr: 2 Topic: __consumer_offsets Partition: 39 Leader: 2 Replicas: 0,2,1 Isr: 2 Topic: __consumer_offsets Partition: 40 Leader: 2 Replicas: 1,0,2 Isr: 2 Topic: __consumer_offsets Partition: 41 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: __consumer_offsets Partition: 42 Leader: 2 Replicas: 0,1,2 Isr: 2 Topic: __consumer_offsets Partition: 43 Leader: 2 Replicas: 1,2,0 Isr: 2 Topic: __consumer_offsets Partition: 44 Leader: 2 Replicas: 2,0,1 Isr: 2 Topic: __consumer_offsets Partition: 45 Leader: 2 Replicas: 0,2,1 Isr: 2 Topic: __consumer_offsets Partition: 46
[GitHub] [kafka] rondagostino commented on a change in pull request #11503: KAFKA-13456: Tighten KRaft config checks/constraints
rondagostino commented on a change in pull request #11503: URL: https://github.com/apache/kafka/pull/11503#discussion_r765940228 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1959,10 +1969,28 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami } } - def listenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = { -getMap(KafkaConfig.ListenerSecurityProtocolMapProp, getString(KafkaConfig.ListenerSecurityProtocolMapProp)) + def effectiveListenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = { Review comment: > validations in DynamicListenerConfig make sense in the context of these changes? We do have this code, which means we currently don't have to worry about this for KRaft: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L909-L911 However, we will support this eventually, so let's put that aside for the moment. We do have these checks, which continue to make sense: ``` if (immutableListenerConfigs(newConfig, listenerName.configPrefix) != immutableListenerConfigs(oldConfig, listenerName.configPrefix)) throw new ConfigException(s"Configs cannot be updated dynamically for existing listener $listenerName, " + "restart broker or create a new listener for update") if (oldConfig.effectiveListenerSecurityProtocolMap(listenerName) != newConfig.effectiveListenerSecurityProtocolMap(listenerName)) throw new ConfigException(s"Security protocol cannot be updated for existing listener $listenerName") ``` One thing that could occur is that we add a new SSL listener that is the first one that is non-PLAINTEXT and then all of a sudden we have removed any PLAINTEXT default values in the listener security protocol map, which could cause the new config to fail. But that's fine. And this would never happen in a non-toy cluster since those generally have at least one secured/non-PLAINTEXT listeners anyway. Another possibility is that we might impact existing controller listeners, but that impact would have to be checked for when we implement this dynamic reconfiguration oath for KRaft. My take on this is that the existing changes are okay in this context. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #11503: KAFKA-13456: Tighten KRaft config checks/constraints
rondagostino commented on a change in pull request #11503: URL: https://github.com/apache/kafka/pull/11503#discussion_r765940228 ## File path: core/src/main/scala/kafka/server/KafkaConfig.scala ## @@ -1959,10 +1969,28 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami } } - def listenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = { -getMap(KafkaConfig.ListenerSecurityProtocolMapProp, getString(KafkaConfig.ListenerSecurityProtocolMapProp)) + def effectiveListenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = { Review comment: > validations in DynamicListenerConfig make sense in the context of these changes? We do have this code, which means we currently don't have to worry about this for KRaft: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L909-L911 However, we will support this eventually, so let's put that aside for the moment. We do have these checks, which continue to make sense: ``` if (immutableListenerConfigs(newConfig, listenerName.configPrefix) != immutableListenerConfigs(oldConfig, listenerName.configPrefix)) throw new ConfigException(s"Configs cannot be updated dynamically for existing listener $listenerName, " + "restart broker or create a new listener for update") if (oldConfig.effectiveListenerSecurityProtocolMap(listenerName) != newConfig.effectiveListenerSecurityProtocolMap(listenerName)) throw new ConfigException(s"Security protocol cannot be updated for existing listener $listenerName") ``` One thing that could occur is that we add a new SSL listener that is the first one that is non-PLAINTEXT and then all of a sudden we have removed any PLAINTEXT default values in the listener security protocol map, which could cause the new config to fail. But that's fine. And this would never happen in a non-toy cluster since those generally have at least one secured/non-PLAINTEXT listener anyway. Another possibility is that we might impact existing controller listeners, but that impact would have to be checked for when we implement this dynamic reconfiguration path for KRaft. My take on this is that the existing changes are okay in this context. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #11345: Allow empty last segment to have missing offset index during recovery
jsancio commented on a change in pull request #11345: URL: https://github.com/apache/kafka/pull/11345#discussion_r765978037 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -443,7 +443,7 @@ private void onBecomeLeader(long currentTimeMs) { private void flushLeaderLog(LeaderState state, long currentTimeMs) { // We update the end offset before flushing so that parked fetches can return sooner. updateLeaderEndOffsetAndTimestamp(state, currentTimeMs); -log.flush(); +log.flush(false); Review comment: Yes. Looking at the code, we don't need to have segments to determine the log end offset. When loading the `KafkaMetadataLog`, KRaft `truncateFullyAndStartAt` using the latest snapshot if the log's end offset (`0`) is less than the latest snapshot's end offset. As @hachikuji mentioned, we only delete inactive segments if there is a snapshot that covers the largest offset in the segment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] blcksrx opened a new pull request #11588: KAFKA-13485: Restart connectors after RetriableException raised from Task::start()
blcksrx opened a new pull request #11588: URL: https://github.com/apache/kafka/pull/11588 If a `RetriableException` is raised from `Task::start()`, this doesn't trigger an attempt to start that connector again. I.e. the restart functionality currently is only implemented for exceptions raised from `poll()/put()`. Triggering restarts also upon failures during `start()` would be desirable, so to circumvent temporary failure conditions like a network hickup which currrently require a manual restart of the affected tasks, if a connector for instance establishes a database connection during `start()`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13488) Producer fails to recover if topic gets deleted (and gets auto-created)
[ https://issues.apache.org/jira/browse/KAFKA-13488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456673#comment-17456673 ] Prateek Agarwal commented on KAFKA-13488: - Hi [~hachikuji] [~jolshan] I have incorporated the review suggestions to the PR: [https://github.com/apache/kafka/pull/11552]. PTAL. > Producer fails to recover if topic gets deleted (and gets auto-created) > --- > > Key: KAFKA-13488 > URL: https://issues.apache.org/jira/browse/KAFKA-13488 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.3, 2.7.2, 2.8.1 >Reporter: Prateek Agarwal >Assignee: Prateek Agarwal >Priority: Major > Attachments: KAFKA-13488.patch > > > Producer currently fails to produce messages to a topic if the topic is > deleted and gets auto-created OR is created manually during the lifetime of > the producer (and certain other conditions are met - leaderEpochs of deleted > topic > 0). > > To reproduce, these are the steps which can be carried out: > 0) A cluster with 2 brokers 0 and 1 with auto.topic.create=true. > 1) Create a topic T with 2 partitions P0-> (0,1), P1-> (0,1) > 2) Reassign the partitions such that P0-> (1,0), P1-> (1,0). > 2) Create a producer P and send few messages which land on all the TPs of > topic T. > 3) Delete the topic T > 4) Immediately, send a new message from producer P, this message will be > failed to send and eventually timed out. > A test-case which fails with the above steps is added at the end as well as a > patch file. > > This happens after leaderEpoch (KIP-320) was introduced in the > MetadataResponse KAFKA-7738. There is a solution attempted to fix this issue > in KAFKA-12257, but the solution has a bug due to which the above use-case > still fails. > > *Issue in the solution of KAFKA-12257:* > {code:java} > // org.apache.kafka.clients.Metadata.handleMetadataResponse(): >... > Map topicIds = new HashMap<>(); > Map oldTopicIds = cache.topicIds(); > for (MetadataResponse.TopicMetadata metadata : > metadataResponse.topicMetadata()) { > String topicName = metadata.topic(); > Uuid topicId = metadata.topicId(); > topics.add(topicName); > // We can only reason about topic ID changes when both IDs are > valid, so keep oldId null unless the new metadata contains a topic ID > Uuid oldTopicId = null; > if (!Uuid.ZERO_UUID.equals(topicId)) { > topicIds.put(topicName, topicId); > oldTopicId = oldTopicIds.get(topicName); > } else { > topicId = null; > } > ... > } {code} > With every new call to {{{}handleMetadataResponse{}}}(), {{cache.topicIds()}} > gets created afresh. When a topic is deleted and created immediately soon > afterwards (because of auto.create being true), producer's call to > {{MetadataRequest}} for the deleted topic T will result in a > {{UNKNOWN_TOPIC_OR_PARTITION}} or {{LEADER_NOT_AVAILABLE}} error > {{MetadataResponse}} depending on which point of topic recreation metadata is > being asked at. In the case of errors, TopicId returned back in the response > is {{{}Uuid.ZERO_UUID{}}}. As seen in the above logic, if the topicId > received is ZERO, the method removes the earlier topicId entry from the cache. > Now, when a non-Error Metadata Response does come back for the newly created > topic T, it will have a non-ZERO topicId now but the leaderEpoch for the > partitions will mostly be ZERO. This situation will lead to rejection of the > new MetadataResponse if the older LeaderEpoch was >0 (for more details, refer > to KAFKA-12257). Because of the rejection of the metadata, producer will > never get to know the new Leader of the TPs of the newly created topic. > > {{*}} 1. Solution / Fix (Preferred){*}: > Client's metadata should keep on remembering the old topicId till: > 1) response for the TP has ERRORs > 2) topicId entry was already present in the cache earlier > 3) retain time is not expired > {code:java} > --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java > +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java > @@ -336,6 +336,10 @@ public class Metadata implements Closeable { > topicIds.put(topicName, topicId); > oldTopicId = oldTopicIds.get(topicName); > } else { > +// Retain the old topicId for comparison with newer TopicId > created later. This is only needed till retainMs > +if (metadata.error() != Errors.NONE && > oldTopicIds.get(topicName) != null && retainTopic(topicName, false, nowMs)) > +topicIds.put(topi
[GitHub] [kafka] vvcephei commented on pull request #11585: MINOR: Cleanup for #11513
vvcephei commented on pull request #11585: URL: https://github.com/apache/kafka/pull/11585#issuecomment-990152230 Failures were unrelated: ``` Build / JDK 11 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testElectionResultOutput, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testPathToJsonFile, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testPathToJsonFile, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #11585: MINOR: Cleanup for #11513
vvcephei merged pull request #11585: URL: https://github.com/apache/kafka/pull/11585 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor
cadonna commented on a change in pull request #10851: URL: https://github.com/apache/kafka/pull/10851#discussion_r765909088 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.assignment; + +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; + +import static org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys; + +/** + * Distributes standby tasks over different tag dimensions. + * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} are taken into account. + * Standby task distribution is on a best-effort basis. For example, if there are not enough clients available + * on different tag dimensions compared to an active and corresponding standby task, + * in that case, the algorithm will fall back to distributing tasks on least-loaded clients. + */ +class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor { +private static final Logger log = LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class); + +@Override +public boolean assign(final Map clients, + final Set allTaskIds, + final Set statefulTaskIds, + final AssignorConfiguration.AssignmentConfigs configs) { +final int numStandbyReplicas = configs.numStandbyReplicas; +final Set rackAwareAssignmentTags = new HashSet<>(configs.rackAwareAssignmentTags); + +final Map tasksToRemainingStandbys = computeTasksToRemainingStandbys( +numStandbyReplicas, +allTaskIds +); + +final Map> tagKeyToValues = new HashMap<>(); +final Map> tagEntryToClients = new HashMap<>(); + +fillClientsTagStatistics(clients, tagEntryToClients, tagKeyToValues); + +for (final TaskId statefulTaskId : statefulTaskIds) { +for (final Map.Entry entry : clients.entrySet()) { +final UUID clientId = entry.getKey(); +final ClientState clientState = entry.getValue(); + +if (clientState.activeTasks().contains(statefulTaskId)) { +assignStandbyTasksForActiveTask( +numStandbyReplicas, +statefulTaskId, +clientId, +rackAwareAssignmentTags, +clients, +tasksToRemainingStandbys, +tagKeyToValues, +tagEntryToClients +); +} +} +} + +// returning false, because standby task assignment will never require a follow-up probing rebalance. +return false; +} + +@Override +public boolean isAllowedTaskMovement(final ClientState source, final ClientState destination) { +final Map sourceClientTags = source.clientTags(); +final Map destinationClientTags = destination.clientTags(); + +for (final Entry sourceClientTagEntry : sourceClientTags.entrySet()) { +if (!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey( { +return false; +} +} + +return true; +} + +private static void fillClientsTagStatistics(final Map clientStates, + final Map> tagEntryToClients, + final Map> tagKeyToValues) { +for (final Entry clientStateEntry : clientStates.entrySet()) { +final UUID clientId = clientStateEntry.getKey(); +final ClientState
[GitHub] [kafka] vvcephei commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2
vvcephei commented on a change in pull request #11581: URL: https://github.com/apache/kafka/pull/11581#discussion_r766152319 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -62,4 +71,32 @@ public static void updatePosition( position.withComponent(meta.topic(), meta.partition(), meta.offset()); } } + +public static boolean isPermitted( +final Position position, +final PositionBound positionBound, +final int partition +) { +if (positionBound.isUnbounded()) { Review comment: Hmm, I think I finally see what you are talking about. Let me give it a shot... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2
guozhangwang commented on a change in pull request #11581: URL: https://github.com/apache/kafka/pull/11581#discussion_r766163659 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -62,4 +71,32 @@ public static void updatePosition( position.withComponent(meta.topic(), meta.partition(), meta.offset()); } } + +public static boolean isPermitted( +final Position position, +final PositionBound positionBound, +final int partition +) { +if (positionBound.isUnbounded()) { +return true; +} else { +final Position bound = positionBound.position(); +for (final String topic : bound.getTopics()) { +final Map partitionBounds = bound.getBound(topic); +final Map seenPartitionBounds = position.getBound(topic); Review comment: qq: why we name this function of position `getBound` while it seems just retrieving the current positions of the topic, not really a bound? Also for this local variables, similarly, why name it `seenPartitionBounds` if the returned value semantics are not really "bounds"? Should that just be `currentOffsets` or something? ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java ## @@ -107,10 +107,10 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, // we can skip flushing to downstream as well as writing to underlying store if (rawNewValue != null || rawOldValue != null) { // we need to get the old values if needed, and then put to store, and then flush -wrapped().put(entry.key(), entry.newValue()); - final ProcessorRecordContext current = context.recordContext(); context.setRecordContext(entry.entry().context()); +wrapped().put(entry.key(), entry.newValue()); Review comment: Why reorder the steps here? ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1764,20 +1764,18 @@ protected void processStreamThread(final Consumer consumer) { ); } final StateQueryResult result = new StateQueryResult<>(); +final Set handledPartitions = new HashSet<>(); Review comment: Why moving it out of the else block scope? ## File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ## @@ -125,22 +128,81 @@ final long deadline = start + DEFAULT_TIMEOUT; do { +if (Thread.currentThread().isInterrupted()) { +fail("Test was interrupted."); +} final StateQueryResult result = kafkaStreams.query(request); -if (result.getPartitionResults().keySet().containsAll(partitions) -|| result.getGlobalResult() != null) { +if (result.getPartitionResults().keySet().containsAll(partitions)) { return result; } else { -try { -Thread.sleep(100L); -} catch (final InterruptedException e) { -throw new RuntimeException(e); -} +sleep(100L); } } while (System.currentTimeMillis() < deadline); throw new TimeoutException("The query never returned the desired partitions"); } +/** + * Repeatedly runs the query until the response is valid and then return the response. + * + * Validity in this case means that the response position is up to the specified bound. + * + * Once position bounding is generally supported, we should migrate tests to wait on the + * expected response position. + */ +public static StateQueryResult iqv2WaitForResult( +final KafkaStreams kafkaStreams, +final StateQueryRequest request) { + +final long start = System.currentTimeMillis(); +final long deadline = start + DEFAULT_TIMEOUT; + +StateQueryResult result; +do { +if (Thread.currentThread().isInterrupted()) { +fail("Test was interrupted."); +} + +result = kafkaStreams.query(request); +final LinkedList> allResults = getAllResults(result); + +if (allResults.isEmpty()) { Review comment: Just curious when would the result be empty actually? I thought even if the tasks were not initialized, we would still return the `NOT_PRESENT` error instead of returning empty. ## File path: streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java ## @@ -97,11 +97,15 @@ public void addResult(final int partition, final QueryResult r) { * prior observations. */ public Position getPosition() { -Posi
[jira] [Resolved] (KAFKA-13512) topicIdsToNames and topicNamesToIds allocate unnecessary maps
[ https://issues.apache.org/jira/browse/KAFKA-13512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin McCabe resolved KAFKA-13512. -- Resolution: Fixed > topicIdsToNames and topicNamesToIds allocate unnecessary maps > - > > Key: KAFKA-13512 > URL: https://issues.apache.org/jira/browse/KAFKA-13512 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.1.0 >Reporter: Justine Olshan >Assignee: Justine Olshan >Priority: Blocker > Fix For: 3.1.0 > > > Currently we write the methods as follows: > {{def topicNamesToIds(): util.Map[String, Uuid] = {}} > {{ new util.HashMap(metadataSnapshot.topicIds.asJava)}} > {{}}} > We do not need to allocate a new map however, we can simply use > {{Collections.unmodifiableMap(metadataSnapshot.topicIds.asJava)}} > We can do something similar for the topicIdsToNames implementation. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13500) Consider adding a dedicated standby consumer
[ https://issues.apache.org/jira/browse/KAFKA-13500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456736#comment-17456736 ] Guozhang Wang commented on KAFKA-13500: --- [~mjsax] What I was wondering is that if we moved the restoration / standby to a separate thread, do we still need to separate restoration and standbys with separate consumers. My thinkings are that: 1) the primary issue was that the same thread were doing standby maintenance and regular processing, and if that's no longer the case, then keeping the state recovery and the standby maintenance with the same configs seems not causing big issues; 2) on separate threads, the monitoring would be clearer as well. > Consider adding a dedicated standby consumer > > > Key: KAFKA-13500 > URL: https://issues.apache.org/jira/browse/KAFKA-13500 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Minor > > We currently use the restore consumer to recover state for active tasks and > to maintain standby tasks during regular processing. This setup has a few > disadvantages > # During state recovery, we might want to apply different consumer configs > compared to standby maintenance during regular processing. > # It make monitoring confusing: because we never commit offsets for > changelog topics, users can only monitor the client's "lag metric" to > observer restore progress (without the need to register a restore listener). > However, if they are interesting in a restore metric, during regular > processing it would report the standby lag, which can be rather confusing. > Because the restore consumer does not use consumer group management, it seems > to be low overhead to actually use a third consumer, because there won't be > any heartbeat thread. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] guozhangwang commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
guozhangwang commented on a change in pull request #11424: URL: https://github.com/apache/kafka/pull/11424#discussion_r766183746 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java ## @@ -128,6 +131,22 @@ public static Sensor activeBufferedRecordsSensor(final String threadId, return sensor; } +public static Sensor totalBytesSensor(final String threadId, Review comment: @ableegoldman ping :) @vamossagar12 I'm just back from a long vacation and is reloading all my review memory here :) is this ready for another review now besides this comment for naming? If yes could you rebase and ping me? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 opened a new pull request #11589: MINOR: update log and method name
wcarlson5 opened a new pull request #11589: URL: https://github.com/apache/kafka/pull/11589 update an unclear log message and method name ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies have a blocking option
wcarlson5 commented on a change in pull request #11479: URL: https://github.com/apache/kafka/pull/11479#discussion_r766186102 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java ## @@ -108,6 +124,44 @@ private void unlock() { version.topologyLock.unlock(); } +public Collection sourceTopicsForTopology(final String name) { +return builders.get(name).sourceTopicCollection(); +} + +public boolean needsUpdate(final String threadName) { +return threadVersions.get(threadName) < topologyVersion(); +} + +public void registerThread(final String threadName) { +threadVersions.put(threadName, 0L); +} + +public void unregisterThread(final String threadName) { +threadVersions.remove(threadName); +} + +public void maybeNotifyTopologyVersionWaiters(final String threadName) { +try { +lock(); +final Iterator iterator = version.activeTopologyWaiters.listIterator(); +TopologyVersionWaiters topologyVersionWaiters; +threadVersions.put(threadName, topologyVersion()); +while (iterator.hasNext()) { +topologyVersionWaiters = iterator.next(); +final long topologyVersionWaitersVersion = topologyVersionWaiters.topologyVersion; +if (topologyVersionWaitersVersion <= threadVersions.get(threadName)) { +if (threadVersions.values().stream().allMatch(t -> t >= topologyVersionWaitersVersion)) { +topologyVersionWaiters.future.complete(null); +iterator.remove(); +log.info("Thread {} is now on topology version {}", threadName, topologyVersionWaiters.topologyVersion); Review comment: Those are good points. I updated the log and method name in a follow up. I thought about extracting the version update to the caller but I would rather have the coupled so I don't have to worry about them getting out of sync. https://github.com/apache/kafka/pull/11589 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #11589: MINOR: update log and method name
wcarlson5 commented on pull request #11589: URL: https://github.com/apache/kafka/pull/11589#issuecomment-990318661 @guozhangwang Some changes you requested in a follow up PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11242: [WIP] MINOR: POC for KIP-591: Add config to set default store impl class
guozhangwang commented on pull request #11242: URL: https://github.com/apache/kafka/pull/11242#issuecomment-990344744 > Actually, the reason why we don't choose `enum` is because that only works for our built-in state stores, not for **custom state store**. So, with the `StoreImplementation` interface, user can implement their state stores their, and set as default stores if they want. I understand that, but I'm still having some reservations about this approach. Let me share it on the DISCUSS thread so that we can engage larger groups. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13528) KRaft RegisterBroker should validate that the cluster ID matches
Colin McCabe created KAFKA-13528: Summary: KRaft RegisterBroker should validate that the cluster ID matches Key: KAFKA-13528 URL: https://issues.apache.org/jira/browse/KAFKA-13528 Project: Kafka Issue Type: Bug Reporter: Colin McCabe -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] vvcephei commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2
vvcephei commented on a change in pull request #11581: URL: https://github.com/apache/kafka/pull/11581#discussion_r766220034 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -62,4 +71,32 @@ public static void updatePosition( position.withComponent(meta.topic(), meta.partition(), meta.offset()); } } + +public static boolean isPermitted( +final Position position, +final PositionBound positionBound, +final int partition +) { +if (positionBound.isUnbounded()) { Review comment: Ok, so the situation is a little different now than when I originally added a separate PositionBound class. At that point, we also needed a bound representing "latest" (i.e., that the query is executed on an active running task), for which there's no sensible Position representation. My mind was still half in that world last time we spoke about this point. Now, I can see that the only special position bound is the "unbounded" one, which is the same thing semantically as an empty position. I just tried out a change to completely get rid of the PositionBound class, but I think it makes the API more confusing. As a user, it seems more clear to create an "unbounded" PositionBound than an "empty" Position. I think it makes sense if you think about in terms of comparing vectors (an empty vector is "less than" all other vectors, so when it's used as a lower bound, it permits everything). But I don't want people to have to think that hard about it. Another option I considered is to add a `Position.unbounded()` factory, but that doesn't completely make sense either, since a Position is just a point in vector space. It doesn't bound anything by itself, though it can be used as a bound. Plus, I think query handling implementation, both for Streams and for custom user stores, is easier to keep track of if you have two types. You simply can't mix up which Position was supposed to be the bound. On balance, it still seems better to keep the separate PositionBound class. However, I did realize that the logic of PositionBound and the internal logic in Streams can be simplified with this observation that an unbounded position is equivalent to an empty position. I just added that commit to this PR. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2
vvcephei commented on a change in pull request #11581: URL: https://github.com/apache/kafka/pull/11581#discussion_r766221104 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1764,20 +1764,18 @@ protected void processStreamThread(final Consumer consumer) { ); } final StateQueryResult result = new StateQueryResult<>(); +final Set handledPartitions = new HashSet<>(); Review comment: Oh, sorry about that. That was an artifact from a recent refactor. But now that I'm looking at it, I realize we don't need a separate set for tracking this, since we can use the result's partition set itself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2
vvcephei commented on a change in pull request #11581: URL: https://github.com/apache/kafka/pull/11581#discussion_r766221800 ## File path: streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java ## @@ -97,11 +97,15 @@ public void addResult(final int partition, final QueryResult r) { * prior observations. */ public Position getPosition() { -Position position = Position.emptyPosition(); -for (final QueryResult r : partitionResults.values()) { -position = position.merge(r.getPosition()); +if (globalResult != null) { Review comment: That's correct, but since the old version of this method was incorrect, I figured I'd go ahead and fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2
vvcephei commented on a change in pull request #11581: URL: https://github.com/apache/kafka/pull/11581#discussion_r766223374 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java ## @@ -107,10 +107,10 @@ private void putAndMaybeForward(final ThreadCache.DirtyEntry entry, // we can skip flushing to downstream as well as writing to underlying store if (rawNewValue != null || rawOldValue != null) { // we need to get the old values if needed, and then put to store, and then flush -wrapped().put(entry.key(), entry.newValue()); - final ProcessorRecordContext current = context.recordContext(); context.setRecordContext(entry.entry().context()); +wrapped().put(entry.key(), entry.newValue()); Review comment: This is actually important. We were not previously setting the record context before passing the cache-evicted record down to the lower store layers. Previously, the context was incorrectly not set during that operation, and if stores relied on the record context (via the old ProcessorContext), they were getting the wrong metadata. Apparently, this work is the first time we added a feature in Streams that actually relied on that metadata. What is happening now is that we use that metadata to set the Position in the lower store, and if it's not set, then we get an error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2
vvcephei commented on a change in pull request #11581: URL: https://github.com/apache/kafka/pull/11581#discussion_r766226822 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java ## @@ -125,22 +128,81 @@ final long deadline = start + DEFAULT_TIMEOUT; do { +if (Thread.currentThread().isInterrupted()) { +fail("Test was interrupted."); +} final StateQueryResult result = kafkaStreams.query(request); -if (result.getPartitionResults().keySet().containsAll(partitions) -|| result.getGlobalResult() != null) { +if (result.getPartitionResults().keySet().containsAll(partitions)) { return result; } else { -try { -Thread.sleep(100L); -} catch (final InterruptedException e) { -throw new RuntimeException(e); -} +sleep(100L); } } while (System.currentTimeMillis() < deadline); throw new TimeoutException("The query never returned the desired partitions"); } +/** + * Repeatedly runs the query until the response is valid and then return the response. + * + * Validity in this case means that the response position is up to the specified bound. + * + * Once position bounding is generally supported, we should migrate tests to wait on the + * expected response position. + */ +public static StateQueryResult iqv2WaitForResult( +final KafkaStreams kafkaStreams, +final StateQueryRequest request) { + +final long start = System.currentTimeMillis(); +final long deadline = start + DEFAULT_TIMEOUT; + +StateQueryResult result; +do { +if (Thread.currentThread().isInterrupted()) { +fail("Test was interrupted."); +} + +result = kafkaStreams.query(request); +final LinkedList> allResults = getAllResults(result); + +if (allResults.isEmpty()) { Review comment: You'll only get a `NOT_PRESENT` response if you specifically request a partition. The default is to just get all locally present partitions. This check is actually just an assumption that in the context of an integration test, if you call this method, you're probably expecting at least one result. It is good to note, though, that if a test is looking for results for a specific set of partitions, it should include that in the query. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java ## @@ -62,4 +71,32 @@ public static void updatePosition( position.withComponent(meta.topic(), meta.partition(), meta.offset()); } } + +public static boolean isPermitted( +final Position position, +final PositionBound positionBound, +final int partition +) { +if (positionBound.isUnbounded()) { +return true; +} else { +final Position bound = positionBound.position(); +for (final String topic : bound.getTopics()) { +final Map partitionBounds = bound.getBound(topic); +final Map seenPartitionBounds = position.getBound(topic); Review comment: Woah, you're absolutely right. I'll fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #11590: HOTFIX: fix failing StreamsMetadataStateTest tests
ableegoldman opened a new pull request #11590: URL: https://github.com/apache/kafka/pull/11590 Followup to [#11562](https://github.com/apache/kafka/commit/e20f102298cc3e056e079cbd1cb33913a9ade0ce# ) to fix tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #11590: HOTFIX: fix failing StreamsMetadataStateTest tests
ableegoldman merged pull request #11590: URL: https://github.com/apache/kafka/pull/11590 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13388) Kafka Producer nodes stuck in CHECKING_API_VERSIONS
[ https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456854#comment-17456854 ] David Mao commented on KAFKA-13388: --- [~dhofftgt] Looking at where the NetworkClient enters the CHECKING_API_VERSIONS state, we see: {code:java} if (discoverBrokerVersions) { this.connectionStates.checkingApiVersions(node); nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder()); {code} which is a separate queue for nodes needing to send the api versions request. Then in {code:java} private void handleInitiateApiVersionRequests(long now) { Iterator> iter = nodesNeedingApiVersionsFetch.entrySet().iterator(); while (iter.hasNext()) { Map.Entry entry = iter.next(); String node = entry.getKey(); if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) { log.debug("Initiating API versions fetch from node {}.", node); ApiVersionsRequest.Builder apiVersionRequestBuilder = entry.getValue(); ClientRequest clientRequest = newClientRequest(node, apiVersionRequestBuilder, now, true); doSend(clientRequest, true, now); iter.remove(); }{code} we only send out the api versions request if the channel is ready (TLS handshake complete, SASL handshake complete). This is actually a pretty insidious bug because I think we end up in a state where we do not apply any request timeout to the channel, since the inflight requests are empty. > Kafka Producer nodes stuck in CHECKING_API_VERSIONS > --- > > Key: KAFKA-13388 > URL: https://issues.apache.org/jira/browse/KAFKA-13388 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: David Hoffman >Priority: Minor > Attachments: Screen Shot 2021-10-25 at 10.28.48 AM.png, > image-2021-10-21-13-42-06-528.png > > > I have been seeing expired batch errors in my app. > {code:java} > org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for > xxx-17:120002 ms has passed since batch creation > {code} > I would have assumed a request timout or connection timeout should have also > been logged. I could not find any other associated errors. > I added some instrumenting to my app and have traced this down to broker > connections hanging in CHECKING_API_VERSIONS state. -It appears there is no > effective timeout for Kafka Producer broker connections in > CHECKING_API_VERSIONS state.- > In the code see the after the NetworkClient connects to a broker node it > makes a request to check api versions, when it receives the response it marks > the node as ready. -I am seeing that sometimes a reply is not received for > the check api versions request the connection just hangs in > CHECKING_API_VERSIONS state until it is disposed I assume after the idle > connection timeout.- > Update: not actually sure what causes the connection to get stuck in > CHECKING_API_VERSIONS. > -I am guessing the connection setup timeout should be still in play for this, > but it is not.- > -There is a connectingNodes set that is consulted when checking timeouts and > the node is removed- > -when ClusterConnectionStates.checkingApiVersions(String id) is called to > transition the node into CHECKING_API_VERSIONS- -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (KAFKA-13388) Kafka Producer nodes stuck in CHECKING_API_VERSIONS
[ https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456854#comment-17456854 ] David Mao edited comment on KAFKA-13388 at 12/10/21, 1:56 AM: -- [~dhofftgt] Looking at where the NetworkClient enters the CHECKING_API_VERSIONS state, we see: {code:java} if (discoverBrokerVersions) { this.connectionStates.checkingApiVersions(node); nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder()); {code} which is a separate queue for nodes needing to send the api versions request. Then in {code:java} private void handleInitiateApiVersionRequests(long now) { Iterator> iter = nodesNeedingApiVersionsFetch.entrySet().iterator(); while (iter.hasNext()) { Map.Entry entry = iter.next(); String node = entry.getKey(); if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) { log.debug("Initiating API versions fetch from node {}.", node); ApiVersionsRequest.Builder apiVersionRequestBuilder = entry.getValue(); ClientRequest clientRequest = newClientRequest(node, apiVersionRequestBuilder, now, true); doSend(clientRequest, true, now); iter.remove(); }{code} we only send out the api versions request if the channel is ready (TLS handshake complete, SASL handshake complete). This is actually a pretty insidious bug because I think we end up in a state where we do not apply any request timeout to the channel if there is some problem completing any of the handshaking/authentication steps, since the inflight requests are empty. was (Author: david.mao): [~dhofftgt] Looking at where the NetworkClient enters the CHECKING_API_VERSIONS state, we see: {code:java} if (discoverBrokerVersions) { this.connectionStates.checkingApiVersions(node); nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder()); {code} which is a separate queue for nodes needing to send the api versions request. Then in {code:java} private void handleInitiateApiVersionRequests(long now) { Iterator> iter = nodesNeedingApiVersionsFetch.entrySet().iterator(); while (iter.hasNext()) { Map.Entry entry = iter.next(); String node = entry.getKey(); if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) { log.debug("Initiating API versions fetch from node {}.", node); ApiVersionsRequest.Builder apiVersionRequestBuilder = entry.getValue(); ClientRequest clientRequest = newClientRequest(node, apiVersionRequestBuilder, now, true); doSend(clientRequest, true, now); iter.remove(); }{code} we only send out the api versions request if the channel is ready (TLS handshake complete, SASL handshake complete). This is actually a pretty insidious bug because I think we end up in a state where we do not apply any request timeout to the channel, since the inflight requests are empty. > Kafka Producer nodes stuck in CHECKING_API_VERSIONS > --- > > Key: KAFKA-13388 > URL: https://issues.apache.org/jira/browse/KAFKA-13388 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: David Hoffman >Priority: Minor > Attachments: Screen Shot 2021-10-25 at 10.28.48 AM.png, > image-2021-10-21-13-42-06-528.png > > > I have been seeing expired batch errors in my app. > {code:java} > org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for > xxx-17:120002 ms has passed since batch creation > {code} > I would have assumed a request timout or connection timeout should have also > been logged. I could not find any other associated errors. > I added some instrumenting to my app and have traced this down to broker > connections hanging in CHECKING_API_VERSIONS state. -It appears there is no > effective timeout for Kafka Producer broker connections in > CHECKING_API_VERSIONS state.- > In the code see the after the NetworkClient connects to a broker node it > makes a request to check api versions, when it receives the response it marks > the node as ready. -I am seeing that sometimes a reply is not received for > the check api versions request the connection just hangs in > CHECKING_API_VERSIONS state until it is disposed I assume after the idle > connection timeout.- > Update: not actually sure what causes the connection to get stuck in > CHECKING_API_VERSIONS. > -I am guessing the connection setup timeout should be still in play for this, > but it is not.- > -There is a connectingNodes set that is consulted when checking timeouts and > the node is removed- > -when ClusterConnectionStates.checkingApiVersions(String id) is called to > transition the node into CHECKING_
[jira] [Comment Edited] (KAFKA-13388) Kafka Producer nodes stuck in CHECKING_API_VERSIONS
[ https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456854#comment-17456854 ] David Mao edited comment on KAFKA-13388 at 12/10/21, 2:10 AM: -- [~dhofftgt] Looking at where the NetworkClient enters the CHECKING_API_VERSIONS state, we see: {code:java} if (discoverBrokerVersions) { this.connectionStates.checkingApiVersions(node); nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder()); {code} which is a separate queue for nodes needing to send the api versions request. Then in {code:java} private void handleInitiateApiVersionRequests(long now) { Iterator> iter = nodesNeedingApiVersionsFetch.entrySet().iterator(); while (iter.hasNext()) { Map.Entry entry = iter.next(); String node = entry.getKey(); if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) { log.debug("Initiating API versions fetch from node {}.", node); ApiVersionsRequest.Builder apiVersionRequestBuilder = entry.getValue(); ClientRequest clientRequest = newClientRequest(node, apiVersionRequestBuilder, now, true); doSend(clientRequest, true, now); iter.remove(); }{code} we only send out the api versions request if the channel is ready (TLS handshake complete, SASL handshake complete). This is actually a pretty insidious bug because I think we end up in a state where we do not apply any request timeout to the channel if there is some delay in completing any of the handshaking/authentication steps, since the inflight requests are empty. was (Author: david.mao): [~dhofftgt] Looking at where the NetworkClient enters the CHECKING_API_VERSIONS state, we see: {code:java} if (discoverBrokerVersions) { this.connectionStates.checkingApiVersions(node); nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder()); {code} which is a separate queue for nodes needing to send the api versions request. Then in {code:java} private void handleInitiateApiVersionRequests(long now) { Iterator> iter = nodesNeedingApiVersionsFetch.entrySet().iterator(); while (iter.hasNext()) { Map.Entry entry = iter.next(); String node = entry.getKey(); if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) { log.debug("Initiating API versions fetch from node {}.", node); ApiVersionsRequest.Builder apiVersionRequestBuilder = entry.getValue(); ClientRequest clientRequest = newClientRequest(node, apiVersionRequestBuilder, now, true); doSend(clientRequest, true, now); iter.remove(); }{code} we only send out the api versions request if the channel is ready (TLS handshake complete, SASL handshake complete). This is actually a pretty insidious bug because I think we end up in a state where we do not apply any request timeout to the channel if there is some problem completing any of the handshaking/authentication steps, since the inflight requests are empty. > Kafka Producer nodes stuck in CHECKING_API_VERSIONS > --- > > Key: KAFKA-13388 > URL: https://issues.apache.org/jira/browse/KAFKA-13388 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: David Hoffman >Priority: Minor > Attachments: Screen Shot 2021-10-25 at 10.28.48 AM.png, > image-2021-10-21-13-42-06-528.png > > > I have been seeing expired batch errors in my app. > {code:java} > org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for > xxx-17:120002 ms has passed since batch creation > {code} > I would have assumed a request timout or connection timeout should have also > been logged. I could not find any other associated errors. > I added some instrumenting to my app and have traced this down to broker > connections hanging in CHECKING_API_VERSIONS state. -It appears there is no > effective timeout for Kafka Producer broker connections in > CHECKING_API_VERSIONS state.- > In the code see the after the NetworkClient connects to a broker node it > makes a request to check api versions, when it receives the response it marks > the node as ready. -I am seeing that sometimes a reply is not received for > the check api versions request the connection just hangs in > CHECKING_API_VERSIONS state until it is disposed I assume after the idle > connection timeout.- > Update: not actually sure what causes the connection to get stuck in > CHECKING_API_VERSIONS. > -I am guessing the connection setup timeout should be still in play for this, > but it is not.- > -There is a connectingNodes set that is consulted when checking timeouts and > the node is removed- > -when ClusterConnectionStates
[jira] [Created] (KAFKA-13529) kafka mm2 consumer configurattion invalid
chengang created KAFKA-13529: Summary: kafka mm2 consumer configurattion invalid Key: KAFKA-13529 URL: https://issues.apache.org/jira/browse/KAFKA-13529 Project: Kafka Issue Type: Bug Affects Versions: 3.0.0, 2.8.0 Reporter: chengang set auto.offset.reset = latest but MirrorMaker2 seeks source topic to offset 0 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13529) kafka mm2 consumer configurattion invalid
[ https://issues.apache.org/jira/browse/KAFKA-13529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] chengang updated KAFKA-13529: - Labels: mirror-maker (was: ) > kafka mm2 consumer configurattion invalid > - > > Key: KAFKA-13529 > URL: https://issues.apache.org/jira/browse/KAFKA-13529 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0, 3.0.0 >Reporter: chengang >Priority: Major > Labels: mirror-maker > > set auto.offset.reset = latest > but MirrorMaker2 seeks source topic to offset 0 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13388) Kafka Producer nodes stuck in CHECKING_API_VERSIONS
[ https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456888#comment-17456888 ] Lucas Bradstreet commented on KAFKA-13388: -- I'm pretty sure [~david.mao] is correct. If we ever get into CHECKING_API_VERSIONS state we won't apply the connection timeouts or request timeouts. We really need to be applying a timeout to this state. I have a reproducer where we drop API_VERSIONS at random and can see that our normal timeouts don't apply. I think this is why the client gets all the way to expiring the batches without a corresponding disconnect. > Kafka Producer nodes stuck in CHECKING_API_VERSIONS > --- > > Key: KAFKA-13388 > URL: https://issues.apache.org/jira/browse/KAFKA-13388 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: David Hoffman >Priority: Minor > Attachments: Screen Shot 2021-10-25 at 10.28.48 AM.png, > image-2021-10-21-13-42-06-528.png > > > I have been seeing expired batch errors in my app. > {code:java} > org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for > xxx-17:120002 ms has passed since batch creation > {code} > I would have assumed a request timout or connection timeout should have also > been logged. I could not find any other associated errors. > I added some instrumenting to my app and have traced this down to broker > connections hanging in CHECKING_API_VERSIONS state. -It appears there is no > effective timeout for Kafka Producer broker connections in > CHECKING_API_VERSIONS state.- > In the code see the after the NetworkClient connects to a broker node it > makes a request to check api versions, when it receives the response it marks > the node as ready. -I am seeing that sometimes a reply is not received for > the check api versions request the connection just hangs in > CHECKING_API_VERSIONS state until it is disposed I assume after the idle > connection timeout.- > Update: not actually sure what causes the connection to get stuck in > CHECKING_API_VERSIONS. > -I am guessing the connection setup timeout should be still in play for this, > but it is not.- > -There is a connectingNodes set that is consulted when checking timeouts and > the node is removed- > -when ClusterConnectionStates.checkingApiVersions(String id) is called to > transition the node into CHECKING_API_VERSIONS- -- This message was sent by Atlassian Jira (v8.20.1#820001)