[GitHub] [kafka] srishti-saraswat commented on a diff in pull request #12567: Migrate connect system tests to KRaft

2022-09-08 Thread GitBox


srishti-saraswat commented on code in PR #12567:
URL: https://github.com/apache/kafka/pull/12567#discussion_r965588572


##
tests/kafkatest/tests/connect/connect_rest_test.py:
##
@@ -34,9 +35,7 @@ class ConnectRestApiTest(KafkaTest):
 
 FILE_SOURCE_CONFIGS = {'name', 'connector.class', 'tasks.max', 
'key.converter', 'value.converter', 'header.converter', 'batch.size',
'topic', 'file', 'transforms', 
'config.action.reload', 'errors.retry.timeout', 'errors.retry.delay.max.ms',
-   'errors.tolerance', 'errors.log.enable', 
'errors.log.include.messages', 'predicates', 'topic.creation.groups',
-   'offsets.storage.topic', 'transaction.boundary', 
'transaction.boundary.interval.ms', 'config.action.reload',
-   'exactly.once.support'}
+   'errors.tolerance', 'errors.log.enable', 
'errors.log.include.messages', 'predicates', 'topic.creation.groups'}

Review Comment:
   Reverted to original since it is taken care in a separate PR - 
https://github.com/apache/kafka/pull/12575



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13766) Use `max.poll.interval.ms` as the timeout during complete-rebalance phase

2022-09-08 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-13766:

Component/s: group-coordinator

> Use `max.poll.interval.ms` as the timeout during complete-rebalance phase
> -
>
> Key: KAFKA-13766
> URL: https://issues.apache.org/jira/browse/KAFKA-13766
> Project: Kafka
>  Issue Type: Bug
>  Components: core, group-coordinator
>Reporter: Guozhang Wang
>Assignee: David Jacot
>Priority: Major
>  Labels: new-rebalance-should-fix
>
> The lifetime of a consumer can be categorized in three phases:
> 1) During normal processing, the broker expects a hb request periodically 
> from consumer, and that is timed by the `session.timeout.ms`.
> 2) During the prepare_rebalance, the broker would expect a join-group request 
> to be received within the rebalance.timeout, which is piggy-backed as the 
> `max.poll.interval.ms`.
> 3) During the complete_rebalance, the broker would expect a sync-group 
> request to be received again within the `session.timeout.ms`.
> So during different phases of the life of the consumer, different timeout 
> would be used to bound the timer.
> Nowadays with cooperative rebalance protocol, we can still return records and 
> process them in the middle of a rebalance from {{consumer.poll}}. In that 
> case, for phase 3) we should also use the `max.poll.interval.ms` to bound the 
> timer, which is in practice larger than `session.timeout.ms`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dajac commented on pull request #12275: MINOR: Change groupInstanceId type from Optional to String in ConsumerGroupMetadata and GroupRebalanceConfig

2022-09-08 Thread GitBox


dajac commented on PR #12275:
URL: https://github.com/apache/kafka/pull/12275#issuecomment-1240403160

   Thanks @il-kyun for the patch. I looked into it and I don't think that the 
change is worth it. Using Optional here seems fine to me. I am going to close 
it for now. Feel free to reopen if needed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac closed pull request #12275: MINOR: Change groupInstanceId type from Optional to String in ConsumerGroupMetadata and GroupRebalanceConfig

2022-09-08 Thread GitBox


dajac closed pull request #12275: MINOR: Change groupInstanceId type from 
Optional to String in ConsumerGroupMetadata and GroupRebalanceConfig
URL: https://github.com/apache/kafka/pull/12275


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests

2022-09-08 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov updated KAFKA-14133:
--
Description: 
{color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}In Review{color}
{color:#00875A}Merged{color}
 # -{color:#00875A}WorkerConnectorTest{color} (connect) (owner:- [~yash.mayya] 
-)-
 # -{color:#00875A}WorkerCoordinatorTest{color} (connect) (owner:- 
[~yash.mayya] -)-
 # -{color:#00875A}RootResourceTest{color} (connect) (owner:- [~yash.mayya] -)-
 # -{color:#00875A}ByteArrayProducerRecordEquals{color} (connect) (owner:- 
[~yash.mayya] -)-
 # {color:#00875A}KStreamFlatTransformTest{color} (owner: Christo)
 # {color:#00875A}KStreamFlatTransformValuesTest{color} (owner: Christo)
 # {color:#00875A}KStreamPrintTest{color} (owner: Christo)
 # {color:#00875A}KStreamRepartitionTest{color} (owner: Christo)
 # {color:#00875A}MaterializedInternalTest{color} (owner: Christo)
 # {color:#00875A}TransformerSupplierAdapterTest{color} (owner: Christo)
 # {color:#00875A}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
 # {color:#00875A}ClientUtilsTest{color} (owner: Christo)
 # {color:#00875A}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
Christo)
 # {color:#ff8b00}StreamsRebalanceListenerTest{color} (owner: Christo)
 # {color:#ff8b00}TimestampedKeyValueStoreMaterializerTest{color} (owner: 
Christo)
 # {color:#ff8b00}CachingInMemoryKeyValueStoreTest{color} (owner: Christo)
 # {color:#ff8b00}CachingInMemorySessionStoreTest{color} (owner: Christo)
 # {color:#ff8b00}CachingPersistentSessionStoreTest{color} (owner: Christo)
 # {color:#ff8b00}CachingPersistentWindowStoreTest{color} (owner: Christo)
 # {color:#ff8b00}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo)
 # {color:#ff8b00}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: 
Christo)
 # {color:#ff8b00}CompositeReadOnlyWindowStoreTest{color} (owner: Christo)
 # {color:#ff8b00}KeyValueStoreBuilderTest{color} (owner: Christo)
 # {color:#ff8b00}RocksDBStoreTest{color} (owner: Christo)
 # {color:#ff8b00}StreamThreadStateStoreProviderTest{color} (owner: Christo)
 # {color:#ff8b00}TopologyTest{color} (owner: Christo)
 # {color:#ff8b00}KTableSuppressProcessorTest{color} (owner: Christo)
 # {color:#ff8b00}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo)
 # {color:#ff8b00}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: 
Christo)
 # {color:#ff8b00}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo)
 # {color:#ff8b00}MeteredTimestampedWindowStoreTest{color} (owner: Christo)
 # InternalTopicManagerTest (owner: Christo)
 # ProcessorContextImplTest (owner: Christo)
 # WriteConsistencyVectorTest (owner: Christo)
 # StreamsAssignmentScaleTest (owner: Christo)
 # StreamsPartitionAssignorTest (owner: Christo)
 # TaskManagerTest (owner: Christo)
 # AssignmentTestUtils (owner: Christo)
 # ProcessorStateManagerTest ({*}WIP{*} owner: Matthew)
 # StandbyTaskTest ({*}WIP{*} owner: Matthew)
 # StoreChangelogReaderTest ({*}WIP{*} owner: Matthew)
 # StreamTaskTest ({*}WIP{*} owner: Matthew)
 # StreamThreadTest ({*}WIP{*} owner: Matthew)
 # StreamsMetricsImplTest
 # TimeOrderedCachingPersistentWindowStoreTest
 # TimeOrderedWindowStoreTest

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*

  was:
{color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}In Review{color}
{color:#00875A}Merged{color}
 # -{color:#00875A}WorkerConnectorTest{color} (connect) (owner:- [~yash.mayya] 
-)-
 # -{color:#00875A}WorkerCoordinatorTest{color} (connect) (owner:- 
[~yash.mayya] -)-
 # -{color:#00875A}RootResourceTest{color} (connect) (owner:- [~yash.mayya] -)-
 # -{color:#00875A}ByteArrayProducerRecordEquals{color} (connect) (owner:- 
[~yash.mayya] -)-
 # {color:#ff8b00}KStreamFlatTransformTest{color} (owner: Christo)
 # {color:#ff8b00}KStreamFlatTransformValuesTest{color} (owner: Christo)
 # {color:#ff8b00}KStreamPrintTest{color} (owner: Christo)
 # {color:#ff8b00}KStreamRepartitionTest{color} (owner: Christo)
 # {color:#ff8b00}MaterializedInternalTest{color} (owner: Christo)
 # {colo

[GitHub] [kafka] mimaison merged pull request #12606: MINOR: Fix usage of @see in IncrementalCooperativeAssignor doc comments

2022-09-08 Thread GitBox


mimaison merged PR #12606:
URL: https://github.com/apache/kafka/pull/12606


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14208) KafkaConsumer#commitAsync throws unexpected WakeupException

2022-09-08 Thread Qingsheng Ren (Jira)
Qingsheng Ren created KAFKA-14208:
-

 Summary: KafkaConsumer#commitAsync throws unexpected 
WakeupException
 Key: KAFKA-14208
 URL: https://issues.apache.org/jira/browse/KAFKA-14208
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 3.2.1
Reporter: Qingsheng Ren


We recently encountered a bug after upgrading Kafka client to 3.2.1 in Flink 
Kafka connector (FLINK-29153). Here's the exception:
{code:java}
org.apache.kafka.common.errors.WakeupException
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:514)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:278)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:252)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.coordinatorUnknownAndUnready(ConsumerCoordinator.java:493)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:1055)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1573)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:226)
 {code}
