[GitHub] [kafka] omkreddy commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-15 Thread via GitHub


omkreddy commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1106751131


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3296,17 +3296,23 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleDescribeUserScramCredentialsRequest(request: 
RequestChannel.Request): Unit = {

Review Comment:
   As I mentioned above,  ControllerAPI also should support 
DescribeUserScramCredentialsRequest API



##
core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala:
##
@@ -260,11 +271,13 @@ class AlterUserScramCredentialsRequestTest extends 
BaseRequestTest {
 .setSalt(saltBytes)
 .setSaltedPassword(saltedPasswordBytes),
 ))).build()
-val results1 = sendAlterUserScramCredentialsRequest(request1).data.results
-assertEquals(2, results1.size)
-checkNoErrorsAlteringCredentials(results1)
-checkUserAppearsInAlterResults(results1, user1)
-checkUserAppearsInAlterResults(results1, user2)
+val results1_1 = 
sendAlterUserScramCredentialsRequest(request1_1).data.results
+assertEquals(2, results1_1.size)
+checkNoErrorsAlteringCredentials(results1_1)
+checkUserAppearsInAlterResults(results1_1, user1)
+checkUserAppearsInAlterResults(results1_1, user2)
+
+Thread.sleep(1)

Review Comment:
   we generally use 'TestUtils.waitForCondition` in tests 



##
metadata/src/main/java/org/apache/kafka/image/ScramImage.java:
##
@@ -50,6 +58,63 @@ public void write(ImageWriter writer, ImageWriterOptions 
options) {
 }
 }
 
+public DescribeUserScramCredentialsResponseData 
describe(DescribeUserScramCredentialsRequestData request) {
+
+List users = request.users();
+Map uniqueUsers = new HashMap();
+
+if ((users == null) || (users.size() == 0)) {
+System.out.println("Describe : get all the users");
+// If there are no users listed then get all the users
+for (Map scramCredentialDataSet : 
mechanisms.values()) {
+for (String user : scramCredentialDataSet.keySet()) {
+uniqueUsers.put(user, false);
+}
+}
+} else {
+// Filter out duplicates
+for (UserName user : users) {
+if (uniqueUsers.containsKey(user.name())) {
+uniqueUsers.put(user.name(), true);
+} else {
+uniqueUsers.put(user.name(), false);
+}
+}
+}
+
+DescribeUserScramCredentialsResponseData retval = new 
DescribeUserScramCredentialsResponseData();
+
+for (Map.Entry user : uniqueUsers.entrySet()) {
+DescribeUserScramCredentialsResult result = 
+  new DescribeUserScramCredentialsResult().setUser(user.getKey());
+
+if (user.getValue() == false) {
+List credentialInfos = new 
ArrayList();
+
+boolean datafound = false;
+for (Map.Entry> mechanismsEntry : mechanisms.entrySet()) {
+Map credentialDataSet = 
mechanismsEntry.getValue();
+if (credentialDataSet.containsKey(user.getKey())) {
+credentialInfos.add(new 
CredentialInfo().setMechanism(mechanismsEntry.getKey().type())
+
.setIterations(credentialDataSet.get(user.getKey()).iterations()));
+datafound = true;
+}
+}
+if (datafound) {
+result.setCredentialInfos(credentialInfos);
+} else {
+result.setErrorCode(Errors.RESOURCE_NOT_FOUND.code())
+  
.setErrorMessage("attemptToDescribeUserThatDoesNotExist: " + user.getKey());

Review Comment:
   Can we use same error message as 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ZkAdminManager.scala#L802



##
metadata/src/main/java/org/apache/kafka/image/ScramImage.java:
##
@@ -50,6 +58,63 @@ public void write(ImageWriter writer, ImageWriterOptions 
options) {
 }
 }
 
+public DescribeUserScramCredentialsResponseData 
describe(DescribeUserScramCredentialsRequestData request) {
+
+List users = request.users();
+Map uniqueUsers = new HashMap();
+
+if ((users == null) || (users.size() == 0)) {
+System.out.println("Describe : get all the users");
+// If there are no users listed then get all the users
+for (Map scramCredentialDataSet : 
mechanisms.values()) {
+for (String user : scramCredentialDataSet.keySet()) {
+uniqueUsers.put(user, false);
+}
+}
+} else {
+// Filter out duplicates
+for (UserName user : users) {
+  

[GitHub] [kafka] omkreddy commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-15 Thread via GitHub


omkreddy commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1106755866


##
core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala:
##
@@ -260,11 +271,13 @@ class AlterUserScramCredentialsRequestTest extends 
BaseRequestTest {
 .setSalt(saltBytes)
 .setSaltedPassword(saltedPasswordBytes),
 ))).build()
-val results1 = sendAlterUserScramCredentialsRequest(request1).data.results
-assertEquals(2, results1.size)
-checkNoErrorsAlteringCredentials(results1)
-checkUserAppearsInAlterResults(results1, user1)
-checkUserAppearsInAlterResults(results1, user2)
+val results1_1 = 
sendAlterUserScramCredentialsRequest(request1_1).data.results
+assertEquals(2, results1_1.size)
+checkNoErrorsAlteringCredentials(results1_1)
+checkUserAppearsInAlterResults(results1_1, user1)
+checkUserAppearsInAlterResults(results1_1, user2)
+
+Thread.sleep(1)

Review Comment:
   we generally use 'TestUtils.waitForCondition` in tests to wait for condition



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14713) Kafka Streams global table startup takes too long

2023-02-15 Thread Tamas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17688945#comment-17688945
 ] 

Tamas commented on KAFKA-14713:
---

Hi [~mjsax] looks similar, but not exactly. They see this issue with 
exactly_once_beta processing guarantee, while we have it with the default 
at_least_once. For us the important part is that is issue is fixed as soon as 
possible, because right now I am between a rock and a hard place because of it.

> Kafka Streams global table startup takes too long
> -
>
> Key: KAFKA-14713
> URL: https://issues.apache.org/jira/browse/KAFKA-14713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Tamas
>Priority: Critical
>
> *Some context first*
> We have a spring based kafka streams application. This application is 
> listening to two topics. Let's call them apartment and visitor. The 
> apartments are stored in a global table, while the visitors are in the stream 
> we are processing, and at one point we are joining the visitor stream 
> together with the apartment table. In our test environment, both topics 
> contain 10 partitions.
> *Issue*
> At first deployment, everything goes fine, the global table is built and all 
> entries in the stream are processed.
> After everything is finished, we shut down the application, restart it and 
> send out a new set of visitors. The application seemingly does not respond.
> After some more debugging it turned out that it simply takes 5 minutes to 
> start up, because the global table takes 30 seconds (default value for the 
> global request timeout) to accept that there are no messages in the apartment 
> topics, for each and every partition. If we send out the list of apartments 
> as new messages, the application starts up immediately.
> To make matters worse, we have clients with 96 partitions, where the startup 
> time would be 48 minutes. Not having messages in the topics between 
> application shutdown and restart is a valid use case, so this is quite a big 
> problem.
> *Possible workarounds*
> We could reduce the request timeout, but since this value is not specific for 
> the global table initialization, but a global request timeout for a lot of 
> things, we do not know what else it will affect, so we are not very keen on 
> doing that. Even then, it would mean a 1.5 minute delay for this particular 
> client (more if we will have other use cases in the future where we will need 
> to use more global tables), which is far too much, considering that the 
> application would be able to otherwise start in about 20 seconds.
> *Potential solutions we see*
>  # Introduce a specific global table initialization timeout in 
> GlobalStateManagerImpl. Then we would be able to safely modify that value 
> without fear of making some other part of kafka unstable.
>  # Parallelize the initialization of the global table partitions in 
> GlobalStateManagerImpl: knowing that the delay at startup is constant instead 
> of linear with the number of partitions would be a huge help.
>  # As long as we receive a response, accept the empty map in the 
> KafkaConsumer, and continue instead of going into a busy-waiting loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14713) Kafka Streams global table startup takes too long

2023-02-15 Thread Tamas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17688945#comment-17688945
 ] 

Tamas edited comment on KAFKA-14713 at 2/15/23 8:52 AM:


Hi [~mjsax] looks similar, but not exactly. They see this issue with 
exactly_once_beta processing guarantee, while we have it with the default 
at_least_once. For us the important part is that is issue is fixed (or at least 
a safe workaround is provided) as soon as possible, because right now I am 
between a rock and a hard place because of it.


was (Author: JIRAUSER298942):
Hi [~mjsax] looks similar, but not exactly. They see this issue with 
exactly_once_beta processing guarantee, while we have it with the default 
at_least_once. For us the important part is that is issue is fixed as soon as 
possible, because right now I am between a rock and a hard place because of it.

> Kafka Streams global table startup takes too long
> -
>
> Key: KAFKA-14713
> URL: https://issues.apache.org/jira/browse/KAFKA-14713
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Tamas
>Priority: Critical
>
> *Some context first*
> We have a spring based kafka streams application. This application is 
> listening to two topics. Let's call them apartment and visitor. The 
> apartments are stored in a global table, while the visitors are in the stream 
> we are processing, and at one point we are joining the visitor stream 
> together with the apartment table. In our test environment, both topics 
> contain 10 partitions.
> *Issue*
> At first deployment, everything goes fine, the global table is built and all 
> entries in the stream are processed.
> After everything is finished, we shut down the application, restart it and 
> send out a new set of visitors. The application seemingly does not respond.
> After some more debugging it turned out that it simply takes 5 minutes to 
> start up, because the global table takes 30 seconds (default value for the 
> global request timeout) to accept that there are no messages in the apartment 
> topics, for each and every partition. If we send out the list of apartments 
> as new messages, the application starts up immediately.
> To make matters worse, we have clients with 96 partitions, where the startup 
> time would be 48 minutes. Not having messages in the topics between 
> application shutdown and restart is a valid use case, so this is quite a big 
> problem.
> *Possible workarounds*
> We could reduce the request timeout, but since this value is not specific for 
> the global table initialization, but a global request timeout for a lot of 
> things, we do not know what else it will affect, so we are not very keen on 
> doing that. Even then, it would mean a 1.5 minute delay for this particular 
> client (more if we will have other use cases in the future where we will need 
> to use more global tables), which is far too much, considering that the 
> application would be able to otherwise start in about 20 seconds.
> *Potential solutions we see*
>  # Introduce a specific global table initialization timeout in 
> GlobalStateManagerImpl. Then we would be able to safely modify that value 
> without fear of making some other part of kafka unstable.
>  # Parallelize the initialization of the global table partitions in 
> GlobalStateManagerImpl: knowing that the delay at startup is constant instead 
> of linear with the number of partitions would be a huge help.
>  # As long as we receive a response, accept the empty map in the 
> KafkaConsumer, and continue instead of going into a busy-waiting loop.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14704) Follower should truncate before incrementing high watermark

2023-02-15 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-14704:

Fix Version/s: 2.8.3
   3.2.4
   3.1.3
   3.0.3

> Follower should truncate before incrementing high watermark
> ---
>
> Key: KAFKA-14704
> URL: https://issues.apache.org/jira/browse/KAFKA-14704
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 3.2.4, 3.1.3, 3.0.3, 3.5.0, 3.4.1, 3.3.3, 2.8.3
>
>
> When a leader becomes a follower, it is likely that it has uncommitted 
> records in its log. When it reaches out to the leader, the leader will detect 
> that they have diverged and it will return the diverging epoch and offset. 
> The follower truncates it log based on this.
> There is a small caveat in this process. When the leader return the diverging 
> epoch and offset, it also includes its high watermark, low watermark, start 
> offset and end offset. The current code in the `AbstractFetcherThread` works 
> as follow. First it process the partition data and then it checks whether 
> there is a diverging epoch/offset. The former may accidentally expose 
> uncommitted records as this step updates the local watermark to whatever is 
> received from the leader. As the follower, or the former leader, may have 
> uncommitted records, it will be able to updated the high watermark to a 
> larger offset if the leader has a higher watermark than the current local 
> one. This result in exposing uncommitted records until the log is finally 
> truncated. The time window is short but a fetch requests coming at the right 
> time to the follower could read those records. This is especially true for 
> clients out there which uses recent versions of the fetch request but without 
> implementing KIP-320.
> When this happens, the follower logs the following message: `Non-monotonic 
> update of high watermark from (offset=21437 segment=[20998:98390]) to 
> (offset=21434 segment=[20998:97843])`.
> This patch proposes to mitigate the issue by starting by checking on whether 
> a diverging epoch/offset is provided by the leader and skip processing the 
> partition data if it is. This basically means that the first fetch request 
> will result in truncating the log and a subsequent fetch request will update 
> the log/high watermarks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] satishd opened a new pull request, #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.

2023-02-15 Thread via GitHub


satishd opened a new pull request, #13255:
URL: https://github.com/apache/kafka/pull/13255

   KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to 
storage module.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13158: KAFKA-14647: Moving TopicFilter to server-common/utils

2023-02-15 Thread via GitHub


vamossagar12 commented on PR #13158:
URL: https://github.com/apache/kafka/pull/13158#issuecomment-1430997152

   > Hi @vamossagar12 this looks good, but I still think we should move 
`GetOffsetShellParsingTest` to `TopicPartitionFilterTest` removing any 
reference to `GetOffsetShell` (there is already a test for that) and create a 
new `PartitionFilterTest`, which should be similar to `TopicFilterTest`.
   
   Thanks @fvaleri . I tried doing this today. A problem that I see is that 
`GetOffsetShellParsingTest` makes use of 2 methods from `GetOffsetShell` namely 
`createTopicPartitionFilterWithTopicAndPartitionPattern` and 
`createTopicPartitionFilterWithPatternList`. Those 2 methods are also 
eventually used in `GetOffsetShell` when creating `topicPartitionFilter`. 
Ideally these 2 methods can reside in `ToolsUtils` and have both tests and 
`GetOffsetShell` reference it. 
   I see there's also [Move GetOffsetShell to 
tools](https://issues.apache.org/jira/browse/KAFKA-14581). Can we do the move 
as part of that effort or here? WDYS?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #13238: KAFKA-14708: Use Java thread instead of kafka library for example purpose

2023-02-15 Thread via GitHub


showuon commented on PR #13238:
URL: https://github.com/apache/kafka/pull/13238#issuecomment-1431011866

   @philipnee , there's spotbug error, could you help fix it? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] benru89 commented on pull request #11442: KAFKA-7883 add schema.namespace support to SetSchemaMetadata SMT in Kafka Connect

2023-02-15 Thread via GitHub


benru89 commented on PR #11442:
URL: https://github.com/apache/kafka/pull/11442#issuecomment-1431013263

   whats the status of this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14716) Connect schema does not allow struct default values

2023-02-15 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban resolved KAFKA-14716.
--
Resolution: Duplicate

> Connect schema does not allow struct default values
> ---
>
> Key: KAFKA-14716
> URL: https://issues.apache.org/jira/browse/KAFKA-14716
> Project: Kafka
>  Issue Type: Bug
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> The ConnectSchema API should allow specifying a composite (struct) default 
> value for a field, but with the current API, it is impossible to do so.
>  # There is a circular dependency between creating a struct as a default 
> value and creating the schema which holds it as the default value. The Struct 
> constructor expects a Schema object, and the default value setter of 
> SchemaBuilder checks schema conformity by using the 
> ConnectSchema.validateValue, which in turn uses ConnectSchema.equals. This 
> can only be bypassed if the struct references a SchemaBuilder instance, and 
> defaultValue is called on that builder instance, but this goes against the 
> Struct docs stating that "Struct objects must specify a complete \{@link 
> Schema} up front".
>  # ConnectSchema.equals is not prepared to be used with other Schema 
> implementations, so equals checks between ConnectSchema and SchemaBuilder 
> instances will always fail. This is only causing an issue if equals has to be 
> used for schema conformity checks.
> Code examples:
> Working code (mind that the schema referenced by the Struct is a 
> SchemaBuilder, and it is mutated after the Struct is constructed):
> {code:java}
> @Test
> public void testCompositeDefault() {
> SchemaBuilder nestedSchema = SchemaBuilder.struct()
> .field("bar", Schema.STRING_SCHEMA);
> Struct nestedDefault = new Struct(nestedSchema);
> nestedDefault.put("bar", "default_value");
> Schema schema = SchemaBuilder.struct()
> .field("foo",
> nestedSchema
> .defaultValue(nestedDefault)
> .build()
> )
> .build();
> } {code}
> Not working code (but better aligned with the current API and docs - 2 
> separate Schema instances used by the Struct and the field, only diff is the 
> default value between the 2):
> {code:java}
>  @Test
> public void testCompositeDefault() {
> Schema nestedSchema = SchemaBuilder.struct()
> .field("bar", Schema.STRING_SCHEMA)
> .build();
> Struct nestedDefault = new Struct(nestedSchema);
> nestedDefault.put("bar", "default_value");
> Schema schema = SchemaBuilder.struct()
> .field("foo",
> SchemaBuilder
> .struct()
> .field("bar", Schema.STRING_SCHEMA)
> .defaultValue(nestedDefault)
> .build()
> )
> .build();
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14716) Connect schema does not allow struct default values

2023-02-15 Thread Daniel Urban (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17688979#comment-17688979
 ] 

Daniel Urban commented on KAFKA-14716:
--

[~ChrisEgerton] Indeed, it is, thanks for pointing that out - will try to 
comment on the PR of KAFKA-12694

