[GitHub] [kafka] showuon commented on pull request #11493: KAFKA-12959: Prioritise assigning standby tasks to threads without any active tasks

2022-02-06 Thread GitBox


showuon commented on pull request #11493:
URL: https://github.com/apache/kafka/pull/11493#issuecomment-1030773151


   @tim-patterson , thanks for the PR. I'll review the PR next week. Thanks for 
the improvement.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2022-02-06 Thread GitBox


showuon commented on pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#issuecomment-1030773435


   @lkokhreidze , thanks for the PR. I'll take a look next week. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11631: KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode

2022-02-06 Thread GitBox


showuon commented on pull request #11631:
URL: https://github.com/apache/kafka/pull/11631#issuecomment-1030826032


   @guozhangwang , this PR is good for review now. I've fixed broken tests and 
added tests. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11681: KAFKA-8785: fix request timeout by waiting for brokers up at the begining

2022-02-06 Thread GitBox


showuon commented on pull request #11681:
URL: https://github.com/apache/kafka/pull/11681#issuecomment-1030829409


   @dajac , it doesn't work to put the admin client creating after 
`cluster.waitForReadyBrokers()`, because the waitForReadyBrokers only wait for 
all brokers registered and unfenced (ref: 
[here](https://github.com/apache/kafka/blob/trunk/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java#L375)),
 but the metadata cache in broker might have not updated the cluster metadata. 
I think we need to explicitly wait for all brokers up via `describeCluster` and 
then create adminClient for testing.
   
   WDYT?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #11586: KAFKA-13516: Connection level metrics are not closed

2022-02-06 Thread GitBox


splett2 commented on a change in pull request #11586:
URL: https://github.com/apache/kafka/pull/11586#discussion_r800194330



##
File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java
##
@@ -1144,7 +1149,11 @@ public void close() {
 public SelectorMetrics(Metrics metrics, String metricGrpPrefix, 
Map metricTags, boolean metricsPerConnection) {
 this.metrics = metrics;
 this.metricTags = metricTags;
-this.metricsPerConnection = metricsPerConnection;
+if (metricsPerConnection) {
+this.connectionMetrics = new ConcurrentHashMap<>();

Review comment:
   Agreed. The Selector is advertised not to be thread safe.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #11376: KAFKA-13342: LeaderAndIsrRequest should not be sent for topic queued for deletion

2022-02-06 Thread GitBox


lbradstreet commented on a change in pull request #11376:
URL: https://github.com/apache/kafka/pull/11376#discussion_r800202592



##
File path: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
##
@@ -204,13 +204,17 @@ class ZkReplicaStateMachine(config: KafkaConfig,
 
controllerContext.updatePartitionFullReplicaAssignment(partition, newAssignment)
   }
 case _ =>
-  controllerContext.partitionLeadershipInfo(partition) match {
-case Some(leaderIsrAndControllerEpoch) =>
-  
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
-replica.topicPartition,
-leaderIsrAndControllerEpoch,
-
controllerContext.partitionFullReplicaAssignment(partition), isNew = false)
-case None =>
+  if 
(!controllerContext.isTopicQueuedUpForDeletion(partition.topic)) {
+controllerContext.partitionLeadershipInfo(partition) match {
+  case Some(leaderIsrAndControllerEpoch) =>
+
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
+  replica.topicPartition,
+  leaderIsrAndControllerEpoch,
+  
controllerContext.partitionFullReplicaAssignment(partition), isNew = false)
+  case None =>
+}
+  } else {
+info(s"OnlineReplica is in deleting state $partition. Not 
adding it to the LeaderAndIsrRequest")

Review comment:
   Good point.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #11376: KAFKA-13342: LeaderAndIsrRequest should not be sent for topic queued for deletion

2022-02-06 Thread GitBox


lbradstreet commented on a change in pull request #11376:
URL: https://github.com/apache/kafka/pull/11376#discussion_r800202601



##
File path: 
core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
##
@@ -394,6 +394,23 @@ class ReplicaStateMachineTest {
   }
 
   @Test
+  def testInvalidOnlineReplicaTransitionWithQueuedDeletion(): Unit = {
+controllerContext.putReplicaState(replica, ReplicaDeletionIneligible)
+controllerContext.updatePartitionFullReplicaAssignment(partition, 
ReplicaAssignment(Seq(brokerId)))
+val leaderIsrAndControllerEpoch = 
LeaderIsrAndControllerEpoch(LeaderAndIsr(brokerId, List(brokerId)), 
controllerEpoch)
+controllerContext.putPartitionLeadershipInfo(partition, 
leaderIsrAndControllerEpoch)
+EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
+
EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
+EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
+controllerContext.queueTopicDeletion(Set(replica.topic))
+controllerContext.putReplicaState(replica, OnlineReplica)
+// we don't expect addLeaderAndIsrRequestForBrokers to be called for this 
partition
+replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
+EasyMock.verify(mockZkClient, mockControllerBrokerRequestBatch)
+assertEquals(OnlineReplica, replicaState(replica))

Review comment:
   Makes sense, well noticed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on pull request #11376: KAFKA-13342: LeaderAndIsrRequest should not be sent for topic queued for deletion

2022-02-06 Thread GitBox


lbradstreet commented on pull request #11376:
URL: https://github.com/apache/kafka/pull/11376#issuecomment-1030866772


   > Are you planning to address the comments @lbradstreet ?
   
   @ijuma thanks for the reminder.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13641) Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters

2022-02-06 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17487750#comment-17487750
 ] 

Matthias J. Sax commented on KAFKA-13641:
-

{quote}Is the plan to have type driven semantics?
{quote}
What is this?
{quote}with better type semantics [...] line between deterministic behavior and 
non-deterministic behavior
{quote}
What is the particular issue you see with types? And how do types related to 
determinism (seems to be two independent concerns)?

> Kafka Streams Scala: Add `Option` to `ValueJoiner` parameters
> -
>
> Key: KAFKA-13641
> URL: https://issues.apache.org/jira/browse/KAFKA-13641
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Mohammad Yousuf Minhaj Zia
>Priority: Minor
>
> Since `ValueJoiner` right parameter in `leftJoins`, `outerJoins` can be 
> nullable, I am wondering if can wrap them around Scala `Option`.
> However, there is also the concern that the left hand side value can be null 
> in the case of tombstone messages, in which the `Option` semantics can be 
> misleading. I still feel this could be a useful feature in reducing the 
> number of `NullPointerExceptions`.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] mjsax commented on pull request #10897: MINOR: Reduced severity for "skipping records" falling out of time windows

2022-02-06 Thread GitBox


mjsax commented on pull request #10897:
URL: https://github.com/apache/kafka/pull/10897#issuecomment-1030907208


   @xdgrulez -- if you are still interested in working on this, it would be 
great. The PR would needs to be rebased/updated though (or just close this one 
an open a new PR?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #11736: MINOR: Remove Crc32.java

2022-02-06 Thread GitBox


ijuma opened a new pull request #11736:
URL: https://github.com/apache/kafka/pull/11736


   We only use it in the legacy record formats (V0 and V1) and the CRC32
   implementation in the standard library has received various performance
   improvements over the years
   (https://bugs.openjdk.java.net/browse/JDK-8245512 is a recent example).
   
   Also worth noting that record formats V0 and V1 have been deprecated
   since Apache Kafka 3.0.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0

2022-02-06 Thread GitBox


mjsax commented on pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#issuecomment-1030907820


   @showuon -- should we close this PR for now?
   
   We have actually collected a few tickets that are not easy to fix without a 
DSL re-design. I would propose to add a new Jira label to tag all relevant 
tickets and we hold off working on them for now. When we do a DSL re-design we 
can cycle back?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11721: KAFKA-13629: Use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-06 Thread GitBox


ijuma commented on pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#issuecomment-1030919300


   I updated the PR description to include the JMH results I got with JDK 
17.0.2 and compiler blackhole mode.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #11721: KAFKA-13629: Use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-06 Thread GitBox


ijuma merged pull request #11721:
URL: https://github.com/apache/kafka/pull/11721


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11631: KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode

2022-02-06 Thread GitBox


guozhangwang commented on a change in pull request #11631:
URL: https://github.com/apache/kafka/pull/11631#discussion_r800245635



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -488,7 +492,7 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) {
 // Always update the heartbeat last poll time so that the 
heartbeat thread does not leave the
 // group proactively due to application inactivity even if (say) 
the coordinator cannot be found.
 pollHeartbeat(timer.currentTimeMs());
-if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
+if (coordinatorUnknownAndUnready(timer)) {

Review comment:
   Makes sense.

##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
##
@@ -2641,14 +2565,16 @@ private FetchResponse fetchResponse(TopicPartition 
partition, long fetchOffset,
 ConsumerNetworkClient consumerClient = new 
ConsumerNetworkClient(loggerFactory, client, metadata, time,
 retryBackoffMs, requestTimeoutMs, heartbeatIntervalMs);
 
-GroupRebalanceConfig rebalanceConfig = new 
GroupRebalanceConfig(sessionTimeoutMs,
+ConsumerCoordinator consumerCoordinator = null;
+if (groupId != null) {
+GroupRebalanceConfig rebalanceConfig = new 
GroupRebalanceConfig(sessionTimeoutMs,

Review comment:
   Nice cleanup!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #11631: KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode

2022-02-06 Thread GitBox


guozhangwang merged pull request #11631:
URL: https://github.com/apache/kafka/pull/11631


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11631: KAFKA-13563: clear FindCoordinatorFuture for non consumer group mode

2022-02-06 Thread GitBox


guozhangwang commented on pull request #11631:
URL: https://github.com/apache/kafka/pull/11631#issuecomment-1030935640


   Merged to trunk, thanks @showuon ! Also cherry-picked to 3.1.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13563) FindCoordinatorFuture never get cleared in non-group mode( consumer#assign)

2022-02-06 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13563.
---
Fix Version/s: 3.2.0
   3.1.1
   Resolution: Fixed

> FindCoordinatorFuture never get cleared in non-group mode( consumer#assign)
> ---
>
> Key: KAFKA-13563
> URL: https://issues.apache.org/jira/browse/KAFKA-13563
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.7.1, 3.0.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.2.0, 3.1.1
>
> Attachments: kafka.zip
>
>
> In KAFKA-10793, we fix the race condition when lookup coordinator by clearing 
> the _findCoordinatorFuture_ when handling the result, rather than in the 
> listener callbacks. It works well under consumer group mode (i.e. 
> Consumer#subscribe), but we found when user is using non consumer group mode 
> (i.e. Consumer#assign) with group id provided (for offset commitment, so that 
> there will be consumerCoordinator created), the _findCoordinatorFuture_ will 
> never be cleared in some situations, and cause the offset committing keeps 
> getting NOT_COORDINATOR error.
>  
> After KAFKA-10793, we clear the _findCoordinatorFuture_ in 2 places:
>  # heartbeat thread
>  # AbstractCoordinator#ensureCoordinatorReady
> But in non consumer group mode with group id provided, there will be no 
> (1)heartbeat thread , and it only call 
> (2)AbstractCoordinator#ensureCoordinatorReady when 1st time consumer wants to 
> fetch committed offset position. That is, after 2nd lookupCoordinator call, 
> we have no chance to clear the _findCoordinatorFuture_ .
>  
> To avoid the race condition as KAFKA-10793 mentioned, it's not safe to clear 
> the _findCoordinatorFuture_ in the future listener. So, I think we can fix 
> this issue by calling AbstractCoordinator#ensureCoordinatorReady when 
> coordinator unknown in non consumer group case, under each Consumer#poll.
>  
> Reproduce steps:
>  
> 1. Start a 3 Broker cluster with a Topic having Replicas=3.
> 2. Start a Client with Producer and Consumer (with Consumer#assign(), not 
> subscribe, and provide a group id) communicating over the Topic.
> 3. Stop the Broker that is acting as the Group Coordinator.
> 4. Observe successful Rediscovery of new Group Coordinator.
> 5. Restart the stopped Broker.
> 6. Stop the Broker that became the new Group Coordinator at step 4.
> 7. Observe "Rediscovery will be attempted" message but no "Discovered group 
> coordinator" message.
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon closed pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0

2022-02-06 Thread GitBox


showuon closed pull request #11124:
URL: https://github.com/apache/kafka/pull/11124


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0

2022-02-06 Thread GitBox


showuon commented on pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#issuecomment-1030946835


   Make sense to me. Close this PR and cycle it back after DSL redesign. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11238: MINOR: Fix force kill of KRaft colocated controllers in system tests

2022-02-06 Thread GitBox


jsancio commented on a change in pull request #11238:
URL: https://github.com/apache/kafka/pull/11238#discussion_r800260558



##
File path: tests/kafkatest/services/kafka/kafka.py
##
@@ -858,12 +859,27 @@ def signal_leader(self, topic, partition=0, 
sig=signal.SIGTERM):
 leader = self.leader(topic, partition)
 self.signal_node(leader, sig)
 
+def controllers_required_for_quorum(self):
+"""
+Assume N = the total number of controller nodes in the cluster, and 
positive
+For N=1, we need 1 controller to be running to have a quorum
+For N=2, we need 2 controllers
+For N=3, we need 2 controllers
+For N=4, we need 3 controllers
+For N=5, we need 3 controllers
+
+:return: the number of controller nodes that must be started for there 
to be a quorum
+"""
+# Note that we use math.ceil() to avoid floating point rounding issues
+# associated with round() (e.g. round(5/2) yields 2 instead of 3)

Review comment:
   I would remove this comment. The code after this PR doesn't use `round`.

##
File path: tests/kafkatest/services/kafka/kafka.py
##
@@ -858,12 +859,27 @@ def signal_leader(self, topic, partition=0, 
sig=signal.SIGTERM):
 leader = self.leader(topic, partition)
 self.signal_node(leader, sig)
 
+def controllers_required_for_quorum(self):
+"""
+Assume N = the total number of controller nodes in the cluster, and 
positive
+For N=1, we need 1 controller to be running to have a quorum
+For N=2, we need 2 controllers
+For N=3, we need 2 controllers
+For N=4, we need 3 controllers
+For N=5, we need 3 controllers
+
+:return: the number of controller nodes that must be started for there 
to be a quorum
+"""
+# Note that we use math.ceil() to avoid floating point rounding issues
+# associated with round() (e.g. round(5/2) yields 2 instead of 3)
+return math.ceil((1 + self.num_nodes_controller_role) / 2)
+
 def stop_node(self, node, clean_shutdown=True, timeout_sec=60):

Review comment:
   It is unfortunate that we need to add code that ignores and overrides 
the `clean_shutdown` flag. Do you know why system tests are attempting to clean 
shutdown when there is no controller quorum? For example, should the test be 
fixed to instead be configured to have enough controllers to allow clean 
shutdown?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #10525: KAFKA-7572: Producer should not send requests with negative partition id

2022-02-06 Thread GitBox


guozhangwang commented on pull request #10525:
URL: https://github.com/apache/kafka/pull/10525#issuecomment-1030974945


   Re-trigger jenkins.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10525: KAFKA-7572: Producer should not send requests with negative partition id

2022-02-06 Thread GitBox


guozhangwang commented on a change in pull request #10525:
URL: https://github.com/apache/kafka/pull/10525#discussion_r800263685



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -1305,10 +1305,16 @@ private ClusterResourceListeners 
configureClusterResourceListeners(Serializer
  */
 private int partition(ProducerRecord record, byte[] serializedKey, 
byte[] serializedValue, Cluster cluster) {
 Integer partition = record.partition();
-return partition != null ?
-partition :
-partitioner.partition(
-record.topic(), record.key(), serializedKey, 
record.value(), serializedValue, cluster);
+if (partition != null) {
+return partition;
+}
+
+int customPartition = partitioner.partition(
+record.topic(), record.key(), serializedKey, record.value(), 
serializedValue, cluster);
+if (customPartition < 0) {
+throw new IllegalArgumentException(String.format("Invalid 
partition: %d. Partition number should always be non-negative.", 
customPartition));

Review comment:
   Could you make the error message to be more specific? E.g. "Partitioner 
generated invalid partition: %d.." to differentiate that a partitioner is used.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #11159: MINOR: Fix logging in ClusterControlManager

2022-02-06 Thread GitBox


jsancio commented on pull request #11159:
URL: https://github.com/apache/kafka/pull/11159#issuecomment-1030979984


   > @jsancio are you going to merge this?
   
   Yes. I can merge it. @dielhennr can you resolve the conflicts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #11159: MINOR: Fix logging in ClusterControlManager

2022-02-06 Thread GitBox


jsancio commented on pull request #11159:
URL: https://github.com/apache/kafka/pull/11159#issuecomment-1030981141


   This was already fixed in trunk. Closing the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio closed pull request #11159: MINOR: Fix logging in ClusterControlManager

2022-02-06 Thread GitBox


jsancio closed pull request #11159:
URL: https://github.com/apache/kafka/pull/11159


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11209: KAFKA-12465: Logic about inconsistent cluster id

2022-02-06 Thread GitBox


jsancio commented on a change in pull request #11209:
URL: https://github.com/apache/kafka/pull/11209#discussion_r800268813



##
File path: 
raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
##
@@ -1535,6 +1535,46 @@ public void 
testFetchSnapshotRequestClusterIdValidation() throws Exception {
 
context.assertSentFetchSnapshotResponse(Errors.INCONSISTENT_CLUSTER_ID);
 }
 
+@Test
+public void testInconsistentClusterIdInFetchSnapshotResponse() throws 
Exception {
+int localId = 0;
+int leaderId = localId + 1;
+Set voters = Utils.mkSet(localId, leaderId);
+int epoch = 2;
+OffsetAndEpoch snapshotId = new OffsetAndEpoch(100L, 1);
+
+RaftClientTestContext context = new 
RaftClientTestContext.Builder(localId, voters)
+.withElectedLeader(epoch, leaderId)
+.build();
+
+// Send a request
+context.pollUntilRequest();
+RaftRequest.Outbound fetchRequest = context.assertSentFetchRequest();
+
+// Firstly receive a response with a valid cluster id
+context.deliverResponse(
+fetchRequest.correlationId,
+fetchRequest.destinationId(),
+snapshotFetchResponse(context.metadataPartition, 
context.metadataTopicId, epoch, leaderId, snapshotId, 200L)
+);
+
+// Send fetch snapshot request
+context.pollUntilRequest();
+RaftRequest.Outbound snapshotRequest = 
context.assertSentFetchSnapshotRequest();
+
+// Secondly receive a response with an inconsistent cluster id
+context.deliverResponse(
+snapshotRequest.correlationId,
+snapshotRequest.destinationId(),
+new 
FetchSnapshotResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code())
+);
+
+// Inconsistent cluster id are not fatal if a previous response 
contained a valid cluster id
+assertDoesNotThrow(context.client::poll);
+
+// It's impossible to receive a be begin quorum response before any 
other request so we don't test

Review comment:
   I am trying to understand this comment. Can you please explain why this 
is true? And why do you think that this comment is important in this test?
   
   This comment applies to a few places.

##
File path: raft/src/main/java/org/apache/kafka/raft/RaftUtil.java
##
@@ -159,4 +161,18 @@ static boolean 
hasValidTopicPartition(DescribeQuorumRequestData data, TopicParti
data.topics().get(0).partitions().size() == 1 &&
data.topics().get(0).partitions().get(0).partitionIndex() 
== topicPartition.partition();
 }
+
+static boolean hasValidTopicPartition(FetchSnapshotRequestData data, 
TopicPartition topicPartition) {

Review comment:
   How about changing this to return an `Errors`?
   1. `INVALID_REQUEST` if there is more than one topic partition
   2. `UNKNOWN_TOPIC_OR_PARTITION` if the topic partition doesn't match the 
log's name and partition
   3. `NONE` otherwise




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`

2022-02-06 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17487820#comment-17487820
 ] 

Eugen Dück commented on KAFKA-13289:


[~mjsax] Our Streams app has now been running in production for 2 straight 
weeks without any issues, with daily throughput figures peaking at around 
150k/sec on one topic.

So thanks a lot for your help in clearing up these issues for us!

This also means of course that our issue was not a Kafka problem, hence 
irrelevant to the issue that Matthew talked about.

> Bulk processing correctly ordered input data through a join with 
> kafka-streams results in `Skipping record for expired segment`
> ---
>
> Key: KAFKA-13289
> URL: https://issues.apache.org/jira/browse/KAFKA-13289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Matthew Sheppard
>Priority: Minor
>
> When pushing bulk data through a kafka-steams app, I see it log the following 
> message many times...
> {noformat}
> WARN 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
> Skipping record for expired segment.
> {noformat}
> ...and data which I expect to have been joined through a leftJoin step 
> appears to be lost.
> I've seen this in practice either when my application has been shut down for 
> a while and then is brought back up, or when I've used something like the 
> [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
>  in an attempt to have the application reprocess past data.
> I was able to reproduce this behaviour in isolation by generating 1000 
> messages to two topics spaced an hour apart (with the original timestamps in 
> order), then having kafka streams select a key for them and try to leftJoin 
> the two rekeyed streams.
> Self contained source code for that reproduction is available at 
> https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java
> The actual kafka-streams topology in there looks like this.
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream leftStream = 
> builder.stream(leftTopic);
> final KStream rightStream = 
> builder.stream(rightTopic);
> final KStream rekeyedLeftStream = leftStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> final KStream rekeyedRightStream = rightStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
> final KStream joined = rekeyedLeftStream.leftJoin(
> rekeyedRightStream,
> (left, right) -> left + "/" + right,
> joinWindow
> );
> {code}
> ...and the eventual output I produce looks like this...
> {code}
> ...
> 523 [523,left/null]
> 524 [524,left/null, 524,left/524,right]
> 525 [525,left/525,right]
> 526 [526,left/null]
> 527 [527,left/null]
> 528 [528,left/528,right]
> 529 [529,left/null]
> 530 [530,left/null]
> 531 [531,left/null, 531,left/531,right]
> 532 [532,left/null]
> 533 [533,left/null]
> 534 [534,left/null, 534,left/534,right]
> 535 [535,left/null]
> 536 [536,left/null]
> 537 [537,left/null, 537,left/537,right]
> 538 [538,left/null]
> 539 [539,left/null]
> 540 [540,left/null]
> 541 [541,left/null]
> 542 [542,left/null]
> 543 [543,left/null]
> ...
> {code}
> ...where as, given the input data, I expect to see every row end with the two 
> values joined, rather than the right value being null.
> Note that I understand it's expected that we initially get the left/null 
> values for many values since that's the expected semantics of kafka-streams 
> left join, at least until 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious
> I've noticed that if I set a very large grace value on the join window the 
> problem is solved, but since the input I provide is not out of order I did 
> not expect to need to do that, and I'm weary of the resource requirements 
> doing so in practice on an application with a lot of volume.
> My suspicion is that something is happening such that when one partition is 
> processed it causes the stream time to be pushed forward to the newest 
> message in that partition, meaning when the next partition is then examined 
> it is found to contain many records which are 'too old' compared to the 
> stream time. 
> I ran across this discussion thread which seems to cover the same issue 
> http://mail-archives.apache.or

[GitHub] [kafka] guozhangwang commented on pull request #11340: KAFKA-13310 : KafkaConsumer cannot jump out of the poll method, and the…

2022-02-06 Thread GitBox


guozhangwang commented on pull request #11340:
URL: https://github.com/apache/kafka/pull/11340#issuecomment-1031034049


   @RivenSun2 Thanks for the final commits and thanks to @showuon for making 
more passes as well. I just did another pass and it looks good to me too! I 
tried to resolve the conflicts myself and merge but seems it's a bit 
complicated. @RivenSun2 could you rebase again (sorry!!) and then I will merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13615) Kafka Streams does not transition state on LeaveGroup due to poll interval being exceeded

2022-02-06 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17487852#comment-17487852
 ] 

Guozhang Wang commented on KAFKA-13615:
---

[~timcosta] Thanks for reporting this issue, what you described seems to be 
related to a known issue in the older versions --- I tried to dig it out but 
could not by just search keywords, sigh.. maybe I need to refresh my memory and 
try again some time later --- anyways, could deterministically reproduce this 
issue, and if yes could you try to use a newer version of Kafka and see if the 
reproducing procedure still triggers it?

> Kafka Streams does not transition state on LeaveGroup due to poll interval 
> being exceeded
> -
>
> Key: KAFKA-13615
> URL: https://issues.apache.org/jira/browse/KAFKA-13615
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.1
>Reporter: Tim Costa
>Priority: Major
>
> We are running a KafkaStreams application with largely default settings. 
> Occasionally one of our consumers in the group takes too long between polls, 
> and streams leaves the consumer group but the state of the application 
> remains `RUNNING`. We are using the default `max.poll.interval.ms` of 5000.
> The process stays alive with no exception that bubbles to our code, so when 
> this occurs our app just kinda sits there idle until a manual restart is 
> performed.
> Here are the logs from around the time of the problem:
> {code:java}
> {"timestamp":"2022-01-24 
> 19:56:44.404","level":"INFO","thread":"kubepodname-StreamThread-1","logger":"org.apache.kafka.streams.processor.internals.StreamThread","message":"stream-thread
>  [kubepodname-StreamThread-1] Processed 65296 total records, ran 0 
> punctuators, and committed 400 total tasks since the last 
> update","context":"default"} {"timestamp":"2022-01-24 
> 19:58:44.478","level":"INFO","thread":"kubepodname-StreamThread-1","logger":"org.apache.kafka.streams.processor.internals.StreamThread","message":"stream-thread
>  [kubepodname-StreamThread-1] Processed 65284 total records, ran 0 
> punctuators, and committed 400 total tasks since the last 
> update","context":"default"}
> {"timestamp":"2022-01-24 
> 20:03:50.383","level":"INFO","thread":"kafka-coordinator-heartbeat-thread | 
> stage-us-1-fanout-logs-2c99","logger":"org.apache.kafka.clients.consumer.internals.AbstractCoordinator","message":"[Consumer
>  clientId=kubepodname-StreamThread-1-consumer, 
> groupId=stage-us-1-fanout-logs-2c99] Member 
> kubepodname-StreamThread-1-consumer-283f0e0d-defa-4edf-88b2-39703f845db5 
> sending LeaveGroup request to coordinator 
> b-2.***.kafka.us-east-1.amazonaws.com:9096 (id: 2147483645 rack: null) due to 
> consumer poll timeout has expired. This means the time between subsequent 
> calls to poll() was longer than the configured max.poll.interval.ms, which 
> typically implies that the poll loop is spending too much time processing 
> messages. You can address this either by increasing max.poll.interval.ms or 
> by reducing the maximum size of batches returned in poll() with 
> max.poll.records.","context":"default"} {code}
> At this point the application entirely stops processing data. We initiated a 
> shutdown by deleting the kubernetes pod, and the line printed immediately by 
> kafka after the sprint boot shutdown initiation logs is the following:
> {code:java}
> {"timestamp":"2022-01-24 
> 20:05:27.368","level":"INFO","thread":"kafka-streams-close-thread","logger":"org.apache.kafka.streams.processor.internals.StreamThread","message":"stream-thread
>  [kubepodname-StreamThread-1] State transition from RUNNING to 
> PENDING_SHUTDOWN","context":"default"}
>  {code}
> For a period of over a minute the application was in a state of hiatus where 
> it had left the group, however it was still marked as being in a `RUNNING` 
> state so we had no way to detect that the application had entered a bad state 
> to kill it in an automated fashion. While the above logs are from an app that 
> we shut down within a minute or two manually, we have seen this stay in a bad 
> state for up to an hour before.
> It feels like a bug to me that the streams consumer can leave the consumer 
> group but not exit the `RUNNING` state. I tried searching for other bugs like 
> this, but couldn't find any. Any ideas on how to detect this, or thoughts on 
> whether this is actually a bug?



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on pull request #11721: KAFKA-13629: Use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-06 Thread GitBox


showuon commented on pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#issuecomment-1031042300


   @jasonk000 , thanks for the good improvement! I learned a lot from the PR! 
Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #11721: KAFKA-13629: Use faster algorithm for ByteUtils sizeOfXxx algorithm

2022-02-06 Thread GitBox


showuon edited a comment on pull request #11721:
URL: https://github.com/apache/kafka/pull/11721#issuecomment-1031042300


   @jasonk000 , thanks for the good improvement! I learned a lot from the PR! 
Also updated the JIRA status. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13629) Client producer use faster ByteUtils sizeOfXxx algorithm

2022-02-06 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-13629.
---
Fix Version/s: 3.2.0
 Reviewer: Ismael Juma
   Resolution: Fixed

> Client producer use faster ByteUtils sizeOfXxx algorithm
> 
>
> Key: KAFKA-13629
> URL: https://issues.apache.org/jira/browse/KAFKA-13629
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, producer 
>Reporter: Jason Koch
>Priority: Minor
> Fix For: 3.2.0
>
>
> Kafka Java client producer path uses `tryAppend()` requires an estimated 
> size. The sizeOf algorithm currently uses an [iterative while 
> loop|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java#L394-L401]
>  which can be improved.
> On real system this reduces the producer path CPU by 4%. JMH benchmarking 
> shows the algorithm is approx 3x faster (90 op/ms -> 290op/ms).
> Specifically, we can use a prepared table with the following lookup which 
> relies on JVM intrinsic for fast performance, and also avoids any branching:.
> --
> {{int leadingZeros = Integer.numberOfLeadingZeros(value);}}
> {{return LEADING_ZEROS_TO_U_VARINT_SIZE[leadingZeros];}}
> --



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock

2022-02-06 Thread GitBox


showuon commented on a change in pull request #11722:
URL: https://github.com/apache/kafka/pull/11722#discussion_r800322496



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##
@@ -578,41 +594,46 @@ private boolean 
shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar
 } else {
 if (shouldStopDrainBatchesForPartition(first, tp))
 break;
+}
 
-boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
-ProducerIdAndEpoch producerIdAndEpoch =
-transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
-ProducerBatch batch = deque.pollFirst();
-if (producerIdAndEpoch != null && !batch.hasSequence()) {
-// If the producer id/epoch of the partition do not 
match the latest one
-// of the producer, we update it and reset the 
sequence. This should be
-// only done when all its in-flight batches have 
completed. This is guarantee
-// in `shouldStopDrainBatchesForPartition`.
-
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
-
-// If the batch already has an assigned sequence, then 
we should not change the producer id and
-// sequence number, since this may introduce 
duplicates. In particular, the previous attempt
-// may actually have been accepted, and if we change 
the producer id and sequence here, this
-// attempt will also be accepted, causing a duplicate.
-//
-// Additionally, we update the next sequence number 
bound for the partition, and also have
-// the transaction manager track the batch so as to 
ensure that sequence ordering is maintained
-// even if we receive out of order responses.
-batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
-
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);
-log.debug("Assigned producerId {} and producerEpoch {} 
to batch with base sequence " +
-"{} being sent to partition {}", 
producerIdAndEpoch.producerId,
-producerIdAndEpoch.epoch, batch.baseSequence(), 
tp);
-
-transactionManager.addInFlightBatch(batch);
-}
-batch.close();
-size += batch.records().sizeInBytes();
-ready.add(batch);
+batch = deque.pollFirst();
+
+boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
+ProducerIdAndEpoch producerIdAndEpoch =
+transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
+if (producerIdAndEpoch != null && !batch.hasSequence()) {
+// If the producer id/epoch of the partition do not match 
the latest one
+// of the producer, we update it and reset the sequence. 
This should be
+// only done when all its in-flight batches have 
completed. This is guarantee
+// in `shouldStopDrainBatchesForPartition`.
+
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
+
+// If the batch already has an assigned sequence, then we 
should not change the producer id and
+// sequence number, since this may introduce duplicates. 
In particular, the previous attempt
+// may actually have been accepted, and if we change the 
producer id and sequence here, this
+// attempt will also be accepted, causing a duplicate.
+//
+// Additionally, we update the next sequence number bound 
for the partition, and also have
+// the transaction manager track the batch so as to ensure 
that sequence ordering is maintained
+// even if we receive out of order responses.
+batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
+
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);
+log.debug("Assigned producerId {} and producerEpoch {} to 
batch with base sequence " +
+"{} being sent to partition {}", 
producerIdAndEpoch.producerId,
+   

[GitHub] [kafka] showuon commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock

2022-02-06 Thread GitBox


showuon commented on a change in pull request #11722:
URL: https://github.com/apache/kafka/pull/11722#discussion_r800338273



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##
@@ -578,41 +594,46 @@ private boolean 
shouldStopDrainBatchesForPartition(ProducerBatch first, TopicPar
 } else {
 if (shouldStopDrainBatchesForPartition(first, tp))
 break;
+}
 
-boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
-ProducerIdAndEpoch producerIdAndEpoch =
-transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
-ProducerBatch batch = deque.pollFirst();
-if (producerIdAndEpoch != null && !batch.hasSequence()) {
-// If the producer id/epoch of the partition do not 
match the latest one
-// of the producer, we update it and reset the 
sequence. This should be
-// only done when all its in-flight batches have 
completed. This is guarantee
-// in `shouldStopDrainBatchesForPartition`.
-
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
-
-// If the batch already has an assigned sequence, then 
we should not change the producer id and
-// sequence number, since this may introduce 
duplicates. In particular, the previous attempt
-// may actually have been accepted, and if we change 
the producer id and sequence here, this
-// attempt will also be accepted, causing a duplicate.
-//
-// Additionally, we update the next sequence number 
bound for the partition, and also have
-// the transaction manager track the batch so as to 
ensure that sequence ordering is maintained
-// even if we receive out of order responses.
-batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
-
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);
-log.debug("Assigned producerId {} and producerEpoch {} 
to batch with base sequence " +
-"{} being sent to partition {}", 
producerIdAndEpoch.producerId,
-producerIdAndEpoch.epoch, batch.baseSequence(), 
tp);
-
-transactionManager.addInFlightBatch(batch);
-}
-batch.close();
-size += batch.records().sizeInBytes();
-ready.add(batch);
+batch = deque.pollFirst();
+
+boolean isTransactional = transactionManager != null && 
transactionManager.isTransactional();
+ProducerIdAndEpoch producerIdAndEpoch =
+transactionManager != null ? 
transactionManager.producerIdAndEpoch() : null;
+if (producerIdAndEpoch != null && !batch.hasSequence()) {
+// If the producer id/epoch of the partition do not match 
the latest one
+// of the producer, we update it and reset the sequence. 
This should be
+// only done when all its in-flight batches have 
completed. This is guarantee
+// in `shouldStopDrainBatchesForPartition`.
+
transactionManager.maybeUpdateProducerIdAndEpoch(batch.topicPartition);
+
+// If the batch already has an assigned sequence, then we 
should not change the producer id and
+// sequence number, since this may introduce duplicates. 
In particular, the previous attempt
+// may actually have been accepted, and if we change the 
producer id and sequence here, this
+// attempt will also be accepted, causing a duplicate.
+//
+// Additionally, we update the next sequence number bound 
for the partition, and also have
+// the transaction manager track the batch so as to ensure 
that sequence ordering is maintained
+// even if we receive out of order responses.
+batch.setProducerState(producerIdAndEpoch, 
transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
+
transactionManager.incrementSequenceNumber(batch.topicPartition, 
batch.recordCount);
+log.debug("Assigned producerId {} and producerEpoch {} to 
batch with base sequence " +
+"{} being sent to partition {}", 
producerIdAndEpoch.producerId,
+   

[GitHub] [kafka] xdgrulez commented on pull request #10897: MINOR: Reduced severity for "skipping records" falling out of time windows

2022-02-06 Thread GitBox


xdgrulez commented on pull request #10897:
URL: https://github.com/apache/kafka/pull/10897#issuecomment-1031141718


   Hi Matthias,
   
   I don't really have time but I'd like to get this done anyway ;)
   
   I think reopening a new PR would be more sensible... the change is too old.
   
   Best,
   Ralph
   
   
   Am So., 6. Feb. 2022 um 21:24 Uhr schrieb Matthias J. Sax <
   ***@***.***>:
   
   > @xdgrulez  -- if you are still interested in
   > working on this, it would be great. The PR would needs to be
   > rebased/updated though (or just close this one an open a new PR?)
   >
   > —
   > Reply to this email directly, view it on GitHub
   > , or
   > unsubscribe
   > 

   > .
   > Triage notifications on the go with GitHub Mobile for iOS
   > 

   > or Android
   > 
.
   >
   > You are receiving this because you were mentioned.Message ID:
   > ***@***.***>
   >
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming closed pull request #11659: KAFKA-13503: Validate broker configs for KRaft

2022-02-06 Thread GitBox


dengziming closed pull request #11659:
URL: https://github.com/apache/kafka/pull/11659


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #11659: KAFKA-13503: Validate broker configs for KRaft

2022-02-06 Thread GitBox


dengziming commented on pull request #11659:
URL: https://github.com/apache/kafka/pull/11659#issuecomment-1031161651


   @cmccabe Thank you, I close this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming edited a comment on pull request #11659: KAFKA-13503: Validate broker configs for KRaft

2022-02-06 Thread GitBox


dengziming edited a comment on pull request #11659:
URL: https://github.com/apache/kafka/pull/11659#issuecomment-1031161651


   @cmccabe Thank you, I closed this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org