As {{WakeupException}} is not listed in the JavaDoc of 
{{{}KafkaConsumer#commitAsync{}}}, Flink Kafka connector doesn't catch the 
exception thrown directly from KafkaConsumer#commitAsync but handles all 
exceptions in the callback.

I checked the source code and suspect this is caused by KAFKA-13563. Also we 
never had this exception in commitAsync when we used Kafka client 2.4.1 & 
2.8.1. 

I'm wondering if this is kind of breaking the public API as the WakeupException 
is not listed in JavaDoc, and maybe it's better to invoke the callback to 
handle the {{WakeupException}} instead of throwing it directly from the method 
itself. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14132) Remaining PowerMock to Mockito tests

2022-09-08 Thread Yash Mayya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yash Mayya updated KAFKA-14132:
---
Description: 
{color:#de350b}Some of the tests below use EasyMock as well. For those migrate 
both PowerMock and EasyMock to Mockito.{color}

Unless stated in brackets the tests are in the connect module.

A list of tests which still require to be moved from PowerMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:
 # ErrorHandlingTaskTest (owner: Divij)
 # SourceTaskOffsetCommiterTest (owner: Divij)
 # WorkerMetricsGroupTest (owner: Divij)
 # WorkerSinkTaskTest (owner: Divij)
 # WorkerSinkTaskThreadedTest (owner: Divij)
 # WorkerTaskTest (owner: [~yash.mayya])
 # ErrorReporterTest (owner: [~yash.mayya])
 # RetryWithToleranceOperatorTest (owner: [~yash.mayya])
 # WorkerErrantRecordReporterTest (owner: [~yash.mayya])
 # ConnectorsResourceTest
 # StandaloneHerderTest
 # KafkaConfigBackingStoreTest
 # KafkaOffsetBackingStoreTest (owner: Christo) 
([https://github.com/apache/kafka/pull/12418])
 # KafkaBasedLogTest
 # RetryUtilTest
 # RepartitionTopicTest (streams) (owner: Christo)
 # StateManagerUtilTest (streams) (owner: Christo)

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*

  was:
{color:#DE350B}Some of the tests below use EasyMock as well. For those migrate 
both PowerMock and EasyMock to Mockito.{color}

Unless stated in brackets the tests are in the connect module.

A list of tests which still require to be moved from PowerMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

# ErrorHandlingTaskTest (owner: Divij)
# SourceTaskOffsetCommiterTest (owner: Divij)
# WorkerMetricsGroupTest (owner: Divij)
# WorkerSinkTaskTest (owner: Divij)
# WorkerSinkTaskThreadedTest (owner: Divij)
# WorkerTaskTest
# ErrorReporterTest
# RetryWithToleranceOperatorTest
# WorkerErrantRecordReporterTest
# ConnectorsResourceTest
# StandaloneHerderTest
# KafkaConfigBackingStoreTest
# KafkaOffsetBackingStoreTest (owner: Christo) 
(https://github.com/apache/kafka/pull/12418)
# KafkaBasedLogTest
# RetryUtilTest
# RepartitionTopicTest (streams) (owner: Christo)
# StateManagerUtilTest (streams) (owner: Christo)

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*



> Remaining PowerMock to Mockito tests
> 
>
> Key: KAFKA-14132
> URL: https://issues.apache.org/jira/browse/KAFKA-14132
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>
> {color:#de350b}Some of the tests below use EasyMock as well. For those 
> migrate both PowerMock and EasyMock to Mockito.{color}
> Unless stated in brackets the tests are in the connect module.
> A list of tests which still require to be moved from PowerMock to Mockito as 
> of 2nd of August 2022 which do not have a Jira issue and do not have pull 
> requests I am aware of which are opened:
>  # ErrorHandlingTaskTest (owner: Divij)
>  # SourceTaskOffsetCommiterTest (owner: Divij)
>  # WorkerMetricsGroupTest (owner: Divij)
>  # WorkerSinkTaskTest (owner: Divij)
>  # WorkerSinkTaskThreadedTest (owner: Divij)
>  # WorkerTaskTest (owner: [~yash.mayya])
>  # ErrorReporterTest (owner: [~yash.mayya])
>  # RetryWithToleranceOperatorTest (owner: [~yash.mayya])
>  # WorkerErrantRecordReporterTest (owner: [~yash.mayya])
>  # ConnectorsResourceTest
>  # StandaloneHerderTest
>  # KafkaConfigBackingStoreTest
>  # KafkaOffsetBackingStoreTest (owner: Christo) 
> ([https://github.com/apache/kafka/pull/12418])
>  # KafkaBasedLogTest
>  # RetryUtilTest
>  # RepartitionTopicTest (streams) (owner: Christo)
>  # StateManagerUtilTest (streams) (owner: Christo)
> *The coverage report for the above tests after the change should be >= to 
> what the coverage is now.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12887) Do not trigger user-customized ExceptionalHandler for RTE

2022-09-08 Thread Ian Corne (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17601779#comment-17601779
 ] 

Ian Corne commented on KAFKA-12887:
---

Why wouldn't you allow the user to handle this?

> Do not trigger user-customized ExceptionalHandler for RTE
> -
>
> Key: KAFKA-12887
> URL: https://issues.apache.org/jira/browse/KAFKA-12887
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Josep Prat
>Priority: Major
> Fix For: 3.1.0
>
>
> Today in StreamThread we have a try-catch block that captures all {{Throwable 
> e}} and then triggers {{this.streamsUncaughtExceptionHandler.accept(e)}}. 
> However, there are possible RTEs such as IllegalState/IllegalArgument 
> exceptions which are usually caused by bugs, etc. In such cases we should not 
> let users to decide what to do with these exceptions, but should let Streams 
> itself to enforce the decision, e.g. in the IllegalState/IllegalArgument we 
> should fail fast to notify the potential error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

2022-09-08 Thread GitBox


cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r965876179


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -421,73 +421,120 @@ private void classifyTasksWithoutStateUpdater(final 
Map> 
activeTasksToCreate,
-  final Map> 
standbyTasksToCreate,
-  final Map> 
tasksToRecycle,
-  final Set tasksToCloseClean) {
+private void classifyRunningAndSuspendedTasks(final Map> activeTasksToCreate,
+  final Map> standbyTasksToCreate,
+  final Map> tasksToRecycle,
+  final Set 
tasksToCloseClean) {
 for (final Task task : tasks.allTasks()) {
+if (!task.isActive()) {
+throw new IllegalStateException("Standby tasks should only be 
managed by the state updater");
+}
 final TaskId taskId = task.id();
 if (activeTasksToCreate.containsKey(taskId)) {
-if (task.isActive()) {
-final Set topicPartitions = 
activeTasksToCreate.get(taskId);
-if (tasks.updateActiveTaskInputPartitions(task, 
topicPartitions)) {
-task.updateInputPartitions(topicPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
-}
-task.resume();
-} else {
-throw new IllegalStateException("Standby tasks should only 
be managed by the state updater");
-}
+handleReAssignedActiveTask(task, 
activeTasksToCreate.get(taskId));
 activeTasksToCreate.remove(taskId);
 } else if (standbyTasksToCreate.containsKey(taskId)) {
-if (!task.isActive()) {
-throw new IllegalStateException("Standby tasks should only 
be managed by the state updater");
-} else {
-tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-}
+tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
 standbyTasksToCreate.remove(taskId);
 } else {
 tasksToCloseClean.add(task);
 }
 }
 }
 
+private void handleReAssignedActiveTask(final Task task,
+final Set 
inputPartitions) {
+if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) {
+task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
+}
+task.resume();

Review Comment:
   Although I like the more explicit nature of your proposal, I do not think 
that it is correct. If a task is in `SUSPENDED` the task transits to 
`RESTORING` only after the call to `task.resume()`. Reassigned revoked active 
tasks should be in `SUSPENDED` and not in `RESTORING`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-12887) Do not trigger user-customized ExceptionalHandler for RTE

2022-09-08 Thread Ian Corne (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17601779#comment-17601779
 ] 

Ian Corne edited comment on KAFKA-12887 at 9/8/22 12:13 PM:


Why wouldn't you allow the user to handle this?

This is currently in 3.1.1 which is the version in the latest spring boot 2.7 
and it disables handling errors in business logic. IllegalArgumentException is 
not only used for fatal errors..


was (Author: icorne):
Why wouldn't you allow the user to handle this?

> Do not trigger user-customized ExceptionalHandler for RTE
> -
>
> Key: KAFKA-12887
> URL: https://issues.apache.org/jira/browse/KAFKA-12887
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Josep Prat
>Priority: Major
> Fix For: 3.1.0
>
>
> Today in StreamThread we have a try-catch block that captures all {{Throwable 
> e}} and then triggers {{this.streamsUncaughtExceptionHandler.accept(e)}}. 
> However, there are possible RTEs such as IllegalState/IllegalArgument 
> exceptions which are usually caused by bugs, etc. In such cases we should not 
> let users to decide what to do with these exceptions, but should let Streams 
> itself to enforce the decision, e.g. in the IllegalState/IllegalArgument we 
> should fail fast to notify the potential error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ashmeet13 commented on pull request #12414: KAFKA-14073 Logging the reason for Snapshot

2022-09-08 Thread GitBox


ashmeet13 commented on PR #12414:
URL: https://github.com/apache/kafka/pull/12414#issuecomment-1240694324

   Thank you @jsancio, I see that the build for JVM 17 and JDK 2.13 has timed 
out again. What can be done from my end to fix this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer

2022-09-08 Thread GitBox


divijvaidya commented on PR #12590:
URL: https://github.com/apache/kafka/pull/12590#issuecomment-1240714341

   @dajac since you are working on the consumer client protocol, perhaps you 
may also be interested in taking a look at this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov opened a new pull request, #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

2022-09-08 Thread GitBox


clolov opened a new pull request, #12607:
URL: https://github.com/apache/kafka/pull/12607

   Batch 5 of the tests detailed in 
https://issues.apache.org/jira/browse/KAFKA-14133 which use EasyMock and need 
to be moved to Mockito.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

2022-09-08 Thread GitBox


clolov commented on PR #12607:
URL: https://github.com/apache/kafka/pull/12607#issuecomment-1240754283

   There were stubbings which were no longer on the call path so I have removed 
them. The way I checked that I wasn't changing the test behaviour was to use 
EasyMock.verify on the mocks and confirming that the stubbings were indeed 
unused prior to my change. There are multiple possibilities for refactoring, 
but I chose to keep the changes as close to the EasyMock implementation as the 
PR is already big.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

2022-09-08 Thread GitBox


clolov commented on PR #12607:
URL: https://github.com/apache/kafka/pull/12607#issuecomment-1240756214

   I am aware that there are merge conflicts and I will aim to address them 
over the coming days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

2022-09-08 Thread GitBox


clolov commented on PR #12607:
URL: https://github.com/apache/kafka/pull/12607#issuecomment-1240757288

   @guozhangwang and @cadonna for visibility


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests

2022-09-08 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov updated KAFKA-14133:
--
Description: 
{color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}In Review{color}
{color:#00875A}Merged{color}
 # -{color:#00875A}WorkerConnectorTest{color} (connect) (owner:- [~yash.mayya] 
-)-
 # -{color:#00875A}WorkerCoordinatorTest{color} (connect) (owner:- 
[~yash.mayya] -)-
 # -{color:#00875A}RootResourceTest{color} (connect) (owner:- [~yash.mayya] -)-
 # -{color:#00875A}ByteArrayProducerRecordEquals{color} (connect) (owner:- 
[~yash.mayya] -)-
 # {color:#00875A}KStreamFlatTransformTest{color} (owner: Christo)
 # {color:#00875A}KStreamFlatTransformValuesTest{color} (owner: Christo)
 # {color:#00875A}KStreamPrintTest{color} (owner: Christo)
 # {color:#00875A}KStreamRepartitionTest{color} (owner: Christo)
 # {color:#00875A}MaterializedInternalTest{color} (owner: Christo)
 # {color:#00875A}TransformerSupplierAdapterTest{color} (owner: Christo)
 # {color:#00875A}KTableSuppressProcessorMetricsTest{color} (owner: Christo)
 # {color:#00875A}ClientUtilsTest{color} (owner: Christo)
 # {color:#00875A}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: 
Christo)
 # {color:#ff8b00}StreamsRebalanceListenerTest{color} (owner: Christo)
 # {color:#ff8b00}TimestampedKeyValueStoreMaterializerTest{color} (owner: 
Christo)
 # {color:#ff8b00}CachingInMemoryKeyValueStoreTest{color} (owner: Christo)
 # {color:#ff8b00}CachingInMemorySessionStoreTest{color} (owner: Christo)
 # {color:#ff8b00}CachingPersistentSessionStoreTest{color} (owner: Christo)
 # {color:#ff8b00}CachingPersistentWindowStoreTest{color} (owner: Christo)
 # {color:#ff8b00}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo)
 # {color:#ff8b00}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: 
Christo)
 # {color:#ff8b00}CompositeReadOnlyWindowStoreTest{color} (owner: Christo)
 # {color:#ff8b00}KeyValueStoreBuilderTest{color} (owner: Christo)
 # {color:#ff8b00}RocksDBStoreTest{color} (owner: Christo)
 # {color:#ff8b00}StreamThreadStateStoreProviderTest{color} (owner: Christo)
 # {color:#ff8b00}TopologyTest{color} (owner: Christo)
 # {color:#ff8b00}KTableSuppressProcessorTest{color} (owner: Christo)
 # {color:#ff8b00}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo)
 # {color:#ff8b00}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: 
Christo)
 # {color:#ff8b00}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo)
 # {color:#ff8b00}MeteredTimestampedWindowStoreTest{color} (owner: Christo)
 # {color:#FF8B00}TaskManagerTest{color} (owner: Christo)
 # InternalTopicManagerTest (owner: Christo)
 # ProcessorContextImplTest (owner: Christo)
 # WriteConsistencyVectorTest (owner: Christo)
 # StreamsAssignmentScaleTest (owner: Christo)
 # StreamsPartitionAssignorTest (owner: Christo)
 # AssignmentTestUtils (owner: Christo)
 # ProcessorStateManagerTest ({*}WIP{*} owner: Matthew)
 # StandbyTaskTest ({*}WIP{*} owner: Matthew)
 # StoreChangelogReaderTest ({*}WIP{*} owner: Matthew)
 # StreamTaskTest ({*}WIP{*} owner: Matthew)
 # StreamThreadTest ({*}WIP{*} owner: Matthew)
 # StreamsMetricsImplTest
 # TimeOrderedCachingPersistentWindowStoreTest
 # TimeOrderedWindowStoreTest

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*

  was:
{color:#de350b}There are tests which use both PowerMock and EasyMock. I have 
put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely 
solely on EasyMock.{color}

Unless stated in brackets the tests are in the streams module.

A list of tests which still require to be moved from EasyMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#ff8b00}In Review{color}
{color:#00875A}Merged{color}
 # -{color:#00875A}WorkerConnectorTest{color} (connect) (owner:- [~yash.mayya] 
-)-
 # -{color:#00875A}WorkerCoordinatorTest{color} (connect) (owner:- 
[~yash.mayya] -)-
 # -{color:#00875A}RootResourceTest{color} (connect) (owner:- [~yash.mayya] -)-
 # -{color:#00875A}ByteArrayProducerRecordEquals{color} (connect) (owner:- 
[~yash.mayya] -)-
 # {color:#00875A}KStreamFlatTransformTest{color} (owner: Christo)
 # {color:#00875A}KStreamFlatTransformValuesTest{color} (owner: Christo)
 # {color:#00875A}KStreamPrintTest{color} (owner: Christo)
 # {color:#00875A}KStreamRepartitionTest{color} (owner: Christo)
 # {color:#00875A}MaterializedInternalTest{color} (ow

[GitHub] [kafka] C0urante merged pull request #12478: KAFKA-13952 fix RetryWithToleranceOperator to respect infinite retries configuration

2022-09-08 Thread GitBox


C0urante merged PR #12478:
URL: https://github.com/apache/kafka/pull/12478


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13952) Infinite retry timeout is not working

2022-09-08 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-13952.
---
Fix Version/s: 3.4.0
   Resolution: Fixed

> Infinite retry timeout is not working
> -
>
> Key: KAFKA-13952
> URL: https://issues.apache.org/jira/browse/KAFKA-13952
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Jakub Malek
>Assignee: Yash Mayya
>Priority: Minor
> Fix For: 3.4.0
>
>
> The 
> [documentation|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L129]
>  for {{errors.retry.timeout}} property says:
> {noformat}
> The maximum duration in milliseconds that a failed operation will be 
> reattempted. The default is 0, which means no retries will be attempted. Use 
> -1 for infinite retries.{noformat}
> But it seems that value {{-1}} is not respected by the 
> [RetryWithToleranceOperator|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java]
>  that simply compares elapsed time until {{startTime + errorRetryTimeout}} is 
> exceeded.
> I was also not able to find any conversion of the raw config value before 
> {{RetryWithToleranceOperator}} is initialized.
> I run a simple test with a connector using mocked transformation plugin that 
> throws the {{RetriableException}} and it seems to prove my claim.
> I'm not sure if it's documentation or implementation error or maybe I've 
> missed something.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] yashmayya commented on pull request #12478: KAFKA-13952 fix RetryWithToleranceOperator to respect infinite retries configuration

2022-09-08 Thread GitBox


yashmayya commented on PR #12478:
URL: https://github.com/apache/kafka/pull/12478#issuecomment-1240782194

   Thanks for the detailed reviews and for bearing with me through multiple 
rounds of review on this one Chris!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #12544: KAFKA-14098: Add meaningful default client IDs for Connect workers

2022-09-08 Thread GitBox


mimaison commented on code in PR #12544:
URL: https://github.com/apache/kafka/pull/12544#discussion_r966049063


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -1138,5 +1142,13 @@ else if (value instanceof Long)
 private String className(Object o) {
 return o != null ? o.getClass().getName() : "null";
 }
-}
 
+private static void maybeAddClientId(Map 
clientProps, String groupId) {
+String clientId = "connect-cluster";
+if (groupId != null) {

Review Comment:
   Can `groupId` actually be null here? This should only run in distributed mode



##
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java:
##
@@ -106,6 +108,7 @@ public Connect startConnect(Map 
workerProps) {
 // Create the admin client to be shared by all backing stores.
 Map adminProps = new HashMap<>(config.originals());
 ConnectUtils.addMetricsContextProperties(adminProps, config, 
kafkaClusterId);
+adminProps.putIfAbsent(CLIENT_ID_CONFIG, "connect-cluster-" + 
config.groupId());

Review Comment:
   With the default config, you get `connect-cluster-connect-cluster` for the 
client-id. I wonder if we need to add the `connect-cluster-` prefix at all?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14198) Release package contains snakeyaml 1.30

2022-09-08 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17601858#comment-17601858
 ] 

Mickael Maison commented on KAFKA-14198:


[~jagsancio] I'm on PTO tomorrow for 2 weeks so unfortunately I'm not sure if 
I'll have the time to look into this.

> Release package contains snakeyaml 1.30
> ---
>
> Key: KAFKA-14198
> URL: https://issues.apache.org/jira/browse/KAFKA-14198
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.3.0
>Reporter: Mickael Maison
>Priority: Major
> Fix For: 3.3.0
>
>
> snakeyaml 1.30 is vulnerable to CVE-2022-25857: 
> https://security.snyk.io/vuln/SNYK-JAVA-ORGYAML-2806360
> It looks like we pull this dependency because of swagger. It's unclear how or 
> even if this can be exploited in Kafka but it's flagged by scanning tools. 
> I wonder if we could make the swagger dependencies compile time only and 
> avoid shipping them. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #12602: KAFKA-13985: Skip committing MirrorSourceTask records without metadata

2022-09-08 Thread GitBox


C0urante commented on PR #12602:
URL: https://github.com/apache/kafka/pull/12602#issuecomment-1240821236

   Thanks @rgroothuijsen! I agree with the `DEBUG` level for logging; it's 
tempting to make this silent, but I'd prefer to err on the side of giving users 
more information since we can't add logging to releases once they've gone out.
   
   RE testing, there are a few approaches I can think of:
   
   0. Simply remove the try/catch block in `MirrorSourceTask::commitRecord`, 
since it essentially duplicates [logic in the Connect 
framework](https://github.com/apache/kafka/blob/0c97be53fa7e1e0720f2086b5d9d80ffcc1db470/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L544-L548)
 with the small change of using the `WARN` level for logging instead of `ERROR`
   0. If we'd prefer to keep the logging as-is, we can isolate all of the logic 
for the method except exception handling into a package-private 
`doCommitRecord` method, which is then wrapped by the public `commitRecord` 
method. I.e.:
   ```java
   @Override
   public void commitRecord(SourceRecord record, RecordMetadata metadata) {
   try {
   doCommitRecord(record, metadata);
   } catch (Throwable e) {
   log.warn("Failure committing record.", e);
   }
   }
   
   // Visible for testing
   void doCommitRecord(SourceRecord record, RecordMetadata metadata) {
   if (stopping) {
   return;
   }
   if (!metadata.hasOffset()) {
   log.error("RecordMetadata has no offset -- can't sync offsets for 
{}.", record.topic());
   return;
   }
   TopicPartition topicPartition = new TopicPartition(record.topic(), 
record.kafkaPartition());
   long latency = System.currentTimeMillis() - record.timestamp();
   metrics.countRecord(topicPartition);
   metrics.replicationLatency(topicPartition, latency);
   TopicPartition sourceTopicPartition = 
MirrorUtils.unwrapPartition(record.sourcePartition());
   long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
   long downstreamOffset = metadata.offset();
   maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

2022-09-08 Thread GitBox


cadonna commented on PR #12600:
URL: https://github.com/apache/kafka/pull/12600#issuecomment-1240827367

   > we reduce removeRevokedTasksFromStateUpdater to only record in the pending 
tasks to suspend, but not try to remove from state updaters. And in 
handleAssignment we just update the tasks bookkeeping from suspended to closed 
in addition calling remove from the state updater. 
   
   I am not sure I can follow. Are you proposing to not recycle or resume 
revoked active tasks?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #12601: MINOR: Retry on test failure for branch builds and increase max test retry to 10

2022-09-08 Thread GitBox


jsancio commented on PR #12601:
URL: https://github.com/apache/kafka/pull/12601#issuecomment-1240836278

   Merging changes and cherry picking to 3.3. Errors seem unrelated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #12601: MINOR: Retry on test failure for branch builds and increase max test retry to 10

2022-09-08 Thread GitBox


jsancio merged PR #12601:
URL: https://github.com/apache/kafka/pull/12601


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12544: KAFKA-14098: Add meaningful default client IDs for Connect workers

2022-09-08 Thread GitBox


C0urante commented on code in PR #12544:
URL: https://github.com/apache/kafka/pull/12544#discussion_r966081693


##
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java:
##
@@ -106,6 +108,7 @@ public Connect startConnect(Map 
workerProps) {
 // Create the admin client to be shared by all backing stores.
 Map adminProps = new HashMap<>(config.originals());
 ConnectUtils.addMetricsContextProperties(adminProps, config, 
kafkaClusterId);
+adminProps.putIfAbsent(CLIENT_ID_CONFIG, "connect-cluster-" + 
config.groupId());

Review Comment:
   Yeah, that's fair. I was thinking it might be useful broker-side to call 
this out as a Connect application but it's not a must-have and the stuttering 
that you've noted with `connect-cluster-connect-cluster` is not great. Will 
remove.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Danny02 opened a new pull request, #12608: Enable KStream to be merged with itself

2022-09-08 Thread GitBox


Danny02 opened a new pull request, #12608:
URL: https://github.com/apache/kafka/pull/12608

   Why:
   It is an interesting question what should be the result when merging a 
KStream with itself. Should the merge duplicate the messages or should it be a 
noop.
   I think the only reasonable solution is to duplicate the messages because 
there are many different ways to disguise a KStream (e.g. adding peek operation 
on it).
   It is therefore impossible to implement the solution where it is a noop.
   
   How does it help with resolving the issue:
   This change makes the behavior of the merge operation consistent.
   
   Side effects:
   Using a list instead of a set for the parent nodes could have side effects. 
The test suit did not detect any.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12544: KAFKA-14098: Add meaningful default client IDs for Connect workers

2022-09-08 Thread GitBox


C0urante commented on code in PR #12544:
URL: https://github.com/apache/kafka/pull/12544#discussion_r966084441


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -1138,5 +1142,13 @@ else if (value instanceof Long)
 private String className(Object o) {
 return o != null ? o.getClass().getName() : "null";
 }
-}
 
+private static void maybeAddClientId(Map 
clientProps, String groupId) {
+String clientId = "connect-cluster";
+if (groupId != null) {

Review Comment:
   Right now we have that guarantee, but since the `*BackingStore` classes 
accept a `WorkerConfig` instead of a `DistributedConfig`, it's difficult to 
enforce that guarantee. Plus, if we ever want to use any of these backing 
stores with standalone mode (or a new mode that doesn't use Kafka's group 
membership API), it'll make that migration easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12544: KAFKA-14098: Add meaningful default client IDs for Connect workers

2022-09-08 Thread GitBox


C0urante commented on code in PR #12544:
URL: https://github.com/apache/kafka/pull/12544#discussion_r966088764


##
connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java:
##
@@ -106,6 +108,7 @@ public Connect startConnect(Map 
workerProps) {
 // Create the admin client to be shared by all backing stores.
 Map adminProps = new HashMap<>(config.originals());
 ConnectUtils.addMetricsContextProperties(adminProps, config, 
kafkaClusterId);
+adminProps.putIfAbsent(CLIENT_ID_CONFIG, "connect-cluster-" + 
config.groupId());

Review Comment:
   One thing that I'm wondering a bit more about is if we should add some kind 
of unique identifier for each worker within the Connect cluster, since without 
one, it becomes harder to use broker logs to debug issues.
   
   It's tempting to use the worker's advertised URL to identify it since that 
comes from the config and is likely to be human-readable. It may be a little 
ugly in some edge cases, though. Other possibilities are to use a hash of the 
advertised URL, or a UUID. Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #11783: KAFKA-14143: Exactly-once source connector system tests (KIP-618)

2022-09-08 Thread GitBox


jsancio commented on PR #11783:
URL: https://github.com/apache/kafka/pull/11783#issuecomment-1240857861

   @showuon @C0urante @mimaison these are the test failures: 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/system-test-kafka-branch-builder--1662600230--C0urante--kafka-1-system-tests--2f62b7469/2022-09-07--001./2022-09-07--001./report.html


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #11783: KAFKA-14143: Exactly-once source connector system tests (KIP-618)

2022-09-08 Thread GitBox


C0urante commented on PR #11783:
URL: https://github.com/apache/kafka/pull/11783#issuecomment-1240862547

   @jsancio looks like all the Connect tests were green, and the failures were 
unrelated. LGTY?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #11783: KAFKA-14143: Exactly-once source connector system tests (KIP-618)

2022-09-08 Thread GitBox


jsancio commented on PR #11783:
URL: https://github.com/apache/kafka/pull/11783#issuecomment-1240920282

   @C0urante sounds good. Do you want to merge it and cherry-pick it to 3.3? I 
can also merge it if you update the description with what you want me to write 
in the commit message.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14203) KRaft broker should disable snapshot generation after error replaying the metadata log

2022-09-08 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio reassigned KAFKA-14203:
--

Assignee: David Arthur

> KRaft broker should disable snapshot generation after error replaying the 
> metadata log
> --
>
> Key: KAFKA-14203
> URL: https://issues.apache.org/jira/browse/KAFKA-14203
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.3.0
>Reporter: Jose Armando Garcia Sancio
>Assignee: David Arthur
>Priority: Major
> Fix For: 3.3.0
>
>
> The broker skips records for which there was an error when replaying the log. 
> This means that the MetadataImage has diverged from the state persistent in 
> the log. The broker should disable snapshot generation else the next time a 
> snapshot gets generated it will result in inconsistent data getting persisted.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji commented on a diff in pull request #12598: KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (server side)

2022-09-08 Thread GitBox


hachikuji commented on code in PR #12598:
URL: https://github.com/apache/kafka/pull/12598#discussion_r966196258


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala:
##
@@ -982,6 +982,12 @@ class GroupCoordinator(val brokerId: Int,
   ): Option[Errors] = {
 if (group.is(Dead)) {
   Some(Errors.COORDINATOR_NOT_AVAILABLE)
+} else if (!isTransactional && generationId < 0 && group.is(Empty)) {

Review Comment:
   I am not sure I follow the reason for the transactional check. Can you 
clarify?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12598: KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (server side)

2022-09-08 Thread GitBox


dajac commented on code in PR #12598:
URL: https://github.com/apache/kafka/pull/12598#discussion_r966198873


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala:
##
@@ -982,6 +982,12 @@ class GroupCoordinator(val brokerId: Int,
   ): Option[Errors] = {
 if (group.is(Dead)) {
   Some(Errors.COORDINATOR_NOT_AVAILABLE)
+} else if (!isTransactional && generationId < 0 && group.is(Empty)) {

Review Comment:
   I thought that transactional commits should never comes with a generation < 
0. I suppose that this is wrong. I suppose that a producer could commit 
transactional offsets without using a consumer group. Is this right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #12604: KAFKA-14188; Getting started for Kafka with KRaft

2022-09-08 Thread GitBox


jsancio commented on code in PR #12604:
URL: https://github.com/apache/kafka/pull/12604#discussion_r966204598


##
docs/quickstart.html:
##
@@ -46,6 +46,14 @@ 
 NOTE: Your local environment must have Java 8+ installed.
 
 
+
+Apache Kafka can be started using ZooKeeper or KRaft. To get 
started with either configuration follow one the sections below but not both.
+
+
+
+Kafka with ZooKeeper
+
+
 
 Run the following commands in order to start all services in the 
correct order:
 

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #12604: KAFKA-14188; Getting started for Kafka with KRaft

2022-09-08 Thread GitBox


jsancio commented on code in PR #12604:
URL: https://github.com/apache/kafka/pull/12604#discussion_r966205030


##
docs/quickstart.html:
##
@@ -64,6 +72,28 @@ 
 
 Once all services have successfully launched, you will have a 
basic Kafka environment running and ready to use.
 
+
+
+Kafka with KRaft
+
+
+
+Generate a Cluster UUID
+
+
+$ 
KAFKA_CLUSTER_ID = $(bin/kafka-storege.sh random-uuid)

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #12604: KAFKA-14188; Getting started for Kafka with KRaft

2022-09-08 Thread GitBox


jsancio commented on PR #12604:
URL: https://github.com/apache/kafka/pull/12604#issuecomment-1240981827

   Thanks for the review @showuon . I addressed your comments. I also fixed the 
`rm` command at the end of the quickstart.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #12598: KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (server side)

2022-09-08 Thread GitBox


hachikuji commented on code in PR #12598:
URL: https://github.com/apache/kafka/pull/12598#discussion_r966208033


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala:
##
@@ -982,6 +982,12 @@ class GroupCoordinator(val brokerId: Int,
   ): Option[Errors] = {
 if (group.is(Dead)) {
   Some(Errors.COORDINATOR_NOT_AVAILABLE)
+} else if (!isTransactional && generationId < 0 && group.is(Empty)) {

Review Comment:
   It seems to be allowed at the moment, though that is definitely not a common 
case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12598: KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (server side)

2022-09-08 Thread GitBox


dajac commented on code in PR #12598:
URL: https://github.com/apache/kafka/pull/12598#discussion_r966208886


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala:
##
@@ -982,6 +982,12 @@ class GroupCoordinator(val brokerId: Int,
   ): Option[Errors] = {
 if (group.is(Dead)) {
   Some(Errors.COORDINATOR_NOT_AVAILABLE)
+} else if (!isTransactional && generationId < 0 && group.is(Empty)) {

Review Comment:
   I can remove that check to stay consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12598: KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (server side)

2022-09-08 Thread GitBox


dajac commented on code in PR #12598:
URL: https://github.com/apache/kafka/pull/12598#discussion_r966215071


##
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala:
##
@@ -982,6 +982,12 @@ class GroupCoordinator(val brokerId: Int,
   ): Option[Errors] = {
 if (group.is(Dead)) {
   Some(Errors.COORDINATOR_NOT_AVAILABLE)
+} else if (!isTransactional && generationId < 0 && group.is(Empty)) {

Review Comment:
   Removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma opened a new pull request, #12609: KAFKA-14198: swagger-jaxrs2 dependency should be compileOnly

2022-09-08 Thread GitBox


ijuma opened a new pull request, #12609:
URL: https://github.com/apache/kafka/pull/12609

   Verified that the artifact generated by `releaseTarGz` no longer includes
   swagger-jaxrs2 or its dependencies (like snakeyaml).
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #12609: KAFKA-14198: swagger-jaxrs2 dependency should be compileOnly

2022-09-08 Thread GitBox


jsancio commented on PR #12609:
URL: https://github.com/apache/kafka/pull/12609#issuecomment-1240998511

   Thanks for the PR @ijuma . What do think @showuon @C0urante @mimaison? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio opened a new pull request, #12610: MINOR; Update documentation for printing dependencies

2022-09-08 Thread GitBox


jsancio opened a new pull request, #12610:
URL: https://github.com/apache/kafka/pull/12610

   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12610: MINOR; Update documentation for printing dependencies

2022-09-08 Thread GitBox


ijuma commented on code in PR #12610:
URL: https://github.com/apache/kafka/pull/12610#discussion_r966242324


##
README.md:
##
@@ -202,7 +202,7 @@ If needed, you can specify the Scala version with 
`-PscalaVersion=2.13`.
 ./gradlew testJar
 
 ### Determining how transitive dependencies are added ###

Review Comment:
   Maybe we should delete this and and move "Dependency Analysis" above "common 
build options"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #12610: MINOR; Update documentation for printing dependencies

2022-09-08 Thread GitBox


jsancio commented on code in PR #12610:
URL: https://github.com/apache/kafka/pull/12610#discussion_r966244134


##
README.md:
##
@@ -202,7 +202,7 @@ If needed, you can specify the Scala version with 
`-PscalaVersion=2.13`.
 ./gradlew testJar
 
 ### Determining how transitive dependencies are added ###

Review Comment:
   I agree. I missed that section. Let me update it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14198) Release package contains snakeyaml 1.30

2022-09-08 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio reassigned KAFKA-14198:
--

Assignee: Ismael Juma

> Release package contains snakeyaml 1.30
> ---
>
> Key: KAFKA-14198
> URL: https://issues.apache.org/jira/browse/KAFKA-14198
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.3.0
>Reporter: Mickael Maison
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 3.3.0
>
>
> snakeyaml 1.30 is vulnerable to CVE-2022-25857: 
> https://security.snyk.io/vuln/SNYK-JAVA-ORGYAML-2806360
> It looks like we pull this dependency because of swagger. It's unclear how or 
> even if this can be exploited in Kafka but it's flagged by scanning tools. 
> I wonder if we could make the swagger dependencies compile time only and 
> avoid shipping them. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] philipnee commented on a diff in pull request #12603: Pausing partition to prevent duplication

2022-09-08 Thread GitBox


philipnee commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r966254369


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 joinPrepareTimer.update();
 }
 
+final SortedSet partitionsToRevoke = 
getPartitionsToRevoke(protocol, generation, memberId);

Review Comment:
   Do we need to recompute the partitions right before the revocation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13766) Use `max.poll.interval.ms` as the timeout during complete-rebalance phase

2022-09-08 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17601949#comment-17601949
 ] 

Guozhang Wang commented on KAFKA-13766:
---

Inside onCompleteJoin, in the block starting with

{{// trigger the awaiting join group response callback for all the members 
after rebalancing{{

Indicates that once we are in the completing rebalance phase, we’ve re-enabled 
the HB with session timeout. I.e. in that phase we effectively have two timers:

{{completeAndScheduleNextHeartbeatExpiration(group, member)}}
and
{{schedulePendingSync(group)}}
whichever triggers first, we would fail the member and re-trigger the 
rebalance. And since in general session.timeout is smaller than rebalance 
timeout, we would hit the former if there’s a delay on assignment.

> Use `max.poll.interval.ms` as the timeout during complete-rebalance phase
> -
>
> Key: KAFKA-13766
> URL: https://issues.apache.org/jira/browse/KAFKA-13766
> Project: Kafka
>  Issue Type: Bug
>  Components: core, group-coordinator
>Reporter: Guozhang Wang
>Assignee: David Jacot
>Priority: Major
>  Labels: new-rebalance-should-fix
>
> The lifetime of a consumer can be categorized in three phases:
> 1) During normal processing, the broker expects a hb request periodically 
> from consumer, and that is timed by the `session.timeout.ms`.
> 2) During the prepare_rebalance, the broker would expect a join-group request 
> to be received within the rebalance.timeout, which is piggy-backed as the 
> `max.poll.interval.ms`.
> 3) During the complete_rebalance, the broker would expect a sync-group 
> request to be received again within the `session.timeout.ms`.
> So during different phases of the life of the consumer, different timeout 
> would be used to bound the timer.
> Nowadays with cooperative rebalance protocol, we can still return records and 
> process them in the middle of a rebalance from {{consumer.poll}}. In that 
> case, for phase 3) we should also use the `max.poll.interval.ms` to bound the 
> timer, which is in practice larger than `session.timeout.ms`.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] guozhangwang commented on pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

2022-09-08 Thread GitBox


guozhangwang commented on PR #12600:
URL: https://github.com/apache/kafka/pull/12600#issuecomment-1241058075

   > I am not sure I can follow. Are you proposing to not recycle or resume 
revoked active tasks?
   
   What I'm proposing is that, for restoring active tasks, we can actually 
ignore them at the `handleRevocation` phase, but only handle them at the 
`handleAssignment` phase, where we would then know if the task is still owned, 
or should be closed, or should be recycled. At that time we add them to the 
corresponding pending tasks, and then call `stateUpdater.remove`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

2022-09-08 Thread GitBox


guozhangwang commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r966274214


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -421,73 +421,120 @@ private void classifyTasksWithoutStateUpdater(final 
Map> 
activeTasksToCreate,
-  final Map> 
standbyTasksToCreate,
-  final Map> 
tasksToRecycle,
-  final Set tasksToCloseClean) {
+private void classifyRunningAndSuspendedTasks(final Map> activeTasksToCreate,
+  final Map> standbyTasksToCreate,
+  final Map> tasksToRecycle,
+  final Set 
tasksToCloseClean) {
 for (final Task task : tasks.allTasks()) {
+if (!task.isActive()) {
+throw new IllegalStateException("Standby tasks should only be 
managed by the state updater");
+}
 final TaskId taskId = task.id();
 if (activeTasksToCreate.containsKey(taskId)) {
-if (task.isActive()) {
-final Set topicPartitions = 
activeTasksToCreate.get(taskId);
-if (tasks.updateActiveTaskInputPartitions(task, 
topicPartitions)) {
-task.updateInputPartitions(topicPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
-}
-task.resume();
-} else {
-throw new IllegalStateException("Standby tasks should only 
be managed by the state updater");
-}
+handleReAssignedActiveTask(task, 
activeTasksToCreate.get(taskId));
 activeTasksToCreate.remove(taskId);
 } else if (standbyTasksToCreate.containsKey(taskId)) {
-if (!task.isActive()) {
-throw new IllegalStateException("Standby tasks should only 
be managed by the state updater");
-} else {
-tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-}
+tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
 standbyTasksToCreate.remove(taskId);
 } else {
 tasksToCloseClean.add(task);
 }
 }
 }
 
+private void handleReAssignedActiveTask(final Task task,
+final Set 
inputPartitions) {
+if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) {
+task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
+}
+task.resume();

Review Comment:
   I'm not sure I follow, but in anyways after thinking about it again I think 
we do not need the pending-tasks-suspended as we could ignore those restoring 
active tasks at the `handleRevocation` phase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14201) Consumer should not send group instance ID if committing with empty member ID

2022-09-08 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-14201:
---
Fix Version/s: 3.3.0

> Consumer should not send group instance ID if committing with empty member ID
> -
>
> Key: KAFKA-14201
> URL: https://issues.apache.org/jira/browse/KAFKA-14201
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: David Jacot
>Priority: Major
> Fix For: 3.3.0
>
>
> The consumer group instance ID is used to support a notion of "static" 
> consumer groups. The idea is to be able to identify the same group instance 
> across restarts so that a rebalance is not needed. However, if the user sets 
> `group.instance.id` in the consumer configuration, but uses "simple" 
> assignment with `assign()`, then the instance ID nevertheless is sent in the 
> OffsetCommit request to the coordinator. This may result in a surprising 
> UNKNOWN_MEMBER_ID error. The consumer should probably be smart enough to only 
> send the instance ID when committing as part of a consumer group.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests

2022-09-08 Thread GitBox


guozhangwang commented on code in PR #12607:
URL: https://github.com/apache/kafka/pull/12607#discussion_r966289656


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -1300,28 +1229,16 @@ public void 
shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception {
 
 // `handleAssignment`
 expectRestoreToBeCompleted(consumer, changeLogReader);
-expect(activeTaskCreator.createTasks(anyObject(), 
eq(taskId00Assignment))).andStubReturn(singletonList(task00));
-
expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andStubReturn(singletonList(task01));
-topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), 
anyString());
-expectLastCall().anyTimes();
+when(activeTaskCreator.createTasks(any(), 
eq(taskId00Assignment))).thenReturn(singletonList(task00));
+
when(standbyTaskCreator.createTasks(eq(taskId01Assignment))).thenReturn(singletonList(task01));
 
+// The second attempt will return empty tasks.
 makeTaskFolders(taskId00.toString(), taskId01.toString());
 expectLockObtainedFor(taskId00, taskId01);
 
-// The second attempt will return empty tasks.
-makeTaskFolders();

Review Comment:
   Why we can remove those calls?



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -325,25 +318,21 @@ public void 
shouldHandleRemovedTasksToRecycleFromStateUpdater() {
 when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, 
task01));
 
 taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
-expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), 
eq(taskId01Partitions), eq(consumer)))
-.andStubReturn(task01Converted);
-activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject());
-expectLastCall().once();
-expect(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), 
eq(taskId00Partitions)))
-.andStubReturn(task00Converted);
-expect(consumer.assignment()).andReturn(emptySet()).anyTimes();
-consumer.resume(anyObject());
-expectLastCall().anyTimes();
-replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, 
consumer);
+when(activeTaskCreator.createActiveTaskFromStandby(eq(task01), 
eq(taskId01Partitions), eq(consumer)))
+.thenReturn(task01Converted);
+when(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), 
eq(taskId00Partitions)))
+.thenReturn(task00Converted);
+when(consumer.assignment()).thenReturn(emptySet());
 
 taskManager.tasks().addPendingTaskToRecycle(taskId00, 
taskId00Partitions);
 taskManager.tasks().addPendingTaskToRecycle(taskId01, 
taskId01Partitions);
 taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter 
-> { });
 
-Mockito.verify(task00Converted).initializeIfNeeded();
-Mockito.verify(task01Converted).initializeIfNeeded();
-Mockito.verify(stateUpdater).add(task00Converted);
-Mockito.verify(stateUpdater).add(task01Converted);
+verify(task00Converted).initializeIfNeeded();
+verify(task01Converted).initializeIfNeeded();
+verify(stateUpdater).add(task00Converted);
+verify(stateUpdater).add(task01Converted);
+verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any());

Review Comment:
   How to specify that we expect this function call only once? Should we use 
`verify(activeTaskCreator, times(1)).func();` instead?



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -4091,21 +3815,20 @@ public void shouldListNotPausedTasks() {
 topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY);
 
 assertEquals(taskManager.notPausedTasks().size(), 0);
+
+verifyConsumerResumedWithAssignment(consumer);

Review Comment:
   Why add this additional verification?



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##
@@ -271,11 +265,13 @@ public void shouldClassifyExistingTasksWithStateUpdater() 
{
 
 taskManager.handleAssignment(standbyTasks, restoringActiveTasks);
 
-Mockito.verify(stateUpdater).getTasks();
-Mockito.verify(stateUpdater).remove(standbyTaskToRecycle.id());
-Mockito.verify(stateUpdater).remove(standbyTaskToClose.id());
-Mockito.verify(stateUpdater).remove(restoringActiveTaskToRecycle.id());
-Mockito.verify(stateUpdater).remove(restoringActiveTaskToClose.id());
+verify(stateUpdater).getTasks();
+verify(stateUpdater).remove(standbyTaskToRecycle.id());
+verify(stateUpdater).remove(standbyTaskToClose.id());
+verify(stateUpdater).remove(restoringActiveTaskToRecycle.id());
+verify(stateUpdater).remove(re

[jira] [Updated] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky

2022-09-08 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-14196:

Affects Version/s: (was: 3.2.1)

> Duplicated consumption during rebalance, causing OffsetValidationTest to act 
> flaky
> --
>
> Key: KAFKA-14196
> URL: https://issues.apache.org/jira/browse/KAFKA-14196
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.3.0
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: new-consumer-threading-should-fix
>
> Several flaky tests under OffsetValidationTest are indicating potential 
> consumer duplication issue, when autocommit is enabled.  I believe this is 
> affecting *3.2* and onward.  Below shows the failure message:
>  
> {code:java}
> Total consumed records 3366 did not match consumed position 3331 {code}
>  
> After investigating the log, I discovered that the data consumed between the 
> start of a rebalance event and the async commit was lost for those failing 
> tests.  In the example below, the rebalance event kicks in at around 
> 1662054846995 (first record), and the async commit of the offset 3739 is 
> completed at around 1662054847015 (right before partitions_revoked).
>  
> {code:java}
> {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
> {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
> {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
> {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
>  {code}
> A few things to note here:
>  # Manually calling commitSync in the onPartitionsRevoke cb seems to 
> alleviate the issue
>  # Setting includeMetadataInTimeout to false also seems to alleviate the 
> issue.
> The above tries seems to suggest that contract between poll() and 
> asyncCommit() is broken.  AFAIK, we implicitly uses poll() to ack the 
> previously fetched data, and the consumer would (try to) commit these offsets 
> in the current poll() loop.  However, it seems like as the poll continues to 
> loop, the "acked" data isn't being committed.
>  
> I believe this could be introduced in  KAFKA-14024, which originated from 
> KAFKA-13310.
> More specifically, (see the comments below), the ConsumerCoordinator will 
> alway return before async commit, due to the previous incomplete commit.  
> However, this is a bit contradictory here because:
>  # I think we want to commit asynchronously while the poll continues, and if 
> we do that, we are back to KAFKA-14024, that the consumer will get rebalance 
> timeout and get kicked out of the group.
>  # But we also need to commit all the "acked" offsets before revoking the 
> partition, and this has to be blocked.
> *Steps to Reproduce the Issue:*
>  # Check out AK 3.2
>  # Run this several times: (Recommend to only run runs with autocommit 
> enabled in consumer_test.py to save time)
> {code:java}
> _DUCKTAPE_OPTIONS="--debug" 
> TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure"
>  bash tests/docker/run_tests.sh {code}
>  
> *Steps to Diagnose the Issue:*
>  # Open the test results in *results/*
>  # Go to the consumer log.  It might look like this
>  
> {code:java}
> results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY
>  {code}
> 3. Find the docker instance that has partition getting revoked and rejoined.  
> Observed the offset before and after.
> *Propose Fixes:*
>  TBD



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky

2022-09-08 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-14196:

Fix Version/s: 3.3.0
   3.2.2

> Duplicated consumption during rebalance, causing OffsetValidationTest to act 
> flaky
> --
>
> Key: KAFKA-14196
> URL: https://issues.apache.org/jira/browse/KAFKA-14196
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.2.1
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.2
>
>
> Several flaky tests under OffsetValidationTest are indicating potential 
> consumer duplication issue, when autocommit is enabled.  I believe this is 
> affecting *3.2* and onward.  Below shows the failure message:
>  
> {code:java}
> Total consumed records 3366 did not match consumed position 3331 {code}
>  
> After investigating the log, I discovered that the data consumed between the 
> start of a rebalance event and the async commit was lost for those failing 
> tests.  In the example below, the rebalance event kicks in at around 
> 1662054846995 (first record), and the async commit of the offset 3739 is 
> completed at around 1662054847015 (right before partitions_revoked).
>  
> {code:java}
> {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
> {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
> {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
> {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
>  {code}
> A few things to note here:
>  # Manually calling commitSync in the onPartitionsRevoke cb seems to 
> alleviate the issue
>  # Setting includeMetadataInTimeout to false also seems to alleviate the 
> issue.
> The above tries seems to suggest that contract between poll() and 
> asyncCommit() is broken.  AFAIK, we implicitly uses poll() to ack the 
> previously fetched data, and the consumer would (try to) commit these offsets 
> in the current poll() loop.  However, it seems like as the poll continues to 
> loop, the "acked" data isn't being committed.
>  
> I believe this could be introduced in  KAFKA-14024, which originated from 
> KAFKA-13310.
> More specifically, (see the comments below), the ConsumerCoordinator will 
> alway return before async commit, due to the previous incomplete commit.  
> However, this is a bit contradictory here because:
>  # I think we want to commit asynchronously while the poll continues, and if 
> we do that, we are back to KAFKA-14024, that the consumer will get rebalance 
> timeout and get kicked out of the group.
>  # But we also need to commit all the "acked" offsets before revoking the 
> partition, and this has to be blocked.
> *Steps to Reproduce the Issue:*
>  # Check out AK 3.2
>  # Run this several times: (Recommend to only run runs with autocommit 
> enabled in consumer_test.py to save time)
> {code:java}
> _DUCKTAPE_OPTIONS="--debug" 
> TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure"
>  bash tests/docker/run_tests.sh {code}
>  
> *Steps to Diagnose the Issue:*
>  # Open the test results in *results/*
>  # Go to the consumer log.  It might look like this
>  
> {code:java}
> results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY
>  {code}
> 3. Find the docker instance that has partition getting revoked and rejoined.  
> Observed the offset before and after.
> *Propose Fixes:*
>  TBD



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky

2022-09-08 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-14196:

Priority: Blocker  (was: Major)

> Duplicated consumption during rebalance, causing OffsetValidationTest to act 
> flaky
> --
>
> Key: KAFKA-14196
> URL: https://issues.apache.org/jira/browse/KAFKA-14196
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.2.1
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.2
>
>
> Several flaky tests under OffsetValidationTest are indicating potential 
> consumer duplication issue, when autocommit is enabled.  I believe this is 
> affecting *3.2* and onward.  Below shows the failure message:
>  
> {code:java}
> Total consumed records 3366 did not match consumed position 3331 {code}
>  
> After investigating the log, I discovered that the data consumed between the 
> start of a rebalance event and the async commit was lost for those failing 
> tests.  In the example below, the rebalance event kicks in at around 
> 1662054846995 (first record), and the async commit of the offset 3739 is 
> completed at around 1662054847015 (right before partitions_revoked).
>  
> {code:java}
> {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
> {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
> {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
> {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
>  {code}
> A few things to note here:
>  # Manually calling commitSync in the onPartitionsRevoke cb seems to 
> alleviate the issue
>  # Setting includeMetadataInTimeout to false also seems to alleviate the 
> issue.
> The above tries seems to suggest that contract between poll() and 
> asyncCommit() is broken.  AFAIK, we implicitly uses poll() to ack the 
> previously fetched data, and the consumer would (try to) commit these offsets 
> in the current poll() loop.  However, it seems like as the poll continues to 
> loop, the "acked" data isn't being committed.
>  
> I believe this could be introduced in  KAFKA-14024, which originated from 
> KAFKA-13310.
> More specifically, (see the comments below), the ConsumerCoordinator will 
> alway return before async commit, due to the previous incomplete commit.  
> However, this is a bit contradictory here because:
>  # I think we want to commit asynchronously while the poll continues, and if 
> we do that, we are back to KAFKA-14024, that the consumer will get rebalance 
> timeout and get kicked out of the group.
>  # But we also need to commit all the "acked" offsets before revoking the 
> partition, and this has to be blocked.
> *Steps to Reproduce the Issue:*
>  # Check out AK 3.2
>  # Run this several times: (Recommend to only run runs with autocommit 
> enabled in consumer_test.py to save time)
> {code:java}
> _DUCKTAPE_OPTIONS="--debug" 
> TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure"
>  bash tests/docker/run_tests.sh {code}
>  
> *Steps to Diagnose the Issue:*
>  # Open the test results in *results/*
>  # Go to the consumer log.  It might look like this
>  
> {code:java}
> results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY
>  {code}
> 3. Find the docker instance that has partition getting revoked and rejoined.  
> Observed the offset before and after.
> *Propose Fixes:*
>  TBD



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky

2022-09-08 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-14196:

Affects Version/s: 3.2.1
   (was: 3.3.0)

> Duplicated consumption during rebalance, causing OffsetValidationTest to act 
> flaky
> --
>
> Key: KAFKA-14196
> URL: https://issues.apache.org/jira/browse/KAFKA-14196
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.2.1
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Major
>  Labels: new-consumer-threading-should-fix
>
> Several flaky tests under OffsetValidationTest are indicating potential 
> consumer duplication issue, when autocommit is enabled.  I believe this is 
> affecting *3.2* and onward.  Below shows the failure message:
>  
> {code:java}
> Total consumed records 3366 did not match consumed position 3331 {code}
>  
> After investigating the log, I discovered that the data consumed between the 
> start of a rebalance event and the async commit was lost for those failing 
> tests.  In the example below, the rebalance event kicks in at around 
> 1662054846995 (first record), and the async commit of the offset 3739 is 
> completed at around 1662054847015 (right before partitions_revoked).
>  
> {code:java}
> {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
> {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
> {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
> {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
>  {code}
> A few things to note here:
>  # Manually calling commitSync in the onPartitionsRevoke cb seems to 
> alleviate the issue
>  # Setting includeMetadataInTimeout to false also seems to alleviate the 
> issue.
> The above tries seems to suggest that contract between poll() and 
> asyncCommit() is broken.  AFAIK, we implicitly uses poll() to ack the 
> previously fetched data, and the consumer would (try to) commit these offsets 
> in the current poll() loop.  However, it seems like as the poll continues to 
> loop, the "acked" data isn't being committed.
>  
> I believe this could be introduced in  KAFKA-14024, which originated from 
> KAFKA-13310.
> More specifically, (see the comments below), the ConsumerCoordinator will 
> alway return before async commit, due to the previous incomplete commit.  
> However, this is a bit contradictory here because:
>  # I think we want to commit asynchronously while the poll continues, and if 
> we do that, we are back to KAFKA-14024, that the consumer will get rebalance 
> timeout and get kicked out of the group.
>  # But we also need to commit all the "acked" offsets before revoking the 
> partition, and this has to be blocked.
> *Steps to Reproduce the Issue:*
>  # Check out AK 3.2
>  # Run this several times: (Recommend to only run runs with autocommit 
> enabled in consumer_test.py to save time)
> {code:java}
> _DUCKTAPE_OPTIONS="--debug" 
> TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure"
>  bash tests/docker/run_tests.sh {code}
>  
> *Steps to Diagnose the Issue:*
>  # Open the test results in *results/*
>  # Go to the consumer log.  It might look like this
>  
> {code:java}
> results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY
>  {code}
> 3. Find the docker instance that has partition getting revoked and rejoined.  
> Observed the offset before and after.
> *Propose Fixes:*
>  TBD



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14209) Optimize stream stream self join to use single state store

2022-09-08 Thread Vicky Papavasileiou (Jira)
Vicky Papavasileiou created KAFKA-14209:
---

 Summary: Optimize stream stream self join to use single state store
 Key: KAFKA-14209
 URL: https://issues.apache.org/jira/browse/KAFKA-14209
 Project: Kafka
  Issue Type: Improvement
Reporter: Vicky Papavasileiou


For stream-stream joins that join the same source, we can omit one state store 
since they contain the same data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji commented on a diff in pull request #12593: KAFKA-14196: Prevent fetching during the rebalancing

2022-09-08 Thread GitBox


hachikuji commented on code in PR #12593:
URL: https://github.com/apache/kafka/pull/12593#discussion_r966321913


##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -1282,6 +1282,11 @@ private Fetch pollForFetches(Timer timer) {
 long pollTimeout = coordinator == null ? timer.remainingMs() :
 Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), 
timer.remainingMs());
 
+if(coordinator.isCommittingOffsetAsync()) {

Review Comment:
   Hmm, do we want to do this in the general case? I think my expectation is 
that we would not continue fetching for partitions only when we have sent the 
offset commit and we are awaiting revocation as part of a rebalance.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee closed pull request #12593: KAFKA-14196: Prevent fetching during the rebalancing

2022-09-08 Thread GitBox


philipnee closed pull request #12593: KAFKA-14196: Prevent fetching during the 
rebalancing
URL: https://github.com/apache/kafka/pull/12593


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky

2022-09-08 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-14196:
---
Description: 
Several flaky tests under OffsetValidationTest are indicating potential 
consumer duplication issue, when autocommit is enabled.  I believe this is 
affecting *3.2* and onward.  Below shows the failure message:

 
{code:java}
Total consumed records 3366 did not match consumed position 3331 {code}
 

After investigating the log, I discovered that the data consumed between the 
start of a rebalance event and the async commit was lost for those failing 
tests.  In the example below, the rebalance event kicks in at around 
1662054846995 (first record), and the async commit of the offset 3739 is 
completed at around 1662054847015 (right before partitions_revoked).

 
{code:java}
{"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
{"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
{"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
{"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
{"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
{"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
 {code}
A few things to note here:
 # Manually calling commitSync in the onPartitionsRevoke cb seems to alleviate 
the issue
 # Setting includeMetadataInTimeout to false also seems to alleviate the issue.

The above tries seems to suggest that contract between poll() and asyncCommit() 
is broken.  AFAIK, we implicitly uses poll() to ack the previously fetched 
data, and the consumer would (try to) commit these offsets in the current 
poll() loop.  However, it seems like as the poll continues to loop, the "acked" 
data isn't being committed.

 

I believe this could be introduced in  KAFKA-14024, which originated from 
KAFKA-13310.

More specifically, (see the comments below), the ConsumerCoordinator will alway 
return before async commit, due to the previous incomplete commit.  However, 
this is a bit contradictory here because:
 # I think we want to commit asynchronously while the poll continues, and if we 
do that, we are back to KAFKA-14024, that the consumer will get rebalance 
timeout and get kicked out of the group.
 # But we also need to commit all the "acked" offsets before revoking the 
partition, and this has to be blocked.

*Steps to Reproduce the Issue:*
 # Check out AK 3.2
 # Run this several times: (Recommend to only run runs with autocommit enabled 
in consumer_test.py to save time)
{code:java}
_DUCKTAPE_OPTIONS="--debug" 
TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure"
 bash tests/docker/run_tests.sh {code}
 

*Steps to Diagnose the Issue:*
 # Open the test results in *results/*
 # Go to the consumer log.  It might look like this

 
{code:java}
results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY
 {code}
3. Find the docker instance that has partition getting revoked and rejoined.  
Observed the offset before and after.

*Propose Fixes:*

 TBD

 

https://github.com/apache/kafka/pull/12603

  was:
Several flaky tests under OffsetValidationTest are indicating potential 
consumer duplication issue, when autocommit is enabled.  I believe this is 
affecting *3.2* and onward.  Below shows the failure message:

 
{code:java}
Total consumed records 3366 did not match consumed position 3331 {code}
 

After investigating the log, I discovered that the data consumed between the 
start of a rebalance event and the async commit was lost for those failing 
tests.  In the example below, the rebalance event kicks in at around 
1662054846995 (first record), and the async commit of the offset 3739 is 
completed at around 1662054847015 (right before partitions_revoked).

 
{code:java}
{"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
{"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
{"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
{"timestamp":1662054847016,"name":"partitions_revoked","partitions

[GitHub] [kafka] C0urante commented on pull request #11783: KAFKA-14143: Exactly-once source connector system tests (KIP-618)

2022-09-08 Thread GitBox


C0urante commented on PR #11783:
URL: https://github.com/apache/kafka/pull/11783#issuecomment-1241120740

   I can handle the merge. Thanks @jsancio, @showuon, @mimaison, and 
@tombentley for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante merged pull request #11783: KAFKA-14143: Exactly-once source connector system tests (KIP-618)

2022-09-08 Thread GitBox


C0urante merged PR #11783:
URL: https://github.com/apache/kafka/pull/11783


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14143) Exactly-once source system tests

2022-09-08 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-14143.
---
Resolution: Fixed

> Exactly-once source system tests
> 
>
> Key: KAFKA-14143
> URL: https://issues.apache.org/jira/browse/KAFKA-14143
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Affects Versions: 3.3.0
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.3.0
>
>
> System tests for the exactly-once source connector support introduced in 
> [KIP-618|https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors]
>  / KAFKA-1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #12575: KAFKA-14193: Add EOS related source connector configs to expected config defs in ConnectRestApiTest

2022-09-08 Thread GitBox


C0urante commented on PR #12575:
URL: https://github.com/apache/kafka/pull/12575#issuecomment-1241138425

   Now that we've merged and backported 
https://github.com/apache/kafka/pull/11783, this should hopefully no longer be 
necessary. @yashmayya let me know if anything still fails; otherwise, we may 
want to close this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12609: KAFKA-14198: swagger-jaxrs2 dependency should be compileOnly

2022-09-08 Thread GitBox


C0urante commented on PR #12609:
URL: https://github.com/apache/kafka/pull/12609#issuecomment-1241219303

   This appears to break the docs generation added in 
https://github.com/apache/kafka/pull/12067; running `./gradlew siteDocsTar` 
with this change causes the generated `docs/generated/connect_rest.yaml` file 
to lose all information on the REST API save for general metadata.
   
   Are we okay with spending some time on trying to fix the docs generation, or 
does this need to be merged ASAP to unblock the 3.3 release?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #12609: KAFKA-14198: swagger-jaxrs2 dependency should be compileOnly

2022-09-08 Thread GitBox


jsancio commented on PR #12609:
URL: https://github.com/apache/kafka/pull/12609#issuecomment-1241226713

   > Are we okay with spending some time on trying to fix the docs generation, 
or does this need to be merged ASAP to unblock the 3.3 release?
   
   @C0urante I am open to suggestions on how to fix this. We have a couple of 
days to fix the issue. Do you have time to look into this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on pull request #12609: KAFKA-14198: swagger-jaxrs2 dependency should be compileOnly

2022-09-08 Thread GitBox


C0urante commented on PR #12609:
URL: https://github.com/apache/kafka/pull/12609#issuecomment-1241229865

   Yeah, I can try to find something. Thanks José!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12595: KAFKA-14204: QuorumController must correctly handle overly large batches

2022-09-08 Thread GitBox


cmccabe commented on code in PR #12595:
URL: https://github.com/apache/kafka/pull/12595#discussion_r966431199


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -844,6 +855,72 @@ public String toString() {
 }
 }
 
+/**
+ * Append records to the Raft log. They will be written out asynchronously.
+ *
+ * @param log   The log4j logger.
+ * @param resultThe controller result we are writing out.
+ * @param maxRecordsPerBatchThe maximum number of records to allow in 
a batch.
+ * @param appender  The callback to invoke for each batch. The 
arguments are last
+ *  write offset, record list, and the return 
result is the new
+ *  last write offset.
+ * @return  The final offset that was returned from 
the Raft layer.
+ */
+static long appendRecords(
+Logger log,
+ControllerResult result,
+int maxRecordsPerBatch,
+Function, Long> appender
+) {
+try {
+List records = result.records();
+if (result.isAtomic()) {
+// If the result must be written out atomically, check that it 
is not too large.
+// In general, we create atomic batches when it is important 
to commit "all, or
+// nothing". They are limited in size and must only be used 
when the batch size
+// is bounded.
+if (records.size() > maxRecordsPerBatch) {
+throw new IllegalStateException("Attempted to atomically 
commit " +
+records.size() + " records, but maxRecordsPerBatch 
is " +
+maxRecordsPerBatch);
+}
+long offset = appender.apply(records);
+if (log.isTraceEnabled()) {
+log.trace("Atomically appended {} record(s) ending with 
offset {}.",
+records.size(), offset);
+}
+return offset;
+} else {
+// If the result is non-atomic, then split it into as many 
batches as needed.
+// The appender callback will create an in-memory snapshot for 
each batch,
+// since we might need to revert to any of them. We will only 
return the final
+// offset of the last batch, however.
+int i = 0, numBatches = 0;
+while (true) {
+numBatches++;
+int j = i + maxRecordsPerBatch;
+if (j > records.size()) {
+long offset = appender.apply(records.subList(i, 
records.size()));

Review Comment:
   Yeah... In general LinkedList turns a lot of stuff into O(N) and that's why 
we mostly don't use it. It's only really useful if you want to delete things 
from the middle of a list in O(1), but you also don't need fast access to the 
middle of the list, which is a pretty rare situation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12595: KAFKA-14204: QuorumController must correctly handle overly large batches

2022-09-08 Thread GitBox


cmccabe commented on code in PR #12595:
URL: https://github.com/apache/kafka/pull/12595#discussion_r966431807


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -844,6 +855,72 @@ public String toString() {
 }
 }
 
+/**
+ * Append records to the Raft log. They will be written out asynchronously.
+ *
+ * @param log   The log4j logger.
+ * @param resultThe controller result we are writing out.
+ * @param maxRecordsPerBatchThe maximum number of records to allow in 
a batch.
+ * @param appender  The callback to invoke for each batch. The 
arguments are last
+ *  write offset, record list, and the return 
result is the new
+ *  last write offset.
+ * @return  The final offset that was returned from 
the Raft layer.
+ */
+static long appendRecords(
+Logger log,
+ControllerResult result,
+int maxRecordsPerBatch,
+Function, Long> appender
+) {
+try {
+List records = result.records();
+if (result.isAtomic()) {
+// If the result must be written out atomically, check that it 
is not too large.
+// In general, we create atomic batches when it is important 
to commit "all, or
+// nothing". They are limited in size and must only be used 
when the batch size
+// is bounded.
+if (records.size() > maxRecordsPerBatch) {
+throw new IllegalStateException("Attempted to atomically 
commit " +
+records.size() + " records, but maxRecordsPerBatch 
is " +
+maxRecordsPerBatch);
+}
+long offset = appender.apply(records);
+if (log.isTraceEnabled()) {
+log.trace("Atomically appended {} record(s) ending with 
offset {}.",
+records.size(), offset);
+}
+return offset;
+} else {
+// If the result is non-atomic, then split it into as many 
batches as needed.
+// The appender callback will create an in-memory snapshot for 
each batch,
+// since we might need to revert to any of them. We will only 
return the final
+// offset of the last batch, however.
+int i = 0, numBatches = 0;
+while (true) {
+numBatches++;
+int j = i + maxRecordsPerBatch;
+if (j > records.size()) {
+long offset = appender.apply(records.subList(i, 
records.size()));

Review Comment:
   btw thanks for thinking about the big-O here, even if it didn't end up being 
an issue in this particular case. We should definitely think about big-O



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #12595: KAFKA-14204: QuorumController must correctly handle overly large batches

2022-09-08 Thread GitBox


cmccabe merged PR #12595:
URL: https://github.com/apache/kafka/pull/12595


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12609: KAFKA-14198: swagger-jaxrs2 dependency should be compileOnly

2022-09-08 Thread GitBox


C0urante commented on code in PR #12609:
URL: https://github.com/apache/kafka/pull/12609#discussion_r966438130


##
build.gradle:
##
@@ -2652,7 +2653,7 @@ project(':connect:runtime') {
   }
 
   task genConnectOpenAPIDocs(type: 
io.swagger.v3.plugins.gradle.tasks.ResolveTask, dependsOn: 
setVersionInOpenAPISpec) {
-classpath = sourceSets.main.runtimeClasspath
+classpath = sourceSets.main.compileClasspath

Review Comment:
   This feels like a terrible hack but by combining the runtime and compile 
classpaths, we can still generate the correct docs with `siteDocsTar` and omit 
the unnecessary Swagger dependencies from the release artifact generated with 
`releaseTarGz`:
   ```suggestion
   classpath = sourceSets.main.compileClasspath + 
sourceSets.main.runtimeClasspath
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #12598: KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (server side)

2022-09-08 Thread GitBox


hachikuji merged PR #12598:
URL: https://github.com/apache/kafka/pull/12598


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14204) QuorumController must correctly handle overly large batches

2022-09-08 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio resolved KAFKA-14204.

Resolution: Fixed

> QuorumController must correctly handle overly large batches
> ---
>
> Key: KAFKA-14204
> URL: https://issues.apache.org/jira/browse/KAFKA-14204
> Project: Kafka
>  Issue Type: Bug
>  Components: controller, kraft
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Blocker
> Fix For: 3.3.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jsancio commented on pull request #12575: KAFKA-14193: Add EOS related source connector configs to expected config defs in ConnectRestApiTest

2022-09-08 Thread GitBox


jsancio commented on PR #12575:
URL: https://github.com/apache/kafka/pull/12575#issuecomment-1241267992

   I am going to close the PR. @yashmayya feel free to open the PR if you 
disagree.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio closed pull request #12575: KAFKA-14193: Add EOS related source connector configs to expected config defs in ConnectRestApiTest

2022-09-08 Thread GitBox


jsancio closed pull request #12575: KAFKA-14193: Add EOS related source 
connector configs to expected config defs in ConnectRestApiTest
URL: https://github.com/apache/kafka/pull/12575


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #12609: KAFKA-14198: swagger-jaxrs2 dependency should be compileOnly

2022-09-08 Thread GitBox


jsancio commented on code in PR #12609:
URL: https://github.com/apache/kafka/pull/12609#discussion_r966451504


##
build.gradle:
##
@@ -2652,7 +2653,7 @@ project(':connect:runtime') {
   }
 
   task genConnectOpenAPIDocs(type: 
io.swagger.v3.plugins.gradle.tasks.ResolveTask, dependsOn: 
setVersionInOpenAPISpec) {
-classpath = sourceSets.main.runtimeClasspath
+classpath = sourceSets.main.compileClasspath

Review Comment:
   I am okay with this fix. Feel free to create a Jira to have a better fix 
beyond 3.3.0



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12609: KAFKA-14198: swagger-jaxrs2 dependency should be compileOnly

2022-09-08 Thread GitBox


ijuma commented on code in PR #12609:
URL: https://github.com/apache/kafka/pull/12609#discussion_r966455049


##
build.gradle:
##
@@ -2652,7 +2653,7 @@ project(':connect:runtime') {
   }
 
   task genConnectOpenAPIDocs(type: 
io.swagger.v3.plugins.gradle.tasks.ResolveTask, dependsOn: 
setVersionInOpenAPISpec) {
-classpath = sourceSets.main.runtimeClasspath
+classpath = sourceSets.main.compileClasspath

Review Comment:
   The right fix may be to have a separate scope for swagger and that's 
basically runtimeClasspath + swagger deps.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #12599: KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (client side)

2022-09-08 Thread GitBox


hachikuji commented on PR #12599:
URL: https://github.com/apache/kafka/pull/12599#issuecomment-1241281654

   Thanks, I was a little hesitant about the client-side fix since I thought 
there might be a debugging benefit of having group instance ID in request logs. 
But I guess this speculative benefit is probably small compared to the cost of 
seeing the regression in behavior because of the additional validation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #12599: KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (client side)

2022-09-08 Thread GitBox


hachikuji merged PR #12599:
URL: https://github.com/apache/kafka/pull/12599


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

2022-09-08 Thread GitBox


jsancio commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r966459040


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -307,11 +322,18 @@ class BrokerMetadataListener(
 
   private def publish(publisher: MetadataPublisher): Unit = {
 val delta = _delta
-_image = _delta.apply()
+try {
+  _image = _delta.apply()
+} catch {
+  case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error 
applying metadata delta $delta", t)
+}
+
 _delta = new MetadataDelta(_image)
 if (isDebugEnabled) {
   debug(s"Publishing new metadata delta $delta at offset 
${_image.highestOffsetAndEpoch().offset}.")
 }
+
+// This publish call is done with its own try-catch and fault handler
 publisher.publish(delta, _image)

Review Comment:
   Is this correct? If there was an error in `_image = _delta.apply()`, 
`_image` will be the previous image that was published while `delta` is the new 
`_delta` that was not applied. Also, note that I am pretty sure that the code 
`publisher.publish` assumes that this layer doesn't send duplicate deltas and 
images.
   
   Is there a way we can write tests for this code and scenarios so that we can 
increase our confidence that this code behaves as we expect?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

2022-09-08 Thread GitBox


jsancio commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r964300580


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -41,8 +43,19 @@ class BrokerMetadataListener(
   val maxBytesBetweenSnapshots: Long,
   val snapshotter: Option[MetadataSnapshotter],
   brokerMetrics: BrokerServerMetrics,
-  metadataLoadingFaultHandler: FaultHandler
+  _metadataLoadingFaultHandler: FaultHandler
 ) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
+
+  private val metadataFaultOccurred = new AtomicBoolean(false)
+  private val metadataLoadingFaultHandler: FaultHandler = new FaultHandler() {
+override def handleFault(failureMessage: String, cause: Throwable): 
RuntimeException = {
+  if (metadataFaultOccurred.compareAndSet(false, true)) {
+error("Disabling metadata snapshots until this broker is restarted.")
+  }
+  _metadataLoadingFaultHandler.handleFault(failureMessage, cause)
+}

Review Comment:
   This abstraction feels strange. For example, how does the operator monitor 
that Kafka has an issue and it is not generating snapshots? I assume that they 
need to monitor the metric for `BrokerSeverMetrics.metadataLoadErrorCount` 
which is updated from `KafkaRaftServer`. The disabling of snapshotting happens 
in `BrokerMetadataListener` which doesn't know about this metric.
   
   I think the solution should make this relation explicit and not have it 
hidden or implemented across multiple layers of abstraction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

2022-09-08 Thread GitBox


jsancio commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r966471788


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -28,6 +28,8 @@ import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.server.fault.FaultHandler
 import org.apache.kafka.snapshot.SnapshotReader
 
+import java.util.concurrent.atomic.AtomicBoolean
+

Review Comment:
   This comment applies to this code:
   ```scala
   try {
 delta.replay(highestMetadataOffset, epoch, 
messageAndVersion.message())
   } catch {
 case e: Throwable => snapshotName match {
   case None => metadataLoadingFaultHandler.handleFault(
 s"Error replaying metadata log record at offset 
${_highestOffset}", e)
   case Some(name) => metadataLoadingFaultHandler.handleFault(
 s"Error replaying record ${index} from snapshot ${name} at 
offset ${_highestOffset}", e)
 }
   }
   ```
   I think this code attempts to read and replay the entire committed log. I 
wonder if this code should be more conservative if it encounters an error 
replaying a record and only read the current batch before updating the image.
   
   Note that this code is used for both snapshots and log segments. For 
snapshots, the entire snapshot needs to be in one `delta` update.



##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -307,11 +322,18 @@ class BrokerMetadataListener(
 
   private def publish(publisher: MetadataPublisher): Unit = {
 val delta = _delta
-_image = _delta.apply()
+try {
+  _image = _delta.apply()

Review Comment:
   Note that it is possible for `_delta` to include a lot of batches maybe even 
the entire log. I wonder that if the broker encounters an error applying a 
delta we want to instead rewind, generate and apply a delta per record batch. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12604: KAFKA-14188; Getting started for Kafka with KRaft

2022-09-08 Thread GitBox


cmccabe commented on code in PR #12604:
URL: https://github.com/apache/kafka/pull/12604#discussion_r966476362


##
docs/streams/quickstart.html:
##
@@ -98,19 +97,46 @@ Step
 Step 2: Start the Kafka server
 
 
-Kafka uses https://zookeeper.apache.org/";>ZooKeeper so you need 
to first start a ZooKeeper server if you don't already have one. You can use 
the convenience script packaged with kafka to get a quick-and-dirty single-node 
ZooKeeper instance.
+  Apache Kafka can be started using ZooKeeper or KRaft. To get started with 
either configuration follow one the section below but not both.

Review Comment:
   should be "one of the sections below" not "one the section"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

2022-09-08 Thread GitBox


mumrah commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r966480469


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -307,11 +322,18 @@ class BrokerMetadataListener(
 
   private def publish(publisher: MetadataPublisher): Unit = {
 val delta = _delta
-_image = _delta.apply()
+try {
+  _image = _delta.apply()
+} catch {
+  case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error 
applying metadata delta $delta", t)
+}
+
 _delta = new MetadataDelta(_image)
 if (isDebugEnabled) {
   debug(s"Publishing new metadata delta $delta at offset 
${_image.highestOffsetAndEpoch().offset}.")
 }
+
+// This publish call is done with its own try-catch and fault handler
 publisher.publish(delta, _image)

Review Comment:
   Thanks, good catch. I missed a `throw` in the catch block above. If we can't 
apply the delta we should not publish the image.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

2022-09-08 Thread GitBox


mumrah commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r966480469


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -307,11 +322,18 @@ class BrokerMetadataListener(
 
   private def publish(publisher: MetadataPublisher): Unit = {
 val delta = _delta
-_image = _delta.apply()
+try {
+  _image = _delta.apply()
+} catch {
+  case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error 
applying metadata delta $delta", t)
+}
+
 _delta = new MetadataDelta(_image)
 if (isDebugEnabled) {
   debug(s"Publishing new metadata delta $delta at offset 
${_image.highestOffsetAndEpoch().offset}.")
 }
+
+// This publish call is done with its own try-catch and fault handler
 publisher.publish(delta, _image)

Review Comment:
   Thanks, good catch. I missed a `throw` in the catch block above. If we can't 
apply the delta we should not publish the image.
   
   I agree that more tests would be very useful as we harden this code path. 
I'll see what I can come up with for this PR and we can continue adding more 
tests after 3.3



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors

2022-09-08 Thread GitBox


mumrah commented on code in PR #12596:
URL: https://github.com/apache/kafka/pull/12596#discussion_r966484578


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -307,11 +322,18 @@ class BrokerMetadataListener(
 
   private def publish(publisher: MetadataPublisher): Unit = {
 val delta = _delta
-_image = _delta.apply()
+try {
+  _image = _delta.apply()

Review Comment:
   Rewinding and re-applying does sound useful for some kind of automatic error 
mitigation, but I think it will be a quite a bit of work. As it stands, I 
believe the broker can only process metadata going forward. 
   
   I can think of a degenerate case we have today where `loadBatches` is able 
to process all but one record, but `delta.apply` cannot complete and so we 
can't publish any new metadata. Like you mention, I think the only way to 
mitigate a situation like this would be to produce smaller deltas to reduce the 
blast radius of a bad record.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12887) Do not trigger user-customized ExceptionalHandler for RTE

2022-09-08 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17602033#comment-17602033
 ] 

Guozhang Wang commented on KAFKA-12887:
---

Hello [~icorne] We have reverted this change due to reasons similar to what you 
described here. Do you see it still in 3.1.1? Could you try 3.2.1 instead?

> Do not trigger user-customized ExceptionalHandler for RTE
> -
>
> Key: KAFKA-12887
> URL: https://issues.apache.org/jira/browse/KAFKA-12887
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Josep Prat
>Priority: Major
> Fix For: 3.1.0
>
>
> Today in StreamThread we have a try-catch block that captures all {{Throwable 
> e}} and then triggers {{this.streamsUncaughtExceptionHandler.accept(e)}}. 
> However, there are possible RTEs such as IllegalState/IllegalArgument 
> exceptions which are usually caused by bugs, etc. In such cases we should not 
> let users to decide what to do with these exceptions, but should let Streams 
> itself to enforce the decision, e.g. in the IllegalState/IllegalArgument we 
> should fail fast to notify the potential error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jsancio commented on pull request #12604: KAFKA-14188; Getting started for Kafka with KRaft

2022-09-08 Thread GitBox


jsancio commented on PR #12604:
URL: https://github.com/apache/kafka/pull/12604#issuecomment-1241324010

   > We should move “Once all services have successfully launched, you will 
have a basic Kafka environment running and ready to use.” under both ZK and 
KRaft sections.
   
   @forlack Thanks for the feedback. Kept the sentence in the "Kafka with 
ZooKeeper" section and added a similar sentence to the "Kafka with KRaft" 
section. It is awkward formatting to have a paragraph for a parent section 
after starting a subsection.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14189) Improve connection limit and reuse of coordinator and leader in KafkaConsumer

2022-09-08 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17602037#comment-17602037
 ] 

Guozhang Wang commented on KAFKA-14189:
---

Hi [~aglicacha] [~vongosling]

The main motivation for using two connection sockets for the coordinator and 
partition leader is to not block coordination related requests such as 
join/sync by fetching requests (which could be long polling, and during that 
time we cannot send other requests using the same socket). Reusing the 
connection may cause issues e.g. a heartbeat request not being processed in 
time if there's already fetching request parked at the broker side.

> Improve connection limit and reuse of coordinator and leader in KafkaConsumer
> -
>
> Key: KAFKA-14189
> URL: https://issues.apache.org/jira/browse/KAFKA-14189
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 0.9.0.0
>Reporter: Junyang Liu
>Priority: Major
>
> The connection id of connection with coordinator in KafkaConsumer is 
> Integer.MAX_VALUE - coordinator id, which is different with connection id of 
> partition leader. So the connection cannot be reused when coordinator and 
> leader are in the same broker, which means we need two seperated connections 
> with the same broker. Suppose such case, a consumer has connected to the 
> coordinator and finished Join and Sync, and wants to send FETCH to leader in 
> the same broker. But the connection count has reached limit, so the consumer 
> with be in the group but cannot consume messages
> partial logs:
> {code:java}
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Added 
> READ_UNCOMMITTED fetch request for partition topic-test-4 at offset 9 to node 
> :9092 (id: 2 rack: 2) 
> (org.apache.kafka.clients.consumer.internals.Fetcher) 
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Built full fetch 
> (sessionId=INVALID, epoch=INITIAL) for node 2 with 1 partition(s). 
> (org.apache.kafka.clients.FetchSessionHandler) 
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Sending 
> READ_UNCOMMITTED FullFetchRequest(topic-test-4) to broker :9092 (id: 2 
> rack: 2) (org.apache.kafka.clients.consumer.internals.Fetcher)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Initiating 
> connection to node :9092 (id: 2 rack: 2) using address / 
> (org.apache.kafka.clients.NetworkClient) 
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Using older server 
> API v3 to send OFFSET_COMMIT 
> {group_id=group-test,generation_id=134,member_id=consumer-11-2e2b16eb-516c-496c-8aa4-c6e990b43598,retention_time=-1,topics=[{topic=topic-test,partitions=[{partition=3,offset=0,metadata=},{partition=4,offset=9,metadata=},{partition=5,offset=13,metadata=}]}]}
>  with correlation id 242 to node 2147483645 
> (org.apache.kafka.clients.NetworkClient)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Created socket with 
> SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2 
> (org.apache.kafka.common.network.Selector)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Completed 
> connection to node 2. Fetching API versions. 
> (org.apache.kafka.clients.NetworkClient)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Initiating API 
> versions fetch from node 2. (org.apache.kafka.clients.NetworkClient)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Subscribed to 
> topic(s): topic-test (org.apache.kafka.clients.consumer.KafkaConsumer)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Connection with 
> / disconnected (org.apache.kafka.common.network.Selector)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Node 2 
> disconnected. (org.apache.kafka.clients.NetworkClient) 
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Cancelled request 
> with header RequestHeader(apiKey=FETCH, apiVersion=10, clientId=consumer-11, 
> correlationId=241) due to node 2 being disconnected 
> (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
> DEBUG [Consumer clientId=consumer-11, groupId=group-test] Error sending fetch 
> request (sessionId=INVALID, epoch=INITIAL) to node 2: 
> org.apache.kafka.common.errors.DisconnectException. 
> (org.apache.kafka.clients.FetchSessionHandler){code}
> connection to coordinator, rebalance and fetching offsets have finished. when 
> preparing connection to leader for fetching, the connection limit has 
> reached, so after tcp connection, the broker disconnect the client.  
>  
> The root cause of this issue is that the process of consuming is a 
> combination of multiple connections(connections with coordinator and leader 
> in same broker), not atomic, which may leads to "half connected". I think we

[GitHub] [kafka] jsancio merged pull request #12604: KAFKA-14188; Getting started for Kafka with KRaft

2022-09-08 Thread GitBox


jsancio merged PR #12604:
URL: https://github.com/apache/kafka/pull/12604


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14208) KafkaConsumer#commitAsync throws unexpected WakeupException

2022-09-08 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17602046#comment-17602046
 ] 

Guozhang Wang commented on KAFKA-14208:
---

Hello Qingsheng, thanks for reporting this issue, and I looked at the source 
code and agree with you that it was introduced as part of KAFKA-13563. I will 
try to fix this with a follow-up PR.

> KafkaConsumer#commitAsync throws unexpected WakeupException
> ---
>
> Key: KAFKA-14208
> URL: https://issues.apache.org/jira/browse/KAFKA-14208
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.2.1
>Reporter: Qingsheng Ren
>Priority: Major
>
> We recently encountered a bug after upgrading Kafka client to 3.2.1 in Flink 
> Kafka connector (FLINK-29153). Here's the exception:
> {code:java}
> org.apache.kafka.common.errors.WakeupException
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:514)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:278)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:252)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.coordinatorUnknownAndUnready(ConsumerCoordinator.java:493)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:1055)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1573)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:226)
>  {code}
> As {{WakeupException}} is not listed in the JavaDoc of 
> {{{}KafkaConsumer#commitAsync{}}}, Flink Kafka connector doesn't catch the 
> exception thrown directly from KafkaConsumer#commitAsync but handles all 
> exceptions in the callback.
> I checked the source code and suspect this is caused by KAFKA-13563. Also we 
> never had this exception in commitAsync when we used Kafka client 2.4.1 & 
> 2.8.1. 
> I'm wondering if this is kind of breaking the public API as the 
> WakeupException is not listed in JavaDoc, and maybe it's better to invoke the 
> callback to handle the {{WakeupException}} instead of throwing it directly 
> from the method itself. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky

2022-09-08 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17602049#comment-17602049
 ] 

Guozhang Wang commented on KAFKA-14196:
---

Thanks Philip, and regarding your two questions above I agree with [~showuon]'s 
thoughts as well. Especially for 1), I think even if subscriptions changed in 
between consecutive onJoinPrepare, as long as they will not change the assigned 
partitions (i.e. as long as `assignFromSubscribed()` has not called) I think we 
are fine, since the returned records depend on that assigned partitions.

> Duplicated consumption during rebalance, causing OffsetValidationTest to act 
> flaky
> --
>
> Key: KAFKA-14196
> URL: https://issues.apache.org/jira/browse/KAFKA-14196
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.2.1
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.2
>
>
> Several flaky tests under OffsetValidationTest are indicating potential 
> consumer duplication issue, when autocommit is enabled.  I believe this is 
> affecting *3.2* and onward.  Below shows the failure message:
>  
> {code:java}
> Total consumed records 3366 did not match consumed position 3331 {code}
>  
> After investigating the log, I discovered that the data consumed between the 
> start of a rebalance event and the async commit was lost for those failing 
> tests.  In the example below, the rebalance event kicks in at around 
> 1662054846995 (first record), and the async commit of the offset 3739 is 
> completed at around 1662054847015 (right before partitions_revoked).
>  
> {code:java}
> {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]}
> {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]}
> {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]}
> {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]}
> {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]}
>  {code}
> A few things to note here:
>  # Manually calling commitSync in the onPartitionsRevoke cb seems to 
> alleviate the issue
>  # Setting includeMetadataInTimeout to false also seems to alleviate the 
> issue.
> The above tries seems to suggest that contract between poll() and 
> asyncCommit() is broken.  AFAIK, we implicitly uses poll() to ack the 
> previously fetched data, and the consumer would (try to) commit these offsets 
> in the current poll() loop.  However, it seems like as the poll continues to 
> loop, the "acked" data isn't being committed.
>  
> I believe this could be introduced in  KAFKA-14024, which originated from 
> KAFKA-13310.
> More specifically, (see the comments below), the ConsumerCoordinator will 
> alway return before async commit, due to the previous incomplete commit.  
> However, this is a bit contradictory here because:
>  # I think we want to commit asynchronously while the poll continues, and if 
> we do that, we are back to KAFKA-14024, that the consumer will get rebalance 
> timeout and get kicked out of the group.
>  # But we also need to commit all the "acked" offsets before revoking the 
> partition, and this has to be blocked.
> *Steps to Reproduce the Issue:*
>  # Check out AK 3.2
>  # Run this several times: (Recommend to only run runs with autocommit 
> enabled in consumer_test.py to save time)
> {code:java}
> _DUCKTAPE_OPTIONS="--debug" 
> TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure"
>  bash tests/docker/run_tests.sh {code}
>  
> *Steps to Diagnose the Issue:*
>  # Open the test results in *results/*
>  # Go to the consumer log.  It might look like this
>  
> {code:java}
> results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY
>  {code}
> 3. Find the docker instance that has partition getting revoked and rejoined.  
> Observed the offset before and after.
> *Propose Fixes:*
>  TBD
>  
> https://github.com/apache/kafka/pull/12603



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled

2022-09-08 Thread GitBox


hachikuji commented on code in PR #12603:
URL: https://github.com/apache/kafka/pull/12603#discussion_r966512403


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -799,64 +804,95 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 }
 }
 
+Optional exception = revokePartitions(partitionsToRevoke, 
generation, memberId);
+
+isLeader = false;
+subscriptions.resetGroupSubscription();
+joinPrepareTimer = null;
+autoCommitOffsetRequestFuture = null;
+timer.update();
+
+if (exception.isPresent()) {
+throw new KafkaException("User rebalance callback throws an 
error", exception.get());
+}
+return true;
+}
+
+private SortedSet getPartitionsToRevoke(RebalanceProtocol 
protocol, int generation, String memberId) {
+SortedSet partitions = new TreeSet<>(COMPARATOR);
+if (generation == Generation.NO_GENERATION.generationId ||
+memberId.equals(Generation.NO_GENERATION.memberId)) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+// Revoke all partitions
+if (protocol == RebalanceProtocol.EAGER) {
+partitions.addAll(subscriptions.assignedPartitions());
+return partitions;
+}
+
+// only revoke those partitions that are not in the subscription any 
more.
+if (protocol == RebalanceProtocol.COOPERATIVE) {
+Set ownedPartitions = new 
HashSet<>(subscriptions.assignedPartitions());
+partitions.addAll(ownedPartitions.stream()
+.filter(tp -> 
!subscriptions.subscription().contains(tp.topic()))
+.collect(Collectors.toSet()));
+return partitions;
+}
+
+log.debug("Invalid protocol: {}. No partition will be revoked.", 
protocol);
+return partitions;
+}
+
+private void pausePartitions(Set partitions) {
+// KAFKA-14196 for more detail, we pause the partition from 
consumption to prevent duplicated
+//  data returned by the consumer poll loop.  Without pausing the 
partitions, the consumer will move forward
+//  returning the data w/o committing them.  And the progress will be 
lost once the partition is revoked.
+//  This only applies to autocommits, as we expect user to handle the 
offsets menually during the partition
+//  revocation.
+
+log.debug("Pausing partitions {} before onJoinPrepare", partitions);
+partitions.forEach(tp -> subscriptionState().pause(tp));

Review Comment:
   I have mixed feelings about reusing the pause mechanism here. On the one 
hand, it does what we want. On the other hand, the pause state can be mutated 
by the user. What if the user calls `resume()` on a partition that we paused 
internally? Sounds crazy perhaps, but I think I'd rather have a mechanism that 
can only be accessed internally for stuff like this.



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 joinPrepareTimer.update();
 }
 
+final SortedSet partitionsToRevoke = 
getPartitionsToRevoke(protocol, generation, memberId);
+
 // async commit offsets prior to rebalance if auto-commit enabled
 // and there is no in-flight offset commit request
-if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
-autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
+if (autoCommitEnabled) {
+pausePartitions(partitionsToRevoke);

Review Comment:
   It seems like this bug (and some of the complexity in this patch) is due to 
the fact that we do the auto-commit prior to revoking partitions. I wonder if 
that is really necessary. If we revoke first, then the partitions would be 
removed from `SubscriptionState` and we wouldn't have to worry about fetches 
for these partitions returning. Could that work as well?



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int 
generation, String memberId) {
 joinPrepareTimer.update();
 }
 
+final SortedSet partitionsToRevoke = 
getPartitionsToRevoke(protocol, generation, memberId);

Review Comment:
   Is your concern that the subscription could change in between the time that 
we  pause the partitions and the time that the revocation callback is triggered?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.

[GitHub] [kafka] guozhangwang opened a new pull request, #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery

2022-09-08 Thread GitBox


guozhangwang opened a new pull request, #12611:
URL: https://github.com/apache/kafka/pull/12611

   Today we may try to discover coordinator in both blocking (e.g. in `poll`) 
and non-blocking (e.g. in `commitAsync`) way. For the latter we would poll the 
underlying network client with timeout 0, and in this case we should not 
trigger wakeup since these are non-blocking calls and hence should not throw 
wake-ups.
   
   In this PR I'm trying to fix it in a least intrusive way (a more general fix 
should be, potentially, to have two versions of `ensureCoordinatorReady`), 
since in our threading refactoring, the `ensureCoordinatorReady` function would 
not be called by the calling thread any more and only triggered by the 
background thread, and hence we would have a much simpler manner to ensure that 
non-blocking functions never throw wake-ups.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery

2022-09-08 Thread GitBox


guozhangwang commented on PR #12611:
URL: https://github.com/apache/kafka/pull/12611#issuecomment-1241358026

   Call @philipnee @showuon for reviews.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-14208) KafkaConsumer#commitAsync throws unexpected WakeupException

2022-09-08 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-14208:
-

Assignee: Guozhang Wang

> KafkaConsumer#commitAsync throws unexpected WakeupException
> ---
>
> Key: KAFKA-14208
> URL: https://issues.apache.org/jira/browse/KAFKA-14208
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.2.1
>Reporter: Qingsheng Ren
>Assignee: Guozhang Wang
>Priority: Major
>
> We recently encountered a bug after upgrading Kafka client to 3.2.1 in Flink 
> Kafka connector (FLINK-29153). Here's the exception:
> {code:java}
> org.apache.kafka.common.errors.WakeupException
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:514)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:278)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:252)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.coordinatorUnknownAndUnready(ConsumerCoordinator.java:493)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:1055)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1573)
>   at 
> org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:226)
>  {code}
> As {{WakeupException}} is not listed in the JavaDoc of 
> {{{}KafkaConsumer#commitAsync{}}}, Flink Kafka connector doesn't catch the 
> exception thrown directly from KafkaConsumer#commitAsync but handles all 
> exceptions in the callback.
> I checked the source code and suspect this is caused by KAFKA-13563. Also we 
> never had this exception in commitAsync when we used Kafka client 2.4.1 & 
> 2.8.1. 
> I'm wondering if this is kind of breaking the public API as the 
> WakeupException is not listed in JavaDoc, and maybe it's better to invoke the 
> callback to handle the {{WakeupException}} instead of throwing it directly 
> from the method itself. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lihaosky commented on a diff in pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation

2022-09-08 Thread GitBox


lihaosky commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r966520188


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -960,7 +1011,7 @@ private void removeRevokedTasksFromStateUpdater(final 
Set remain
 for (final Task restoringTask : stateUpdater.getTasks()) {
 if (restoringTask.isActive()) {
 if 
(remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
-tasks.addPendingTaskToCloseClean(restoringTask.id());
+
tasks.addPendingActiveTaskToSuspend(restoringTask.id());

Review Comment:
   noob question: Why do we suspend instead of close?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -421,73 +421,120 @@ private void classifyTasksWithoutStateUpdater(final 
Map> 
activeTasksToCreate,
-  final Map> 
standbyTasksToCreate,
-  final Map> 
tasksToRecycle,
-  final Set tasksToCloseClean) {
+private void classifyRunningAndSuspendedTasks(final Map> activeTasksToCreate,
+  final Map> standbyTasksToCreate,
+  final Map> tasksToRecycle,
+  final Set 
tasksToCloseClean) {
 for (final Task task : tasks.allTasks()) {
+if (!task.isActive()) {
+throw new IllegalStateException("Standby tasks should only be 
managed by the state updater");
+}
 final TaskId taskId = task.id();
 if (activeTasksToCreate.containsKey(taskId)) {
-if (task.isActive()) {
-final Set topicPartitions = 
activeTasksToCreate.get(taskId);
-if (tasks.updateActiveTaskInputPartitions(task, 
topicPartitions)) {
-task.updateInputPartitions(topicPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
-}
-task.resume();
-} else {
-throw new IllegalStateException("Standby tasks should only 
be managed by the state updater");
-}
+handleReAssignedActiveTask(task, 
activeTasksToCreate.get(taskId));
 activeTasksToCreate.remove(taskId);
 } else if (standbyTasksToCreate.containsKey(taskId)) {
-if (!task.isActive()) {
-throw new IllegalStateException("Standby tasks should only 
be managed by the state updater");
-} else {
-tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-}
+tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
 standbyTasksToCreate.remove(taskId);
 } else {
 tasksToCloseClean.add(task);
 }
 }
 }
 
+private void handleReAssignedActiveTask(final Task task,
+final Set 
inputPartitions) {
+if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) {
+task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
+}
+task.resume();
+if (task.state() == State.RESTORING) {
+handleReAssignedRevokedActiveTask(task);
+}
+}
+
+private void handleReAssignedRevokedActiveTask(final Task task) {
+tasks.removeTask(task);
+stateUpdater.add(task);
+}
+
 private void classifyTasksWithStateUpdater(final Map> activeTasksToCreate,
final Map> standbyTasksToCreate,
final Map> tasksToRecycle,
final Set 
tasksToCloseClean) {
-classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, 
tasksToRecycle, tasksToCloseClean);
+classifyRunningAndSuspendedTasks(activeTasksToCreate, 
standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);

Review Comment:
   noob question: are taskIds in `activeTasksToCreate` and 
`standbyTasksToCreate` always mutually exclusive? I guess standby is always 
disabled if there's only 1 host/node? 



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -960,7 +1011,7 @@ private void removeRevokedTasksFromStateUpdater(final 
Set remain
 for (final Task restoringTask : stateUpdater.getTasks()) {
 if (restoringTask.isActive()) {
 if 
(remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) {
-tasks.addPendingTaskToCloseClean(restoringTask.id());
+   

  1   2   >