> Connect schema does not allow struct default values
> ---
>
> Key: KAFKA-14716
> URL: https://issues.apache.org/jira/browse/KAFKA-14716
> Project: Kafka
>  Issue Type: Bug
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> The ConnectSchema API should allow specifying a composite (struct) default 
> value for a field, but with the current API, it is impossible to do so.
>  # There is a circular dependency between creating a struct as a default 
> value and creating the schema which holds it as the default value. The Struct 
> constructor expects a Schema object, and the default value setter of 
> SchemaBuilder checks schema conformity by using the 
> ConnectSchema.validateValue, which in turn uses ConnectSchema.equals. This 
> can only be bypassed if the struct references a SchemaBuilder instance, and 
> defaultValue is called on that builder instance, but this goes against the 
> Struct docs stating that "Struct objects must specify a complete \{@link 
> Schema} up front".
>  # ConnectSchema.equals is not prepared to be used with other Schema 
> implementations, so equals checks between ConnectSchema and SchemaBuilder 
> instances will always fail. This is only causing an issue if equals has to be 
> used for schema conformity checks.
> Code examples:
> Working code (mind that the schema referenced by the Struct is a 
> SchemaBuilder, and it is mutated after the Struct is constructed):
> {code:java}
> @Test
> public void testCompositeDefault() {
> SchemaBuilder nestedSchema = SchemaBuilder.struct()
> .field("bar", Schema.STRING_SCHEMA);
> Struct nestedDefault = new Struct(nestedSchema);
> nestedDefault.put("bar", "default_value");
> Schema schema = SchemaBuilder.struct()
> .field("foo",
> nestedSchema
> .defaultValue(nestedDefault)
> .build()
> )
> .build();
> } {code}
> Not working code (but better aligned with the current API and docs - 2 
> separate Schema instances used by the Struct and the field, only diff is the 
> default value between the 2):
> {code:java}
>  @Test
> public void testCompositeDefault() {
> Schema nestedSchema = SchemaBuilder.struct()
> .field("bar", Schema.STRING_SCHEMA)
> .build();
> Struct nestedDefault = new Struct(nestedSchema);
> nestedDefault.put("bar", "default_value");
> Schema schema = SchemaBuilder.struct()
> .field("foo",
> SchemaBuilder
> .struct()
> .field("bar", Schema.STRING_SCHEMA)
> .defaultValue(nestedDefault)
> .build()
> )
> .build();
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] urbandan commented on pull request #10566: KAFKA-12694 Avoid schema mismatch DataException when validating default values

2023-02-15 Thread via GitHub


urbandan commented on PR #10566:
URL: https://github.com/apache/kafka/pull/10566#issuecomment-1431030129

   I would propose a 4th option as well: changing the 
ConnectSchema#validateValue method to use a different logic for Struct default 
values:
   1. Use the Schema methods
   2. Ignore optional and default value
   
   The benefit would be that it wouldn't change the existing equals logic, 
existing code utilizing SchemaBuilder would keep working, and it would fix the 
ambiguity around optionality and default being part of the schema check (while 
in reality, they should belong to the field, and not the schema).
   
   I have a working change with this 4th option, please  let me know if this 
logic is acceptable, and I'll submit a PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14590) Move DelegationTokenCommand to tools

2023-02-15 Thread Gantigmaa Selenge (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gantigmaa Selenge updated KAFKA-14590:
--
Fix Version/s: 3.5.0
Affects Version/s: (was: 3.5.0)

> Move DelegationTokenCommand to tools
> 
>
> Key: KAFKA-14590
> URL: https://issues.apache.org/jira/browse/KAFKA-14590
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Gantigmaa Selenge
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14592) Move FeatureCommand to tools

2023-02-15 Thread Federico Valeri (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17689019#comment-17689019
 ] 

Federico Valeri commented on KAFKA-14592:
-

Here we should also add the missing BAT wrapper script.

> Move FeatureCommand to tools
> 
>
> Key: KAFKA-14592
> URL: https://issues.apache.org/jira/browse/KAFKA-14592
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Gantigmaa Selenge
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on pull request #11442: KAFKA-7883 add schema.namespace support to SetSchemaMetadata SMT in Kafka Connect

2023-02-15 Thread via GitHub


mimaison commented on PR #11442:
URL: https://github.com/apache/kafka/pull/11442#issuecomment-1431141433

   To be able to merge this, the associated 
[KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-855%3A+Add+schema.namespace+parameter+to+SetSchemaMetadata+SMT+in+Kafka+Connect)
 must be voted.
   In the [discussion 
thread](https://lists.apache.org/thread/3hkd9lljobf9rl56ogjpcbo4ldoxcz5n) for 
the KIP, a few questions have not been answered yet by the author. Until these 
are answered, it's unlikely there will be any votes, so this will stay stuck.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dejan2609 commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8

2023-02-15 Thread via GitHub


dejan2609 commented on PR #13205:
URL: https://github.com/apache/kafka/pull/13205#issuecomment-1431231802

   @ijuma some findigs: build fails due to issues with gradle wrapper 
bootstrapping. 
   
   I will post more details today. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] chia7712 commented on a diff in pull request #13248: KAFKA-14717 KafkaStreams can' get running if the rebalance happens be…

2023-02-15 Thread via GitHub


chia7712 commented on code in PR #13248:
URL: https://github.com/apache/kafka/pull/13248#discussion_r1107014705


##
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java:
##
@@ -257,6 +258,23 @@ private Thread adjustCountHelperThread(final KafkaStreams 
kafkaStreams, final in
 });
 }
 
+@Test
+public void testRebalanceHappensBeforeStreamThreadGetDown() throws 
Exception {
+final Properties prop = new Properties();
+prop.putAll(properties);
+// make rebalance happen quickly
+prop.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200);

Review Comment:
   > While this test seems would be pass some times even without the fix, is 
that right?
   
   you are right. I have updated the test to make sure it is always failed 
without the fix.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-12473) Make the "cooperative-sticky, range" as the default assignor

2023-02-15 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-12473:
--
Fix Version/s: (was: 3.5.0)

> Make the "cooperative-sticky, range" as the default assignor
> 
>
> Key: KAFKA-12473
> URL: https://issues.apache.org/jira/browse/KAFKA-12473
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Critical
>  Labels: kip
>
> Now that 3.0 is coming up, we can change the default 
> ConsumerPartitionAssignor to something better than the RangeAssignor. The 
> original plan was to switch over to the StickyAssignor, but now that we have 
> incremental cooperative rebalancing we should  consider using the new 
> CooperativeStickyAssignor instead: this will enable the consumer group to 
> follow the COOPERATIVE protocol, improving the rebalancing experience OOTB.
> KIP: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12473) Make the "cooperative-sticky, range" as the default assignor

2023-02-15 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17689070#comment-17689070
 ] 

Luke Chen commented on KAFKA-12473:
---

Removed the fixed version value until we have some progress. Thanks.

> Make the "cooperative-sticky, range" as the default assignor
> 
>
> Key: KAFKA-12473
> URL: https://issues.apache.org/jira/browse/KAFKA-12473
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Critical
>  Labels: kip
>
> Now that 3.0 is coming up, we can change the default 
> ConsumerPartitionAssignor to something better than the RangeAssignor. The 
> original plan was to switch over to the StickyAssignor, but now that we have 
> incremental cooperative rebalancing we should  consider using the new 
> CooperativeStickyAssignor instead: this will enable the consumer group to 
> follow the COOPERATIVE protocol, improving the rebalancing experience OOTB.
> KIP: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14720) Tools migration guidelines

2023-02-15 Thread Federico Valeri (Jira)
Federico Valeri created KAFKA-14720:
---

 Summary: Tools migration guidelines
 Key: KAFKA-14720
 URL: https://issues.apache.org/jira/browse/KAFKA-14720
 Project: Kafka
  Issue Type: Improvement
Reporter: Federico Valeri


The tools migration effort is ongoing and being tracked in KAFKA-14525. This is 
part of a bigger initiative to split the core module into multiple modules 
(e.g. storage, network, security, tools), which is being tracked in KAFKA-14524.

The plan is to migrate tools and related classes in a fully compatible way from 
kafka.tools and kafka.admin packages (core module) to org.apache.kafka.tools 
package (tools module).

While kicking off this activity, we identified a number of potential 
compatibility issues:

* Missing wrapper: some tools do not have a wrapper script. There are system 
tests that directly refer to the tool's fully qualified class name (FQCN) and 
expect the old package name when running on old Kafka releases. They are often 
used for troubleshooting or automation through the “kafka-run-class.sh” script 
which takes the FQCN as input parameter.
* SPI argument: some tools have arguments for setting a custom SPI 
implementation to be used in place of the default implementation. Any custom 
SPI implementation depends on the old package name.
* Broken tool: some tools do not work on supported releases.
* Core dependency: some tools require access to non-trivial core classes that 
should be migrated first.

See KIP-906 for more information.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14720) Tools migration guidelines

2023-02-15 Thread Federico Valeri (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Valeri updated KAFKA-14720:

Description: 
The tools migration effort is ongoing and being tracked in KAFKA-14525. This is 
part of a bigger initiative to split the core module into multiple modules 
(e.g. storage, network, security, tools), which is being tracked in KAFKA-14524.

The plan is to migrate tools and related classes in a fully compatible way from 
kafka.tools and kafka.admin packages (core module) to org.apache.kafka.tools 
package (tools module).

While kicking off this activity, we identified a number of potential 
compatibility issues:

* Missing wrapper: some tools do not have a wrapper script. There are system 
tests that directly refer to the tool's fully qualified class name (FQCN) and 
expect the old package name when running on old Kafka releases. They are often 
used for troubleshooting or automation through the kafka-run-class.sh script 
which takes the FQCN as input parameter.
* SPI argument: some tools have arguments for setting a custom SPI 
implementation to be used in place of the default implementation. Any custom 
SPI implementation depends on the old package name.
* Broken tool: some tools do not work on supported releases.
* Core dependency: some tools require access to non-trivial core classes that 
should be migrated first.

See KIP-906 for more information.

  was:
The tools migration effort is ongoing and being tracked in KAFKA-14525. This is 
part of a bigger initiative to split the core module into multiple modules 
(e.g. storage, network, security, tools), which is being tracked in KAFKA-14524.

The plan is to migrate tools and related classes in a fully compatible way from 
kafka.tools and kafka.admin packages (core module) to org.apache.kafka.tools 
package (tools module).

While kicking off this activity, we identified a number of potential 
compatibility issues:

* Missing wrapper: some tools do not have a wrapper script. There are system 
tests that directly refer to the tool's fully qualified class name (FQCN) and 
expect the old package name when running on old Kafka releases. They are often 
used for troubleshooting or automation through the “kafka-run-class.sh” script 
which takes the FQCN as input parameter.
* SPI argument: some tools have arguments for setting a custom SPI 
implementation to be used in place of the default implementation. Any custom 
SPI implementation depends on the old package name.
* Broken tool: some tools do not work on supported releases.
* Core dependency: some tools require access to non-trivial core classes that 
should be migrated first.

See KIP-906 for more information.


> Tools migration guidelines
> --
>
> Key: KAFKA-14720
> URL: https://issues.apache.org/jira/browse/KAFKA-14720
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Federico Valeri
>Priority: Major
>
> The tools migration effort is ongoing and being tracked in KAFKA-14525. This 
> is part of a bigger initiative to split the core module into multiple modules 
> (e.g. storage, network, security, tools), which is being tracked in 
> KAFKA-14524.
> The plan is to migrate tools and related classes in a fully compatible way 
> from kafka.tools and kafka.admin packages (core module) to 
> org.apache.kafka.tools package (tools module).
> While kicking off this activity, we identified a number of potential 
> compatibility issues:
> * Missing wrapper: some tools do not have a wrapper script. There are system 
> tests that directly refer to the tool's fully qualified class name (FQCN) and 
> expect the old package name when running on old Kafka releases. They are 
> often used for troubleshooting or automation through the kafka-run-class.sh 
> script which takes the FQCN as input parameter.
> * SPI argument: some tools have arguments for setting a custom SPI 
> implementation to be used in place of the default implementation. Any custom 
> SPI implementation depends on the old package name.
> * Broken tool: some tools do not work on supported releases.
> * Core dependency: some tools require access to non-trivial core classes that 
> should be migrated first.
> See KIP-906 for more information.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14720) Tools migration guidelines

2023-02-15 Thread Federico Valeri (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Valeri reassigned KAFKA-14720:
---

Assignee: Federico Valeri

> Tools migration guidelines
> --
>
> Key: KAFKA-14720
> URL: https://issues.apache.org/jira/browse/KAFKA-14720
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Federico Valeri
>Assignee: Federico Valeri
>Priority: Major
>
> The tools migration effort is ongoing and being tracked in KAFKA-14525. This 
> is part of a bigger initiative to split the core module into multiple modules 
> (e.g. storage, network, security, tools), which is being tracked in 
> KAFKA-14524.
> The plan is to migrate tools and related classes in a fully compatible way 
> from kafka.tools and kafka.admin packages (core module) to 
> org.apache.kafka.tools package (tools module).
> While kicking off this activity, we identified a number of potential 
> compatibility issues:
> * Missing wrapper: some tools do not have a wrapper script. There are system 
> tests that directly refer to the tool's fully qualified class name (FQCN) and 
> expect the old package name when running on old Kafka releases. They are 
> often used for troubleshooting or automation through the kafka-run-class.sh 
> script which takes the FQCN as input parameter.
> * SPI argument: some tools have arguments for setting a custom SPI 
> implementation to be used in place of the default implementation. Any custom 
> SPI implementation depends on the old package name.
> * Broken tool: some tools do not work on supported releases.
> * Core dependency: some tools require access to non-trivial core classes that 
> should be migrated first.
> See KIP-906 for more information.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] dejan2609 commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8

2023-02-15 Thread via GitHub


dejan2609 commented on PR #13205:
URL: https://github.com/apache/kafka/pull/13205#issuecomment-1431370263

   Update: gradle wrapper bootstrapping is ok now, but spotless Scala checks 
are failing... 
   Searching for a solution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13247: KAFKA-14595 Move value objects of ReassignPartitionsCommand to java

2023-02-15 Thread via GitHub


ijuma commented on PR #13247:
URL: https://github.com/apache/kafka/pull/13247#issuecomment-1431422276

   If you want to do it in small steps, one way is that you introduce the new 
classes, but you do not update the command to use them. That way you can put 
them in the right destination from the start. In any case, I'll leave it to 
@mimaison to say how he'd prefer it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14720) Tools migration guidelines

2023-02-15 Thread Federico Valeri (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Valeri updated KAFKA-14720:

Description: 
The tools migration effort is ongoing and being tracked in KAFKA-14525. This is 
part of a bigger initiative to split the core module into multiple modules 
(e.g. storage, network, security, tools), which is being tracked in KAFKA-14524.

The plan is to migrate tools and related classes in a fully compatible way from 
kafka.tools and kafka.admin packages (core module) to org.apache.kafka.tools 
package (tools module).

While kicking off this activity, we identified a number of potential 
compatibility issues:

* Missing wrapper: some tools do not have a wrapper script. There are system 
tests that directly refer to the tool's fully qualified class name (FQCN) and 
expect the old package name when running on old Kafka releases. They are often 
used for troubleshooting or automation through the kafka-run-class.sh script 
which takes the FQCN as input parameter.
* SPI argument: some tools have arguments for setting a custom SPI 
implementation to be used in place of the default implementation. Any custom 
SPI implementation depends on the old package name.
* Broken tool: some tools do not work on supported releases.
* Core dependency: some tools require access to non-trivial core classes that 
should be migrated first.

See KIP-906 for more information.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines


  was:
The tools migration effort is ongoing and being tracked in KAFKA-14525. This is 
part of a bigger initiative to split the core module into multiple modules 
(e.g. storage, network, security, tools), which is being tracked in KAFKA-14524.

The plan is to migrate tools and related classes in a fully compatible way from 
kafka.tools and kafka.admin packages (core module) to org.apache.kafka.tools 
package (tools module).

While kicking off this activity, we identified a number of potential 
compatibility issues:

* Missing wrapper: some tools do not have a wrapper script. There are system 
tests that directly refer to the tool's fully qualified class name (FQCN) and 
expect the old package name when running on old Kafka releases. They are often 
used for troubleshooting or automation through the kafka-run-class.sh script 
which takes the FQCN as input parameter.
* SPI argument: some tools have arguments for setting a custom SPI 
implementation to be used in place of the default implementation. Any custom 
SPI implementation depends on the old package name.
* Broken tool: some tools do not work on supported releases.
* Core dependency: some tools require access to non-trivial core classes that 
should be migrated first.

See KIP-906 for more information.


> Tools migration guidelines
> --
>
> Key: KAFKA-14720
> URL: https://issues.apache.org/jira/browse/KAFKA-14720
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Federico Valeri
>Assignee: Federico Valeri
>Priority: Major
>
> The tools migration effort is ongoing and being tracked in KAFKA-14525. This 
> is part of a bigger initiative to split the core module into multiple modules 
> (e.g. storage, network, security, tools), which is being tracked in 
> KAFKA-14524.
> The plan is to migrate tools and related classes in a fully compatible way 
> from kafka.tools and kafka.admin packages (core module) to 
> org.apache.kafka.tools package (tools module).
> While kicking off this activity, we identified a number of potential 
> compatibility issues:
> * Missing wrapper: some tools do not have a wrapper script. There are system 
> tests that directly refer to the tool's fully qualified class name (FQCN) and 
> expect the old package name when running on old Kafka releases. They are 
> often used for troubleshooting or automation through the kafka-run-class.sh 
> script which takes the FQCN as input parameter.
> * SPI argument: some tools have arguments for setting a custom SPI 
> implementation to be used in place of the default implementation. Any custom 
> SPI implementation depends on the old package name.
> * Broken tool: some tools do not work on supported releases.
> * Core dependency: some tools require access to non-trivial core classes that 
> should be migrated first.
> See KIP-906 for more information.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] nizhikov commented on pull request #13247: KAFKA-14595 Move value objects of ReassignPartitionsCommand to java

2023-02-15 Thread via GitHub


nizhikov commented on PR #13247:
URL: https://github.com/apache/kafka/pull/13247#issuecomment-1431428999

   Hello @ijuma 
   
   > If you want to do it in small steps
   
   It's more about simplify review then my personal preferences :)
   
   For now, I introduced java classes from `ReassignPartitionsCommand` in core 
module.
   It seems we can review and merge current changes and then move to the actual 
command scala -> java transformation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-15 Thread via GitHub


pprovenzano commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1107184550


##
metadata/src/main/java/org/apache/kafka/image/ScramImage.java:
##
@@ -50,6 +58,63 @@ public void write(ImageWriter writer, ImageWriterOptions 
options) {
 }
 }
 
