[GitHub] [kafka] ableegoldman commented on a change in pull request #11584: MINOR: improve logging

2021-12-09 Thread GitBox


ableegoldman commented on a change in pull request #11584:
URL: https://github.com/apache/kafka/pull/11584#discussion_r765559208



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -288,7 +288,7 @@ private Exception invokeOnAssignment(final 
ConsumerPartitionAssignor assignor, f
 }
 
 private Exception invokePartitionsAssigned(final Set 
assignedPartitions) {
-log.info("Adding newly assigned partitions: {}", 
Utils.join(assignedPartitions, ", "));
+log.info("Adding newly assigned partitions: {}", 
Utils.join(assignedPartitions.stream().sorted().toArray(), ", "));

Review comment:
   FYI `TopicPartition` doesn't implement `Comparable` so you can't use 
`.stream().sorted()`

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -215,7 +215,7 @@ public String protocolType() {
 
 @Override
 protected JoinGroupRequestData.JoinGroupRequestProtocolCollection 
metadata() {
-log.debug("Joining group with current subscription: {}", 
subscriptions.subscription());
+log.debug("Joining group with current subscription: {}", 
Utils.join(subscriptions.subscription().stream().sorted().toArray(), ", "));

Review comment:
   Could we just keep the subscription sorted to begin with? (ie store in a 
sorted data structure)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #11575: KAFKA-13511: Update TimestampConverter support unix epoch as millis, micros, and seconds

2021-12-09 Thread GitBox


mimaison commented on pull request #11575:
URL: https://github.com/apache/kafka/pull/11575#issuecomment-989705328


   Hi @twobeeb, thanks for the PR, this looks like a useful addition. However 
as this is adding a new configuration, this change requires a KIP before we can 
accept it.
   Take a look at 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals. 
This page details the KIP process. Let me know if you have any questions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11584: MINOR: improve logging

2021-12-09 Thread GitBox


showuon commented on a change in pull request #11584:
URL: https://github.com/apache/kafka/pull/11584#discussion_r765638758



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -288,7 +288,7 @@ private Exception invokeOnAssignment(final 
ConsumerPartitionAssignor assignor, f
 }
 
 private Exception invokePartitionsAssigned(final Set 
assignedPartitions) {
-log.info("Adding newly assigned partitions: {}", 
Utils.join(assignedPartitions, ", "));
+log.info("Adding newly assigned partitions: {}", 
Utils.join(assignedPartitions.stream().sorted().toArray(), ", "));

Review comment:
   Good point!

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -397,10 +397,10 @@ protected void onJoinComplete(int generation,
 "\tCurrent owned partitions:  {}\n" +
 "\tAdded partitions (assigned - owned):   {}\n" +
 "\tRevoked partitions (owned - assigned): {}\n",
-assignedPartitions,
-ownedPartitions,
-addedPartitions,
-revokedPartitions
+Utils.join(assignedPartitions.stream().sorted().toArray(), ", 
"),
+Utils.join(ownedPartitions.stream().sorted().toArray(), ", "),
+Utils.join(addedPartitions.stream().sorted().toArray(), ", "),
+Utils.join(revokedPartitions.stream().sorted().toArray(), ", ")

Review comment:
   Actually, the `assignedPartitions`, `addedPartitions`, and 
`revokedPartitions`, and the following `invokePartitionsRevoked`, 
`invokePartitionsAssigned` are coming from the `assignment.partitions()`(or the 
`assignedPartitions`). We can sort it first, and the rest of the partitions 
will be sorted.
   
   Just the `ownedPartitions` might still need to sort itself.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vpapavas opened a new pull request #11585: chore: cleanup minor leftovers

2021-12-09 Thread GitBox


vpapavas opened a new pull request #11585:
URL: https://github.com/apache/kafka/pull/11585


   Clean up some minor things that were left over from PR 
https://github.com/apache/kafka/pull/11513
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10503) MockProducer doesn't throw ClassCastException when no partition for topic

2021-12-09 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-10503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456308#comment-17456308
 ] 

Gonzalo Muñoz Fernández commented on KAFKA-10503:
-

[~dhunziker] yes, if the serializers are null, there's a NPE. I agree with 
that, thanks for pointing out!

> MockProducer doesn't throw ClassCastException when no partition for topic
> -
>
> Key: KAFKA-10503
> URL: https://issues.apache.org/jira/browse/KAFKA-10503
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, producer 
>Affects Versions: 2.6.0
>Reporter: Gonzalo Muñoz Fernández
>Assignee: Gonzalo Muñoz Fernández
>Priority: Minor
>  Labels: mock, producer
> Fix For: 2.7.0
>
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> Though {{MockProducer}} admits serializers in its constructors, it doesn't 
> check during {{send}} method that those serializers are the proper ones to 
> serialize key/value included into the {{ProducerRecord}}.
> [This 
> check|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java#L499-L500]
>  is only done if there is a partition assigned for that topic.
> It would be an enhancement if these serialize methods were also invoked in 
> simple scenarios, where no partition is assigned to a topic.
> eg:
> {code:java}
> @Test
> public void shouldThrowClassCastException() {
> MockProducer producer = new MockProducer<>(true, new 
> IntegerSerializer(), new StringSerializer());
> ProducerRecord record = new ProducerRecord(TOPIC, "key1", "value1");
> try {
> producer.send(record);
> fail("Should have thrown ClassCastException because record cannot 
> be casted with serializers");
> } catch (ClassCastException e) {}
> }
> {code}
> Currently, for obtaining the ClassCastException is needed to define the topic 
> into a partition:
> {code:java}
> PartitionInfo partitionInfo = new PartitionInfo(TOPIC, 0, null, null, null);
> Cluster cluster = new Cluster(null, emptyList(), asList(partitionInfo),
>   emptySet(), emptySet());
>  producer = new MockProducer(cluster, 
> true, 
> new DefaultPartitioner(), 
> new IntegerSerializer(), 
> new StringSerializer());
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13527) Add top-level error code field to DescribeLogDirsResponse

2021-12-09 Thread Mickael Maison (Jira)
Mickael Maison created KAFKA-13527:
--

 Summary: Add top-level error code field to DescribeLogDirsResponse
 Key: KAFKA-13527
 URL: https://issues.apache.org/jira/browse/KAFKA-13527
 Project: Kafka
  Issue Type: Bug
Reporter: Mickael Maison
Assignee: Mickael Maison


Ticket for KIP-784: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-784%3A+Add+top-level+error+code+field+to+DescribeLogDirsResponse



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] ableegoldman merged pull request #11562: KAFKA-12648: extend IQ APIs to work with named topologies

2021-12-09 Thread GitBox


ableegoldman merged pull request #11562:
URL: https://github.com/apache/kafka/pull/11562


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on a change in pull request #11393: MINOR: Refactor RequestResponseTest

2021-12-09 Thread GitBox


tombentley commented on a change in pull request #11393:
URL: https://github.com/apache/kafka/pull/11393#discussion_r765722507



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##
@@ -1269,24 +909,656 @@ public void 
testDeletableTopicResultErrorMessageIsNullByDefault() {
 assertNull(result.errorMessage());
 }
 
-private ResponseHeader createResponseHeader(short headerVersion) {
-return new ResponseHeader(10, headerVersion);
+/**
+ * Check that all error codes in the response get included in {@link 
AbstractResponse#errorCounts()}.
+ */
+@Test
+public void testErrorCountsIncludesNone() {
+assertEquals(1, 
createAddOffsetsToTxnResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createAddPartitionsToTxnResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createAlterClientQuotasResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createAlterConfigsResponse().errorCounts().get(Errors.NONE));
+assertEquals(2, 
createAlterPartitionReassignmentsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createAlterReplicaLogDirsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createApiVersionResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createBrokerHeartbeatResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createBrokerRegistrationResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createControlledShutdownResponse().errorCounts().get(Errors.NONE));
+assertEquals(2, 
createCreateAclsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createCreatePartitionsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createCreateTokenResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createCreateTopicResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createDeleteAclsResponse(DELETE_ACLS.latestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createDeleteGroupsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createDeleteTopicsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createDescribeAclsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createDescribeClientQuotasResponse().errorCounts().get(Errors.NONE));
+assertEquals(2, 
createDescribeConfigsResponse(DESCRIBE_CONFIGS.latestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createDescribeGroupResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createDescribeTokenResponse().errorCounts().get(Errors.NONE));
+assertEquals(2, 
createElectLeadersResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, createEndTxnResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createExpireTokenResponse().errorCounts().get(Errors.NONE));
+assertEquals(3, 
createFetchResponse(123).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createFindCoordinatorResponse(FIND_COORDINATOR.oldestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createFindCoordinatorResponse(FIND_COORDINATOR.latestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createHeartBeatResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createIncrementalAlterConfigsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createJoinGroupResponse(JOIN_GROUP.latestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(2, 
createLeaderAndIsrResponse(4).errorCounts().get(Errors.NONE));
+assertEquals(2, 
createLeaderAndIsrResponse(5).errorCounts().get(Errors.NONE));
+assertEquals(3, 
createLeaderEpochResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createLeaveGroupResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createListGroupsResponse(LIST_GROUPS.latestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createListOffsetResponse(LIST_OFFSETS.latestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createListPartitionReassignmentsResponse().errorCounts().get(Errors.NONE));
+assertEquals(3, 
createMetadataResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createOffsetCommitResponse().errorCounts().get(Errors.NONE));
+assertEquals(2, 
createOffsetDeleteResponse().errorCounts().get(Errors.NONE));
+assertEquals(3, 
createOffsetFetchResponse(OFFSET_FETCH.latestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createProduceResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createRenewTokenResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createSaslAuthenticateResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createSaslHandsh

[GitHub] [kafka] tombentley commented on a change in pull request #11393: MINOR: Refactor RequestResponseTest

2021-12-09 Thread GitBox


tombentley commented on a change in pull request #11393:
URL: https://github.com/apache/kafka/pull/11393#discussion_r765726907



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##
@@ -1269,24 +909,656 @@ public void 
testDeletableTopicResultErrorMessageIsNullByDefault() {
 assertNull(result.errorMessage());
 }
 
-private ResponseHeader createResponseHeader(short headerVersion) {
-return new ResponseHeader(10, headerVersion);
+/**
+ * Check that all error codes in the response get included in {@link 
AbstractResponse#errorCounts()}.
+ */
+@Test
+public void testErrorCountsIncludesNone() {
+assertEquals(1, 
createAddOffsetsToTxnResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createAddPartitionsToTxnResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createAlterClientQuotasResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createAlterConfigsResponse().errorCounts().get(Errors.NONE));
+assertEquals(2, 
createAlterPartitionReassignmentsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createAlterReplicaLogDirsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createApiVersionResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createBrokerHeartbeatResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createBrokerRegistrationResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createControlledShutdownResponse().errorCounts().get(Errors.NONE));
+assertEquals(2, 
createCreateAclsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createCreatePartitionsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createCreateTokenResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createCreateTopicResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createDeleteAclsResponse(DELETE_ACLS.latestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createDeleteGroupsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createDeleteTopicsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createDescribeAclsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createDescribeClientQuotasResponse().errorCounts().get(Errors.NONE));
+assertEquals(2, 
createDescribeConfigsResponse(DESCRIBE_CONFIGS.latestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createDescribeGroupResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createDescribeTokenResponse().errorCounts().get(Errors.NONE));
+assertEquals(2, 
createElectLeadersResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, createEndTxnResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createExpireTokenResponse().errorCounts().get(Errors.NONE));
+assertEquals(3, 
createFetchResponse(123).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createFindCoordinatorResponse(FIND_COORDINATOR.oldestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createFindCoordinatorResponse(FIND_COORDINATOR.latestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createHeartBeatResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createIncrementalAlterConfigsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createJoinGroupResponse(JOIN_GROUP.latestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(2, 
createLeaderAndIsrResponse(4).errorCounts().get(Errors.NONE));
+assertEquals(2, 
createLeaderAndIsrResponse(5).errorCounts().get(Errors.NONE));
+assertEquals(3, 
createLeaderEpochResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createLeaveGroupResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createListGroupsResponse(LIST_GROUPS.latestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createListOffsetResponse(LIST_OFFSETS.latestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createListPartitionReassignmentsResponse().errorCounts().get(Errors.NONE));
+assertEquals(3, 
createMetadataResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createOffsetCommitResponse().errorCounts().get(Errors.NONE));
+assertEquals(2, 
createOffsetDeleteResponse().errorCounts().get(Errors.NONE));
+assertEquals(3, 
createOffsetFetchResponse(OFFSET_FETCH.latestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createProduceResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createRenewTokenResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createSaslAuthenticateResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createSaslHandsh

[GitHub] [kafka] tombentley commented on a change in pull request #11393: MINOR: Refactor RequestResponseTest

2021-12-09 Thread GitBox


tombentley commented on a change in pull request #11393:
URL: https://github.com/apache/kafka/pull/11393#discussion_r765733569



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##
@@ -1098,7 +741,7 @@ public void testJoinGroupRequestVersion0RebalanceTimeout() 
{
 
 @Test
 public void testOffsetFetchRequestBuilderToStringV0ToV7() {

Review comment:
   We have `testJoinGroupRequestVersion0RebalanceTimeout` but 
`testOffsetFetchRequestBuilderToStringV0ToV7`, so rename 
`testJoinGroupRequestVersion0RebalanceTimeout` → 
`testJoinGroupRequestV0RebalanceTimeout` for consistency

##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##
@@ -247,403 +280,123 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
+// This class performs tests requests and responses for all API keys
 public class RequestResponseTest {
 
 // Exception includes a message that we verify is not included in error 
responses
 private final UnknownServerException unknownServerException = new 
UnknownServerException("secret");
 
+
+@Test
+public void testSerialization() {
+Map> toSkip = new HashMap<>();
+// It's not possible to create a MetadataRequest v0 via the builder
+toSkip.put(METADATA, singletonList((short) 0));
+// DescribeLogDirsResponse does not have a top level error field
+toSkip.put(DESCRIBE_LOG_DIRS, DESCRIBE_LOG_DIRS.allVersions());
+// ElectLeaders v0 does not have a top level error field, when 
accessing it, it defaults to NONE
+toSkip.put(ELECT_LEADERS, singletonList((short) 0));
+
+for (ApiKeys apikey : ApiKeys.values()) {
+for (short version : apikey.allVersions()) {
+if (toSkip.containsKey(apikey) && 
toSkip.get(apikey).contains(version)) continue;
+AbstractRequest request = getRequest(apikey, version);
+checkRequest(request);
+checkErrorResponse(request, unknownServerException);
+checkResponse(getResponse(apikey, version), version);
+}
+}
+}
+
+// This test validates special cases that are not checked in 
testSerialization
 @Test
-public void testSerialization() throws Exception {
-checkRequest(createControlledShutdownRequest(), true);
-checkResponse(createControlledShutdownResponse(), 1, true);
-checkErrorResponse(createControlledShutdownRequest(), 
unknownServerException, true);
-checkErrorResponse(createControlledShutdownRequest(0), 
unknownServerException, true);
-checkRequest(createFetchRequest(4), true);
-checkResponse(createFetchResponse(true), 4, true);
+public void testSerializationSpecialCases() {

Review comment:
   `testSerialization` iterates over all keys and call getRequest, such 
that it's impossible for someone adding a a new RPC to forget to write a 
serialization test. I wonder if we should do a similar thing for special cases, 
adding a `testSerializationSpecialCase(ApiKeys key)` which switches and calls 
per-key methods to do the test. Maybe it's overkill, because many RPCs don't 
_appear_ to have special cases, but maybe it's really lack of coverage?

##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##
@@ -771,11 +424,11 @@ public void testPartitionSize() {
 CompressionType.NONE, new SimpleRecord("woot".getBytes()), new 
SimpleRecord("woot".getBytes()));
 ProduceRequest request = 
ProduceRequest.forMagic(RecordBatch.MAGIC_VALUE_V2,
 new ProduceRequestData()
-.setTopicData(new 
ProduceRequestData.TopicProduceDataCollection(Arrays.asList(

Review comment:
   A comment for line 418: Should we rename this 
`testProduceRequestPartitionSize`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11408: KAFKA-13374: update doc to allow read from leader/followers

2021-12-09 Thread GitBox


showuon commented on pull request #11408:
URL: https://github.com/apache/kafka/pull/11408#issuecomment-989817483


   @hachikuji @mimaison , could you help check this 1 line change PR. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #11561: MINOR: Bump version of grgit to 4.1.1

2021-12-09 Thread GitBox


cadonna commented on pull request #11561:
URL: https://github.com/apache/kafka/pull/11561#issuecomment-989818400


   Cherry-picked to 3.1 \cc @dajac 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #11561: MINOR: Bump version of grgit to 4.1.1

2021-12-09 Thread GitBox


cadonna commented on pull request #11561:
URL: https://github.com/apache/kafka/pull/11561#issuecomment-989822040


   Cherry-picked to 3.0


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr opened a new pull request #11586: KAFKA-13516: Connection level metrics are not closed

2021-12-09 Thread GitBox


dongjinleekr opened a new pull request #11586:
URL: https://github.com/apache/kafka/pull/11586


   Here is the fix. The core of this approach is using a `ConcurrentHashMap` to 
manage connection ids and their corresponding sensors.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13516) Connection level metrics are not closed

2021-12-09 Thread Dongjin Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjin Lee reassigned KAFKA-13516:
---

Assignee: Dongjin Lee

> Connection level metrics are not closed
> ---
>
> Key: KAFKA-13516
> URL: https://issues.apache.org/jira/browse/KAFKA-13516
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: Aman Agarwal
>Assignee: Dongjin Lee
>Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Connection level metrics are not closed by the Selector on connection close, 
> hence leaking the sensors.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-9837) New RPC for notifying controller of failed replica

2021-12-09 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez updated KAFKA-9837:
---
Fix Version/s: 3.2.0
   (was: 2.9)

> New RPC for notifying controller of failed replica
> --
>
> Key: KAFKA-9837
> URL: https://issues.apache.org/jira/browse/KAFKA-9837
> Project: Kafka
>  Issue Type: New Feature
>  Components: controller, core
>Reporter: David Arthur
>Assignee: dengziming
>Priority: Major
>  Labels: kip-500
> Fix For: 3.2.0
>
>
> This is the tracking ticket for 
> [KIP-589|https://cwiki.apache.org/confluence/display/KAFKA/KIP-589+Add+API+to+update+Replica+state+in+Controller].
>  For the bridge release, brokers should no longer use ZooKeeper to notify the 
> controller that a log dir has failed. It should instead use an RPC mechanism.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] rondagostino commented on a change in pull request #11503: KAFKA-13456: Tighten KRaft config checks/constraints

2021-12-09 Thread GitBox


rondagostino commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r765806468



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1959,10 +1969,28 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
 }
   }
 
-  def listenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = {
-getMap(KafkaConfig.ListenerSecurityProtocolMapProp, 
getString(KafkaConfig.ListenerSecurityProtocolMapProp))
+  def effectiveListenerSecurityProtocolMap: Map[ListenerName, 
SecurityProtocol] = {

Review comment:
   > One more thing to check is that the validations in 
DynamicListenerConfig make sense in the context of these changes 
   
   I just push a commit fixing all of the minor issues, I'll look into this 
possible issue later today.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #11573: KAFKA-13507: GlobalProcessor ignores user specified names

2021-12-09 Thread GitBox


bbejeck commented on pull request #11573:
URL: https://github.com/apache/kafka/pull/11573#issuecomment-989875505


   Java 8 and Java 11 passed
   
   Java 17 failure unrelated - 
`org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] patrickstuedi commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

2021-12-09 Thread GitBox


patrickstuedi commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r765750361



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -62,4 +71,32 @@ public static void updatePosition(
 position.withComponent(meta.topic(), meta.partition(), 
meta.offset());
 }
 }
+
+public static boolean isPermitted(
+final Position position,
+final PositionBound positionBound,
+final int partition
+) {
+if (positionBound.isUnbounded()) {

Review comment:
   I know we've discussed this, but just for the record :-), I believe the 
type PositionBound could be saved, unbounded is basically an empty Position. 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -62,4 +71,32 @@ public static void updatePosition(
 position.withComponent(meta.topic(), meta.partition(), 
meta.offset());
 }
 }
+
+public static boolean isPermitted(
+final Position position,
+final PositionBound positionBound,
+final int partition
+) {
+if (positionBound.isUnbounded()) {

Review comment:
   I still think this entire check should be method on Position, as in, 
Position::dominates(PositionBound)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] patrickstuedi commented on pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

2021-12-09 Thread GitBox


patrickstuedi commented on pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#issuecomment-989876770


   Looks good, thanks John!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #11450: KAFKA-13414: Replace Powermock/EasyMock by Mockito in connect.storage

2021-12-09 Thread GitBox


mimaison merged pull request #11450:
URL: https://github.com/apache/kafka/pull/11450


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #11393: MINOR: Refactor RequestResponseTest

2021-12-09 Thread GitBox


mimaison commented on a change in pull request #11393:
URL: https://github.com/apache/kafka/pull/11393#discussion_r765828093



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##
@@ -1269,24 +909,656 @@ public void 
testDeletableTopicResultErrorMessageIsNullByDefault() {
 assertNull(result.errorMessage());
 }
 
-private ResponseHeader createResponseHeader(short headerVersion) {
-return new ResponseHeader(10, headerVersion);
+/**
+ * Check that all error codes in the response get included in {@link 
AbstractResponse#errorCounts()}.
+ */
+@Test
+public void testErrorCountsIncludesNone() {
+assertEquals(1, 
createAddOffsetsToTxnResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createAddPartitionsToTxnResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createAlterClientQuotasResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createAlterConfigsResponse().errorCounts().get(Errors.NONE));
+assertEquals(2, 
createAlterPartitionReassignmentsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createAlterReplicaLogDirsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createApiVersionResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createBrokerHeartbeatResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createBrokerRegistrationResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createControlledShutdownResponse().errorCounts().get(Errors.NONE));
+assertEquals(2, 
createCreateAclsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createCreatePartitionsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createCreateTokenResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createCreateTopicResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createDeleteAclsResponse(DELETE_ACLS.latestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createDeleteGroupsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createDeleteTopicsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createDescribeAclsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createDescribeClientQuotasResponse().errorCounts().get(Errors.NONE));
+assertEquals(2, 
createDescribeConfigsResponse(DESCRIBE_CONFIGS.latestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createDescribeGroupResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createDescribeTokenResponse().errorCounts().get(Errors.NONE));
+assertEquals(2, 
createElectLeadersResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, createEndTxnResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createExpireTokenResponse().errorCounts().get(Errors.NONE));
+assertEquals(3, 
createFetchResponse(123).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createFindCoordinatorResponse(FIND_COORDINATOR.oldestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createFindCoordinatorResponse(FIND_COORDINATOR.latestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createHeartBeatResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createIncrementalAlterConfigsResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createJoinGroupResponse(JOIN_GROUP.latestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(2, 
createLeaderAndIsrResponse(4).errorCounts().get(Errors.NONE));
+assertEquals(2, 
createLeaderAndIsrResponse(5).errorCounts().get(Errors.NONE));
+assertEquals(3, 
createLeaderEpochResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createLeaveGroupResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createListGroupsResponse(LIST_GROUPS.latestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createListOffsetResponse(LIST_OFFSETS.latestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createListPartitionReassignmentsResponse().errorCounts().get(Errors.NONE));
+assertEquals(3, 
createMetadataResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createOffsetCommitResponse().errorCounts().get(Errors.NONE));
+assertEquals(2, 
createOffsetDeleteResponse().errorCounts().get(Errors.NONE));
+assertEquals(3, 
createOffsetFetchResponse(OFFSET_FETCH.latestVersion()).errorCounts().get(Errors.NONE));
+assertEquals(1, 
createProduceResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createRenewTokenResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createSaslAuthenticateResponse().errorCounts().get(Errors.NONE));
+assertEquals(1, 
createSaslHandshak

[GitHub] [kafka] mimaison commented on a change in pull request #11393: MINOR: Refactor RequestResponseTest

2021-12-09 Thread GitBox


mimaison commented on a change in pull request #11393:
URL: https://github.com/apache/kafka/pull/11393#discussion_r765833040



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##
@@ -247,403 +280,123 @@
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 
+// This class performs tests requests and responses for all API keys
 public class RequestResponseTest {
 
 // Exception includes a message that we verify is not included in error 
responses
 private final UnknownServerException unknownServerException = new 
UnknownServerException("secret");
 
+
+@Test
+public void testSerialization() {
+Map> toSkip = new HashMap<>();
+// It's not possible to create a MetadataRequest v0 via the builder
+toSkip.put(METADATA, singletonList((short) 0));
+// DescribeLogDirsResponse does not have a top level error field
+toSkip.put(DESCRIBE_LOG_DIRS, DESCRIBE_LOG_DIRS.allVersions());
+// ElectLeaders v0 does not have a top level error field, when 
accessing it, it defaults to NONE
+toSkip.put(ELECT_LEADERS, singletonList((short) 0));
+
+for (ApiKeys apikey : ApiKeys.values()) {
+for (short version : apikey.allVersions()) {
+if (toSkip.containsKey(apikey) && 
toSkip.get(apikey).contains(version)) continue;
+AbstractRequest request = getRequest(apikey, version);
+checkRequest(request);
+checkErrorResponse(request, unknownServerException);
+checkResponse(getResponse(apikey, version), version);
+}
+}
+}
+
+// This test validates special cases that are not checked in 
testSerialization
 @Test
-public void testSerialization() throws Exception {
-checkRequest(createControlledShutdownRequest(), true);
-checkResponse(createControlledShutdownResponse(), 1, true);
-checkErrorResponse(createControlledShutdownRequest(), 
unknownServerException, true);
-checkErrorResponse(createControlledShutdownRequest(0), 
unknownServerException, true);
-checkRequest(createFetchRequest(4), true);
-checkResponse(createFetchResponse(true), 4, true);
+public void testSerializationSpecialCases() {

Review comment:
   It's much harder to cover all special cases. 
   Are we missing coverage for some special cases? Possibly! But at least for 
the common case, we should be covered now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna opened a new pull request #11587: HOTFIX: Bump version of grgit fro 4.1.0 to 4.1.1

2021-12-09 Thread GitBox


cadonna opened a new pull request #11587:
URL: https://github.com/apache/kafka/pull/11587


   grgit 4.1.0 caused the following error with gradle
   
   > Could not resolve all artifacts for configuration ':classpath'.
  > Could not resolve org.eclipse.jgit:org.eclipse.jgit:latest.release.
Required by:
project : > org.ajoberstar.grgit:grgit-core:4.1.0
   ...
   
   making it impossible to build the branch. 
   
   grgit 4.1.1 fixed this issue.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #11587: HOTFIX: Bump version of grgit fro 4.1.0 to 4.1.1

2021-12-09 Thread GitBox


cadonna commented on pull request #11587:
URL: https://github.com/apache/kafka/pull/11587#issuecomment-989908970


   Call for review: @ijuma @vvcephei 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #11587: HOTFIX: Bump version of grgit fro 4.1.0 to 4.1.1

2021-12-09 Thread GitBox


cadonna commented on pull request #11587:
URL: https://github.com/apache/kafka/pull/11587#issuecomment-989909719


   I opened this PR instead of cherry-picking 
https://github.com/apache/kafka/pull/11561 since the issue is a bit different 
on 2.8.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #11393: MINOR: Refactor RequestResponseTest

2021-12-09 Thread GitBox


mimaison commented on pull request #11393:
URL: https://github.com/apache/kafka/pull/11393#issuecomment-989911761


   > A lot (may be all) of the `create*()` methods could be static. Perhaps 
that would allow factoring some of the RPC-specific tests into their own 
classes which would make this class less massive. However, it's not really 
clear to me that that would result in increased coverage, or make it easier to 
reason about missing coverage, so I guess I don't really see a benefit to that, 
but wondered what you thought.
   
   Yeah this class is massive and splitting it may be helpful to allow taking a 
closer look at parts. Considering this PR is already almost +/- 2000 lines, I 
propose to not do it in this PR though.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10199) Separate state restoration into separate threads

2021-12-09 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-10199:
-

Assignee: Bruno Cadonna  (was: Guozhang Wang)

> Separate state restoration into separate threads
> 
>
> Key: KAFKA-10199
> URL: https://issues.apache.org/jira/browse/KAFKA-10199
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Bruno Cadonna
>Priority: Major
>
> As part of the restoration optimization effort, we would like to move the 
> restoration process to separate threads such that:
> 1. Stream threads would not be restricted by the main consumer `poll` 
> frequency to keep as part of the group.
> 2. We can allow larger batches of data to be written into the restoration.
> Besides this, we'd also like to fix the known issues that for piggy-backed 
> source topics as changelog topics, the serde exception / extra processing 
> logic would be skipped.
> We would also cleanup the global update tasks as part of this effort to 
> consolidate to the separate restoration threads, and would also gear them up 
> with corresponding monitoring metrics (KIPs in progress).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13500) Consider adding a dedicated standby consumer

2021-12-09 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456480#comment-17456480
 ] 

Bruno Cadonna commented on KAFKA-13500:
---

I am planning to maintain also the standby tasks in the restoration thread. I 
even called the interface {{StateUpdater}} because of that.

> Consider adding a dedicated standby consumer
> 
>
> Key: KAFKA-13500
> URL: https://issues.apache.org/jira/browse/KAFKA-13500
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>
> We currently use the restore consumer to recover state for active tasks and 
> to maintain standby tasks during regular processing. This setup has a few 
> disadvantages
>  # During state recovery, we might want to apply different consumer configs 
> compared to standby maintenance during regular processing.
>  # It make monitoring confusing: because we never commit offsets for 
> changelog topics, users can only monitor the client's "lag metric" to 
> observer restore progress (without the need to register a restore listener). 
> However, if they are interesting in a restore metric, during regular 
> processing it would report the standby lag, which can be rather confusing.
> Because the restore consumer does not use consumer group management, it seems 
> to be low overhead to actually use a third consumer, because there won't be 
> any heartbeat thread.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] bbejeck merged pull request #11573: KAFKA-13507: GlobalProcessor ignores user specified names

2021-12-09 Thread GitBox


bbejeck merged pull request #11573:
URL: https://github.com/apache/kafka/pull/11573


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #11573: KAFKA-13507: GlobalProcessor ignores user specified names

2021-12-09 Thread GitBox


bbejeck commented on pull request #11573:
URL: https://github.com/apache/kafka/pull/11573#issuecomment-989915792


   merged #11573 into trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #11573: KAFKA-13507: GlobalProcessor ignores user specified names

2021-12-09 Thread GitBox


bbejeck commented on pull request #11573:
URL: https://github.com/apache/kafka/pull/11573#issuecomment-989917167


   Thanks @tamara-skokova for the contribution!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10333) Provide an API to retrieve Kafka Connect task configurations

2021-12-09 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-10333.

Fix Version/s: 2.8.0
   Resolution: Duplicate

DUP of KAFKA-10833

> Provide an API to retrieve Kafka Connect task configurations
> 
>
> Key: KAFKA-10333
> URL: https://issues.apache.org/jira/browse/KAFKA-10333
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
> Fix For: 2.8.0
>
>
> Kafka Connect exposes an API to retrieve configurations from connectors.
> Connectors are responsible for creating tasks. When doing so, they have to 
> build configurations for individual tasks. In some case, the configuration 
> can be passed as is to tasks but in some others, the configuration is mutated 
> to make tasks do specific work.
> For example with MirrorSourceConnector, the connector configuration has a 
> field "topics" which is a list of topics and patterns to mirror. When the 
> connector builds task configurations, it resolves the list of topic names and 
> patterns to exact partitions and spread the partitions over all the tasks.
> It would be useful to identify the exact configuration each task is given.
> For MM2, it would allow identifying the partitions that matched the topics 
> field. I would also help understanding the impact when a task fails



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13507) GlobalProcessor ignores user specified names

2021-12-09 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-13507:

Fix Version/s: 3.2.0

> GlobalProcessor ignores user specified names
> 
>
> Key: KAFKA-13507
> URL: https://issues.apache.org/jira/browse/KAFKA-13507
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Tamara Skokova
>Priority: Minor
>  Labels: beginner, newbie
> Fix For: 3.2.0
>
>
> Using `StreamsBuilder.addGlobalStore` users can specify a name via `Consumed` 
> parameter. However, the specified name is ignored and the created source and 
> global processor get generated names assigned.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13507) GlobalProcessor ignores user specified names

2021-12-09 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-13507:

Affects Version/s: 2.8.0
   2.7.0
   2.6.0
   2.5.0

> GlobalProcessor ignores user specified names
> 
>
> Key: KAFKA-13507
> URL: https://issues.apache.org/jira/browse/KAFKA-13507
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0, 2.6.0, 2.7.0, 2.8.0
>Reporter: Matthias J. Sax
>Assignee: Tamara Skokova
>Priority: Minor
>  Labels: beginner, newbie
> Fix For: 3.2.0
>
>
> Using `StreamsBuilder.addGlobalStore` users can specify a name via `Consumed` 
> parameter. However, the specified name is ignored and the created source and 
> global processor get generated names assigned.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] mimaison commented on a change in pull request #11456: KAFKA-13351: Add possibility to write kafka headers in Kafka Console Producer

2021-12-09 Thread GitBox


mimaison commented on a change in pull request #11456:
URL: https://github.com/apache/kafka/pull/11456#discussion_r765876199



##
File path: core/src/main/scala/kafka/tools/ConsoleProducer.scala
##
@@ -206,11 +210,25 @@ object ConsoleProducer {
   .describedAs("size")
   .ofType(classOf[java.lang.Integer])
   .defaultsTo(1024*100)
-val propertyOpt = parser.accepts("property", "A mechanism to pass 
user-defined properties in the form key=value to the message reader. " +
-  "This allows custom configuration for a user-defined message reader. 
Default properties include:\n" +
-  "\tparse.key=true|false\n" +
-  "\tkey.separator=\n" +
-  "\tignore.error=true|false")
+val propertyOpt = parser.accepts("property",
+  """A mechanism to pass user-defined properties in the form key=value to 
the message reader. This allows custom configuration for a user-defined message 
reader.
+|Default properties include:
+| parse.key=false
+| parse.headers=false
+| ignore.error=false
+| key.separator=\t
+| headers.delimiter=\t
+| headers.separator=,
+| headers.key.separator=:
+|Default parsing pattern when:
+| parse.headers=true & parse.key=true:

Review comment:
   Maybe it's time to use more than 80 columns! But that's a discussion for 
another KIP.
   
   As all these `parse.*` default to `false`, what about only listing the 
configs set to `true` in the example?
   For example:
   `parse.headers=false & parse.key=true` -> `parse.key=true`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13077) Replication failing after unclean shutdown of ZK and all brokers

2021-12-09 Thread Shivakumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456515#comment-17456515
 ] 

Shivakumar commented on KAFKA-13077:


Hi [~junrao] 

here is the summary of our issue, hope you can help us here 

Kafka(2.8.1), ZooKeeper(3.6.3) in eks kubernetes 1.19
kafka cluster size = 3
zk cluster size = 3

1) after rolling restart of zk , sometimes all the partitions of the topic 
become out of sync especially for broker 2, ISR=2 and Leader=2 and other 
brokers are out of ISR 
Topic: __consumer_offsets    PartitionCount: 50    ReplicationFactor: 3    
Configs: 
compression.type=producer,cleanup.policy=compact,segment.bytes=104857600
    Topic: __consumer_offsets    Partition: 0    Leader: 2    Replicas: 0,1,2   
 Isr: 2
    Topic: __consumer_offsets    Partition: 1    Leader: 2    Replicas: 1,2,0   
 Isr: 2
    Topic: __consumer_offsets    Partition: 2    Leader: 2    Replicas: 2,0,1   
 Isr: 2,1,0
    Topic: __consumer_offsets    Partition: 3    Leader: 2    Replicas: 0,2,1   
 Isr: 2
    Topic: __consumer_offsets    Partition: 4    Leader: 2    Replicas: 1,0,2   
 Isr: 2
    Topic: __consumer_offsets    Partition: 5    Leader: 2    Replicas: 2,1,0   
 Isr: 2
    Topic: __consumer_offsets    Partition: 6    Leader: 2    Replicas: 0,1,2   
 Isr: 2
    Topic: __consumer_offsets    Partition: 7    Leader: 2    Replicas: 1,2,0   
 Isr: 2
    Topic: __consumer_offsets    Partition: 8    Leader: 2    Replicas: 2,0,1   
 Isr: 2
    Topic: __consumer_offsets    Partition: 9    Leader: 2    Replicas: 0,2,1   
 Isr: 2
    Topic: __consumer_offsets    Partition: 10    Leader: 2    Replicas: 1,0,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 11    Leader: 2    Replicas: 2,1,0  
  Isr: 2
    Topic: __consumer_offsets    Partition: 12    Leader: 2    Replicas: 0,1,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 13    Leader: 2    Replicas: 1,2,0  
  Isr: 2
    Topic: __consumer_offsets    Partition: 14    Leader: 2    Replicas: 2,0,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 15    Leader: 2    Replicas: 0,2,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 16    Leader: 2    Replicas: 1,0,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 17    Leader: 2    Replicas: 2,1,0  
  Isr: 2
    Topic: __consumer_offsets    Partition: 18    Leader: 2    Replicas: 0,1,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 19    Leader: 2    Replicas: 1,2,0  
  Isr: 2
    Topic: __consumer_offsets    Partition: 20    Leader: 2    Replicas: 2,0,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 21    Leader: 2    Replicas: 0,2,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 22    Leader: 2    Replicas: 1,0,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 23    Leader: 2    Replicas: 2,1,0  
  Isr: 2
    Topic: __consumer_offsets    Partition: 24    Leader: 2    Replicas: 0,1,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 25    Leader: 2    Replicas: 1,2,0  
  Isr: 2
    Topic: __consumer_offsets    Partition: 26    Leader: 2    Replicas: 2,0,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 27    Leader: 2    Replicas: 0,2,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 28    Leader: 2    Replicas: 1,0,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 29    Leader: 2    Replicas: 2,1,0  
  Isr: 2,1,0
    Topic: __consumer_offsets    Partition: 30    Leader: 2    Replicas: 0,1,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 31    Leader: 2    Replicas: 1,2,0  
  Isr: 2
    Topic: __consumer_offsets    Partition: 32    Leader: 2    Replicas: 2,0,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 33    Leader: 2    Replicas: 0,2,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 34    Leader: 2    Replicas: 1,0,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 35    Leader: 2    Replicas: 2,1,0  
  Isr: 2
    Topic: __consumer_offsets    Partition: 36    Leader: 2    Replicas: 0,1,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 37    Leader: 2    Replicas: 1,2,0  
  Isr: 2
    Topic: __consumer_offsets    Partition: 38    Leader: 2    Replicas: 2,0,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 39    Leader: 2    Replicas: 0,2,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 40    Leader: 2    Replicas: 1,0,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 41    Leader: 2    Replicas: 2,1,0  
  Isr: 2,1,0
    Topic: __consumer_offsets    Partition: 42    Leader: 2    Replicas: 0,1,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 43    Leader: 2    Replicas: 1,2,0  
  Isr: 2
    Topic: __consumer_offsets    Partition: 44    Leader: 2    Replicas: 2,0,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 45    Leader: 2    Replicas: 0,2,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 46    Leader: 2    Replicas: 1,0,2  
  Isr: 2
    Top

[jira] [Comment Edited] (KAFKA-13077) Replication failing after unclean shutdown of ZK and all brokers

2021-12-09 Thread Shivakumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456515#comment-17456515
 ] 

Shivakumar edited comment on KAFKA-13077 at 12/9/21, 3:44 PM:
--

Hi [~junrao] 

here is the summary of our issue, hope you can help us here 

Kafka(2.8.1), ZooKeeper(3.6.3) in eks kubernetes 1.19
kafka cluster size = 3
zk cluster size = 3

1) after rolling restart of zk , sometimes all the partitions of the topic 
become out of sync especially for broker 2, ISR=2 and Leader=2 and other 
brokers are out of ISR 
Topic: __consumer_offsets    PartitionCount: 50    ReplicationFactor: 3    
Configs: 
compression.type=producer,cleanup.policy=compact,segment.bytes=104857600
    Topic: __consumer_offsets    Partition: 0    Leader: 2    Replicas: 0,1,2   
 Isr: 2
    Topic: __consumer_offsets    Partition: 1    Leader: 2    Replicas: 1,2,0   
 Isr: 2
    Topic: __consumer_offsets    Partition: 2    Leader: 2    Replicas: 2,0,1   
 Isr: 2,1,0
    Topic: __consumer_offsets    Partition: 3    Leader: 2    Replicas: 0,2,1   
 Isr: 2
    Topic: __consumer_offsets    Partition: 4    Leader: 2    Replicas: 1,0,2   
 Isr: 2
    Topic: __consumer_offsets    Partition: 5    Leader: 2    Replicas: 2,1,0   
 Isr: 2
    Topic: __consumer_offsets    Partition: 6    Leader: 2    Replicas: 0,1,2   
 Isr: 2
    Topic: __consumer_offsets    Partition: 7    Leader: 2    Replicas: 1,2,0   
 Isr: 2
    Topic: __consumer_offsets    Partition: 8    Leader: 2    Replicas: 2,0,1   
 Isr: 2
    Topic: __consumer_offsets    Partition: 9    Leader: 2    Replicas: 0,2,1   
 Isr: 2
    Topic: __consumer_offsets    Partition: 10    Leader: 2    Replicas: 1,0,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 11    Leader: 2    Replicas: 2,1,0  
  Isr: 2
    Topic: __consumer_offsets    Partition: 12    Leader: 2    Replicas: 0,1,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 13    Leader: 2    Replicas: 1,2,0  
  Isr: 2
    Topic: __consumer_offsets    Partition: 14    Leader: 2    Replicas: 2,0,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 15    Leader: 2    Replicas: 0,2,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 16    Leader: 2    Replicas: 1,0,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 17    Leader: 2    Replicas: 2,1,0  
  Isr: 2
    Topic: __consumer_offsets    Partition: 18    Leader: 2    Replicas: 0,1,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 19    Leader: 2    Replicas: 1,2,0  
  Isr: 2
    Topic: __consumer_offsets    Partition: 20    Leader: 2    Replicas: 2,0,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 21    Leader: 2    Replicas: 0,2,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 22    Leader: 2    Replicas: 1,0,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 23    Leader: 2    Replicas: 2,1,0  
  Isr: 2
    Topic: __consumer_offsets    Partition: 24    Leader: 2    Replicas: 0,1,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 25    Leader: 2    Replicas: 1,2,0  
  Isr: 2
    Topic: __consumer_offsets    Partition: 26    Leader: 2    Replicas: 2,0,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 27    Leader: 2    Replicas: 0,2,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 28    Leader: 2    Replicas: 1,0,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 29    Leader: 2    Replicas: 2,1,0  
  Isr: 2,1,0
    Topic: __consumer_offsets    Partition: 30    Leader: 2    Replicas: 0,1,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 31    Leader: 2    Replicas: 1,2,0  
  Isr: 2
    Topic: __consumer_offsets    Partition: 32    Leader: 2    Replicas: 2,0,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 33    Leader: 2    Replicas: 0,2,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 34    Leader: 2    Replicas: 1,0,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 35    Leader: 2    Replicas: 2,1,0  
  Isr: 2
    Topic: __consumer_offsets    Partition: 36    Leader: 2    Replicas: 0,1,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 37    Leader: 2    Replicas: 1,2,0  
  Isr: 2
    Topic: __consumer_offsets    Partition: 38    Leader: 2    Replicas: 2,0,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 39    Leader: 2    Replicas: 0,2,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 40    Leader: 2    Replicas: 1,0,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 41    Leader: 2    Replicas: 2,1,0  
  Isr: 2,1,0
    Topic: __consumer_offsets    Partition: 42    Leader: 2    Replicas: 0,1,2  
  Isr: 2
    Topic: __consumer_offsets    Partition: 43    Leader: 2    Replicas: 1,2,0  
  Isr: 2
    Topic: __consumer_offsets    Partition: 44    Leader: 2    Replicas: 2,0,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 45    Leader: 2    Replicas: 0,2,1  
  Isr: 2
    Topic: __consumer_offsets    Partition: 46

[GitHub] [kafka] rondagostino commented on a change in pull request #11503: KAFKA-13456: Tighten KRaft config checks/constraints

2021-12-09 Thread GitBox


rondagostino commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r765940228



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1959,10 +1969,28 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
 }
   }
 
-  def listenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = {
-getMap(KafkaConfig.ListenerSecurityProtocolMapProp, 
getString(KafkaConfig.ListenerSecurityProtocolMapProp))
+  def effectiveListenerSecurityProtocolMap: Map[ListenerName, 
SecurityProtocol] = {

Review comment:
   > validations in DynamicListenerConfig make sense in the context of 
these changes?
   
   We do have this code, which means we currently don't have to worry about 
this for KRaft:
   
   
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L909-L911
   
   However, we will support this eventually, so let's put that aside for the 
moment.
   
   We do have these checks, which continue to make sense:
   
   ```
 if (immutableListenerConfigs(newConfig, listenerName.configPrefix) != 
immutableListenerConfigs(oldConfig, listenerName.configPrefix))
   throw new ConfigException(s"Configs cannot be updated dynamically 
for existing listener $listenerName, " +
 "restart broker or create a new listener for update")
 if (oldConfig.effectiveListenerSecurityProtocolMap(listenerName) != 
newConfig.effectiveListenerSecurityProtocolMap(listenerName))
   throw new ConfigException(s"Security protocol cannot be updated for 
existing listener $listenerName")
   ```
   
   One thing that could occur is that we add a new SSL listener that is the 
first one that is non-PLAINTEXT and then all of a sudden we have removed any 
PLAINTEXT default values in the listener security protocol map, which could 
cause the new config to fail.  But that's fine.  And this would never happen in 
a non-toy cluster since those generally have at least one secured/non-PLAINTEXT 
listeners anyway.
   
   Another possibility is that we might impact existing controller listeners, 
but that impact would have to be checked for when we implement this dynamic 
reconfiguration oath for KRaft.
   
   My take on this is that the existing changes are okay in this context.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #11503: KAFKA-13456: Tighten KRaft config checks/constraints

2021-12-09 Thread GitBox


rondagostino commented on a change in pull request #11503:
URL: https://github.com/apache/kafka/pull/11503#discussion_r765940228



##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -1959,10 +1969,28 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
 }
   }
 
-  def listenerSecurityProtocolMap: Map[ListenerName, SecurityProtocol] = {
-getMap(KafkaConfig.ListenerSecurityProtocolMapProp, 
getString(KafkaConfig.ListenerSecurityProtocolMapProp))
+  def effectiveListenerSecurityProtocolMap: Map[ListenerName, 
SecurityProtocol] = {

Review comment:
   > validations in DynamicListenerConfig make sense in the context of 
these changes?
   
   We do have this code, which means we currently don't have to worry about 
this for KRaft:
   
   
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala#L909-L911
   
   However, we will support this eventually, so let's put that aside for the 
moment.
   
   We do have these checks, which continue to make sense:
   
   ```
 if (immutableListenerConfigs(newConfig, listenerName.configPrefix) != 
immutableListenerConfigs(oldConfig, listenerName.configPrefix))
   throw new ConfigException(s"Configs cannot be updated dynamically 
for existing listener $listenerName, " +
 "restart broker or create a new listener for update")
 if (oldConfig.effectiveListenerSecurityProtocolMap(listenerName) != 
newConfig.effectiveListenerSecurityProtocolMap(listenerName))
   throw new ConfigException(s"Security protocol cannot be updated for 
existing listener $listenerName")
   ```
   
   One thing that could occur is that we add a new SSL listener that is the 
first one that is non-PLAINTEXT and then all of a sudden we have removed any 
PLAINTEXT default values in the listener security protocol map, which could 
cause the new config to fail.  But that's fine.  And this would never happen in 
a non-toy cluster since those generally have at least one secured/non-PLAINTEXT 
listener anyway.
   
   Another possibility is that we might impact existing controller listeners, 
but that impact would have to be checked for when we implement this dynamic 
reconfiguration path for KRaft.
   
   My take on this is that the existing changes are okay in this context.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11345: Allow empty last segment to have missing offset index during recovery

2021-12-09 Thread GitBox


jsancio commented on a change in pull request #11345:
URL: https://github.com/apache/kafka/pull/11345#discussion_r765978037



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -443,7 +443,7 @@ private void onBecomeLeader(long currentTimeMs) {
 private void flushLeaderLog(LeaderState state, long currentTimeMs) {
 // We update the end offset before flushing so that parked fetches can 
return sooner.
 updateLeaderEndOffsetAndTimestamp(state, currentTimeMs);
-log.flush();
+log.flush(false);

Review comment:
   Yes. Looking at the code, we don't need to have segments to determine 
the log end offset. When loading the `KafkaMetadataLog`, KRaft 
`truncateFullyAndStartAt` using the latest snapshot if the log's end offset 
(`0`) is less than the latest snapshot's end offset. As @hachikuji mentioned, 
we only delete inactive segments if there is a snapshot that covers the largest 
offset in the segment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] blcksrx opened a new pull request #11588: KAFKA-13485: Restart connectors after RetriableException raised from Task::start()

2021-12-09 Thread GitBox


blcksrx opened a new pull request #11588:
URL: https://github.com/apache/kafka/pull/11588


   If a `RetriableException` is raised from `Task::start()`, this doesn't 
trigger an attempt to start that connector again. I.e. the restart 
functionality currently is only implemented for exceptions raised from 
`poll()/put()`. Triggering restarts also upon failures during `start()` would 
be desirable, so to circumvent temporary failure conditions like a network 
hickup which currrently require a manual restart of the affected tasks, if a 
connector for instance establishes a database connection during `start()`.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13488) Producer fails to recover if topic gets deleted (and gets auto-created)

2021-12-09 Thread Prateek Agarwal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456673#comment-17456673
 ] 

Prateek Agarwal commented on KAFKA-13488:
-

Hi [~hachikuji] [~jolshan] I have incorporated the review suggestions to the 
PR: [https://github.com/apache/kafka/pull/11552]. PTAL.

> Producer fails to recover if topic gets deleted (and gets auto-created)
> ---
>
> Key: KAFKA-13488
> URL: https://issues.apache.org/jira/browse/KAFKA-13488
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.2.2, 2.3.1, 2.4.1, 2.5.1, 2.6.3, 2.7.2, 2.8.1
>Reporter: Prateek Agarwal
>Assignee: Prateek Agarwal
>Priority: Major
> Attachments: KAFKA-13488.patch
>
>
> Producer currently fails to produce messages to a topic if the topic is 
> deleted and gets auto-created OR is created manually during the lifetime of 
> the producer (and certain other conditions are met - leaderEpochs of deleted 
> topic > 0).
>  
> To reproduce, these are the steps which can be carried out:
> 0) A cluster with 2 brokers 0 and 1 with auto.topic.create=true.
> 1) Create a topic T with 2 partitions P0-> (0,1), P1-> (0,1)
> 2) Reassign the partitions such that P0-> (1,0), P1-> (1,0).
> 2) Create a producer P and send few messages which land on all the TPs of 
> topic T.
> 3) Delete the topic T
> 4) Immediately, send a new message from producer P, this message will be 
> failed to send and eventually timed out.
> A test-case which fails with the above steps is added at the end as well as a 
> patch file.
>  
> This happens after leaderEpoch (KIP-320) was introduced in the 
> MetadataResponse KAFKA-7738. There is a solution attempted to fix this issue 
> in KAFKA-12257, but the solution has a bug due to which the above use-case 
> still fails.
>  
> *Issue in the solution of KAFKA-12257:*
> {code:java}
> // org.apache.kafka.clients.Metadata.handleMetadataResponse():
>...
>         Map topicIds = new HashMap<>();
>         Map oldTopicIds = cache.topicIds();
>         for (MetadataResponse.TopicMetadata metadata : 
> metadataResponse.topicMetadata()) {
>             String topicName = metadata.topic();
>             Uuid topicId = metadata.topicId();
>             topics.add(topicName);
>             // We can only reason about topic ID changes when both IDs are 
> valid, so keep oldId null unless the new metadata contains a topic ID
>             Uuid oldTopicId = null;
>             if (!Uuid.ZERO_UUID.equals(topicId)) {
>                 topicIds.put(topicName, topicId);
>                 oldTopicId = oldTopicIds.get(topicName);
>             } else {
>                  topicId = null;
>             }
> ...
> } {code}
> With every new call to {{{}handleMetadataResponse{}}}(), {{cache.topicIds()}} 
> gets created afresh. When a topic is deleted and created immediately soon 
> afterwards (because of auto.create being true), producer's call to 
> {{MetadataRequest}} for the deleted topic T will result in a 
> {{UNKNOWN_TOPIC_OR_PARTITION}} or {{LEADER_NOT_AVAILABLE}} error 
> {{MetadataResponse}} depending on which point of topic recreation metadata is 
> being asked at. In the case of errors, TopicId returned back in the response 
> is {{{}Uuid.ZERO_UUID{}}}. As seen in the above logic, if the topicId 
> received is ZERO, the method removes the earlier topicId entry from the cache.
> Now, when a non-Error Metadata Response does come back for the newly created 
> topic T, it will have a non-ZERO topicId now but the leaderEpoch for the 
> partitions will mostly be ZERO. This situation will lead to rejection of the 
> new MetadataResponse if the older LeaderEpoch was >0 (for more details, refer 
> to KAFKA-12257). Because of the rejection of the metadata, producer will 
> never get to know the new Leader of the TPs of the newly created topic.
>  
> {{*}} 1. Solution / Fix (Preferred){*}:
> Client's metadata should keep on remembering the old topicId till:
> 1) response for the TP has ERRORs
> 2) topicId entry was already present in the cache earlier
> 3) retain time is not expired
> {code:java}
> --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
> +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
> @@ -336,6 +336,10 @@ public class Metadata implements Closeable {
>  topicIds.put(topicName, topicId);
>  oldTopicId = oldTopicIds.get(topicName);
>  } else {
> +// Retain the old topicId for comparison with newer TopicId 
> created later. This is only needed till retainMs
> +if (metadata.error() != Errors.NONE && 
> oldTopicIds.get(topicName) != null && retainTopic(topicName, false, nowMs))
> +topicIds.put(topi

[GitHub] [kafka] vvcephei commented on pull request #11585: MINOR: Cleanup for #11513

2021-12-09 Thread GitBox


vvcephei commented on pull request #11585:
URL: https://github.com/apache/kafka/pull/11585#issuecomment-990152230


   Failures were unrelated:
   ```
   Build / JDK 11 and Scala 2.13 / 
kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, 
Name=testElectionResultOutput, Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / 
kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testPathToJsonFile, 
Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / 
kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testPathToJsonFile, 
Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldRemoveNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #11585: MINOR: Cleanup for #11513

2021-12-09 Thread GitBox


vvcephei merged pull request #11585:
URL: https://github.com/apache/kafka/pull/11585


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-12-09 Thread GitBox


cadonna commented on a change in pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#discussion_r765909088



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientTagAwareStandbyTaskAssignor.java
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import 
org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+
+import static 
org.apache.kafka.streams.processor.internals.assignment.StandbyTaskAssignmentUtils.computeTasksToRemainingStandbys;
+
+/**
+ * Distributes standby tasks over different tag dimensions.
+ * Only tags specified via {@link AssignmentConfigs#rackAwareAssignmentTags} 
are taken into account.
+ * Standby task distribution is on a best-effort basis. For example, if there 
are not enough clients available
+ * on different tag dimensions compared to an active and corresponding standby 
task,
+ * in that case, the algorithm will fall back to distributing tasks on 
least-loaded clients.
+ */
+class ClientTagAwareStandbyTaskAssignor implements StandbyTaskAssignor {
+private static final Logger log = 
LoggerFactory.getLogger(ClientTagAwareStandbyTaskAssignor.class);
+
+@Override
+public boolean assign(final Map clients,
+  final Set allTaskIds,
+  final Set statefulTaskIds,
+  final AssignorConfiguration.AssignmentConfigs 
configs) {
+final int numStandbyReplicas = configs.numStandbyReplicas;
+final Set rackAwareAssignmentTags = new 
HashSet<>(configs.rackAwareAssignmentTags);
+
+final Map tasksToRemainingStandbys = 
computeTasksToRemainingStandbys(
+numStandbyReplicas,
+allTaskIds
+);
+
+final Map> tagKeyToValues = new HashMap<>();
+final Map> tagEntryToClients = new HashMap<>();
+
+fillClientsTagStatistics(clients, tagEntryToClients, tagKeyToValues);
+
+for (final TaskId statefulTaskId : statefulTaskIds) {
+for (final Map.Entry entry : 
clients.entrySet()) {
+final UUID clientId = entry.getKey();
+final ClientState clientState = entry.getValue();
+
+if (clientState.activeTasks().contains(statefulTaskId)) {
+assignStandbyTasksForActiveTask(
+numStandbyReplicas,
+statefulTaskId,
+clientId,
+rackAwareAssignmentTags,
+clients,
+tasksToRemainingStandbys,
+tagKeyToValues,
+tagEntryToClients
+);
+}
+}
+}
+
+// returning false, because standby task assignment will never require 
a follow-up probing rebalance.
+return false;
+}
+
+@Override
+public boolean isAllowedTaskMovement(final ClientState source, final 
ClientState destination) {
+final Map sourceClientTags = source.clientTags();
+final Map destinationClientTags = 
destination.clientTags();
+
+for (final Entry sourceClientTagEntry : 
sourceClientTags.entrySet()) {
+if 
(!sourceClientTagEntry.getValue().equals(destinationClientTags.get(sourceClientTagEntry.getKey(
 {
+return false;
+}
+}
+
+return true;
+}
+
+private static void fillClientsTagStatistics(final Map 
clientStates,
+ final Map> tagEntryToClients,
+ final Map> tagKeyToValues) {
+for (final Entry clientStateEntry : 
clientStates.entrySet()) {
+final UUID clientId = clientStateEntry.getKey();
+final ClientState 

[GitHub] [kafka] vvcephei commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

2021-12-09 Thread GitBox


vvcephei commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r766152319



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -62,4 +71,32 @@ public static void updatePosition(
 position.withComponent(meta.topic(), meta.partition(), 
meta.offset());
 }
 }
+
+public static boolean isPermitted(
+final Position position,
+final PositionBound positionBound,
+final int partition
+) {
+if (positionBound.isUnbounded()) {

Review comment:
   Hmm, I think I finally see what you are talking about. Let me give it a 
shot...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

2021-12-09 Thread GitBox


guozhangwang commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r766163659



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -62,4 +71,32 @@ public static void updatePosition(
 position.withComponent(meta.topic(), meta.partition(), 
meta.offset());
 }
 }
+
+public static boolean isPermitted(
+final Position position,
+final PositionBound positionBound,
+final int partition
+) {
+if (positionBound.isUnbounded()) {
+return true;
+} else {
+final Position bound = positionBound.position();
+for (final String topic : bound.getTopics()) {
+final Map partitionBounds = 
bound.getBound(topic);
+final Map seenPartitionBounds = 
position.getBound(topic);

Review comment:
   qq: why we name this function of position `getBound` while it seems just 
retrieving the current positions of the topic, not really a bound?
   
   Also for this local variables, similarly, why name it `seenPartitionBounds` 
if the returned value semantics are not really "bounds"? Should that just be 
`currentOffsets` or something?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
##
@@ -107,10 +107,10 @@ private void putAndMaybeForward(final 
ThreadCache.DirtyEntry entry,
 // we can skip flushing to downstream as well as writing to 
underlying store
 if (rawNewValue != null || rawOldValue != null) {
 // we need to get the old values if needed, and then put to 
store, and then flush
-wrapped().put(entry.key(), entry.newValue());
-
 final ProcessorRecordContext current = context.recordContext();
 context.setRecordContext(entry.entry().context());
+wrapped().put(entry.key(), entry.newValue());

Review comment:
   Why reorder the steps here?

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1764,20 +1764,18 @@ protected void processStreamThread(final 
Consumer consumer) {
 );
 }
 final StateQueryResult result = new StateQueryResult<>();
+final Set handledPartitions = new HashSet<>();

Review comment:
   Why moving it out of the else block scope?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##
@@ -125,22 +128,81 @@
 final long deadline = start + DEFAULT_TIMEOUT;
 
 do {
+if (Thread.currentThread().isInterrupted()) {
+fail("Test was interrupted.");
+}
 final StateQueryResult result = kafkaStreams.query(request);
-if (result.getPartitionResults().keySet().containsAll(partitions)
-|| result.getGlobalResult() != null) {
+if (result.getPartitionResults().keySet().containsAll(partitions)) 
{
 return result;
 } else {
-try {
-Thread.sleep(100L);
-} catch (final InterruptedException e) {
-throw new RuntimeException(e);
-}
+sleep(100L);
 }
 } while (System.currentTimeMillis() < deadline);
 
 throw new TimeoutException("The query never returned the desired 
partitions");
 }
 
+/**
+ * Repeatedly runs the query until the response is valid and then return 
the response.
+ * 
+ * Validity in this case means that the response position is up to the 
specified bound.
+ * 
+ * Once position bounding is generally supported, we should migrate tests 
to wait on the
+ * expected response position.
+ */
+public static  StateQueryResult iqv2WaitForResult(
+final KafkaStreams kafkaStreams,
+final StateQueryRequest request) {
+
+final long start = System.currentTimeMillis();
+final long deadline = start + DEFAULT_TIMEOUT;
+
+StateQueryResult result;
+do {
+if (Thread.currentThread().isInterrupted()) {
+fail("Test was interrupted.");
+}
+
+result = kafkaStreams.query(request);
+final LinkedList> allResults = 
getAllResults(result);
+
+if (allResults.isEmpty()) {

Review comment:
   Just curious when would the result be empty actually? I thought even if 
the tasks were not initialized, we would still return the `NOT_PRESENT` error 
instead of returning empty.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
##
@@ -97,11 +97,15 @@ public void addResult(final int partition, final 
QueryResult r) {
  * prior observations.
  */
 public Position getPosition() {
-Posi

[jira] [Resolved] (KAFKA-13512) topicIdsToNames and topicNamesToIds allocate unnecessary maps

2021-12-09 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-13512.
--
Resolution: Fixed

> topicIdsToNames and topicNamesToIds allocate unnecessary maps
> -
>
> Key: KAFKA-13512
> URL: https://issues.apache.org/jira/browse/KAFKA-13512
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.1.0
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
> Fix For: 3.1.0
>
>
> Currently we write the methods as follows:
> {{def topicNamesToIds(): util.Map[String, Uuid] = {}}
> {{    new util.HashMap(metadataSnapshot.topicIds.asJava)}}
> {{}}}
> We do not need to allocate a new map however, we can simply use
> {{Collections.unmodifiableMap(metadataSnapshot.topicIds.asJava)}}
> We can do something similar for the topicIdsToNames implementation.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13500) Consider adding a dedicated standby consumer

2021-12-09 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456736#comment-17456736
 ] 

Guozhang Wang commented on KAFKA-13500:
---

[~mjsax] What I was wondering is that if we moved the restoration / standby to 
a separate thread, do we still need to separate restoration and standbys with 
separate consumers. My thinkings are that: 1) the primary issue was that the 
same thread were doing standby maintenance and regular processing, and if 
that's no longer the case, then keeping the state recovery and the standby 
maintenance with the same configs seems not causing big issues; 2) on separate 
threads, the monitoring would be clearer as well.

> Consider adding a dedicated standby consumer
> 
>
> Key: KAFKA-13500
> URL: https://issues.apache.org/jira/browse/KAFKA-13500
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>
> We currently use the restore consumer to recover state for active tasks and 
> to maintain standby tasks during regular processing. This setup has a few 
> disadvantages
>  # During state recovery, we might want to apply different consumer configs 
> compared to standby maintenance during regular processing.
>  # It make monitoring confusing: because we never commit offsets for 
> changelog topics, users can only monitor the client's "lag metric" to 
> observer restore progress (without the need to register a restore listener). 
> However, if they are interesting in a restore metric, during regular 
> processing it would report the standby lag, which can be rather confusing.
> Because the restore consumer does not use consumer group management, it seems 
> to be low overhead to actually use a third consumer, because there won't be 
> any heartbeat thread.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] guozhangwang commented on a change in pull request #11424: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2021-12-09 Thread GitBox


guozhangwang commented on a change in pull request #11424:
URL: https://github.com/apache/kafka/pull/11424#discussion_r766183746



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
##
@@ -128,6 +131,22 @@ public static Sensor activeBufferedRecordsSensor(final 
String threadId,
 return sensor;
 }
 
+public static Sensor totalBytesSensor(final String threadId,

Review comment:
   @ableegoldman ping :)
   
   @vamossagar12 I'm just back from a long vacation and is reloading all my 
review memory here :) is this ready for another review now besides this comment 
for naming? If yes could you rebase and ping me?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 opened a new pull request #11589: MINOR: update log and method name

2021-12-09 Thread GitBox


wcarlson5 opened a new pull request #11589:
URL: https://github.com/apache/kafka/pull/11589


   update an unclear log message and method name
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies have a blocking option

2021-12-09 Thread GitBox


wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r766186102



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##
@@ -108,6 +124,44 @@ private void unlock() {
 version.topologyLock.unlock();
 }
 
+public Collection sourceTopicsForTopology(final String name) {
+return builders.get(name).sourceTopicCollection();
+}
+
+public boolean needsUpdate(final String threadName) {
+return threadVersions.get(threadName) < topologyVersion();
+}
+
+public void registerThread(final String threadName) {
+threadVersions.put(threadName, 0L);
+}
+
+public void unregisterThread(final String threadName) {
+threadVersions.remove(threadName);
+}
+
+public void maybeNotifyTopologyVersionWaiters(final String threadName) {
+try {
+lock();
+final Iterator iterator = 
version.activeTopologyWaiters.listIterator();
+TopologyVersionWaiters topologyVersionWaiters;
+threadVersions.put(threadName, topologyVersion());
+while (iterator.hasNext()) {
+topologyVersionWaiters = iterator.next();
+final long topologyVersionWaitersVersion = 
topologyVersionWaiters.topologyVersion;
+if (topologyVersionWaitersVersion <= 
threadVersions.get(threadName)) {
+if (threadVersions.values().stream().allMatch(t -> t >= 
topologyVersionWaitersVersion)) {
+topologyVersionWaiters.future.complete(null);
+iterator.remove();
+log.info("Thread {} is now on topology version {}", 
threadName, topologyVersionWaiters.topologyVersion);

Review comment:
   Those are good points. I updated the log and method name in a follow up. 
I thought about extracting the version update to the caller but I would rather 
have the coupled so I don't have to worry about them getting out of sync.
   
   https://github.com/apache/kafka/pull/11589




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #11589: MINOR: update log and method name

2021-12-09 Thread GitBox


wcarlson5 commented on pull request #11589:
URL: https://github.com/apache/kafka/pull/11589#issuecomment-990318661


   @guozhangwang Some changes you requested in a follow up PR


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11242: [WIP] MINOR: POC for KIP-591: Add config to set default store impl class

2021-12-09 Thread GitBox


guozhangwang commented on pull request #11242:
URL: https://github.com/apache/kafka/pull/11242#issuecomment-990344744


   > Actually, the reason why we don't choose `enum` is because that only works 
for our built-in state stores, not for **custom state store**. So, with the 
`StoreImplementation` interface, user can implement their state stores their, 
and set as default stores if they want.
   
   I understand that, but I'm still having some reservations about this 
approach. Let me share it on the DISCUSS thread so that we can engage larger 
groups.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13528) KRaft RegisterBroker should validate that the cluster ID matches

2021-12-09 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-13528:


 Summary: KRaft RegisterBroker should validate that the cluster ID 
matches
 Key: KAFKA-13528
 URL: https://issues.apache.org/jira/browse/KAFKA-13528
 Project: Kafka
  Issue Type: Bug
Reporter: Colin McCabe






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] vvcephei commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

2021-12-09 Thread GitBox


vvcephei commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r766220034



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -62,4 +71,32 @@ public static void updatePosition(
 position.withComponent(meta.topic(), meta.partition(), 
meta.offset());
 }
 }
+
+public static boolean isPermitted(
+final Position position,
+final PositionBound positionBound,
+final int partition
+) {
+if (positionBound.isUnbounded()) {

Review comment:
   Ok, so the situation is a little different now than when I originally 
added a separate PositionBound class. At that point, we also needed a bound 
representing "latest" (i.e., that the query is executed on an active running 
task), for which there's no sensible Position representation.
   
   My mind was still half in that world last time we spoke about this point. 
Now, I can see that the only special position bound is the "unbounded" one, 
which is the same thing semantically as an empty position.
   
   I just tried out a change to completely get rid of the PositionBound class, 
but I think it makes the API more confusing. As a user, it seems more clear to 
create an "unbounded" PositionBound than an "empty" Position. I think it makes 
sense if you think about in terms of comparing vectors (an empty vector is 
"less than" all other vectors, so when it's used as a lower bound, it permits 
everything). But I don't want people to have to think that hard about it.
   
   Another option I considered is to add a `Position.unbounded()` factory, but 
that doesn't completely make sense either, since a Position is just a point in 
vector space. It doesn't bound anything by itself, though it can be used as a 
bound.
   
   Plus, I think query handling implementation, both for Streams and for custom 
user stores, is easier to keep track of if you have two types. You simply can't 
mix up which Position was supposed to be the bound.
   
   On balance, it still seems better to keep the separate PositionBound class. 
However, I did realize that the logic of PositionBound and the internal logic 
in Streams can be simplified with this observation that an unbounded position 
is equivalent to an empty position. I just added that commit to this PR.
   
   Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

2021-12-09 Thread GitBox


vvcephei commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r766221104



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1764,20 +1764,18 @@ protected void processStreamThread(final 
Consumer consumer) {
 );
 }
 final StateQueryResult result = new StateQueryResult<>();
+final Set handledPartitions = new HashSet<>();

Review comment:
   Oh, sorry about that. That was an artifact from a recent refactor. But 
now that I'm looking at it, I realize we don't need a separate set for tracking 
this, since we can use the result's partition set itself.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

2021-12-09 Thread GitBox


vvcephei commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r766221800



##
File path: 
streams/src/main/java/org/apache/kafka/streams/query/StateQueryResult.java
##
@@ -97,11 +97,15 @@ public void addResult(final int partition, final 
QueryResult r) {
  * prior observations.
  */
 public Position getPosition() {
-Position position = Position.emptyPosition();
-for (final QueryResult r : partitionResults.values()) {
-position = position.merge(r.getPosition());
+if (globalResult != null) {

Review comment:
   That's correct, but since the old version of this method was incorrect, 
I figured I'd go ahead and fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

2021-12-09 Thread GitBox


vvcephei commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r766223374



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
##
@@ -107,10 +107,10 @@ private void putAndMaybeForward(final 
ThreadCache.DirtyEntry entry,
 // we can skip flushing to downstream as well as writing to 
underlying store
 if (rawNewValue != null || rawOldValue != null) {
 // we need to get the old values if needed, and then put to 
store, and then flush
-wrapped().put(entry.key(), entry.newValue());
-
 final ProcessorRecordContext current = context.recordContext();
 context.setRecordContext(entry.entry().context());
+wrapped().put(entry.key(), entry.newValue());

Review comment:
   This is actually important. We were not previously setting the record 
context before passing the cache-evicted record down to the lower store layers. 
Previously, the context was incorrectly not set during that operation, and if 
stores relied on the record context (via the old ProcessorContext), they were 
getting the wrong metadata.
   
   Apparently, this work is the first time we added a feature in Streams that 
actually relied on that metadata. What is happening now is that we use that 
metadata to set the Position in the lower store, and if it's not set, then we 
get an error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11581: KAFKA-13522: add position tracking and bounding to IQv2

2021-12-09 Thread GitBox


vvcephei commented on a change in pull request #11581:
URL: https://github.com/apache/kafka/pull/11581#discussion_r766226822



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
##
@@ -125,22 +128,81 @@
 final long deadline = start + DEFAULT_TIMEOUT;
 
 do {
+if (Thread.currentThread().isInterrupted()) {
+fail("Test was interrupted.");
+}
 final StateQueryResult result = kafkaStreams.query(request);
-if (result.getPartitionResults().keySet().containsAll(partitions)
-|| result.getGlobalResult() != null) {
+if (result.getPartitionResults().keySet().containsAll(partitions)) 
{
 return result;
 } else {
-try {
-Thread.sleep(100L);
-} catch (final InterruptedException e) {
-throw new RuntimeException(e);
-}
+sleep(100L);
 }
 } while (System.currentTimeMillis() < deadline);
 
 throw new TimeoutException("The query never returned the desired 
partitions");
 }
 
+/**
+ * Repeatedly runs the query until the response is valid and then return 
the response.
+ * 
+ * Validity in this case means that the response position is up to the 
specified bound.
+ * 
+ * Once position bounding is generally supported, we should migrate tests 
to wait on the
+ * expected response position.
+ */
+public static  StateQueryResult iqv2WaitForResult(
+final KafkaStreams kafkaStreams,
+final StateQueryRequest request) {
+
+final long start = System.currentTimeMillis();
+final long deadline = start + DEFAULT_TIMEOUT;
+
+StateQueryResult result;
+do {
+if (Thread.currentThread().isInterrupted()) {
+fail("Test was interrupted.");
+}
+
+result = kafkaStreams.query(request);
+final LinkedList> allResults = 
getAllResults(result);
+
+if (allResults.isEmpty()) {

Review comment:
   You'll only get a `NOT_PRESENT` response if you specifically request a 
partition. The default is to just get all locally present partitions. This 
check is actually just an assumption that in the context of an integration 
test, if you call this method, you're probably expecting at least one result.
   
   It is good to note, though, that if a test is looking for results for a 
specific set of partitions, it should include that in the query.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StoreQueryUtils.java
##
@@ -62,4 +71,32 @@ public static void updatePosition(
 position.withComponent(meta.topic(), meta.partition(), 
meta.offset());
 }
 }
+
+public static boolean isPermitted(
+final Position position,
+final PositionBound positionBound,
+final int partition
+) {
+if (positionBound.isUnbounded()) {
+return true;
+} else {
+final Position bound = positionBound.position();
+for (final String topic : bound.getTopics()) {
+final Map partitionBounds = 
bound.getBound(topic);
+final Map seenPartitionBounds = 
position.getBound(topic);

Review comment:
   Woah, you're absolutely right. I'll fix it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #11590: HOTFIX: fix failing StreamsMetadataStateTest tests

2021-12-09 Thread GitBox


ableegoldman opened a new pull request #11590:
URL: https://github.com/apache/kafka/pull/11590


   Followup to 
[#11562](https://github.com/apache/kafka/commit/e20f102298cc3e056e079cbd1cb33913a9ade0ce#
   ) to fix tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #11590: HOTFIX: fix failing StreamsMetadataStateTest tests

2021-12-09 Thread GitBox


ableegoldman merged pull request #11590:
URL: https://github.com/apache/kafka/pull/11590


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13388) Kafka Producer nodes stuck in CHECKING_API_VERSIONS

2021-12-09 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456854#comment-17456854
 ] 

David Mao commented on KAFKA-13388:
---

[~dhofftgt] 

Looking at where the NetworkClient enters the CHECKING_API_VERSIONS state, we 
see:

 
{code:java}
if (discoverBrokerVersions) {
 this.connectionStates.checkingApiVersions(node); 
 nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
 {code}
which is a separate queue for nodes needing to send the api versions request.

Then in 

 
{code:java}
private void handleInitiateApiVersionRequests(long now) {
Iterator> iter = 
nodesNeedingApiVersionsFetch.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = iter.next();
String node = entry.getKey();
if (selector.isChannelReady(node) && 
inFlightRequests.canSendMore(node)) {
log.debug("Initiating API versions fetch from node {}.", node);
ApiVersionsRequest.Builder apiVersionRequestBuilder = 
entry.getValue();
ClientRequest clientRequest = newClientRequest(node, 
apiVersionRequestBuilder, now, true);
doSend(clientRequest, true, now);
iter.remove();
}{code}
we only send out the api versions request if the channel is ready (TLS 
handshake complete, SASL handshake complete).

This is actually a pretty insidious bug because I think we end up in a state 
where we do not apply any request timeout to the channel, since the inflight 
requests are empty.

> Kafka Producer nodes stuck in CHECKING_API_VERSIONS
> ---
>
> Key: KAFKA-13388
> URL: https://issues.apache.org/jira/browse/KAFKA-13388
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: David Hoffman
>Priority: Minor
> Attachments: Screen Shot 2021-10-25 at 10.28.48 AM.png, 
> image-2021-10-21-13-42-06-528.png
>
>
> I have been seeing expired batch errors in my app.
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for 
> xxx-17:120002 ms has passed since batch creation
> {code}
>  I would have assumed a request timout or connection timeout should have also 
> been logged. I could not find any other associated errors. 
> I added some instrumenting to my app and have traced this down to broker 
> connections hanging in CHECKING_API_VERSIONS state. -It appears there is no 
> effective timeout for Kafka Producer broker connections in 
> CHECKING_API_VERSIONS state.-
> In the code see the after the NetworkClient connects to a broker node it 
> makes a request to check api versions, when it receives the response it marks 
> the node as ready. -I am seeing that sometimes a reply is not received for 
> the check api versions request the connection just hangs in 
> CHECKING_API_VERSIONS state until it is disposed I assume after the idle 
> connection timeout.-
> Update: not actually sure what causes the connection to get stuck in 
> CHECKING_API_VERSIONS.
> -I am guessing the connection setup timeout should be still in play for this, 
> but it is not.- 
>  -There is a connectingNodes set that is consulted when checking timeouts and 
> the node is removed- 
>  -when ClusterConnectionStates.checkingApiVersions(String id) is called to 
> transition the node into CHECKING_API_VERSIONS-



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13388) Kafka Producer nodes stuck in CHECKING_API_VERSIONS

2021-12-09 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456854#comment-17456854
 ] 

David Mao edited comment on KAFKA-13388 at 12/10/21, 1:56 AM:
--

[~dhofftgt] 

Looking at where the NetworkClient enters the CHECKING_API_VERSIONS state, we 
see:

 
{code:java}
if (discoverBrokerVersions) {
 this.connectionStates.checkingApiVersions(node); 
 nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
 {code}
which is a separate queue for nodes needing to send the api versions request.

Then in 

 
{code:java}
private void handleInitiateApiVersionRequests(long now) {
Iterator> iter = 
nodesNeedingApiVersionsFetch.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = iter.next();
String node = entry.getKey();
if (selector.isChannelReady(node) && 
inFlightRequests.canSendMore(node)) {
log.debug("Initiating API versions fetch from node {}.", node);
ApiVersionsRequest.Builder apiVersionRequestBuilder = 
entry.getValue();
ClientRequest clientRequest = newClientRequest(node, 
apiVersionRequestBuilder, now, true);
doSend(clientRequest, true, now);
iter.remove();
}{code}
we only send out the api versions request if the channel is ready (TLS 
handshake complete, SASL handshake complete).

This is actually a pretty insidious bug because I think we end up in a state 
where we do not apply any request timeout to the channel if there is some 
problem completing any of the handshaking/authentication steps, since the 
inflight requests are empty.


was (Author: david.mao):
[~dhofftgt] 

Looking at where the NetworkClient enters the CHECKING_API_VERSIONS state, we 
see:

 
{code:java}
if (discoverBrokerVersions) {
 this.connectionStates.checkingApiVersions(node); 
 nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
 {code}
which is a separate queue for nodes needing to send the api versions request.

Then in 

 
{code:java}
private void handleInitiateApiVersionRequests(long now) {
Iterator> iter = 
nodesNeedingApiVersionsFetch.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = iter.next();
String node = entry.getKey();
if (selector.isChannelReady(node) && 
inFlightRequests.canSendMore(node)) {
log.debug("Initiating API versions fetch from node {}.", node);
ApiVersionsRequest.Builder apiVersionRequestBuilder = 
entry.getValue();
ClientRequest clientRequest = newClientRequest(node, 
apiVersionRequestBuilder, now, true);
doSend(clientRequest, true, now);
iter.remove();
}{code}
we only send out the api versions request if the channel is ready (TLS 
handshake complete, SASL handshake complete).

This is actually a pretty insidious bug because I think we end up in a state 
where we do not apply any request timeout to the channel, since the inflight 
requests are empty.

> Kafka Producer nodes stuck in CHECKING_API_VERSIONS
> ---
>
> Key: KAFKA-13388
> URL: https://issues.apache.org/jira/browse/KAFKA-13388
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: David Hoffman
>Priority: Minor
> Attachments: Screen Shot 2021-10-25 at 10.28.48 AM.png, 
> image-2021-10-21-13-42-06-528.png
>
>
> I have been seeing expired batch errors in my app.
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for 
> xxx-17:120002 ms has passed since batch creation
> {code}
>  I would have assumed a request timout or connection timeout should have also 
> been logged. I could not find any other associated errors. 
> I added some instrumenting to my app and have traced this down to broker 
> connections hanging in CHECKING_API_VERSIONS state. -It appears there is no 
> effective timeout for Kafka Producer broker connections in 
> CHECKING_API_VERSIONS state.-
> In the code see the after the NetworkClient connects to a broker node it 
> makes a request to check api versions, when it receives the response it marks 
> the node as ready. -I am seeing that sometimes a reply is not received for 
> the check api versions request the connection just hangs in 
> CHECKING_API_VERSIONS state until it is disposed I assume after the idle 
> connection timeout.-
> Update: not actually sure what causes the connection to get stuck in 
> CHECKING_API_VERSIONS.
> -I am guessing the connection setup timeout should be still in play for this, 
> but it is not.- 
>  -There is a connectingNodes set that is consulted when checking timeouts and 
> the node is removed- 
>  -when ClusterConnectionStates.checkingApiVersions(String id) is called to 
> transition the node into CHECKING_

[jira] [Comment Edited] (KAFKA-13388) Kafka Producer nodes stuck in CHECKING_API_VERSIONS

2021-12-09 Thread David Mao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456854#comment-17456854
 ] 

David Mao edited comment on KAFKA-13388 at 12/10/21, 2:10 AM:
--

[~dhofftgt] 

Looking at where the NetworkClient enters the CHECKING_API_VERSIONS state, we 
see:

 
{code:java}
if (discoverBrokerVersions) {
 this.connectionStates.checkingApiVersions(node); 
 nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
 {code}
which is a separate queue for nodes needing to send the api versions request.

Then in 

 
{code:java}
private void handleInitiateApiVersionRequests(long now) {
Iterator> iter = 
nodesNeedingApiVersionsFetch.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = iter.next();
String node = entry.getKey();
if (selector.isChannelReady(node) && 
inFlightRequests.canSendMore(node)) {
log.debug("Initiating API versions fetch from node {}.", node);
ApiVersionsRequest.Builder apiVersionRequestBuilder = 
entry.getValue();
ClientRequest clientRequest = newClientRequest(node, 
apiVersionRequestBuilder, now, true);
doSend(clientRequest, true, now);
iter.remove();
}{code}
we only send out the api versions request if the channel is ready (TLS 
handshake complete, SASL handshake complete).

This is actually a pretty insidious bug because I think we end up in a state 
where we do not apply any request timeout to the channel if there is some delay 
in completing any of the handshaking/authentication steps, since the inflight 
requests are empty.


was (Author: david.mao):
[~dhofftgt] 

Looking at where the NetworkClient enters the CHECKING_API_VERSIONS state, we 
see:

 
{code:java}
if (discoverBrokerVersions) {
 this.connectionStates.checkingApiVersions(node); 
 nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder());
 {code}
which is a separate queue for nodes needing to send the api versions request.

Then in 

 
{code:java}
private void handleInitiateApiVersionRequests(long now) {
Iterator> iter = 
nodesNeedingApiVersionsFetch.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry entry = iter.next();
String node = entry.getKey();
if (selector.isChannelReady(node) && 
inFlightRequests.canSendMore(node)) {
log.debug("Initiating API versions fetch from node {}.", node);
ApiVersionsRequest.Builder apiVersionRequestBuilder = 
entry.getValue();
ClientRequest clientRequest = newClientRequest(node, 
apiVersionRequestBuilder, now, true);
doSend(clientRequest, true, now);
iter.remove();
}{code}
we only send out the api versions request if the channel is ready (TLS 
handshake complete, SASL handshake complete).

This is actually a pretty insidious bug because I think we end up in a state 
where we do not apply any request timeout to the channel if there is some 
problem completing any of the handshaking/authentication steps, since the 
inflight requests are empty.

> Kafka Producer nodes stuck in CHECKING_API_VERSIONS
> ---
>
> Key: KAFKA-13388
> URL: https://issues.apache.org/jira/browse/KAFKA-13388
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: David Hoffman
>Priority: Minor
> Attachments: Screen Shot 2021-10-25 at 10.28.48 AM.png, 
> image-2021-10-21-13-42-06-528.png
>
>
> I have been seeing expired batch errors in my app.
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for 
> xxx-17:120002 ms has passed since batch creation
> {code}
>  I would have assumed a request timout or connection timeout should have also 
> been logged. I could not find any other associated errors. 
> I added some instrumenting to my app and have traced this down to broker 
> connections hanging in CHECKING_API_VERSIONS state. -It appears there is no 
> effective timeout for Kafka Producer broker connections in 
> CHECKING_API_VERSIONS state.-
> In the code see the after the NetworkClient connects to a broker node it 
> makes a request to check api versions, when it receives the response it marks 
> the node as ready. -I am seeing that sometimes a reply is not received for 
> the check api versions request the connection just hangs in 
> CHECKING_API_VERSIONS state until it is disposed I assume after the idle 
> connection timeout.-
> Update: not actually sure what causes the connection to get stuck in 
> CHECKING_API_VERSIONS.
> -I am guessing the connection setup timeout should be still in play for this, 
> but it is not.- 
>  -There is a connectingNodes set that is consulted when checking timeouts and 
> the node is removed- 
>  -when ClusterConnectionStates

[jira] [Created] (KAFKA-13529) kafka mm2 consumer configurattion invalid

2021-12-09 Thread chengang (Jira)
chengang created KAFKA-13529:


 Summary: kafka mm2 consumer configurattion invalid
 Key: KAFKA-13529
 URL: https://issues.apache.org/jira/browse/KAFKA-13529
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.0.0, 2.8.0
Reporter: chengang


set  auto.offset.reset = latest

but  MirrorMaker2 seeks source topic to offset 0



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13529) kafka mm2 consumer configurattion invalid

2021-12-09 Thread chengang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

chengang updated KAFKA-13529:
-
Labels: mirror-maker  (was: )

> kafka mm2 consumer configurattion invalid
> -
>
> Key: KAFKA-13529
> URL: https://issues.apache.org/jira/browse/KAFKA-13529
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0, 3.0.0
>Reporter: chengang
>Priority: Major
>  Labels: mirror-maker
>
> set  auto.offset.reset = latest
> but  MirrorMaker2 seeks source topic to offset 0



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13388) Kafka Producer nodes stuck in CHECKING_API_VERSIONS

2021-12-09 Thread Lucas Bradstreet (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456888#comment-17456888
 ] 

Lucas Bradstreet commented on KAFKA-13388:
--

I'm pretty sure [~david.mao] is correct. If we ever get into 
CHECKING_API_VERSIONS state we won't apply the connection timeouts or request 
timeouts. We really need to be applying a timeout to this state. I have a 
reproducer where we drop API_VERSIONS at random and can see that our normal 
timeouts don't apply. I think this is why the client gets all the way to 
expiring the batches without a corresponding disconnect.

> Kafka Producer nodes stuck in CHECKING_API_VERSIONS
> ---
>
> Key: KAFKA-13388
> URL: https://issues.apache.org/jira/browse/KAFKA-13388
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: David Hoffman
>Priority: Minor
> Attachments: Screen Shot 2021-10-25 at 10.28.48 AM.png, 
> image-2021-10-21-13-42-06-528.png
>
>
> I have been seeing expired batch errors in my app.
> {code:java}
> org.apache.kafka.common.errors.TimeoutException: Expiring 51 record(s) for 
> xxx-17:120002 ms has passed since batch creation
> {code}
>  I would have assumed a request timout or connection timeout should have also 
> been logged. I could not find any other associated errors. 
> I added some instrumenting to my app and have traced this down to broker 
> connections hanging in CHECKING_API_VERSIONS state. -It appears there is no 
> effective timeout for Kafka Producer broker connections in 
> CHECKING_API_VERSIONS state.-
> In the code see the after the NetworkClient connects to a broker node it 
> makes a request to check api versions, when it receives the response it marks 
> the node as ready. -I am seeing that sometimes a reply is not received for 
> the check api versions request the connection just hangs in 
> CHECKING_API_VERSIONS state until it is disposed I assume after the idle 
> connection timeout.-
> Update: not actually sure what causes the connection to get stuck in 
> CHECKING_API_VERSIONS.
> -I am guessing the connection setup timeout should be still in play for this, 
> but it is not.- 
>  -There is a connectingNodes set that is consulted when checking timeouts and 
> the node is removed- 
>  -when ClusterConnectionStates.checkingApiVersions(String id) is called to 
> transition the node into CHECKING_API_VERSIONS-



--
This message was sent by Atlassian Jira
(v8.20.1#820001)