+public DescribeUserScramCredentialsResponseData 
describe(DescribeUserScramCredentialsRequestData request) {
+
+List users = request.users();
+Map uniqueUsers = new HashMap();
+
+if ((users == null) || (users.size() == 0)) {
+System.out.println("Describe : get all the users");
+// If there are no users listed then get all the users
+for (Map scramCredentialDataSet : 
mechanisms.values()) {
+for (String user : scramCredentialDataSet.keySet()) {
+uniqueUsers.put(user, false);
+}
+}
+} else {
+// Filter out duplicates
+for (UserName user : users) {
+if (uniqueUsers.containsKey(user.name())) {
+uniqueUsers.put(user.name(), true);
+} else {
+uniqueUsers.put(user.name(), false);
+}
+}
+}
+
+DescribeUserScramCredentialsResponseData retval = new 
DescribeUserScramCredentialsResponseData();
+
+for (Map.Entry user : uniqueUsers.entrySet()) {
+DescribeUserScramCredentialsResult result = 
+  new DescribeUserScramCredentialsResult().setUser(user.getKey());
+
+if (user.getValue() == false) {
+List credentialInfos = new 
ArrayList();
+
+boolean datafound = false;
+for (Map.Entry> mechanismsEntry : mechanisms.entrySet()) {

Review Comment:
   No, in our implementation we will never create a `Map` for ScramMechanism.UNKNOWN because we never populate any 
credentials for an unknown mechanism. Thus I can iterate over the maps knowing 
that they are all valid.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-15 Thread via GitHub


pprovenzano commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1107193412


##
metadata/src/main/java/org/apache/kafka/image/ScramImage.java:
##
@@ -50,6 +58,63 @@ public void write(ImageWriter writer, ImageWriterOptions 
options) {
 }
 }
 
+public DescribeUserScramCredentialsResponseData 
describe(DescribeUserScramCredentialsRequestData request) {
+
+List users = request.users();
+Map uniqueUsers = new HashMap();
+
+if ((users == null) || (users.size() == 0)) {
+System.out.println("Describe : get all the users");
+// If there are no users listed then get all the users
+for (Map scramCredentialDataSet : 
mechanisms.values()) {
+for (String user : scramCredentialDataSet.keySet()) {
+uniqueUsers.put(user, false);
+}
+}
+} else {
+// Filter out duplicates
+for (UserName user : users) {
+if (uniqueUsers.containsKey(user.name())) {
+uniqueUsers.put(user.name(), true);
+} else {
+uniqueUsers.put(user.name(), false);
+}
+}
+}
+
+DescribeUserScramCredentialsResponseData retval = new 
DescribeUserScramCredentialsResponseData();
+
+for (Map.Entry user : uniqueUsers.entrySet()) {
+DescribeUserScramCredentialsResult result = 
+  new DescribeUserScramCredentialsResult().setUser(user.getKey());
+
+if (user.getValue() == false) {
+List credentialInfos = new 
ArrayList();
+
+boolean datafound = false;
+for (Map.Entry> mechanismsEntry : mechanisms.entrySet()) {
+Map credentialDataSet = 
mechanismsEntry.getValue();
+if (credentialDataSet.containsKey(user.getKey())) {
+credentialInfos.add(new 
CredentialInfo().setMechanism(mechanismsEntry.getKey().type())
+
.setIterations(credentialDataSet.get(user.getKey()).iterations()));
+datafound = true;
+}
+}
+if (datafound) {
+result.setCredentialInfos(credentialInfos);
+} else {
+result.setErrorCode(Errors.RESOURCE_NOT_FOUND.code())

Review Comment:
   The request contains a list of users. If the list is empty then the request 
is to describe all the users. I personally think this is a security issue but 
I'm just implementing what was there for Zk. 
   
   If there are no users with SCRAM credentials and the request is to describe 
all the users, then an empty response is returned. It is not an error. This is 
tested in the unit test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-15 Thread via GitHub


pprovenzano commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1107200727


##
metadata/src/main/java/org/apache/kafka/image/ScramImage.java:
##
@@ -50,6 +58,63 @@ public void write(ImageWriter writer, ImageWriterOptions 
options) {
 }
 }
 
+public DescribeUserScramCredentialsResponseData 
describe(DescribeUserScramCredentialsRequestData request) {
+
+List users = request.users();
+Map uniqueUsers = new HashMap();
+
+if ((users == null) || (users.size() == 0)) {
+System.out.println("Describe : get all the users");
+// If there are no users listed then get all the users
+for (Map scramCredentialDataSet : 
mechanisms.values()) {
+for (String user : scramCredentialDataSet.keySet()) {
+uniqueUsers.put(user, false);
+}
+}
+} else {
+// Filter out duplicates
+for (UserName user : users) {
+if (uniqueUsers.containsKey(user.name())) {
+uniqueUsers.put(user.name(), true);
+} else {
+uniqueUsers.put(user.name(), false);
+}
+}
+}
+
+DescribeUserScramCredentialsResponseData retval = new 
DescribeUserScramCredentialsResponseData();
+
+for (Map.Entry user : uniqueUsers.entrySet()) {
+DescribeUserScramCredentialsResult result = 
+  new DescribeUserScramCredentialsResult().setUser(user.getKey());
+
+if (user.getValue() == false) {
+List credentialInfos = new 
ArrayList();
+
+boolean datafound = false;
+for (Map.Entry> mechanismsEntry : mechanisms.entrySet()) {
+Map credentialDataSet = 
mechanismsEntry.getValue();
+if (credentialDataSet.containsKey(user.getKey())) {
+credentialInfos.add(new 
CredentialInfo().setMechanism(mechanismsEntry.getKey().type())
+
.setIterations(credentialDataSet.get(user.getKey()).iterations()));
+datafound = true;
+}
+}
+if (datafound) {
+result.setCredentialInfos(credentialInfos);
+} else {
+result.setErrorCode(Errors.RESOURCE_NOT_FOUND.code())

Review Comment:
   I'm not sure why there is an illegalUsers error in the Zk case as it 
explicitly tests for users.get.isEmtpy for the describe all case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-15 Thread via GitHub


pprovenzano commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1107200727


##
metadata/src/main/java/org/apache/kafka/image/ScramImage.java:
##
@@ -50,6 +58,63 @@ public void write(ImageWriter writer, ImageWriterOptions 
options) {
 }
 }
 
+public DescribeUserScramCredentialsResponseData 
describe(DescribeUserScramCredentialsRequestData request) {
+
+List users = request.users();
+Map uniqueUsers = new HashMap();
+
+if ((users == null) || (users.size() == 0)) {
+System.out.println("Describe : get all the users");
+// If there are no users listed then get all the users
+for (Map scramCredentialDataSet : 
mechanisms.values()) {
+for (String user : scramCredentialDataSet.keySet()) {
+uniqueUsers.put(user, false);
+}
+}
+} else {
+// Filter out duplicates
+for (UserName user : users) {
+if (uniqueUsers.containsKey(user.name())) {
+uniqueUsers.put(user.name(), true);
+} else {
+uniqueUsers.put(user.name(), false);
+}
+}
+}
+
+DescribeUserScramCredentialsResponseData retval = new 
DescribeUserScramCredentialsResponseData();
+
+for (Map.Entry user : uniqueUsers.entrySet()) {
+DescribeUserScramCredentialsResult result = 
+  new DescribeUserScramCredentialsResult().setUser(user.getKey());
+
+if (user.getValue() == false) {
+List credentialInfos = new 
ArrayList();
+
+boolean datafound = false;
+for (Map.Entry> mechanismsEntry : mechanisms.entrySet()) {
+Map credentialDataSet = 
mechanismsEntry.getValue();
+if (credentialDataSet.containsKey(user.getKey())) {
+credentialInfos.add(new 
CredentialInfo().setMechanism(mechanismsEntry.getKey().type())
+
.setIterations(credentialDataSet.get(user.getKey()).iterations()));
+datafound = true;
+}
+}
+if (datafound) {
+result.setCredentialInfos(credentialInfos);
+} else {
+result.setErrorCode(Errors.RESOURCE_NOT_FOUND.code())

Review Comment:
   I'm not sure why there is an illegalUsers error in the Zk case as it 
explicitly tests for users.get.isEmtpy for the describe all case. 
https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/core/src/main/scala/kafka/server/ZkAdminManager.scala#L805



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-15 Thread via GitHub


pprovenzano commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1107211109


##
metadata/src/main/java/org/apache/kafka/image/ScramImage.java:
##
@@ -50,6 +58,63 @@ public void write(ImageWriter writer, ImageWriterOptions 
options) {
 }
 }
 
+public DescribeUserScramCredentialsResponseData 
describe(DescribeUserScramCredentialsRequestData request) {
+
+List users = request.users();
+Map uniqueUsers = new HashMap();
+
+if ((users == null) || (users.size() == 0)) {
+System.out.println("Describe : get all the users");
+// If there are no users listed then get all the users
+for (Map scramCredentialDataSet : 
mechanisms.values()) {
+for (String user : scramCredentialDataSet.keySet()) {
+uniqueUsers.put(user, false);
+}
+}
+} else {
+// Filter out duplicates
+for (UserName user : users) {
+if (uniqueUsers.containsKey(user.name())) {
+uniqueUsers.put(user.name(), true);
+} else {
+uniqueUsers.put(user.name(), false);
+}
+}
+}
+
+DescribeUserScramCredentialsResponseData retval = new 
DescribeUserScramCredentialsResponseData();
+
+for (Map.Entry user : uniqueUsers.entrySet()) {
+DescribeUserScramCredentialsResult result = 
+  new DescribeUserScramCredentialsResult().setUser(user.getKey());
+
+if (user.getValue() == false) {
+List credentialInfos = new 
ArrayList();
+
+boolean datafound = false;
+for (Map.Entry> mechanismsEntry : mechanisms.entrySet()) {
+Map credentialDataSet = 
mechanismsEntry.getValue();
+if (credentialDataSet.containsKey(user.getKey())) {
+credentialInfos.add(new 
CredentialInfo().setMechanism(mechanismsEntry.getKey().type())
+
.setIterations(credentialDataSet.get(user.getKey()).iterations()));
+datafound = true;
+}
+}
+if (datafound) {
+result.setCredentialInfos(credentialInfos);
+} else {
+result.setErrorCode(Errors.RESOURCE_NOT_FOUND.code())
+  
.setErrorMessage("attemptToDescribeUserThatDoesNotExist: " + user.getKey());

Review Comment:
   Yes!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-15 Thread via GitHub


pprovenzano commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1107227490


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -3611,12 +3617,4 @@ object KafkaApis {
   private def unsupported(text: String): Exception = {
 new UnsupportedVersionException(s"Unsupported when using a Raft-based 
metadata quorum: $text")
   }
-
-  private def notYetSupported(request: RequestChannel.Request): Exception = {

Review Comment:
   The compiler throws an error if we try to keep them. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-15 Thread via GitHub


C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1107273995


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##
@@ -16,40 +16,88 @@
  */
 package org.apache.kafka.connect.mirror;
 
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
-import java.util.Map;
-import java.util.HashMap;
 import java.util.Collections;
-import java.time.Duration;
+import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ConcurrentHashMap;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset 
translation. */
 class OffsetSyncStore implements AutoCloseable {
-private final KafkaConsumer consumer;
-private final Map offsetSyncs = new 
HashMap<>();
-private final TopicPartition offsetSyncTopicPartition;
+private final KafkaBasedLog backingStore;
+private final Map offsetSyncs = new 
ConcurrentHashMap<>();
+private final TopicAdmin admin;
+private volatile boolean readToEnd = false;
 
 OffsetSyncStore(MirrorCheckpointConfig config) {
-consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-new ByteArrayDeserializer(), new ByteArrayDeserializer());
-offsetSyncTopicPartition = new 
TopicPartition(config.offsetSyncsTopic(), 0);
-consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+Consumer consumer = null;
+TopicAdmin admin = null;
+KafkaBasedLog store;
+try {
+Consumer finalConsumer = consumer = 
MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig());

Review Comment:
   Ah yeah, much cleaner. Thanks! 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-15 Thread via GitHub


C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1107275404


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java:
##
@@ -134,9 +138,9 @@ public String version() {
 @Override
 public List poll() throws InterruptedException {
 try {
-long deadline = System.currentTimeMillis() + interval.toMillis();
-while (!stopping && System.currentTimeMillis() < deadline) {
-offsetSyncStore.update(pollTimeout);
+if (stopping.await(pollTimeout.toMillis(), TimeUnit.MILLISECONDS)) 
{

Review Comment:
   Ah yeah, totally right, the condition was correct. Sorry about that!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dejan2609 commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8

2023-02-15 Thread via GitHub


dejan2609 commented on PR #13205:
URL: https://github.com/apache/kafka/pull/13205#issuecomment-1431575644

   It seems that Spotless Gradle plugin needs to be alligned with Gradle 8.0 (I 
filed a ticket here: https://github.com/diffplug/spotless/issues/1572)
   
   Thing is that they dropped support for direct Java 8 builds: 
https://github.com/diffplug/spotless/blob/main/plugin-gradle/CHANGES.md#6140---2023-01-26
 (their suggestion for other teams is to use Java cross compilation: 
https://docs.gradle.org/8.0/userguide/building_java_projects.html#sec:java_cross_compilation).
   
   Kafka obviously still needs to build artifacts against Java 8, so maybe it 
would be a good idea to use Spotless team suggestion.
   
   All-in-all, herewith a plan for a Gradle 7 -->> 8 upgrade:
   - spotless team will release Gradle 8.0 compatible version (most probably 
they will not backport solution into spotless gradle 6.13.x line)
   - in parallel I can try to drop JDK 8 usage (note: Java 8 compatible 
artifacts will still be generated)
   - when we make sure that Java cross compilation works we can come back to 
this PR and bump Gradle (and Spotless plugin version to 6.15+).
   
   @ijuma If it is ok with you I can start working towards this solution.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #13219: MINOR: Simplify JUnit assertions; remove accidental unnecessary code

2023-02-15 Thread via GitHub


clolov commented on PR #13219:
URL: https://github.com/apache/kafka/pull/13219#issuecomment-1431608450

   @divijvaidya, as suggested I have left only test changes in this pull 
request. Furthermore, I believe I have fixed all `assertEquals` which had 
arguments the wrong way around 😊


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-15 Thread via GitHub


pprovenzano commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1107329539


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -99,6 +99,7 @@ class ControllerApis(val requestChannel: RequestChannel,
 case ApiKeys.INCREMENTAL_ALTER_CONFIGS => 
handleIncrementalAlterConfigs(request)
 case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => 
handleAlterPartitionReassignments(request)
 case ApiKeys.LIST_PARTITION_REASSIGNMENTS => 
handleListPartitionReassignments(request)
+case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => 
handleAlterUserScramCredentials(request)

Review Comment:
   We cannot test it until we update ControllerApis to handle METADATA 
requests, so I'm going to wait on adding it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-02-15 Thread via GitHub


pprovenzano commented on code in PR #13114:
URL: https://github.com/apache/kafka/pull/13114#discussion_r1107329945


##
metadata/src/main/java/org/apache/kafka/image/ScramImage.java:
##
@@ -50,6 +58,63 @@ public void write(ImageWriter writer, ImageWriterOptions 
options) {
 }
 }
 
+public DescribeUserScramCredentialsResponseData 
describe(DescribeUserScramCredentialsRequestData request) {
+
+List users = request.users();
+Map uniqueUsers = new HashMap();
+
+if ((users == null) || (users.size() == 0)) {
+System.out.println("Describe : get all the users");
+// If there are no users listed then get all the users
+for (Map scramCredentialDataSet : 
mechanisms.values()) {
+for (String user : scramCredentialDataSet.keySet()) {
+uniqueUsers.put(user, false);
+}
+}
+} else {
+// Filter out duplicates
+for (UserName user : users) {
+if (uniqueUsers.containsKey(user.name())) {
+uniqueUsers.put(user.name(), true);
+} else {
+uniqueUsers.put(user.name(), false);
+}
+}
+}
+
+DescribeUserScramCredentialsResponseData retval = new 
DescribeUserScramCredentialsResponseData();
+
+for (Map.Entry user : uniqueUsers.entrySet()) {
+DescribeUserScramCredentialsResult result = 
+  new DescribeUserScramCredentialsResult().setUser(user.getKey());
+
+if (user.getValue() == false) {
+List credentialInfos = new 
ArrayList();
+
+boolean datafound = false;
+for (Map.Entry> mechanismsEntry : mechanisms.entrySet()) {
+Map credentialDataSet = 
mechanismsEntry.getValue();
+if (credentialDataSet.containsKey(user.getKey())) {
+credentialInfos.add(new 
CredentialInfo().setMechanism(mechanismsEntry.getKey().type())
+
.setIterations(credentialDataSet.get(user.getKey()).iterations()));
+datafound = true;
+}
+}
+if (datafound) {
+result.setCredentialInfos(credentialInfos);
+} else {
+result.setErrorCode(Errors.RESOURCE_NOT_FOUND.code())
+  
.setErrorMessage("attemptToDescribeUserThatDoesNotExist: " + user.getKey());

Review Comment:
   Fixed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-15 Thread via GitHub


C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1107345427


##
connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java:
##
@@ -392,6 +400,17 @@ protected Consumer createConsumer() {
 return new KafkaConsumer<>(consumerConfigs);
 }
 
+/**
+ * Test whether a topic partition should be read by this log.
+ * Overridden by subclasses when only a subset of the assigned 
partitions should be read into memory.
+ * By default, this will read all partitions.

Review Comment:
   Some nits:
   
   ```suggestion
* Signals whether a topic partition should be read by this log. Invoked 
on {@link #start() startup} once
* for every partition found in the log's backing topic.
* This method can be overridden by subclasses when only a subset of 
the assigned partitions
* should be read into memory. By default, all partitions are read.
   ```



##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -701,43 +745,151 @@ protected void produceMessages(EmbeddedConnectCluster 
cluster, String topicName,
 int cnt = 0;
 for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
 for (int p = 0; p < numPartitions; p++)
-cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+produce(cluster.kafka(), topicName, p, "key", "value-" + 
cnt++);
 }
-
+
+/**
+ * Produce a test record to a Kafka cluster.
+ * This method allows subclasses to configure and use their own Kafka 
Producer instead of using the built-in default.
+ * @param cluster   Kafka cluster that should receive the record
+ * @param topic Topic to send the record to, non-null
+ * @param partition Partition to send the record to, maybe null.
+ * @param key   Kafka key for the record
+ * @param value Kafka value for the record
+ */
+protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer 
partition, String key, String value) {
+cluster.produce(topic, partition, key, value);
+}
+
+protected static Map 
waitForCheckpointOnAllPartitions(
+MirrorClient client, String consumerGroupName, String 
remoteClusterAlias, String topicName
+) throws InterruptedException {
+AtomicReference> ret = new 
AtomicReference<>();
+waitForCondition(
+() -> {
+Map offsets = 
client.remoteConsumerOffsets(
+consumerGroupName, remoteClusterAlias, 
Duration.ofMillis(3000));
+for (int i = 0; i < NUM_PARTITIONS; i++) {
+if (!offsets.containsKey(new TopicPartition(topicName, 
i))) {
+log.info("Checkpoint is missing for {}: {}-{}", 
consumerGroupName, topicName, i);
+return false;
+}
+}
+ret.set(offsets);
+return true;
+},
+CHECKPOINT_DURATION_MS,
+String.format(
+"Offsets for consumer group %s not translated from %s 
for topic %s",
+consumerGroupName,
+remoteClusterAlias,
+topicName
+)
+);
+return ret.get();
+}
+
 /*
  * given consumer group, topics and expected number of records, make sure 
the consumer group
  * offsets are eventually synced to the expected offset numbers
  */
-protected static  void 
waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect,
-Consumer consumer, List topics, String 
consumerGroupId, int numRecords, boolean exactOffsetTranslation)
-throws InterruptedException {
+protected static  void waitForConsumerGroupFullSync(
+EmbeddedConnectCluster connect, List topics, String 
consumerGroupId, int numRecords, boolean exactOffsetTranslation
+) throws InterruptedException {
 try (Admin adminClient = connect.kafka().createAdminClient()) {
-List tps = new ArrayList<>(NUM_PARTITIONS * 
topics.size());
+Map tps = new HashMap<>(NUM_PARTITIONS 
* topics.size());
 for (int partitionIndex = 0; partitionIndex < NUM_PARTITIONS; 
partitionIndex++) {
 for (String topic : topics) {
-tps.add(new TopicPartition(topic, partitionIndex));
+tps.put(new TopicPartition(topic, partitionIndex), 
OffsetSpec.latest());
 }
 }
 long expectedTotalOffsets = numRecords * topics.size();
 
 waitForCondition(() -> {
 Map consumerGroupOffsets =
 
adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata()

[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-15 Thread via GitHub


C0urante commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1107352681


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -701,43 +745,151 @@ protected void produceMessages(EmbeddedConnectCluster 
cluster, String topicName,
 int cnt = 0;
 for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
 for (int p = 0; p < numPartitions; p++)
-cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+produce(cluster.kafka(), topicName, p, "key", "value-" + 
cnt++);
 }
-
+
+/**
+ * Produce a test record to a Kafka cluster.
+ * This method allows subclasses to configure and use their own Kafka 
Producer instead of using the built-in default.
+ * @param cluster   Kafka cluster that should receive the record
+ * @param topic Topic to send the record to, non-null
+ * @param partition Partition to send the record to, maybe null.
+ * @param key   Kafka key for the record
+ * @param value Kafka value for the record
+ */
+protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer 
partition, String key, String value) {
+cluster.produce(topic, partition, key, value);
+}
+
+protected static Map 
waitForCheckpointOnAllPartitions(
+MirrorClient client, String consumerGroupName, String 
remoteClusterAlias, String topicName
+) throws InterruptedException {
+AtomicReference> ret = new 
AtomicReference<>();
+waitForCondition(
+() -> {
+Map offsets = 
client.remoteConsumerOffsets(
+consumerGroupName, remoteClusterAlias, 
Duration.ofMillis(3000));
+for (int i = 0; i < NUM_PARTITIONS; i++) {
+if (!offsets.containsKey(new TopicPartition(topicName, 
i))) {
+log.info("Checkpoint is missing for {}: {}-{}", 
consumerGroupName, topicName, i);
+return false;
+}
+}
+ret.set(offsets);
+return true;
+},
+CHECKPOINT_DURATION_MS,
+String.format(
+"Offsets for consumer group %s not translated from %s 
for topic %s",
+consumerGroupName,
+remoteClusterAlias,
+topicName
+)
+);
+return ret.get();
+}
+
 /*
  * given consumer group, topics and expected number of records, make sure 
the consumer group
  * offsets are eventually synced to the expected offset numbers
  */
-protected static  void 
waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect,
-Consumer consumer, List topics, String 
consumerGroupId, int numRecords, boolean exactOffsetTranslation)
-throws InterruptedException {
+protected static  void waitForConsumerGroupFullSync(
+EmbeddedConnectCluster connect, List topics, String 
consumerGroupId, int numRecords, boolean exactOffsetTranslation
+) throws InterruptedException {
 try (Admin adminClient = connect.kafka().createAdminClient()) {
-List tps = new ArrayList<>(NUM_PARTITIONS * 
topics.size());
+Map tps = new HashMap<>(NUM_PARTITIONS 
* topics.size());
 for (int partitionIndex = 0; partitionIndex < NUM_PARTITIONS; 
partitionIndex++) {
 for (String topic : topics) {
-tps.add(new TopicPartition(topic, partitionIndex));
+tps.put(new TopicPartition(topic, partitionIndex), 
OffsetSpec.latest());
 }
 }
 long expectedTotalOffsets = numRecords * topics.size();
 
 waitForCondition(() -> {
 Map consumerGroupOffsets =
 
adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get();
-long consumerGroupOffsetTotal = 
consumerGroupOffsets.values().stream()
+long totalConsumerGroupOffsets = 
consumerGroupOffsets.values().stream()
 .mapToLong(OffsetAndMetadata::offset).sum();
 
-Map offsets = consumer.endOffsets(tps, 
CONSUMER_POLL_TIMEOUT_MS);
-long totalOffsets = offsets.values().stream().mapToLong(l -> 
l).sum();
-
+Map 
endOffsets =
+adminClient.listOffsets(tps).all().get();
+long totalEndOffsets = endOffsets.values().stream()
+
.mapToLong(ListOffsetsResult.ListOffsetsResultInfo::offset).sum();
+
+for (TopicPartition tp : endOffsets.keySet()) {
+if (consumerGroupOffsets.containsKey(tp)) {
+assertTrue(consumerGroup

[GitHub] [kafka] mumrah opened a new pull request, #13257: MINOR: Add ZK migration docs to the packaged docs

2023-02-15 Thread via GitHub


mumrah opened a new pull request, #13257:
URL: https://github.com/apache/kafka/pull/13257

   This patch brings in the ZK migration docs that were added for the 3.4 
release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8

2023-02-15 Thread via GitHub


ijuma commented on PR #13205:
URL: https://github.com/apache/kafka/pull/13205#issuecomment-1431655901

   An alternative would be to drop spotless until we drop support for Java 8 
(Apache Kafka 4.0). What actually uses spotless today?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

2023-02-15 Thread via GitHub


C0urante commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1106236517


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##
@@ -135,38 +135,40 @@ public void testFlushFailureReplacesOffsets() throws 
Exception {
 // First time the write fails
 expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, true, null);
 writer.offset(OFFSET_KEY, OFFSET_VALUE);
-assertTrue(writer.beginFlush());
+assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
 writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
 verify(callback).onCompletion(eq(EXCEPTION), isNull());
 
 // Second time it succeeds
 expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, false, null);
-assertTrue(writer.beginFlush());
+assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
 writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
 verify(callback).onCompletion(isNull(), isNull());
 
 // Third time it has no data to flush so we won't get past beginFlush()
-assertFalse(writer.beginFlush());
+assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
 }
 
 @Test
-public void testAlreadyFlushing() {
+public void testAlreadyFlushing() throws InterruptedException, 
TimeoutException {
 @SuppressWarnings("unchecked")
 final Callback callback = mock(Callback.class);
 // Trigger the send, but don't invoke the callback so we'll still be 
mid-flush
 CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
 expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown);
 
 writer.offset(OFFSET_KEY, OFFSET_VALUE);
-assertTrue(writer.beginFlush());
+assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));

Review Comment:
   Worth it to add a check to ensure that `beginFlush` times out if if has been 
invoked previously, before `doFlush` has also been called?
   ```suggestion
   assertThrows(TimeoutException.class, () -> writer.beginFlush(1000L, 
TimeUnit.MILLISECONDS));
   assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

2023-02-15 Thread via GitHub


C0urante commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1107391193


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java:
##
@@ -365,6 +365,10 @@ public void execute() {
 } catch (InterruptedException e) {
 // Ignore and allow to exit.
 } catch (RuntimeException e) {
+if (isCancelled()) {
+log.debug("Skipping final offset commit as task has been 
cancelled");
+throw e;
+}

Review Comment:
   Honestly not sure why we didn't put this here to begin with. Nice 👍



##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##
@@ -135,38 +135,40 @@ public void testFlushFailureReplacesOffsets() throws 
Exception {
 // First time the write fails
 expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, true, null);
 writer.offset(OFFSET_KEY, OFFSET_VALUE);
-assertTrue(writer.beginFlush());
+assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
 writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
 verify(callback).onCompletion(eq(EXCEPTION), isNull());
 
 // Second time it succeeds
 expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, false, null);
-assertTrue(writer.beginFlush());
+assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
 writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
 verify(callback).onCompletion(isNull(), isNull());
 
 // Third time it has no data to flush so we won't get past beginFlush()
-assertFalse(writer.beginFlush());
+assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
 }
 
 @Test
-public void testAlreadyFlushing() {
+public void testAlreadyFlushing() throws InterruptedException, 
TimeoutException {
 @SuppressWarnings("unchecked")
 final Callback callback = mock(Callback.class);
 // Trigger the send, but don't invoke the callback so we'll still be 
mid-flush
 CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
 expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown);
 
 writer.offset(OFFSET_KEY, OFFSET_VALUE);
-assertTrue(writer.beginFlush());
+assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));

Review Comment:
   Still think this may apply here



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java:
##
@@ -100,23 +104,45 @@ private boolean flushing() {
 
 /**
  * Performs the first step of a flush operation, snapshotting the current 
state. This does not
- * actually initiate the flush with the underlying storage.
+ * actually initiate the flush with the underlying storage. Ensures that 
any previous flush operations
+ * have finished before beginning a new flush.
  *
  * @return true if a flush was initiated, false if no data was available
+ * @throws ConnectException if the previous flush is not complete before 
this method is called
  */
-public synchronized boolean beginFlush() {
-if (flushing()) {
-log.error("Invalid call to OffsetStorageWriter flush() while 
already flushing, the "
+public boolean beginFlush() {
+try {
+return beginFlush(0, TimeUnit.NANOSECONDS);
+} catch (InterruptedException | TimeoutException e) {
+log.error("Invalid call to OffsetStorageWriter beginFlush() while 
already flushing, the "
 + "framework should not allow this");
 throw new ConnectException("OffsetStorageWriter is already 
flushing");
 }
+}
 
-if (data.isEmpty())
-return false;
-
-toFlush = data;
-data = new HashMap<>();
-return true;
+/**
+ * Performs the first step of a flush operation, snapshotting the current 
state. This does not
+ * actually initiate the flush with the underlying storage. Ensures that 
any previous flush operations
+ * have finished before beginning a new flush.
+ *
+ * @param timeout A maximum duration to wait for previous flushes to 
finish before giving up on waiting
+ * @param timeUnit Units of the timeout argument
+ * @return true if a flush was initiated, false if no data was available
+ * @throws InterruptedException if this thread was interrupted while 
waiting for the previous flush to complete
+ * @throws TimeoutException if the {@code timeout} elapses before previous 
flushes are complete.
+ */
+public boolean beginFlush(long timeout, TimeUnit timeUnit) throws 
InterruptedException, TimeoutException {
+if (flushInProgress.tryAcqui

[jira] [Created] (KAFKA-14721) Kafka listener uses wrong login class

2023-02-15 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-14721:


 Summary: Kafka listener uses wrong login class
 Key: KAFKA-14721
 URL: https://issues.apache.org/jira/browse/KAFKA-14721
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.1.2
Reporter: Daniel Urban


When trying to configure a single SASL_SSL listener with GSSAPI, Scram and 
OAuth support, we encounter an error at startup:
{code:java}
2023-02-15 13:26:04,250 ERROR kafka.server.KafkaServer: [main]: [KafkaServer 
id=104] Fatal error during KafkaServer startup. Prepare to shutdown
org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No 
serviceName defined in either JAAS or Kafka config
        at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        at 
org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:107)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        at kafka.network.Processor.(SocketServer.scala:861) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at kafka.network.SocketServer.newProcessor(SocketServer.scala:442) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at 
kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:299)
 ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) 
~[scala-library-2.13.10.jar:?]
        at 
kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:297) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:262)
 ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:259)
 ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575) 
~[scala-library-2.13.10.jar:?]
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573) 
~[scala-library-2.13.10.jar:?]
        at scala.collection.AbstractIterable.foreach(Iterable.scala:933) 
~[scala-library-2.13.10.jar:?]
        at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:259)
 ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at kafka.network.SocketServer.startup(SocketServer.scala:131) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at kafka.server.KafkaServer.startup(KafkaServer.scala:310) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at kafka.Kafka$.main(Kafka.scala:109) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at com.cloudera.kafka.wrap.Kafka$.$anonfun$main$1(Kafka.scala:107) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at 
com.cloudera.kafka.wrap.Kafka$.$anonfun$main$1$adapted(Kafka.scala:107) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at com.cloudera.kafka.wrap.Kafka$.runMain(Kafka.scala:118) 
[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at com.cloudera.kafka.wrap.Kafka$.main(Kafka.scala:110) 
[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at com.cloudera.kafka.wrap.Kafka.main(Kafka.scala) 
[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either 
JAAS or Kafka config
        at 
org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:309)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        at 
org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:92)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        at 
org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:61)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        at 
org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        ... 21 more{code}
Using the following configs in a Kafka broker:

jaas configuration file:
{code:java}
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required doNotPrompt=true 
useKeyTab=true storeKey=true serviceName="kafka" 
keyTab="/var/KAFKA_BROKER/kafka.keytab" principal="kafka/hgiovr@SITE";
org.apache.kafka.common.security.scram.ScramLoginModule required;
};{code}
and the following properties:
{code:java}
listener.name.sasl_ssl.sasl.enabled.mechanisms=GSSAPI,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER
listener.name.sasl_ssl.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
 required;
listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuth

[GitHub] [kafka] dejan2609 commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8

2023-02-15 Thread via GitHub


dejan2609 commented on PR #13205:
URL: https://github.com/apache/kafka/pull/13205#issuecomment-1431760212

   At the moment spotless is used via Jenkins CI server :arrow_right: 
Jenkinsfile 'spotlessScalaCheck' task execution: 
https://github.com/apache/kafka/blob/3.4.0/Jenkinsfile#L23
   
   My suggestion is to:
   - remove 'spotlessScalaCheck' out of Jenkinsfile 
   - put some comments into build.gradle that would prevent others from 
changing anything related to spotless until Kafka 4.0 comes into play (i.e. 
after Java 8 is dropped):
  - plugin definition: 
https://github.com/apache/kafka/blob/3.4.0/build.gradle#L33
  - plugin configuration: 
https://github.com/apache/kafka/blob/3.4.0/build.gradle#L47
   
   Note: this PR already removes task 
[dependency](https://github.com/apache/kafka/blob/3.4.0/build.gradle#L2084).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #13025: KAFKA-14299: Fix pause and resume with state updater

2023-02-15 Thread via GitHub


lucasbru commented on code in PR #13025:
URL: https://github.com/apache/kafka/pull/13025#discussion_r1107498812


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java:
##
@@ -190,7 +190,7 @@ public void clearTaskTimeout() {
 
 @Override
 public boolean commitNeeded() {
-throw new UnsupportedOperationException("This task is read-only");
+return task.commitNeeded();

Review Comment:
   ```
   1) introduce a TaskManager.allProcessingTasks, 2) depending on the updater 
enabled flag, let maybeCommit call either this one or allTasks
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on a diff in pull request #13025: KAFKA-14299: Fix pause and resume with state updater

2023-02-15 Thread via GitHub


lucasbru commented on code in PR #13025:
URL: https://github.com/apache/kafka/pull/13025#discussion_r1107498812


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java:
##
@@ -190,7 +190,7 @@ public void clearTaskTimeout() {
 
 @Override
 public boolean commitNeeded() {
-throw new UnsupportedOperationException("This task is read-only");
+return task.commitNeeded();

Review Comment:
   > 1) introduce a TaskManager.allProcessingTasks, 2) depending on the updater 
enabled flag, let maybeCommit call either this one or allTasks
   
   let's do :this: 
   I will update the PR tomorrow first thing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14722) Make BooleanSerde public

2023-02-15 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-14722:
---

 Summary: Make BooleanSerde public
 Key: KAFKA-14722
 URL: https://issues.apache.org/jira/browse/KAFKA-14722
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


We introduce a "BooleanSerde via [https://github.com/apache/kafka/pull/13249] 
as internal class. We could make it public.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax commented on a diff in pull request #13249: KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value

2023-02-15 Thread via GitHub


mjsax commented on code in PR #13249:
URL: https://github.com/apache/kafka/pull/13249#discussion_r1107522774


##
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java:
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static java.util.Objects.requireNonNull;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+
+/**
+ * Similar to {@link ValueAndTimestampSerde} but this serde additionally 
supports (de)serializing
+ * {@link ValueAndTimestamp} instances for which the {@code value} is {@code 
null}.
+ * 
+ * The serialized format is:
+ * 
+ *  +  + 
+ * 
+ * where the boolean is needed in order to distinguish between null and empty 
values (i.e., between
+ * tombstones and {@code byte[0]} values).
+ */
+public class NullableValueAndTimestampSerde extends 
WrappingNullableSerde, Void, V> {
+public NullableValueAndTimestampSerde(final Serde valueSerde) {
+super(
+new 
NullableValueAndTimestampSerializer<>(requireNonNull(valueSerde).serializer()),
+new 
NullableValueAndTimestampDeserializer<>(requireNonNull(valueSerde).deserializer())
+);
+}
+
+static final class BooleanSerde {
+private static final byte TRUE = 0x01;
+private static final byte FALSE = 0x00;
+
+static class BooleanSerializer implements Serializer {
+@Override
+public byte[] serialize(final String topic, final Boolean data) {
+if (data == null) {
+// actually want to return null here but spotbugs won't 
allow deserialization so

Review Comment:
   Not sure if I can follow? We should return `null` and just make sure spotbug 
does not mess with us. What does it complain about?



##
streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java:
##
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import 
org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanDeserializer;
+
+/**
+ * See {@link NullableValueAndTimestampSerde}.
+ */
+public class NullableValueAndTimestampDeserializer implements 
WrappingNullableDeserializer, Void, V> {
+public final Deserializer valueDeserializer;
+private final Deserializer timestampDeserializer;
+private final Deserializer booleanDeserializer;
+
+NullableValueAndTimestampDeserializer(final Deserializer 
valueDeserializer) {
+this.valueDeserializer = Objects.

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

2023-02-15 Thread via GitHub


vcrfxia commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1107567122


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -283,7 +293,12 @@ public void init(final StateStoreContext context, final 
StateStore root) {
 
 // VisibleForTesting
 void restoreBatch(final Collection> 
records) {
-// advance stream time to the max timestamp in the batch
+// copy the observed stream time, for use in deciding whether to drop 
records during restore,
+// when records have exceeded the store's grace period.
+long streamTimeForRestore = observedStreamTime;

Review Comment:
   > I guess the question is, what is the value of `observedStreamTime` when we 
start the restore? Are you saying it's `-1` and we basically "replay" 
`observedStreamTime` during restore?
   
   Yes, that's exactly right. `observedStreamTime` is tracked locally per 
store. It is initialized to `-1` and only updated on `put()` or during restore. 
(This is the same as the existing behavior for window stores today.)
   
   > Maybe best to update some variable names?
   
   Are you proposing that `doPut()` takes stream time as a parameter, so that 
during normal `put()` operation we pass `observedStreamTime` and during restore 
we pass `endOfBatchStreamTime`, which means we can rename 
`streamTimeForRestore` to be `observedStreamTime` instead? This SGTM, just want 
to check whether that's also what you have in mind, since we removed a number 
of parameters from `doPut()` in a previous PR revision in order to keep the 
parameter list small.
   
   > I guess follow up work (independent for this KIP) might be, to actually 
make use of KS runtime streamTime instead of tracking inside the store, and 
thus won't need `observedStreamTime` any longer, as we could look ahead to the 
"end-of-restore stream-time" (not just "end-of batch").
   
   What's the scope of the "streamTime" which is tracked by the KS runtime? Is 
it per-task? Per-processor? Global? I'm wondering how this would work in 
situations with multiple partitions, or with multiple processors where some 
processors are expected to see new data earlier than other (downstream) 
processors.
   
   I guess we'd also need to implement the change from your other comment about 
not writing records which are expired (based on grace period) into the 
changelog topic first before we can make this change, otherwise we would not 
have a way to determine during restore whether records are expired or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4

2023-02-15 Thread via GitHub


guozhangwang commented on code in PR #13192:
URL: https://github.com/apache/kafka/pull/13192#discussion_r1107569600


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataFetcher.java:
##
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NodeApiVersions;
+import org.apache.kafka.clients.StaleMetadataException;
+import org.apache.kafka.clients.consumer.LogTruncationException;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import 
org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult;
+import 
org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicAuthorizationException;
+import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
+import 
org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse;
+import 
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ListOffsetsRequest;
+import org.apache.kafka.common.requests.ListOffsetsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.slf4j.Logger;
+
+public class MetadataFetcher {
+
+private final Logger log;
+private final ConsumerMetadata metadata;
+private final SubscriptionState subscriptions;
+private final ConsumerNetworkClient client;
+private final Time time;
+private final long retryBackoffMs;
+private final long requestTimeoutMs;
+private final IsolationLevel isolationLevel;
+private final AtomicReference cachedListOffsetsException 
= new AtomicReference<>();
+private final AtomicReference 
cachedOffsetForLeaderException = new AtomicReference<>();
+private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
+private final ApiVersions apiVersions;
+private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
+
+public MetadataFetcher(LogContext logContext,
+   ConsumerNetworkClient client,
+   ConsumerMetadata metadata,
+   SubscriptionState subscriptions,
+   Time time,
+   long retryBackoffMs,
+   long requestTimeoutMs,
+   IsolationLevel isolationLevel,
+   ApiVersions apiVersions) {
+this.log = logContext.logger(getClass());
+this.time = time;
+this.cli

[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-15 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1107587497


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -101,15 +130,61 @@ public String toString() {
 public AddPartitionsToTxnRequest(final AddPartitionsToTxnRequestData data, 
short version) {
 super(ApiKeys.ADD_PARTITIONS_TO_TXN, version);
 this.data = data;
+this.version = version;
 }
-
+
+// Only used for versions < 4
 public List partitions() {
 if (cachedPartitions != null) {
 return cachedPartitions;
 }
 cachedPartitions = Builder.getPartitions(data);
 return cachedPartitions;
 }
+
+private List partitionsForTransaction(String transaction) {
+if (cachedPartitionsByTransaction == null) {
+cachedPartitionsByTransaction = new HashMap<>();
+}
+
+return cachedPartitionsByTransaction.computeIfAbsent(transaction, txn 
-> {
+List partitions = new ArrayList<>();
+for (AddPartitionsToTxnTopic topicCollection : 
data.transactions().find(txn).topics()) {
+for (Integer partition : topicCollection.partitions()) {
+partitions.add(new TopicPartition(topicCollection.name(), 
partition));
+}
+}
+return partitions;
+});
+}
+
+public Map> partitionsByTransaction() {
+if (cachedPartitionsByTransaction != null && 
cachedPartitionsByTransaction.size() == data.transactions().size()) {

Review Comment:
   I'm not sure how that would happen



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-15 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1107589437


##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -352,7 +353,12 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
   // this is an optimization: if the partitions are already in the 
metadata reply OK immediately
   Left(Errors.NONE)
 } else {
-  Right(coordinatorEpoch, 
txnMetadata.prepareAddPartitions(partitions.toSet, time.milliseconds()))
+  // If verifyOnly, we should have returned in the step above. If 
we didn't the partitions are not present in the transaction.

Review Comment:
   yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-15 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1107589884


##
clients/src/test/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponseTest.java:
##
@@ -84,16 +88,59 @@ public void testParse() {
 
 topicCollection.add(topicResult);
 
-AddPartitionsToTxnResponseData data = new 
AddPartitionsToTxnResponseData()
-  .setResults(topicCollection)
-  
.setThrottleTimeMs(throttleTimeMs);
-AddPartitionsToTxnResponse response = new 
AddPartitionsToTxnResponse(data);
-
 for (short version : ApiKeys.ADD_PARTITIONS_TO_TXN.allVersions()) {

Review Comment:
   I thought maybe we didn't want to redo the top steps every time but sure i 
can change it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-15 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1107590295


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2383,68 +2384,101 @@ class KafkaApis(val requestChannel: RequestChannel,
 if (config.interBrokerProtocolVersion.isLessThan(version))
   throw new UnsupportedVersionException(s"inter.broker.protocol.version: 
${config.interBrokerProtocolVersion.version} is less than the required version: 
${version.version}")
   }
-
-  def handleAddPartitionToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
+  def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
 ensureInterBrokerVersion(IBP_0_11_0_IV0)
-val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
-val transactionalId = addPartitionsToTxnRequest.data.transactionalId
-val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
-if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, 
transactionalId))
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
-else {
-  val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-  val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-  val authorizedPartitions = mutable.Set[TopicPartition]()
-
-  val authorizedTopics = authHelper.filterByAuthorized(request.context, 
WRITE, TOPIC,
-partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic)
-  for (topicPartition <- partitionsToAdd) {
-if (!authorizedTopics.contains(topicPartition.topic))
-  unauthorizedTopicErrors += topicPartition -> 
Errors.TOPIC_AUTHORIZATION_FAILED
-else if (!metadataCache.contains(topicPartition))
-  nonExistingTopicErrors += topicPartition -> 
Errors.UNKNOWN_TOPIC_OR_PARTITION
-else
-  authorizedPartitions.add(topicPartition)
+val lock = new Object
+val addPartitionsToTxnRequest = if (request.context.apiVersion() < 4) 
request.body[AddPartitionsToTxnRequest].normalizeRequest() else 
request.body[AddPartitionsToTxnRequest]
+val version = addPartitionsToTxnRequest.version
+val responses = new AddPartitionsToTxnResultCollection()
+val partitionsByTransaction = 
addPartitionsToTxnRequest.partitionsByTransaction()
+
+// Newer versions of the request should only come from other brokers.
+if (version >= 4) authHelper.authorizeClusterOperation(request, 
CLUSTER_ACTION)
+
+// V4 requests introduced batches of transactions. We need all 
transactions to be handled before sending the 
+// response so there are a few differences in handling errors and sending 
responses.
+def createResponse(requestThrottleMs: Int): AbstractResponse = {
+  if (version < 4) {
+// There will only be one response in data. Add it to the response 
data object.
+val data = new AddPartitionsToTxnResponseData()
+responses.forEach(result => {
+  data.setResults(result.topicResults())
+  data.setThrottleTimeMs(requestThrottleMs)
+})
+new AddPartitionsToTxnResponse(data)
+  } else {
+new AddPartitionsToTxnResponse(new 
AddPartitionsToTxnResponseData().setThrottleTimeMs(requestThrottleMs).setResultsByTransaction(responses))
   }
+}
 
-  if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) 
{
-// Any failed partition check causes the entire request to fail. We 
send the appropriate error codes for the
-// partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error 
code for the partitions which succeeded
-// the authorization check to indicate that they were not added to the 
transaction.
-val partitionErrors = unauthorizedTopicErrors ++ 
nonExistingTopicErrors ++
-  authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED)
-requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-  new AddPartitionsToTxnResponse(requestThrottleMs, 
partitionErrors.asJava))
+val txns = addPartitionsToTxnRequest.data.transactions
+def maybeSendResponse(): Unit = {
+  lock synchronized {
+if (responses.size() == txns.size()) {
+  requestHelper.sendResponseMaybeThrottle(request, createResponse)

Review Comment:
   no



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-15 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1107590667


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2383,68 +2384,101 @@ class KafkaApis(val requestChannel: RequestChannel,
 if (config.interBrokerProtocolVersion.isLessThan(version))
   throw new UnsupportedVersionException(s"inter.broker.protocol.version: 
${config.interBrokerProtocolVersion.version} is less than the required version: 
${version.version}")
   }
-
-  def handleAddPartitionToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
+  def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, 
requestLocal: RequestLocal): Unit = {
 ensureInterBrokerVersion(IBP_0_11_0_IV0)
-val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest]
-val transactionalId = addPartitionsToTxnRequest.data.transactionalId
-val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala
-if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, 
transactionalId))
-  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
-addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception))
-else {
-  val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]()
-  val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]()
-  val authorizedPartitions = mutable.Set[TopicPartition]()
-
-  val authorizedTopics = authHelper.filterByAuthorized(request.context, 
WRITE, TOPIC,
-partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic)
-  for (topicPartition <- partitionsToAdd) {
-if (!authorizedTopics.contains(topicPartition.topic))
-  unauthorizedTopicErrors += topicPartition -> 
Errors.TOPIC_AUTHORIZATION_FAILED
-else if (!metadataCache.contains(topicPartition))
-  nonExistingTopicErrors += topicPartition -> 
Errors.UNKNOWN_TOPIC_OR_PARTITION
-else
-  authorizedPartitions.add(topicPartition)
+val lock = new Object

Review Comment:
   i can do that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-15 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1107592032


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -101,15 +130,61 @@ public String toString() {
 public AddPartitionsToTxnRequest(final AddPartitionsToTxnRequestData data, 
short version) {
 super(ApiKeys.ADD_PARTITIONS_TO_TXN, version);
 this.data = data;
+this.version = version;
 }
-
+
+// Only used for versions < 4
 public List partitions() {
 if (cachedPartitions != null) {
 return cachedPartitions;
 }
 cachedPartitions = Builder.getPartitions(data);
 return cachedPartitions;
 }
+
+private List partitionsForTransaction(String transaction) {

Review Comment:
   I'm not sure I understand the question.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14723) Do not write expired store records to changelog

2023-02-15 Thread Victoria Xia (Jira)
Victoria Xia created KAFKA-14723:


 Summary: Do not write expired store records to changelog
 Key: KAFKA-14723
 URL: https://issues.apache.org/jira/browse/KAFKA-14723
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Victoria Xia


Window stores and versioned stores both have concepts of "retention" and 
"expiration." Records which are expired are not written to the store, e.g., 
[this 
example|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java#L265-L266]
 for segments stores. However, these expired records are still written to the 
changelog topic, in the case of persistent stores. This does not cause any 
problems because the records are once again omitted from the store during 
restore, but it is inefficient. It'd be good to avoid writing expired records 
to the changelog topic in the first place. Another benefit is that doing so 
would allow us to simplify the restoration code for versioned stores (see 
[relevant 
discussion|https://github.com/apache/kafka/pull/13243#discussion_r1106568364]). 

The reason expired records are still written to the changelog topic is because 
the whether records are expired or not is only tracked at the innermost store 
layer, and not any of the outer store layers such as the changelogging layer. 
The innermost store layer keeps its own `observedStreamTime` which is advanced 
on calls to put() and during restoration, and uses this variable to determine 
when a record is expired. Because the return type from put() is void, the 
changelogging layer has no way to tell whether the inner store's put() actually 
put the record or dropped it as expired, and always writes to the changelog 
topic regardless.

In order to avoid this, we could:
 * update the put() interface to return a boolean indicating whether the record 
was actually put or not, or
 * move the logic for determining when a record is expired into an outer store 
layer, or
 * reorder/restructure the wrapped store layers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14723) Do not write expired store records to changelog

2023-02-15 Thread Victoria Xia (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victoria Xia updated KAFKA-14723:
-
Description: 
Window stores and versioned stores both have concepts of "retention" and 
"expiration." Records which are expired are not written to the store, e.g., 
[this 
example|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java#L265-L266]
 for segments stores. However, these expired records are still written to the 
changelog topic, in the case of persistent stores. This does not cause any 
problems because the records are once again omitted from the store during 
restore, but it is inefficient. It'd be good to avoid writing expired records 
to the changelog topic in the first place. Another benefit is that doing so 
would allow us to simplify the restoration code for versioned stores (see 
[relevant 
discussion|https://github.com/apache/kafka/pull/13243#discussion_r1106568364]). 

The reason expired records are still written to the changelog topic is because 
whether the records are expired or not is only tracked at the innermost store 
layer, and not any of the outer store layers such as the changelogging layer. 
The innermost store layer keeps its own `observedStreamTime` which is advanced 
on calls to put() and during restoration, and uses this variable to determine 
when a record is expired. Because the return type from put() is void, the 
changelogging layer has no way to tell whether the inner store's put() actually 
put the record or dropped it as expired, and always writes to the changelog 
topic regardless.

In order to avoid this, we could:
 * update the put() interface to return a boolean indicating whether the record 
was actually put or not, or
 * move the logic for determining when a record is expired into an outer store 
layer, or
 * reorder/restructure the wrapped store layers.

  was:
Window stores and versioned stores both have concepts of "retention" and 
"expiration." Records which are expired are not written to the store, e.g., 
[this 
example|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java#L265-L266]
 for segments stores. However, these expired records are still written to the 
changelog topic, in the case of persistent stores. This does not cause any 
problems because the records are once again omitted from the store during 
restore, but it is inefficient. It'd be good to avoid writing expired records 
to the changelog topic in the first place. Another benefit is that doing so 
would allow us to simplify the restoration code for versioned stores (see 
[relevant 
discussion|https://github.com/apache/kafka/pull/13243#discussion_r1106568364]). 

The reason expired records are still written to the changelog topic is because 
the whether records are expired or not is only tracked at the innermost store 
layer, and not any of the outer store layers such as the changelogging layer. 
The innermost store layer keeps its own `observedStreamTime` which is advanced 
on calls to put() and during restoration, and uses this variable to determine 
when a record is expired. Because the return type from put() is void, the 
changelogging layer has no way to tell whether the inner store's put() actually 
put the record or dropped it as expired, and always writes to the changelog 
topic regardless.

In order to avoid this, we could:
 * update the put() interface to return a boolean indicating whether the record 
was actually put or not, or
 * move the logic for determining when a record is expired into an outer store 
layer, or
 * reorder/restructure the wrapped store layers.


> Do not write expired store records to changelog
> ---
>
> Key: KAFKA-14723
> URL: https://issues.apache.org/jira/browse/KAFKA-14723
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Victoria Xia
>Priority: Major
>
> Window stores and versioned stores both have concepts of "retention" and 
> "expiration." Records which are expired are not written to the store, e.g., 
> [this 
> example|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java#L265-L266]
>  for segments stores. However, these expired records are still written to the 
> changelog topic, in the case of persistent stores. This does not cause any 
> problems because the records are once again omitted from the store during 
> restore, but it is inefficient. It'd be good to avoid writing expired records 
> to the changelog topic in the 

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

2023-02-15 Thread via GitHub


vcrfxia commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1107595402


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -297,6 +312,12 @@ void restoreBatch(final Collection> records) {
 // records into memory. how high this memory amplification will be is 
very much dependent
 // on the specific workload and the value of the "segment interval" 
parameter.
 for (final ConsumerRecord record : records) {
+if (record.timestamp() < streamTimeForRestore - gracePeriod) {
+// record is older than grace period and was therefore never 
written to the store

Review Comment:
   Sounds good. Here's the ticket: 
https://issues.apache.org/jira/browse/KAFKA-14723



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14274) Fetcher refactor—split Fetcher into Fetcher and MetadataFetcher

2023-02-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14274:
--
Summary: Fetcher refactor—split Fetcher into Fetcher and MetadataFetcher  
(was: Implement fetching logic)

> Fetcher refactor—split Fetcher into Fetcher and MetadataFetcher
> ---
>
> Key: KAFKA-14274
> URL: https://issues.apache.org/jira/browse/KAFKA-14274
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>
> The fetch request and fetch processing should happen asynchronously.  More 
> specifically, we have the background thread to send fetch requests 
> autonomously and relay the response back to the polling thread.  The polling 
> thread collects these fetch requests and returns the ConsumerRecord.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14274) Split Fetcher into Fetcher and MetadataFetcher

2023-02-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14274:
--
Summary: Split Fetcher into Fetcher and MetadataFetcher  (was: Fetcher 
refactor—split Fetcher into Fetcher and MetadataFetcher)

> Split Fetcher into Fetcher and MetadataFetcher
> --
>
> Key: KAFKA-14274
> URL: https://issues.apache.org/jira/browse/KAFKA-14274
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>
> The fetch request and fetch processing should happen asynchronously.  More 
> specifically, we have the background thread to send fetch requests 
> autonomously and relay the response back to the polling thread.  The polling 
> thread collects these fetch requests and returns the ConsumerRecord.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14365) Extract common logic from Fetcher into FetcherUtils

2023-02-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14365:
--
Summary: Extract common logic from Fetcher into FetcherUtils  (was: 
Refactor Fetcher to allow different implementations)

> Extract common logic from Fetcher into FetcherUtils
> ---
>
> Key: KAFKA-14365
> URL: https://issues.apache.org/jira/browse/KAFKA-14365
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>
> The `Fetcher` API is used internally by the `KafkaConsumer` to fetch records 
> from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored `Fetcher`. In order to keep the 
> existing `KafkaConsumer` as untouched as possible, this Jira proposes to 
> refactor the `Fetcher` so as to allow other implementations to use the unit 
> tests and `KafkaConsumer`.
> Here are the proposed steps:
>  # Extract out the common APIs used by the `KafkaConsumer` and related unit 
> tests into a new Java interface named `Fetcher`
>  # Rename the existing `Fetcher` as `KafkaFetcher` (or similar)
>  # Refactor the `KafkaConsumer`, `FetcherTest`, and other call sites to 
> primarily use the new `Fetcher` interface
> A future pull request will add the new `Fetcher` implementation and tie it in 
> to the existing `FetcherTest` tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

2023-02-15 Thread via GitHub


vcrfxia commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1107613538


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##
@@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() {
 verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
 }
 
+@Test
+public void shouldNotRestoreExpired() {
+final List records = new ArrayList<>();
+records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - 
GRACE_PERIOD)); // grace period has not elapsed
+records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - 
GRACE_PERIOD)); // grace period has elapsed, so this record should not be 
restored
+
+store.restoreBatch(getChangelogRecords(records));
+
+verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+verifyGetValueFromStore("k1", "v1", HISTORY_RETENTION + 10 - 
GRACE_PERIOD);
+verifyGetNullFromStore("k2");
+}
+
+@Test
+public void shouldRestoreEvenIfRecordWouldBeExpiredByEndOfBatch() {
+final List records = new ArrayList<>();
+records.add(new DataRecord("k2", "v2", HISTORY_RETENTION - 
GRACE_PERIOD)); // this record will be older than grace period by the end of 
the batch, but should still be restored
+records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+
+store.restoreBatch(getChangelogRecords(records));
+
+verifyGetValueFromStore("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD);
+verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+}
+
+@Test
+public void shouldAllowZeroHistoryRetention() {
+// recreate store with zero history retention
+store.close();
+store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 0L, 
SEGMENT_INTERVAL);
+store.init((StateStoreContext) context, store);
+
+// put, get, and delete
+putToStore("k", "v", BASE_TIMESTAMP);
+verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", 
BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v", 
BASE_TIMESTAMP); // query in "future" is allowed
+
+// update existing record at same timestamp
+putToStore("k", "updated", BASE_TIMESTAMP);
+verifyGetValueFromStore("k", "updated", BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "updated", 
BASE_TIMESTAMP);
+
+// put new record version
+putToStore("k", "v2", BASE_TIMESTAMP + 2);
+verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 2);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", 
BASE_TIMESTAMP + 2);
+
+// query in past (history retention expired) returns null
+verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1);
+
+// put in past (grace period expired) does not update the store

Review Comment:
   Hm, just realized it's not possible to add this case in a meaningful way. 
Suppose observed stream time is `t` and we put-in-past for an existing key at 
time `t-1`. We cannot query for the value of the key at time `t-1` because that 
is outside history retention. And if we query for the latest value of the key, 
then we'll get the record at time `t` regardless of whether the put at time 
`t-1` was properly rejected or not.
   
   We'd have to query the inner store in order to perform this check, which 
feels like overkill. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14675) Extract metadata-related tasks from Fetcher into MetadataFetcher

2023-02-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14675:
--
Parent: (was: KAFKA-14365)
Issue Type: Improvement  (was: Sub-task)

> Extract metadata-related tasks from Fetcher into MetadataFetcher
> 
>
> Key: KAFKA-14675
> URL: https://issues.apache.org/jira/browse/KAFKA-14675
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> Extract from {{Fetcher}} the APIs that are related to metadata operations 
> into a new class named {{{}MetadataFetcher{}}}. This will allow the 
> refactoring of {{Fetcher}} and {{MetadataFetcher}} for the new consumer. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14365) Extract common logic from Fetcher into FetcherUtils

2023-02-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14365:
--
Priority: Major  (was: Minor)

> Extract common logic from Fetcher into FetcherUtils
> ---
>
> Key: KAFKA-14365
> URL: https://issues.apache.org/jira/browse/KAFKA-14365
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> The {{Fetcher}} class is used internally by the `KafkaConsumer` to fetch 
> records from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored {{{}Fetcher{}}}. In order to 
> keep the existing users of {{Fetcher}} as untouched as possible, this Jira 
> proposes to refactor the {{Fetcher}} by extracting out some common logic into 
> {{FetcherUtils}} to allow forthcoming implementations to use that common 
> logic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14365) Extract common logic from Fetcher into FetcherUtils

2023-02-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14365:
--
Description: The {{Fetcher}} class is used internally by the 
`KafkaConsumer` to fetch records from the brokers. There is ongoing work to 
create a new consumer implementation with a significantly refactored threading 
model. The threading refactor work requires a similarly refactored 
{{{}Fetcher{}}}. In order to keep the existing users of {{Fetcher}} as 
untouched as possible, this Jira proposes to refactor the {{Fetcher}} by 
extracting out some common logic into {{FetcherUtils}} to allow forthcoming 
implementations to use that common logic.  (was: The `Fetcher` API is used 
internally by the `KafkaConsumer` to fetch records from the brokers. There is 
ongoing work to create a new consumer implementation with a significantly 
refactored threading model. The threading refactor work requires a similarly 
refactored `Fetcher`. In order to keep the existing `KafkaConsumer` as 
untouched as possible, this Jira proposes to refactor the `Fetcher` so as to 
allow other implementations to use the unit tests and `KafkaConsumer`.

Here are the proposed steps:
 # Extract out the common APIs used by the `KafkaConsumer` and related unit 
tests into a new Java interface named `Fetcher`
 # Rename the existing `Fetcher` as `KafkaFetcher` (or similar)
 # Refactor the `KafkaConsumer`, `FetcherTest`, and other call sites to 
primarily use the new `Fetcher` interface

A future pull request will add the new `Fetcher` implementation and tie it in 
to the existing `FetcherTest` tests.)

> Extract common logic from Fetcher into FetcherUtils
> ---
>
> Key: KAFKA-14365
> URL: https://issues.apache.org/jira/browse/KAFKA-14365
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Minor
>
> The {{Fetcher}} class is used internally by the `KafkaConsumer` to fetch 
> records from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored {{{}Fetcher{}}}. In order to 
> keep the existing users of {{Fetcher}} as untouched as possible, this Jira 
> proposes to refactor the {{Fetcher}} by extracting out some common logic into 
> {{FetcherUtils}} to allow forthcoming implementations to use that common 
> logic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14365) Extract common logic from Fetcher into FetcherUtils

2023-02-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14365:
--
Description: The {{Fetcher}} class is used internally by the 
{{KafkaConsumer}} to fetch records from the brokers. There is ongoing work to 
create a new consumer implementation with a significantly refactored threading 
model. The threading refactor work requires a similarly refactored 
{{{}Fetcher{}}}. In order to keep the existing users of {{Fetcher}} as 
untouched as possible, this Jira proposes to refactor the {{Fetcher}} by 
extracting out some common logic into {{FetcherUtils}} to allow forthcoming 
implementations to use that common logic.  (was: The {{Fetcher}} class is used 
internally by the `KafkaConsumer` to fetch records from the brokers. There is 
ongoing work to create a new consumer implementation with a significantly 
refactored threading model. The threading refactor work requires a similarly 
refactored {{{}Fetcher{}}}. In order to keep the existing users of {{Fetcher}} 
as untouched as possible, this Jira proposes to refactor the {{Fetcher}} by 
extracting out some common logic into {{FetcherUtils}} to allow forthcoming 
implementations to use that common logic.)

> Extract common logic from Fetcher into FetcherUtils
> ---
>
> Key: KAFKA-14365
> URL: https://issues.apache.org/jira/browse/KAFKA-14365
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
> records from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored {{{}Fetcher{}}}. In order to 
> keep the existing users of {{Fetcher}} as untouched as possible, this Jira 
> proposes to refactor the {{Fetcher}} by extracting out some common logic into 
> {{FetcherUtils}} to allow forthcoming implementations to use that common 
> logic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-15 Thread via GitHub


jeffkbkim commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1107618273


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -101,15 +130,61 @@ public String toString() {
 public AddPartitionsToTxnRequest(final AddPartitionsToTxnRequestData data, 
short version) {
 super(ApiKeys.ADD_PARTITIONS_TO_TXN, version);
 this.data = data;
+this.version = version;
 }
-
+
+// Only used for versions < 4
 public List partitions() {
 if (cachedPartitions != null) {
 return cachedPartitions;
 }
 cachedPartitions = Builder.getPartitions(data);
 return cachedPartitions;
 }
+
+private List partitionsForTransaction(String transaction) {

Review Comment:
   sorry, i meant the return value `List`, it doesn't seem to 
be used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14365) Extract common logic from Fetcher into FetcherUtils

2023-02-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14365:
--
Description: 
The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
records from the brokers. There is ongoing work to create a new consumer 
implementation with a significantly refactored threading model. The threading 
refactor work requires a similarly refactored {{{}Fetcher{}}}.

This Jira proposes to refactor the {{Fetcher}} by extracting out some common 
logic into {{FetcherUtils}} to allow forthcoming implementations of fetcher to 
leverage that common logic.

  was:The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to 
fetch records from the brokers. There is ongoing work to create a new consumer 
implementation with a significantly refactored threading model. The threading 
refactor work requires a similarly refactored {{{}Fetcher{}}}. In order to keep 
the existing users of {{Fetcher}} as untouched as possible, this Jira proposes 
to refactor the {{Fetcher}} by extracting out some common logic into 
{{FetcherUtils}} to allow forthcoming implementations to use that common logic.


> Extract common logic from Fetcher into FetcherUtils
> ---
>
> Key: KAFKA-14365
> URL: https://issues.apache.org/jira/browse/KAFKA-14365
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
> records from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored {{{}Fetcher{}}}.
> This Jira proposes to refactor the {{Fetcher}} by extracting out some common 
> logic into {{FetcherUtils}} to allow forthcoming implementations of fetcher 
> to leverage that common logic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14675) Extract metadata-related tasks from Fetcher into MetadataFetcher

2023-02-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14675:
--
Description: 
The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
records from the brokers. There is ongoing work to create a new consumer 
implementation with a significantly refactored threading model. The threading 
refactor work requires a similarly refactored {{{}Fetcher{}}}.

This task covers the work to extract from {{Fetcher}} the APIs that are related 
to metadata operations into a new class named {{{}MetadataFetcher{}}}. This 
will allow the refactoring of {{Fetcher}} and {{MetadataFetcher}} for the new 
consumer.

  was:Extract from {{Fetcher}} the APIs that are related to metadata operations 
into a new class named {{{}MetadataFetcher{}}}. This will allow the refactoring 
of {{Fetcher}} and {{MetadataFetcher}} for the new consumer. 


> Extract metadata-related tasks from Fetcher into MetadataFetcher
> 
>
> Key: KAFKA-14675
> URL: https://issues.apache.org/jira/browse/KAFKA-14675
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
> records from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored {{{}Fetcher{}}}.
> This task covers the work to extract from {{Fetcher}} the APIs that are 
> related to metadata operations into a new class named 
> {{{}MetadataFetcher{}}}. This will allow the refactoring of {{Fetcher}} and 
> {{MetadataFetcher}} for the new consumer.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14365) Extract common logic from Fetcher into FetcherUtils

2023-02-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14365:
--
Description: 
The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
records from the brokers. There is ongoing work to create a new consumer 
implementation with a significantly refactored threading model. The threading 
refactor work requires a similarly refactored {{{}Fetcher{}}}.

This task includes refactoring {{Fetcher}} by extracting out some common logic 
into {{FetcherUtils}} to allow forthcoming implementations to leverage that 
common logic.

  was:
The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
records from the brokers. There is ongoing work to create a new consumer 
implementation with a significantly refactored threading model. The threading 
refactor work requires a similarly refactored {{{}Fetcher{}}}.

This Jira proposes to refactor the {{Fetcher}} by extracting out some common 
logic into {{FetcherUtils}} to allow forthcoming implementations of fetcher to 
leverage that common logic.


> Extract common logic from Fetcher into FetcherUtils
> ---
>
> Key: KAFKA-14365
> URL: https://issues.apache.org/jira/browse/KAFKA-14365
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
> records from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored {{{}Fetcher{}}}.
> This task includes refactoring {{Fetcher}} by extracting out some common 
> logic into {{FetcherUtils}} to allow forthcoming implementations to leverage 
> that common logic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14274) Split Fetcher into Fetcher and MetadataFetcher

2023-02-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14274:
--
Description: 
The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
records from the brokers. There is ongoing work to create a new consumer 
implementation with a significantly refactored threading model. The threading 
refactor work requires a similarly refactored {{{}Fetcher{}}}.

 

The fetch request and fetch processing should happen asynchronously.  More 
specifically, we have the background thread to send fetch requests autonomously 
and relay the response back to the polling thread.  The polling thread collects 
these fetch requests and returns the ConsumerRecord.

  was:The fetch request and fetch processing should happen asynchronously.  
More specifically, we have the background thread to send fetch requests 
autonomously and relay the response back to the polling thread.  The polling 
thread collects these fetch requests and returns the ConsumerRecord.


> Split Fetcher into Fetcher and MetadataFetcher
> --
>
> Key: KAFKA-14274
> URL: https://issues.apache.org/jira/browse/KAFKA-14274
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>
> The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
> records from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored {{{}Fetcher{}}}.
>  
> The fetch request and fetch processing should happen asynchronously.  More 
> specifically, we have the background thread to send fetch requests 
> autonomously and relay the response back to the polling thread.  The polling 
> thread collects these fetch requests and returns the ConsumerRecord.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14274) Split Fetcher into Fetcher and MetadataFetcher

2023-02-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14274:
--
Component/s: clients

> Split Fetcher into Fetcher and MetadataFetcher
> --
>
> Key: KAFKA-14274
> URL: https://issues.apache.org/jira/browse/KAFKA-14274
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>
> The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
> records from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored {{{}Fetcher{}}}.
>  
> The fetch request and fetch processing should happen asynchronously.  More 
> specifically, we have the background thread to send fetch requests 
> autonomously and relay the response back to the polling thread.  The polling 
> thread collects these fetch requests and returns the ConsumerRecord.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14274) Split Fetcher into Fetcher and MetadataFetcher

2023-02-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14274:
--
Description: 
The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
records from the brokers. There is ongoing work to create a new consumer 
implementation with a significantly refactored threading model. The threading 
refactor work requires a similarly refactored {{{}Fetcher{}}}.

The fetch request and fetch processing should happen asynchronously. More 
specifically, we have the background thread to send fetch requests autonomously 
and relay the response back to the foreground thread. The foreground thread 
collects these fetch responses in the form of {{{}CompletedFetch{}}}, 
decompresses the data, deserializes the data, and converts each {{Record}} into 
a {{ConsumerRecord}} for returning to the user.

  was:
The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
records from the brokers. There is ongoing work to create a new consumer 
implementation with a significantly refactored threading model. The threading 
refactor work requires a similarly refactored {{{}Fetcher{}}}.

 

The fetch request and fetch processing should happen asynchronously.  More 
specifically, we have the background thread to send fetch requests autonomously 
and relay the response back to the polling thread.  The polling thread collects 
these fetch requests and returns the ConsumerRecord.


> Split Fetcher into Fetcher and MetadataFetcher
> --
>
> Key: KAFKA-14274
> URL: https://issues.apache.org/jira/browse/KAFKA-14274
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>
> The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
> records from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored {{{}Fetcher{}}}.
> The fetch request and fetch processing should happen asynchronously. More 
> specifically, we have the background thread to send fetch requests 
> autonomously and relay the response back to the foreground thread. The 
> foreground thread collects these fetch responses in the form of 
> {{{}CompletedFetch{}}}, decompresses the data, deserializes the data, and 
> converts each {{Record}} into a {{ConsumerRecord}} for returning to the user.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14274) Introduce FetchRequestManager

2023-02-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14274:
--
Summary: Introduce FetchRequestManager  (was: Split Fetcher into Fetcher 
and MetadataFetcher)

> Introduce FetchRequestManager
> -
>
> Key: KAFKA-14274
> URL: https://issues.apache.org/jira/browse/KAFKA-14274
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>
> The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
> records from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored {{{}Fetcher{}}}.
> The fetch request and fetch processing should happen asynchronously. More 
> specifically, we have the background thread to send fetch requests 
> autonomously and relay the response back to the foreground thread. The 
> foreground thread collects these fetch responses in the form of 
> {{{}CompletedFetch{}}}, decompresses the data, deserializes the data, and 
> converts each {{Record}} into a {{ConsumerRecord}} for returning to the user.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-15 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1107626067


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -101,15 +130,61 @@ public String toString() {
 public AddPartitionsToTxnRequest(final AddPartitionsToTxnRequestData data, 
short version) {
 super(ApiKeys.ADD_PARTITIONS_TO_TXN, version);
 this.data = data;
+this.version = version;
 }
-
+
+// Only used for versions < 4
 public List partitions() {
 if (cachedPartitions != null) {
 return cachedPartitions;
 }
 cachedPartitions = Builder.getPartitions(data);
 return cachedPartitions;
 }
+
+private List partitionsForTransaction(String transaction) {

Review Comment:
   ?? are you saying the method isn't used? It's used on line 168



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-15 Thread via GitHub


gharris1727 commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1107628693


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -701,43 +745,151 @@ protected void produceMessages(EmbeddedConnectCluster 
cluster, String topicName,
 int cnt = 0;
 for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
 for (int p = 0; p < numPartitions; p++)
-cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+produce(cluster.kafka(), topicName, p, "key", "value-" + 
cnt++);
 }
-
+
+/**
+ * Produce a test record to a Kafka cluster.
+ * This method allows subclasses to configure and use their own Kafka 
Producer instead of using the built-in default.
+ * @param cluster   Kafka cluster that should receive the record
+ * @param topic Topic to send the record to, non-null
+ * @param partition Partition to send the record to, maybe null.
+ * @param key   Kafka key for the record
+ * @param value Kafka value for the record
+ */
+protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer 
partition, String key, String value) {
+cluster.produce(topic, partition, key, value);
+}
+
+protected static Map 
waitForCheckpointOnAllPartitions(
+MirrorClient client, String consumerGroupName, String 
remoteClusterAlias, String topicName
+) throws InterruptedException {
+AtomicReference> ret = new 
AtomicReference<>();
+waitForCondition(
+() -> {
+Map offsets = 
client.remoteConsumerOffsets(
+consumerGroupName, remoteClusterAlias, 
Duration.ofMillis(3000));
+for (int i = 0; i < NUM_PARTITIONS; i++) {
+if (!offsets.containsKey(new TopicPartition(topicName, 
i))) {
+log.info("Checkpoint is missing for {}: {}-{}", 
consumerGroupName, topicName, i);
+return false;
+}
+}
+ret.set(offsets);
+return true;
+},
+CHECKPOINT_DURATION_MS,
+String.format(
+"Offsets for consumer group %s not translated from %s 
for topic %s",
+consumerGroupName,
+remoteClusterAlias,
+topicName
+)
+);
+return ret.get();
+}
+
 /*
  * given consumer group, topics and expected number of records, make sure 
the consumer group
  * offsets are eventually synced to the expected offset numbers
  */
-protected static  void 
waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect,
-Consumer consumer, List topics, String 
consumerGroupId, int numRecords, boolean exactOffsetTranslation)
-throws InterruptedException {
+protected static  void waitForConsumerGroupFullSync(
+EmbeddedConnectCluster connect, List topics, String 
consumerGroupId, int numRecords, boolean exactOffsetTranslation
+) throws InterruptedException {
 try (Admin adminClient = connect.kafka().createAdminClient()) {
-List tps = new ArrayList<>(NUM_PARTITIONS * 
topics.size());
+Map tps = new HashMap<>(NUM_PARTITIONS 
* topics.size());
 for (int partitionIndex = 0; partitionIndex < NUM_PARTITIONS; 
partitionIndex++) {
 for (String topic : topics) {
-tps.add(new TopicPartition(topic, partitionIndex));
+tps.put(new TopicPartition(topic, partitionIndex), 
OffsetSpec.latest());
 }
 }
 long expectedTotalOffsets = numRecords * topics.size();
 
 waitForCondition(() -> {
 Map consumerGroupOffsets =
 
adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get();
-long consumerGroupOffsetTotal = 
consumerGroupOffsets.values().stream()
+long totalConsumerGroupOffsets = 
consumerGroupOffsets.values().stream()
 .mapToLong(OffsetAndMetadata::offset).sum();
 
-Map offsets = consumer.endOffsets(tps, 
CONSUMER_POLL_TIMEOUT_MS);
-long totalOffsets = offsets.values().stream().mapToLong(l -> 
l).sum();
-
+Map 
endOffsets =
+adminClient.listOffsets(tps).all().get();
+long totalEndOffsets = endOffsets.values().stream()
+
.mapToLong(ListOffsetsResult.ListOffsetsResultInfo::offset).sum();
+
+for (TopicPartition tp : endOffsets.keySet()) {
+if (consumerGroupOffsets.containsKey(tp)) {
+assertTrue(consumerGr

[jira] [Updated] (KAFKA-14274) Introduce FetchRequestManager

2023-02-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14274:
--
Description: 
The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
records from the brokers. There is ongoing work to create a new consumer 
implementation with a significantly refactored threading model. The threading 
refactor work requires a similarly refactored {{{}Fetcher{}}}.

This task is to introduce a new class named {{FetchRequestManager}} that will 
be responsible for:
 # Formatting fetch requests to the background thread
 # Configuring the callback on fetch responses for the background thread

The response handler will collect the fetch responses from the broker and 
create {{{}CompletedFetch{}}}, instances, much as is done in {{{}Fetcher{}}}. 
The newly introduced {{FetchUtils}} will be used for both 
{{FetchRequestManager}} and {{Fetcher}} to keep the logic as reusable as 
possible.

The foreground logic will decompress the data into a {{{}Record{}}}, which will 
then be deserialized into a {{ConsumerRecord}} for returning to the user.

  was:
The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
records from the brokers. There is ongoing work to create a new consumer 
implementation with a significantly refactored threading model. The threading 
refactor work requires a similarly refactored {{{}Fetcher{}}}.

The fetch request and fetch processing should happen asynchronously. More 
specifically, we have the background thread to send fetch requests autonomously 
and relay the response back to the foreground thread. The foreground thread 
collects these fetch responses in the form of {{{}CompletedFetch{}}}, 
decompresses the data, deserializes the data, and converts each {{Record}} into 
a {{ConsumerRecord}} for returning to the user.


> Introduce FetchRequestManager
> -
>
> Key: KAFKA-14274
> URL: https://issues.apache.org/jira/browse/KAFKA-14274
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>
> The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
> records from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored {{{}Fetcher{}}}.
> This task is to introduce a new class named {{FetchRequestManager}} that will 
> be responsible for:
>  # Formatting fetch requests to the background thread
>  # Configuring the callback on fetch responses for the background thread
> The response handler will collect the fetch responses from the broker and 
> create {{{}CompletedFetch{}}}, instances, much as is done in {{{}Fetcher{}}}. 
> The newly introduced {{FetchUtils}} will be used for both 
> {{FetchRequestManager}} and {{Fetcher}} to keep the logic as reusable as 
> possible.
> The foreground logic will decompress the data into a {{{}Record{}}}, which 
> will then be deserialized into a {{ConsumerRecord}} for returning to the user.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-15 Thread via GitHub


gharris1727 commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1107634292


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##
@@ -30,7 +30,7 @@ public class OffsetSyncStoreTest {
 static class FakeOffsetSyncStore extends OffsetSyncStore {
 
 FakeOffsetSyncStore() {
-super(null, null);
+super();

Review Comment:
   1. I've added a null-check to the real start() method which allows us to 
call the real start method on the fake offset store.
   2. Added
   3. I removed the assertion messages as they were just visual noise, the 
comments seemed easy enough to interpret.
   4. Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14274) Introduce FetchRequestManager to integrate fetch into new consumer threading refactor

2023-02-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14274:
--
Summary: Introduce FetchRequestManager to integrate fetch into new consumer 
threading refactor  (was: Introduce FetchRequestManager)

> Introduce FetchRequestManager to integrate fetch into new consumer threading 
> refactor
> -
>
> Key: KAFKA-14274
> URL: https://issues.apache.org/jira/browse/KAFKA-14274
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>
> The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
> records from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored {{{}Fetcher{}}}.
> This task is to introduce a new class named {{FetchRequestManager}} that will 
> be responsible for:
>  # Formatting fetch requests to the background thread
>  # Configuring the callback on fetch responses for the background thread
> The response handler will collect the fetch responses from the broker and 
> create {{{}CompletedFetch{}}}, instances, much as is done in {{{}Fetcher{}}}. 
> The newly introduced {{FetchUtils}} will be used for both 
> {{FetchRequestManager}} and {{Fetcher}} to keep the logic as reusable as 
> possible.
> The foreground logic will decompress the data into a {{{}Record{}}}, which 
> will then be deserialized into a {{ConsumerRecord}} for returning to the user.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14274) Introduce FetchRequestManager to integrate fetch into new consumer threading refactor

2023-02-15 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-14274:
--
Parent: (was: KAFKA-14246)
Issue Type: Improvement  (was: Sub-task)

> Introduce FetchRequestManager to integrate fetch into new consumer threading 
> refactor
> -
>
> Key: KAFKA-14274
> URL: https://issues.apache.org/jira/browse/KAFKA-14274
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Reporter: Philip Nee
>Assignee: Kirk True
>Priority: Major
>
> The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
> records from the brokers. There is ongoing work to create a new consumer 
> implementation with a significantly refactored threading model. The threading 
> refactor work requires a similarly refactored {{{}Fetcher{}}}.
> This task is to introduce a new class named {{FetchRequestManager}} that will 
> be responsible for:
>  # Formatting fetch requests to the background thread
>  # Configuring the callback on fetch responses for the background thread
> The response handler will collect the fetch responses from the broker and 
> create {{{}CompletedFetch{}}}, instances, much as is done in {{{}Fetcher{}}}. 
> The newly introduced {{FetchUtils}} will be used for both 
> {{FetchRequestManager}} and {{Fetcher}} to keep the logic as reusable as 
> possible.
> The foreground logic will decompress the data into a {{{}Record{}}}, which 
> will then be deserialized into a {{ConsumerRecord}} for returning to the user.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14724) Port tests in FetcherTest to FetchRequestManagerTest

2023-02-15 Thread Kirk True (Jira)
Kirk True created KAFKA-14724:
-

 Summary: Port tests in FetcherTest to FetchRequestManagerTest
 Key: KAFKA-14724
 URL: https://issues.apache.org/jira/browse/KAFKA-14724
 Project: Kafka
  Issue Type: Improvement
Reporter: Kirk True
Assignee: Kirk True


The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch 
records from the brokers. There is ongoing work to create a new consumer 
implementation with a significantly refactored threading model. The threading 
refactor work requires a similarly refactored {{{}Fetcher{}}}.

This task involves copying the relevant tests from {{FetcherTest}} and 
modifying them to fit a new unit test named {{{}FetchRequestManagerTest{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-02-15 Thread via GitHub


gharris1727 commented on code in PR #13178:
URL: https://github.com/apache/kafka/pull/13178#discussion_r1107647929


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##
@@ -578,7 +551,7 @@ public void testNoCheckpointsIfNoRecordsAreMirrored() 
throws InterruptedExceptio
 Map translatedOffsets = 
backupClient.remoteConsumerOffsets(
 consumerGroupName, PRIMARY_CLUSTER_ALIAS, 
Duration.ofSeconds(30L));
 return translatedOffsets.containsKey(remoteTopicPartition(tp1, 
PRIMARY_CLUSTER_ALIAS)) &&
-   translatedOffsets.containsKey(remoteTopicPartition(tp2, 
PRIMARY_CLUSTER_ALIAS));
+   !translatedOffsets.containsKey(remoteTopicPartition(tp2, 
PRIMARY_CLUSTER_ALIAS));

Review Comment:
   I added a read-to-end-and-commit-offsets that makes this topic able to be 
checkpointed, and reverts this change to the wait condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-15 Thread via GitHub


jeffkbkim commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1107649386


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -101,15 +130,61 @@ public String toString() {
 public AddPartitionsToTxnRequest(final AddPartitionsToTxnRequestData data, 
short version) {
 super(ApiKeys.ADD_PARTITIONS_TO_TXN, version);
 this.data = data;
+this.version = version;
 }
-
+
+// Only used for versions < 4
 public List partitions() {
 if (cachedPartitions != null) {
 return cachedPartitions;
 }
 cachedPartitions = Builder.getPartitions(data);
 return cachedPartitions;
 }
+
+private List partitionsForTransaction(String transaction) {

Review Comment:
   i'm saying the method can be of type void, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-15 Thread via GitHub


jeffkbkim commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1107649386


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -101,15 +130,61 @@ public String toString() {
 public AddPartitionsToTxnRequest(final AddPartitionsToTxnRequestData data, 
short version) {
 super(ApiKeys.ADD_PARTITIONS_TO_TXN, version);
 this.data = data;
+this.version = version;
 }
-
+
+// Only used for versions < 4
 public List partitions() {
 if (cachedPartitions != null) {
 return cachedPartitions;
 }
 cachedPartitions = Builder.getPartitions(data);
 return cachedPartitions;
 }
+
+private List partitionsForTransaction(String transaction) {

Review Comment:
   the method can be of type void, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-15 Thread via GitHub


hachikuji commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1107606857


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -35,21 +44,43 @@ public class AddPartitionsToTxnRequest extends 
AbstractRequest {
 private final AddPartitionsToTxnRequestData data;
 
 private List cachedPartitions = null;
+
+private Map> cachedPartitionsByTransaction = 
null;
+
+private final short version;
 
 public static class Builder extends 
AbstractRequest.Builder {
 public final AddPartitionsToTxnRequestData data;
+public final boolean isClientRequest;
 
-public Builder(final AddPartitionsToTxnRequestData data) {
+// Only used for versions < 4
+public Builder(String transactionalId,
+   long producerId,
+   short producerEpoch,
+   List partitions) {
 super(ApiKeys.ADD_PARTITIONS_TO_TXN);
-this.data = data;
+this.isClientRequest = true;
+
+AddPartitionsToTxnTopicCollection topics = 
compileTopics(partitions);
+
+this.data = new AddPartitionsToTxnRequestData()
+.setTransactionalId(transactionalId)
+.setProducerId(producerId)
+.setProducerEpoch(producerEpoch)
+.setTopics(topics);
 }
 
-public Builder(final String transactionalId,
-   final long producerId,
-   final short producerEpoch,
-   final List partitions) {
+public Builder(AddPartitionsToTxnTransactionCollection transactions,
+   boolean verifyOnly) {
 super(ApiKeys.ADD_PARTITIONS_TO_TXN);
+this.isClientRequest = false;
 
+this.data = new AddPartitionsToTxnRequestData()
+.setTransactions(transactions)
+.setVerifyOnly(verifyOnly);
+}
+
+private AddPartitionsToTxnTopicCollection compileTopics(final 
List partitions) {

Review Comment:
   nit: how about `buildTxnTopicCollection`?



##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -35,21 +44,43 @@ public class AddPartitionsToTxnRequest extends 
AbstractRequest {
 private final AddPartitionsToTxnRequestData data;
 
 private List cachedPartitions = null;
+
+private Map> cachedPartitionsByTransaction = 
null;
+
+private final short version;
 
 public static class Builder extends 
AbstractRequest.Builder {
 public final AddPartitionsToTxnRequestData data;
+public final boolean isClientRequest;
 
-public Builder(final AddPartitionsToTxnRequestData data) {
+// Only used for versions < 4
+public Builder(String transactionalId,
+   long producerId,
+   short producerEpoch,
+   List partitions) {
 super(ApiKeys.ADD_PARTITIONS_TO_TXN);
-this.data = data;
+this.isClientRequest = true;
+
+AddPartitionsToTxnTopicCollection topics = 
compileTopics(partitions);
+
+this.data = new AddPartitionsToTxnRequestData()
+.setTransactionalId(transactionalId)

Review Comment:
   Does it make sense to set verifyOnly to false explicitly here?



##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -66,24 +97,22 @@ public Builder(final String transactionalId,
 AddPartitionsToTxnTopicCollection topics = new 
AddPartitionsToTxnTopicCollection();
 for (Map.Entry> partitionEntry : 
partitionMap.entrySet()) {
 topics.add(new AddPartitionsToTxnTopic()
-   .setName(partitionEntry.getKey())
-   .setPartitions(partitionEntry.getValue()));
+.setName(partitionEntry.getKey())
+.setPartitions(partitionEntry.getValue()));
 }
-
-this.data = new AddPartitionsToTxnRequestData()
-.setTransactionalId(transactionalId)
-.setProducerId(producerId)
-.setProducerEpoch(producerEpoch)
-.setTopics(topics);
+return topics;
 }
 
 @Override
 public AddPartitionsToTxnRequest build(short version) {
-return new AddPartitionsToTxnRequest(data, version);
+short clampedVersion = (isClientRequest && version > 3) ? 3 : 
version;

Review Comment:
   It's a little strange to ignore the version. I think another way to do this 
is to set the `latestAllowedVersion` to 3 in the client builder. That will 
ensure that the client does not try to use a higher version even if the broker 
su

[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-15 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1107662160


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -101,15 +130,61 @@ public String toString() {
 public AddPartitionsToTxnRequest(final AddPartitionsToTxnRequestData data, 
short version) {
 super(ApiKeys.ADD_PARTITIONS_TO_TXN, version);
 this.data = data;
+this.version = version;
 }
-
+
+// Only used for versions < 4
 public List partitions() {
 if (cachedPartitions != null) {
 return cachedPartitions;
 }
 cachedPartitions = Builder.getPartitions(data);
 return cachedPartitions;
 }
+
+private List partitionsForTransaction(String transaction) {

Review Comment:
   not as it is written. We return on line 150. I originally thought this could 
be useful if we just wanted the list of the partitions. but since it is private 
I can change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests

2023-02-15 Thread via GitHub


jolshan commented on code in PR #13231:
URL: https://github.com/apache/kafka/pull/13231#discussion_r1107662689


##
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java:
##
@@ -66,24 +97,22 @@ public Builder(final String transactionalId,
 AddPartitionsToTxnTopicCollection topics = new 
AddPartitionsToTxnTopicCollection();
 for (Map.Entry> partitionEntry : 
partitionMap.entrySet()) {
 topics.add(new AddPartitionsToTxnTopic()
-   .setName(partitionEntry.getKey())
-   .setPartitions(partitionEntry.getValue()));
+.setName(partitionEntry.getKey())
+.setPartitions(partitionEntry.getValue()));
 }
-
-this.data = new AddPartitionsToTxnRequestData()
-.setTransactionalId(transactionalId)
-.setProducerId(producerId)
-.setProducerEpoch(producerEpoch)
-.setTopics(topics);
+return topics;
 }
 
 @Override
 public AddPartitionsToTxnRequest build(short version) {
-return new AddPartitionsToTxnRequest(data, version);
+short clampedVersion = (isClientRequest && version > 3) ? 3 : 
version;

Review Comment:
   Hmm I didn't see such an option in the builder but maybe I'm missing 
something. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

2023-02-15 Thread via GitHub


gharris1727 commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1107664475


##
connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java:
##
@@ -135,38 +135,40 @@ public void testFlushFailureReplacesOffsets() throws 
Exception {
 // First time the write fails
 expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, true, null);
 writer.offset(OFFSET_KEY, OFFSET_VALUE);
-assertTrue(writer.beginFlush());
+assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
 writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
 verify(callback).onCompletion(eq(EXCEPTION), isNull());
 
 // Second time it succeeds
 expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, false, null);
-assertTrue(writer.beginFlush());
+assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
 writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
 verify(callback).onCompletion(isNull(), isNull());
 
 // Third time it has no data to flush so we won't get past beginFlush()
-assertFalse(writer.beginFlush());
+assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));
 }
 
 @Test
-public void testAlreadyFlushing() {
+public void testAlreadyFlushing() throws InterruptedException, 
TimeoutException {
 @SuppressWarnings("unchecked")
 final Callback callback = mock(Callback.class);
 // Trigger the send, but don't invoke the callback so we'll still be 
mid-flush
 CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
 expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, 
OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown);
 
 writer.offset(OFFSET_KEY, OFFSET_VALUE);
-assertTrue(writer.beginFlush());
+assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS));

Review Comment:
   Oh i understand now, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush

2023-02-15 Thread via GitHub


gharris1727 commented on code in PR #13208:
URL: https://github.com/apache/kafka/pull/13208#discussion_r1107669147


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java:
##
@@ -100,23 +104,45 @@ private boolean flushing() {
 
 /**
  * Performs the first step of a flush operation, snapshotting the current 
state. This does not
- * actually initiate the flush with the underlying storage.
+ * actually initiate the flush with the underlying storage. Ensures that 
any previous flush operations
+ * have finished before beginning a new flush.
  *
  * @return true if a flush was initiated, false if no data was available
+ * @throws ConnectException if the previous flush is not complete before 
this method is called
  */
-public synchronized boolean beginFlush() {
-if (flushing()) {
-log.error("Invalid call to OffsetStorageWriter flush() while 
already flushing, the "
+public boolean beginFlush() {
+try {
+return beginFlush(0, TimeUnit.NANOSECONDS);
+} catch (InterruptedException | TimeoutException e) {
+log.error("Invalid call to OffsetStorageWriter beginFlush() while 
already flushing, the "
 + "framework should not allow this");
 throw new ConnectException("OffsetStorageWriter is already 
flushing");
 }
+}
 
-if (data.isEmpty())
-return false;
-
-toFlush = data;
-data = new HashMap<>();
-return true;
+/**
+ * Performs the first step of a flush operation, snapshotting the current 
state. This does not
+ * actually initiate the flush with the underlying storage. Ensures that 
any previous flush operations
+ * have finished before beginning a new flush.
+ *
+ * @param timeout A maximum duration to wait for previous flushes to 
finish before giving up on waiting
+ * @param timeUnit Units of the timeout argument
+ * @return true if a flush was initiated, false if no data was available
+ * @throws InterruptedException if this thread was interrupted while 
waiting for the previous flush to complete
+ * @throws TimeoutException if the {@code timeout} elapses before previous 
flushes are complete.
+ */
+public boolean beginFlush(long timeout, TimeUnit timeUnit) throws 
InterruptedException, TimeoutException {
+if (flushInProgress.tryAcquire(Math.max(0, timeout), timeUnit)) {
+synchronized (this) {
+if (data.isEmpty())
+return false;

Review Comment:
   Oh wow that's pretty serious, I added a unit test that targets this release.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

2023-02-15 Thread via GitHub


vcrfxia commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1107672610


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -283,7 +293,12 @@ public void init(final StateStoreContext context, final 
StateStore root) {
 
 // VisibleForTesting
 void restoreBatch(final Collection> 
records) {
-// advance stream time to the max timestamp in the batch
+// copy the observed stream time, for use in deciding whether to drop 
records during restore,
+// when records have exceeded the store's grace period.
+long streamTimeForRestore = observedStreamTime;

Review Comment:
   > Are you proposing that doPut() takes stream time as a parameter, so that 
during normal put() operation we pass observedStreamTime and during restore we 
pass endOfBatchStreamTime, which means we can rename streamTimeForRestore to be 
observedStreamTime instead?
   
   Went ahead and made this update in the latest commit. Can revise if it's not 
what you had envisioned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

2023-02-15 Thread via GitHub


vcrfxia commented on PR #13243:
URL: https://github.com/apache/kafka/pull/13243#issuecomment-1431970157

   > One more thought: should we add verification about the 
"droppedRecordSensor" into all unit tests that drop records?
   
   Included this test update in the latest commit. I believe I've 
addressed/responded to all outstanding comments with the latest commit.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest

2023-02-15 Thread via GitHub


C0urante commented on code in PR #13191:
URL: https://github.com/apache/kafka/pull/13191#discussion_r1107510674


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##
@@ -235,115 +236,100 @@ public void testMetricsGroup() {
 public void testSendRecordsConvertsData() {
 createWorkerTask();
 
-List records = new ArrayList<>();
 // Can just use the same record for key and value
-records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD));
-
-Capture> sent = 
expectSendRecordAnyTimes();
+List records = Collections.singletonList(
+new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, 
KEY, RECORD_SCHEMA, RECORD)
+);
 
+expectSendRecord(emptyHeaders());
 expectTopicCreation(TOPIC);
 
-PowerMock.replayAll();
-
 workerTask.toSend = records;
 workerTask.sendRecords();
+
+ArgumentCaptor> sent = 
verifySendRecord();
+
 assertEquals(SERIALIZED_KEY, sent.getValue().key());
 assertEquals(SERIALIZED_RECORD, sent.getValue().value());
 
-PowerMock.verifyAll();
+verifyTaskGetTopic();
 }
 
 @Test
 public void testSendRecordsPropagatesTimestamp() {
 final Long timestamp = System.currentTimeMillis();
-
 createWorkerTask();
 
-List records = Collections.singletonList(
-new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, 
KEY, RECORD_SCHEMA, RECORD, timestamp)
-);
-
-Capture> sent = 
expectSendRecordAnyTimes();
-
+expectSendRecord(emptyHeaders());
 expectTopicCreation(TOPIC);
 
-PowerMock.replayAll();
-
-workerTask.toSend = records;
+workerTask.toSend = Collections.singletonList(
+new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, 
KEY, RECORD_SCHEMA, RECORD, timestamp)
+);
 workerTask.sendRecords();
+
+ArgumentCaptor> sent = 
verifySendRecord();
 assertEquals(timestamp, sent.getValue().timestamp());
 
-PowerMock.verifyAll();
+verifyTaskGetTopic();
 }
 
 @Test
 public void testSendRecordsCorruptTimestamp() {
 final Long timestamp = -3L;
 createWorkerTask();
 
-List records = Collections.singletonList(
+expectSendRecord(emptyHeaders());
+expectTopicCreation(TOPIC);

Review Comment:
   Why is this added? We're testing a scenario where the task fails on an 
invalid record timestamp, it should never get to the point of attempting to 
create a topic.



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java:
##
@@ -639,144 +644,112 @@ public void 
testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() {
 SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, 
KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD);
 
-expectPreliminaryCalls();
-
EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap());
+expectSendRecord(emptyHeaders());
 
-Capture newTopicCapture = EasyMock.newCapture();
-
EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC));
-
-expectSendRecord();
-expectSendRecord();
-
-PowerMock.replayAll();
+when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap());
+
when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC));
 
 workerTask.toSend = Arrays.asList(record1, record2);
 workerTask.sendRecords();
+
+ArgumentCaptor> sent = 
verifySendRecord(2);
+
+List> capturedValues = 
sent.getAllValues();
+assertEquals(2, capturedValues.size());
 }
 
-private Capture> expectSendRecord(
-String topic,
-boolean anyTimes,
-Headers headers
-) {
+private void expectSendRecord(Headers headers) {
 if (headers != null)
-expectConvertHeadersAndKeyValue(topic, anyTimes, headers);
+expectConvertHeadersAndKeyValue(headers);
 
-expectApplyTransformationChain(anyTimes);
+expectApplyTransformationChain();
 
-Capture> sent = EasyMock.newCapture();
-
-IExpectationSetters> expect = EasyMock.expect(
-producer.send(EasyMock.capture(sent), 
EasyMock.capture(producerCallbacks)));
+expectTaskGetTopic();
+}
 
-IAnswer> expectResponse = () -> {
-synchronized (producerCallbacks) {
-for (Callback cb : producerCallbacks.getValues()) {
-cb.onCompletion(new RecordMetadata(new 
TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null);
-  

[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores

2023-02-15 Thread via GitHub


vcrfxia commented on code in PR #13252:
URL: https://github.com/apache/kafka/pull/13252#discussion_r1107700533


##
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java:
##
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+import java.util.Objects;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.VersionedBytesStore;
+import org.apache.kafka.streams.state.VersionedKeyValueStore;
+import org.apache.kafka.streams.state.VersionedRecord;
+
+/**
+ * A metered {@link VersionedKeyValueStore} wrapper that is used for recording 
operation
+ * metrics, and hence its inner {@link VersionedBytesStore} implementation 
does not need to provide
+ * its own metrics collecting functionality. The inner {@code 
VersionedBytesStore} of this class
+ * is a {@link KeyValueStore} of type , so we use {@link 
Serde}s
+ * to convert from > to 
. In particular,
+ * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to 
a versioned key-value
+ * store requires putting a null value associated with a timestamp.
+ *
+ * @param  The key type
+ * @param  The (raw) value type
+ */
+public class MeteredVersionedKeyValueStore
+extends WrappedStateStore
+implements VersionedKeyValueStore {
+
+private final MeteredVersionedKeyValueStoreInternal internal;
+
+MeteredVersionedKeyValueStore(final VersionedBytesStore inner,
+  final String metricScope,
+  final Time time,
+  final Serde keySerde,
+  final Serde> 
valueSerde) {
+super(inner);
+internal = new MeteredVersionedKeyValueStoreInternal(inner, 
metricScope, time, keySerde, valueSerde);
+}
+
+/**
+ * Private helper class which represents the functionality of a {@link 
VersionedKeyValueStore}
+ * as a {@link TimestampedKeyValueStore} so that the bulk of the metering 
logic may be
+ * inherited from {@link MeteredKeyValueStore}. As a result, the 
implementation of
+ * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate 
from this
+ * {@link TimestampedKeyValueStore} representation of a versioned 
key-value store into the
+ * {@link VersionedKeyValueStore} interface itself.
+ */
+private class MeteredVersionedKeyValueStoreInternal
+extends MeteredKeyValueStore>
+implements TimestampedKeyValueStore {
+
+private final VersionedBytesStore inner;
+
+MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner,
+  final String metricScope,
+  final Time time,
+  final Serde keySerde,
+  final 
Serde> valueSerde) {
+super(inner, metricScope, time, keySerde, valueSerde);
+this.inner = inner;
+}
+
+@Override
+public void put(final K key, final ValueAndTimestamp value) {
+super.put(
+key,
+// versioned stores req

[GitHub] [kafka] mjsax commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

2023-02-15 Thread via GitHub


mjsax commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1107704159


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -283,7 +293,12 @@ public void init(final StateStoreContext context, final 
StateStore root) {
 
 // VisibleForTesting
 void restoreBatch(final Collection> 
records) {
-// advance stream time to the max timestamp in the batch
+// copy the observed stream time, for use in deciding whether to drop 
records during restore,
+// when records have exceeded the store's grace period.
+long streamTimeForRestore = observedStreamTime;

Review Comment:
   Did not have a concrete proposal. Should be fine I guess.
   
   Currently, `streamTime` is tracked per task (based on input records over all 
partitions). And yes, there is all kind of tricky things that you call out. 
Even if we have a filter() downstream processors see only a subset of data and 
their "internal stream-time (if they have any)" could be different (ie 
lagging). Caching has a similar effect.
   
   There is a proposal to let KS track streamTime per processor, too.
   
   Bottom line: it's complicated and need proper design and a KIP by itself...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

2023-02-15 Thread via GitHub


mjsax commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1107705172


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##
@@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() {
 verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
 }
 
+@Test
+public void shouldNotRestoreExpired() {
+final List records = new ArrayList<>();
+records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - 
GRACE_PERIOD)); // grace period has not elapsed
+records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - 
GRACE_PERIOD)); // grace period has elapsed, so this record should not be 
restored
+
+store.restoreBatch(getChangelogRecords(records));
+
+verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+verifyGetValueFromStore("k1", "v1", HISTORY_RETENTION + 10 - 
GRACE_PERIOD);
+verifyGetNullFromStore("k2");
+}
+
+@Test
+public void shouldRestoreEvenIfRecordWouldBeExpiredByEndOfBatch() {
+final List records = new ArrayList<>();
+records.add(new DataRecord("k2", "v2", HISTORY_RETENTION - 
GRACE_PERIOD)); // this record will be older than grace period by the end of 
the batch, but should still be restored
+records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+
+store.restoreBatch(getChangelogRecords(records));
+
+verifyGetValueFromStore("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD);
+verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+}
+
+@Test
+public void shouldAllowZeroHistoryRetention() {
+// recreate store with zero history retention
+store.close();
+store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 0L, 
SEGMENT_INTERVAL);
+store.init((StateStoreContext) context, store);
+
+// put, get, and delete
+putToStore("k", "v", BASE_TIMESTAMP);
+verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", 
BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v", 
BASE_TIMESTAMP); // query in "future" is allowed
+
+// update existing record at same timestamp
+putToStore("k", "updated", BASE_TIMESTAMP);
+verifyGetValueFromStore("k", "updated", BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "updated", 
BASE_TIMESTAMP);
+
+// put new record version
+putToStore("k", "v2", BASE_TIMESTAMP + 2);
+verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 2);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", 
BASE_TIMESTAMP + 2);
+
+// query in past (history retention expired) returns null
+verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1);
+
+// put in past (grace period expired) does not update the store

Review Comment:
   Was just an idea. Not a big deal to not have the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

2023-02-15 Thread via GitHub


mjsax commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1107707006


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##
@@ -458,8 +460,22 @@ public void writeLatestValues(final WriteBatch batch) 
throws RocksDBException {
 }
 }
 
+/**
+ * Helper method shared between put and restore.
+ * 
+ * This method does not check whether the record being put is expired 
based on grace period
+ * or not; that is the caller's responsibility. This method does, however, 
check whether the
+ * record is expired based on history retention, by using the current
+ * {@code observedStreamTime}, and returns without inserting into the 
store if so. It can be
+ * possible that a record is not expired based on grace period but is 
expired based on
+ * history retention, even though history retention is always at least the 
grace period,
+ * during restore because restore advances {@code observedStreamTime} to 
the largest timestamp
+ * in the entire restore batch at the beginning of restore, in order to 
optimize for not
+ * putting records into the store which will have expired by the end of 
the restore.

Review Comment:
   Thanks for adding this! Great addition!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores

2023-02-15 Thread via GitHub


mjsax commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1107711967


##
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##
@@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() {
 verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
 }
 
+@Test
+public void shouldNotRestoreExpired() {
+final List records = new ArrayList<>();
+records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - 
GRACE_PERIOD)); // grace period has not elapsed
+records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - 
GRACE_PERIOD)); // grace period has elapsed, so this record should not be 
restored
+
+store.restoreBatch(getChangelogRecords(records));
+
+verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+verifyGetValueFromStore("k1", "v1", HISTORY_RETENTION + 10 - 
GRACE_PERIOD);
+verifyGetNullFromStore("k2");
+}
+
+@Test
+public void shouldRestoreEvenIfRecordWouldBeExpiredByEndOfBatch() {
+final List records = new ArrayList<>();
+records.add(new DataRecord("k2", "v2", HISTORY_RETENTION - 
GRACE_PERIOD)); // this record will be older than grace period by the end of 
the batch, but should still be restored
+records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+
+store.restoreBatch(getChangelogRecords(records));
+
+verifyGetValueFromStore("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD);
+verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+}
+
+@Test
+public void shouldAllowZeroHistoryRetention() {
+// recreate store with zero history retention
+store.close();
+store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 0L, 
SEGMENT_INTERVAL);
+store.init((StateStoreContext) context, store);
+
+// put, get, and delete
+putToStore("k", "v", BASE_TIMESTAMP);
+verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", 
BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v", 
BASE_TIMESTAMP); // query in "future" is allowed
+
+// update existing record at same timestamp
+putToStore("k", "updated", BASE_TIMESTAMP);
+verifyGetValueFromStore("k", "updated", BASE_TIMESTAMP);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "updated", 
BASE_TIMESTAMP);
+
+// put new record version
+putToStore("k", "v2", BASE_TIMESTAMP + 2);
+verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 2);
+verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", 
BASE_TIMESTAMP + 2);
+
+// query in past (history retention expired) returns null
+verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1);
+
+// put in past (grace period expired) does not update the store

Review Comment:
   Just see you added the test. Does not hurt to keep it. (We should not write 
test base on knowing how the implemenation works, but rather treat it as a 
"black box").



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >