[GitHub] [kafka] omkreddy commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
omkreddy commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1106751131 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3296,17 +3296,23 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleDescribeUserScramCredentialsRequest(request: RequestChannel.Request): Unit = { Review Comment: As I mentioned above, ControllerAPI also should support DescribeUserScramCredentialsRequest API ## core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala: ## @@ -260,11 +271,13 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest { .setSalt(saltBytes) .setSaltedPassword(saltedPasswordBytes), ))).build() -val results1 = sendAlterUserScramCredentialsRequest(request1).data.results -assertEquals(2, results1.size) -checkNoErrorsAlteringCredentials(results1) -checkUserAppearsInAlterResults(results1, user1) -checkUserAppearsInAlterResults(results1, user2) +val results1_1 = sendAlterUserScramCredentialsRequest(request1_1).data.results +assertEquals(2, results1_1.size) +checkNoErrorsAlteringCredentials(results1_1) +checkUserAppearsInAlterResults(results1_1, user1) +checkUserAppearsInAlterResults(results1_1, user2) + +Thread.sleep(1) Review Comment: we generally use 'TestUtils.waitForCondition` in tests ## metadata/src/main/java/org/apache/kafka/image/ScramImage.java: ## @@ -50,6 +58,63 @@ public void write(ImageWriter writer, ImageWriterOptions options) { } } +public DescribeUserScramCredentialsResponseData describe(DescribeUserScramCredentialsRequestData request) { + +List users = request.users(); +Map uniqueUsers = new HashMap(); + +if ((users == null) || (users.size() == 0)) { +System.out.println("Describe : get all the users"); +// If there are no users listed then get all the users +for (Map scramCredentialDataSet : mechanisms.values()) { +for (String user : scramCredentialDataSet.keySet()) { +uniqueUsers.put(user, false); +} +} +} else { +// Filter out duplicates +for (UserName user : users) { +if (uniqueUsers.containsKey(user.name())) { +uniqueUsers.put(user.name(), true); +} else { +uniqueUsers.put(user.name(), false); +} +} +} + +DescribeUserScramCredentialsResponseData retval = new DescribeUserScramCredentialsResponseData(); + +for (Map.Entry user : uniqueUsers.entrySet()) { +DescribeUserScramCredentialsResult result = + new DescribeUserScramCredentialsResult().setUser(user.getKey()); + +if (user.getValue() == false) { +List credentialInfos = new ArrayList(); + +boolean datafound = false; +for (Map.Entry> mechanismsEntry : mechanisms.entrySet()) { +Map credentialDataSet = mechanismsEntry.getValue(); +if (credentialDataSet.containsKey(user.getKey())) { +credentialInfos.add(new CredentialInfo().setMechanism(mechanismsEntry.getKey().type()) + .setIterations(credentialDataSet.get(user.getKey()).iterations())); +datafound = true; +} +} +if (datafound) { +result.setCredentialInfos(credentialInfos); +} else { +result.setErrorCode(Errors.RESOURCE_NOT_FOUND.code()) + .setErrorMessage("attemptToDescribeUserThatDoesNotExist: " + user.getKey()); Review Comment: Can we use same error message as https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ZkAdminManager.scala#L802 ## metadata/src/main/java/org/apache/kafka/image/ScramImage.java: ## @@ -50,6 +58,63 @@ public void write(ImageWriter writer, ImageWriterOptions options) { } } +public DescribeUserScramCredentialsResponseData describe(DescribeUserScramCredentialsRequestData request) { + +List users = request.users(); +Map uniqueUsers = new HashMap(); + +if ((users == null) || (users.size() == 0)) { +System.out.println("Describe : get all the users"); +// If there are no users listed then get all the users +for (Map scramCredentialDataSet : mechanisms.values()) { +for (String user : scramCredentialDataSet.keySet()) { +uniqueUsers.put(user, false); +} +} +} else { +// Filter out duplicates +for (UserName user : users) { +
[GitHub] [kafka] omkreddy commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
omkreddy commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1106755866 ## core/src/test/scala/unit/kafka/server/AlterUserScramCredentialsRequestTest.scala: ## @@ -260,11 +271,13 @@ class AlterUserScramCredentialsRequestTest extends BaseRequestTest { .setSalt(saltBytes) .setSaltedPassword(saltedPasswordBytes), ))).build() -val results1 = sendAlterUserScramCredentialsRequest(request1).data.results -assertEquals(2, results1.size) -checkNoErrorsAlteringCredentials(results1) -checkUserAppearsInAlterResults(results1, user1) -checkUserAppearsInAlterResults(results1, user2) +val results1_1 = sendAlterUserScramCredentialsRequest(request1_1).data.results +assertEquals(2, results1_1.size) +checkNoErrorsAlteringCredentials(results1_1) +checkUserAppearsInAlterResults(results1_1, user1) +checkUserAppearsInAlterResults(results1_1, user2) + +Thread.sleep(1) Review Comment: we generally use 'TestUtils.waitForCondition` in tests to wait for condition -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14713) Kafka Streams global table startup takes too long
[ https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17688945#comment-17688945 ] Tamas commented on KAFKA-14713: --- Hi [~mjsax] looks similar, but not exactly. They see this issue with exactly_once_beta processing guarantee, while we have it with the default at_least_once. For us the important part is that is issue is fixed as soon as possible, because right now I am between a rock and a hard place because of it. > Kafka Streams global table startup takes too long > - > > Key: KAFKA-14713 > URL: https://issues.apache.org/jira/browse/KAFKA-14713 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Tamas >Priority: Critical > > *Some context first* > We have a spring based kafka streams application. This application is > listening to two topics. Let's call them apartment and visitor. The > apartments are stored in a global table, while the visitors are in the stream > we are processing, and at one point we are joining the visitor stream > together with the apartment table. In our test environment, both topics > contain 10 partitions. > *Issue* > At first deployment, everything goes fine, the global table is built and all > entries in the stream are processed. > After everything is finished, we shut down the application, restart it and > send out a new set of visitors. The application seemingly does not respond. > After some more debugging it turned out that it simply takes 5 minutes to > start up, because the global table takes 30 seconds (default value for the > global request timeout) to accept that there are no messages in the apartment > topics, for each and every partition. If we send out the list of apartments > as new messages, the application starts up immediately. > To make matters worse, we have clients with 96 partitions, where the startup > time would be 48 minutes. Not having messages in the topics between > application shutdown and restart is a valid use case, so this is quite a big > problem. > *Possible workarounds* > We could reduce the request timeout, but since this value is not specific for > the global table initialization, but a global request timeout for a lot of > things, we do not know what else it will affect, so we are not very keen on > doing that. Even then, it would mean a 1.5 minute delay for this particular > client (more if we will have other use cases in the future where we will need > to use more global tables), which is far too much, considering that the > application would be able to otherwise start in about 20 seconds. > *Potential solutions we see* > # Introduce a specific global table initialization timeout in > GlobalStateManagerImpl. Then we would be able to safely modify that value > without fear of making some other part of kafka unstable. > # Parallelize the initialization of the global table partitions in > GlobalStateManagerImpl: knowing that the delay at startup is constant instead > of linear with the number of partitions would be a huge help. > # As long as we receive a response, accept the empty map in the > KafkaConsumer, and continue instead of going into a busy-waiting loop. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14713) Kafka Streams global table startup takes too long
[ https://issues.apache.org/jira/browse/KAFKA-14713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17688945#comment-17688945 ] Tamas edited comment on KAFKA-14713 at 2/15/23 8:52 AM: Hi [~mjsax] looks similar, but not exactly. They see this issue with exactly_once_beta processing guarantee, while we have it with the default at_least_once. For us the important part is that is issue is fixed (or at least a safe workaround is provided) as soon as possible, because right now I am between a rock and a hard place because of it. was (Author: JIRAUSER298942): Hi [~mjsax] looks similar, but not exactly. They see this issue with exactly_once_beta processing guarantee, while we have it with the default at_least_once. For us the important part is that is issue is fixed as soon as possible, because right now I am between a rock and a hard place because of it. > Kafka Streams global table startup takes too long > - > > Key: KAFKA-14713 > URL: https://issues.apache.org/jira/browse/KAFKA-14713 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Tamas >Priority: Critical > > *Some context first* > We have a spring based kafka streams application. This application is > listening to two topics. Let's call them apartment and visitor. The > apartments are stored in a global table, while the visitors are in the stream > we are processing, and at one point we are joining the visitor stream > together with the apartment table. In our test environment, both topics > contain 10 partitions. > *Issue* > At first deployment, everything goes fine, the global table is built and all > entries in the stream are processed. > After everything is finished, we shut down the application, restart it and > send out a new set of visitors. The application seemingly does not respond. > After some more debugging it turned out that it simply takes 5 minutes to > start up, because the global table takes 30 seconds (default value for the > global request timeout) to accept that there are no messages in the apartment > topics, for each and every partition. If we send out the list of apartments > as new messages, the application starts up immediately. > To make matters worse, we have clients with 96 partitions, where the startup > time would be 48 minutes. Not having messages in the topics between > application shutdown and restart is a valid use case, so this is quite a big > problem. > *Possible workarounds* > We could reduce the request timeout, but since this value is not specific for > the global table initialization, but a global request timeout for a lot of > things, we do not know what else it will affect, so we are not very keen on > doing that. Even then, it would mean a 1.5 minute delay for this particular > client (more if we will have other use cases in the future where we will need > to use more global tables), which is far too much, considering that the > application would be able to otherwise start in about 20 seconds. > *Potential solutions we see* > # Introduce a specific global table initialization timeout in > GlobalStateManagerImpl. Then we would be able to safely modify that value > without fear of making some other part of kafka unstable. > # Parallelize the initialization of the global table partitions in > GlobalStateManagerImpl: knowing that the delay at startup is constant instead > of linear with the number of partitions would be a huge help. > # As long as we receive a response, accept the empty map in the > KafkaConsumer, and continue instead of going into a busy-waiting loop. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14704) Follower should truncate before incrementing high watermark
[ https://issues.apache.org/jira/browse/KAFKA-14704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-14704: Fix Version/s: 2.8.3 3.2.4 3.1.3 3.0.3 > Follower should truncate before incrementing high watermark > --- > > Key: KAFKA-14704 > URL: https://issues.apache.org/jira/browse/KAFKA-14704 > Project: Kafka > Issue Type: Bug >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > Fix For: 3.2.4, 3.1.3, 3.0.3, 3.5.0, 3.4.1, 3.3.3, 2.8.3 > > > When a leader becomes a follower, it is likely that it has uncommitted > records in its log. When it reaches out to the leader, the leader will detect > that they have diverged and it will return the diverging epoch and offset. > The follower truncates it log based on this. > There is a small caveat in this process. When the leader return the diverging > epoch and offset, it also includes its high watermark, low watermark, start > offset and end offset. The current code in the `AbstractFetcherThread` works > as follow. First it process the partition data and then it checks whether > there is a diverging epoch/offset. The former may accidentally expose > uncommitted records as this step updates the local watermark to whatever is > received from the leader. As the follower, or the former leader, may have > uncommitted records, it will be able to updated the high watermark to a > larger offset if the leader has a higher watermark than the current local > one. This result in exposing uncommitted records until the log is finally > truncated. The time window is short but a fetch requests coming at the right > time to the follower could read those records. This is especially true for > clients out there which uses recent versions of the fetch request but without > implementing KIP-320. > When this happens, the follower logs the following message: `Non-monotonic > update of high watermark from (offset=21437 segment=[20998:98390]) to > (offset=21434 segment=[20998:97843])`. > This patch proposes to mitigate the issue by starting by checking on whether > a diverging epoch/offset is provided by the leader and skip processing the > partition data if it is. This basically means that the first fetch request > will result in truncating the log and a subsequent fetch request will update > the log/high watermarks. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd opened a new pull request, #13255: KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module.
satishd opened a new pull request, #13255: URL: https://github.com/apache/kafka/pull/13255 KAFKA 14714: Move/Rewrite RollParams, LogAppendInfo, and LeaderHwChange to storage module. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13158: KAFKA-14647: Moving TopicFilter to server-common/utils
vamossagar12 commented on PR #13158: URL: https://github.com/apache/kafka/pull/13158#issuecomment-1430997152 > Hi @vamossagar12 this looks good, but I still think we should move `GetOffsetShellParsingTest` to `TopicPartitionFilterTest` removing any reference to `GetOffsetShell` (there is already a test for that) and create a new `PartitionFilterTest`, which should be similar to `TopicFilterTest`. Thanks @fvaleri . I tried doing this today. A problem that I see is that `GetOffsetShellParsingTest` makes use of 2 methods from `GetOffsetShell` namely `createTopicPartitionFilterWithTopicAndPartitionPattern` and `createTopicPartitionFilterWithPatternList`. Those 2 methods are also eventually used in `GetOffsetShell` when creating `topicPartitionFilter`. Ideally these 2 methods can reside in `ToolsUtils` and have both tests and `GetOffsetShell` reference it. I see there's also [Move GetOffsetShell to tools](https://issues.apache.org/jira/browse/KAFKA-14581). Can we do the move as part of that effort or here? WDYS? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13238: KAFKA-14708: Use Java thread instead of kafka library for example purpose
showuon commented on PR #13238: URL: https://github.com/apache/kafka/pull/13238#issuecomment-1431011866 @philipnee , there's spotbug error, could you help fix it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] benru89 commented on pull request #11442: KAFKA-7883 add schema.namespace support to SetSchemaMetadata SMT in Kafka Connect
benru89 commented on PR #11442: URL: https://github.com/apache/kafka/pull/11442#issuecomment-1431013263 whats the status of this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14716) Connect schema does not allow struct default values
[ https://issues.apache.org/jira/browse/KAFKA-14716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban resolved KAFKA-14716. -- Resolution: Duplicate > Connect schema does not allow struct default values > --- > > Key: KAFKA-14716 > URL: https://issues.apache.org/jira/browse/KAFKA-14716 > Project: Kafka > Issue Type: Bug >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > The ConnectSchema API should allow specifying a composite (struct) default > value for a field, but with the current API, it is impossible to do so. > # There is a circular dependency between creating a struct as a default > value and creating the schema which holds it as the default value. The Struct > constructor expects a Schema object, and the default value setter of > SchemaBuilder checks schema conformity by using the > ConnectSchema.validateValue, which in turn uses ConnectSchema.equals. This > can only be bypassed if the struct references a SchemaBuilder instance, and > defaultValue is called on that builder instance, but this goes against the > Struct docs stating that "Struct objects must specify a complete \{@link > Schema} up front". > # ConnectSchema.equals is not prepared to be used with other Schema > implementations, so equals checks between ConnectSchema and SchemaBuilder > instances will always fail. This is only causing an issue if equals has to be > used for schema conformity checks. > Code examples: > Working code (mind that the schema referenced by the Struct is a > SchemaBuilder, and it is mutated after the Struct is constructed): > {code:java} > @Test > public void testCompositeDefault() { > SchemaBuilder nestedSchema = SchemaBuilder.struct() > .field("bar", Schema.STRING_SCHEMA); > Struct nestedDefault = new Struct(nestedSchema); > nestedDefault.put("bar", "default_value"); > Schema schema = SchemaBuilder.struct() > .field("foo", > nestedSchema > .defaultValue(nestedDefault) > .build() > ) > .build(); > } {code} > Not working code (but better aligned with the current API and docs - 2 > separate Schema instances used by the Struct and the field, only diff is the > default value between the 2): > {code:java} > @Test > public void testCompositeDefault() { > Schema nestedSchema = SchemaBuilder.struct() > .field("bar", Schema.STRING_SCHEMA) > .build(); > Struct nestedDefault = new Struct(nestedSchema); > nestedDefault.put("bar", "default_value"); > Schema schema = SchemaBuilder.struct() > .field("foo", > SchemaBuilder > .struct() > .field("bar", Schema.STRING_SCHEMA) > .defaultValue(nestedDefault) > .build() > ) > .build(); > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14716) Connect schema does not allow struct default values
[ https://issues.apache.org/jira/browse/KAFKA-14716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17688979#comment-17688979 ] Daniel Urban commented on KAFKA-14716: -- [~ChrisEgerton] Indeed, it is, thanks for pointing that out - will try to comment on the PR of KAFKA-12694 > Connect schema does not allow struct default values > --- > > Key: KAFKA-14716 > URL: https://issues.apache.org/jira/browse/KAFKA-14716 > Project: Kafka > Issue Type: Bug >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > The ConnectSchema API should allow specifying a composite (struct) default > value for a field, but with the current API, it is impossible to do so. > # There is a circular dependency between creating a struct as a default > value and creating the schema which holds it as the default value. The Struct > constructor expects a Schema object, and the default value setter of > SchemaBuilder checks schema conformity by using the > ConnectSchema.validateValue, which in turn uses ConnectSchema.equals. This > can only be bypassed if the struct references a SchemaBuilder instance, and > defaultValue is called on that builder instance, but this goes against the > Struct docs stating that "Struct objects must specify a complete \{@link > Schema} up front". > # ConnectSchema.equals is not prepared to be used with other Schema > implementations, so equals checks between ConnectSchema and SchemaBuilder > instances will always fail. This is only causing an issue if equals has to be > used for schema conformity checks. > Code examples: > Working code (mind that the schema referenced by the Struct is a > SchemaBuilder, and it is mutated after the Struct is constructed): > {code:java} > @Test > public void testCompositeDefault() { > SchemaBuilder nestedSchema = SchemaBuilder.struct() > .field("bar", Schema.STRING_SCHEMA); > Struct nestedDefault = new Struct(nestedSchema); > nestedDefault.put("bar", "default_value"); > Schema schema = SchemaBuilder.struct() > .field("foo", > nestedSchema > .defaultValue(nestedDefault) > .build() > ) > .build(); > } {code} > Not working code (but better aligned with the current API and docs - 2 > separate Schema instances used by the Struct and the field, only diff is the > default value between the 2): > {code:java} > @Test > public void testCompositeDefault() { > Schema nestedSchema = SchemaBuilder.struct() > .field("bar", Schema.STRING_SCHEMA) > .build(); > Struct nestedDefault = new Struct(nestedSchema); > nestedDefault.put("bar", "default_value"); > Schema schema = SchemaBuilder.struct() > .field("foo", > SchemaBuilder > .struct() > .field("bar", Schema.STRING_SCHEMA) > .defaultValue(nestedDefault) > .build() > ) > .build(); > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] urbandan commented on pull request #10566: KAFKA-12694 Avoid schema mismatch DataException when validating default values
urbandan commented on PR #10566: URL: https://github.com/apache/kafka/pull/10566#issuecomment-1431030129 I would propose a 4th option as well: changing the ConnectSchema#validateValue method to use a different logic for Struct default values: 1. Use the Schema methods 2. Ignore optional and default value The benefit would be that it wouldn't change the existing equals logic, existing code utilizing SchemaBuilder would keep working, and it would fix the ambiguity around optionality and default being part of the schema check (while in reality, they should belong to the field, and not the schema). I have a working change with this 4th option, please let me know if this logic is acceptable, and I'll submit a PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14590) Move DelegationTokenCommand to tools
[ https://issues.apache.org/jira/browse/KAFKA-14590?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gantigmaa Selenge updated KAFKA-14590: -- Fix Version/s: 3.5.0 Affects Version/s: (was: 3.5.0) > Move DelegationTokenCommand to tools > > > Key: KAFKA-14590 > URL: https://issues.apache.org/jira/browse/KAFKA-14590 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Gantigmaa Selenge >Priority: Major > Fix For: 3.5.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14592) Move FeatureCommand to tools
[ https://issues.apache.org/jira/browse/KAFKA-14592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17689019#comment-17689019 ] Federico Valeri commented on KAFKA-14592: - Here we should also add the missing BAT wrapper script. > Move FeatureCommand to tools > > > Key: KAFKA-14592 > URL: https://issues.apache.org/jira/browse/KAFKA-14592 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Gantigmaa Selenge >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison commented on pull request #11442: KAFKA-7883 add schema.namespace support to SetSchemaMetadata SMT in Kafka Connect
mimaison commented on PR #11442: URL: https://github.com/apache/kafka/pull/11442#issuecomment-1431141433 To be able to merge this, the associated [KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-855%3A+Add+schema.namespace+parameter+to+SetSchemaMetadata+SMT+in+Kafka+Connect) must be voted. In the [discussion thread](https://lists.apache.org/thread/3hkd9lljobf9rl56ogjpcbo4ldoxcz5n) for the KIP, a few questions have not been answered yet by the author. Until these are answered, it's unlikely there will be any votes, so this will stay stuck. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dejan2609 commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8
dejan2609 commented on PR #13205: URL: https://github.com/apache/kafka/pull/13205#issuecomment-1431231802 @ijuma some findigs: build fails due to issues with gradle wrapper bootstrapping. I will post more details today. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a diff in pull request #13248: KAFKA-14717 KafkaStreams can' get running if the rebalance happens be…
chia7712 commented on code in PR #13248: URL: https://github.com/apache/kafka/pull/13248#discussion_r1107014705 ## streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java: ## @@ -257,6 +258,23 @@ private Thread adjustCountHelperThread(final KafkaStreams kafkaStreams, final in }); } +@Test +public void testRebalanceHappensBeforeStreamThreadGetDown() throws Exception { +final Properties prop = new Properties(); +prop.putAll(properties); +// make rebalance happen quickly +prop.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 200); Review Comment: > While this test seems would be pass some times even without the fix, is that right? you are right. I have updated the test to make sure it is always failed without the fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12473) Make the "cooperative-sticky, range" as the default assignor
[ https://issues.apache.org/jira/browse/KAFKA-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-12473: -- Fix Version/s: (was: 3.5.0) > Make the "cooperative-sticky, range" as the default assignor > > > Key: KAFKA-12473 > URL: https://issues.apache.org/jira/browse/KAFKA-12473 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Critical > Labels: kip > > Now that 3.0 is coming up, we can change the default > ConsumerPartitionAssignor to something better than the RangeAssignor. The > original plan was to switch over to the StickyAssignor, but now that we have > incremental cooperative rebalancing we should consider using the new > CooperativeStickyAssignor instead: this will enable the consumer group to > follow the COOPERATIVE protocol, improving the rebalancing experience OOTB. > KIP: > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-12473) Make the "cooperative-sticky, range" as the default assignor
[ https://issues.apache.org/jira/browse/KAFKA-12473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17689070#comment-17689070 ] Luke Chen commented on KAFKA-12473: --- Removed the fixed version value until we have some progress. Thanks. > Make the "cooperative-sticky, range" as the default assignor > > > Key: KAFKA-12473 > URL: https://issues.apache.org/jira/browse/KAFKA-12473 > Project: Kafka > Issue Type: Improvement > Components: consumer >Reporter: A. Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Critical > Labels: kip > > Now that 3.0 is coming up, we can change the default > ConsumerPartitionAssignor to something better than the RangeAssignor. The > original plan was to switch over to the StickyAssignor, but now that we have > incremental cooperative rebalancing we should consider using the new > CooperativeStickyAssignor instead: this will enable the consumer group to > follow the COOPERATIVE protocol, improving the rebalancing experience OOTB. > KIP: > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14720) Tools migration guidelines
Federico Valeri created KAFKA-14720: --- Summary: Tools migration guidelines Key: KAFKA-14720 URL: https://issues.apache.org/jira/browse/KAFKA-14720 Project: Kafka Issue Type: Improvement Reporter: Federico Valeri The tools migration effort is ongoing and being tracked in KAFKA-14525. This is part of a bigger initiative to split the core module into multiple modules (e.g. storage, network, security, tools), which is being tracked in KAFKA-14524. The plan is to migrate tools and related classes in a fully compatible way from kafka.tools and kafka.admin packages (core module) to org.apache.kafka.tools package (tools module). While kicking off this activity, we identified a number of potential compatibility issues: * Missing wrapper: some tools do not have a wrapper script. There are system tests that directly refer to the tool's fully qualified class name (FQCN) and expect the old package name when running on old Kafka releases. They are often used for troubleshooting or automation through the “kafka-run-class.sh” script which takes the FQCN as input parameter. * SPI argument: some tools have arguments for setting a custom SPI implementation to be used in place of the default implementation. Any custom SPI implementation depends on the old package name. * Broken tool: some tools do not work on supported releases. * Core dependency: some tools require access to non-trivial core classes that should be migrated first. See KIP-906 for more information. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14720) Tools migration guidelines
[ https://issues.apache.org/jira/browse/KAFKA-14720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Federico Valeri updated KAFKA-14720: Description: The tools migration effort is ongoing and being tracked in KAFKA-14525. This is part of a bigger initiative to split the core module into multiple modules (e.g. storage, network, security, tools), which is being tracked in KAFKA-14524. The plan is to migrate tools and related classes in a fully compatible way from kafka.tools and kafka.admin packages (core module) to org.apache.kafka.tools package (tools module). While kicking off this activity, we identified a number of potential compatibility issues: * Missing wrapper: some tools do not have a wrapper script. There are system tests that directly refer to the tool's fully qualified class name (FQCN) and expect the old package name when running on old Kafka releases. They are often used for troubleshooting or automation through the kafka-run-class.sh script which takes the FQCN as input parameter. * SPI argument: some tools have arguments for setting a custom SPI implementation to be used in place of the default implementation. Any custom SPI implementation depends on the old package name. * Broken tool: some tools do not work on supported releases. * Core dependency: some tools require access to non-trivial core classes that should be migrated first. See KIP-906 for more information. was: The tools migration effort is ongoing and being tracked in KAFKA-14525. This is part of a bigger initiative to split the core module into multiple modules (e.g. storage, network, security, tools), which is being tracked in KAFKA-14524. The plan is to migrate tools and related classes in a fully compatible way from kafka.tools and kafka.admin packages (core module) to org.apache.kafka.tools package (tools module). While kicking off this activity, we identified a number of potential compatibility issues: * Missing wrapper: some tools do not have a wrapper script. There are system tests that directly refer to the tool's fully qualified class name (FQCN) and expect the old package name when running on old Kafka releases. They are often used for troubleshooting or automation through the “kafka-run-class.sh” script which takes the FQCN as input parameter. * SPI argument: some tools have arguments for setting a custom SPI implementation to be used in place of the default implementation. Any custom SPI implementation depends on the old package name. * Broken tool: some tools do not work on supported releases. * Core dependency: some tools require access to non-trivial core classes that should be migrated first. See KIP-906 for more information. > Tools migration guidelines > -- > > Key: KAFKA-14720 > URL: https://issues.apache.org/jira/browse/KAFKA-14720 > Project: Kafka > Issue Type: Improvement >Reporter: Federico Valeri >Priority: Major > > The tools migration effort is ongoing and being tracked in KAFKA-14525. This > is part of a bigger initiative to split the core module into multiple modules > (e.g. storage, network, security, tools), which is being tracked in > KAFKA-14524. > The plan is to migrate tools and related classes in a fully compatible way > from kafka.tools and kafka.admin packages (core module) to > org.apache.kafka.tools package (tools module). > While kicking off this activity, we identified a number of potential > compatibility issues: > * Missing wrapper: some tools do not have a wrapper script. There are system > tests that directly refer to the tool's fully qualified class name (FQCN) and > expect the old package name when running on old Kafka releases. They are > often used for troubleshooting or automation through the kafka-run-class.sh > script which takes the FQCN as input parameter. > * SPI argument: some tools have arguments for setting a custom SPI > implementation to be used in place of the default implementation. Any custom > SPI implementation depends on the old package name. > * Broken tool: some tools do not work on supported releases. > * Core dependency: some tools require access to non-trivial core classes that > should be migrated first. > See KIP-906 for more information. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14720) Tools migration guidelines
[ https://issues.apache.org/jira/browse/KAFKA-14720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Federico Valeri reassigned KAFKA-14720: --- Assignee: Federico Valeri > Tools migration guidelines > -- > > Key: KAFKA-14720 > URL: https://issues.apache.org/jira/browse/KAFKA-14720 > Project: Kafka > Issue Type: Improvement >Reporter: Federico Valeri >Assignee: Federico Valeri >Priority: Major > > The tools migration effort is ongoing and being tracked in KAFKA-14525. This > is part of a bigger initiative to split the core module into multiple modules > (e.g. storage, network, security, tools), which is being tracked in > KAFKA-14524. > The plan is to migrate tools and related classes in a fully compatible way > from kafka.tools and kafka.admin packages (core module) to > org.apache.kafka.tools package (tools module). > While kicking off this activity, we identified a number of potential > compatibility issues: > * Missing wrapper: some tools do not have a wrapper script. There are system > tests that directly refer to the tool's fully qualified class name (FQCN) and > expect the old package name when running on old Kafka releases. They are > often used for troubleshooting or automation through the kafka-run-class.sh > script which takes the FQCN as input parameter. > * SPI argument: some tools have arguments for setting a custom SPI > implementation to be used in place of the default implementation. Any custom > SPI implementation depends on the old package name. > * Broken tool: some tools do not work on supported releases. > * Core dependency: some tools require access to non-trivial core classes that > should be migrated first. > See KIP-906 for more information. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dejan2609 commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8
dejan2609 commented on PR #13205: URL: https://github.com/apache/kafka/pull/13205#issuecomment-1431370263 Update: gradle wrapper bootstrapping is ok now, but spotless Scala checks are failing... Searching for a solution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13247: KAFKA-14595 Move value objects of ReassignPartitionsCommand to java
ijuma commented on PR #13247: URL: https://github.com/apache/kafka/pull/13247#issuecomment-1431422276 If you want to do it in small steps, one way is that you introduce the new classes, but you do not update the command to use them. That way you can put them in the right destination from the start. In any case, I'll leave it to @mimaison to say how he'd prefer it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14720) Tools migration guidelines
[ https://issues.apache.org/jira/browse/KAFKA-14720?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Federico Valeri updated KAFKA-14720: Description: The tools migration effort is ongoing and being tracked in KAFKA-14525. This is part of a bigger initiative to split the core module into multiple modules (e.g. storage, network, security, tools), which is being tracked in KAFKA-14524. The plan is to migrate tools and related classes in a fully compatible way from kafka.tools and kafka.admin packages (core module) to org.apache.kafka.tools package (tools module). While kicking off this activity, we identified a number of potential compatibility issues: * Missing wrapper: some tools do not have a wrapper script. There are system tests that directly refer to the tool's fully qualified class name (FQCN) and expect the old package name when running on old Kafka releases. They are often used for troubleshooting or automation through the kafka-run-class.sh script which takes the FQCN as input parameter. * SPI argument: some tools have arguments for setting a custom SPI implementation to be used in place of the default implementation. Any custom SPI implementation depends on the old package name. * Broken tool: some tools do not work on supported releases. * Core dependency: some tools require access to non-trivial core classes that should be migrated first. See KIP-906 for more information. https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines was: The tools migration effort is ongoing and being tracked in KAFKA-14525. This is part of a bigger initiative to split the core module into multiple modules (e.g. storage, network, security, tools), which is being tracked in KAFKA-14524. The plan is to migrate tools and related classes in a fully compatible way from kafka.tools and kafka.admin packages (core module) to org.apache.kafka.tools package (tools module). While kicking off this activity, we identified a number of potential compatibility issues: * Missing wrapper: some tools do not have a wrapper script. There are system tests that directly refer to the tool's fully qualified class name (FQCN) and expect the old package name when running on old Kafka releases. They are often used for troubleshooting or automation through the kafka-run-class.sh script which takes the FQCN as input parameter. * SPI argument: some tools have arguments for setting a custom SPI implementation to be used in place of the default implementation. Any custom SPI implementation depends on the old package name. * Broken tool: some tools do not work on supported releases. * Core dependency: some tools require access to non-trivial core classes that should be migrated first. See KIP-906 for more information. > Tools migration guidelines > -- > > Key: KAFKA-14720 > URL: https://issues.apache.org/jira/browse/KAFKA-14720 > Project: Kafka > Issue Type: Improvement >Reporter: Federico Valeri >Assignee: Federico Valeri >Priority: Major > > The tools migration effort is ongoing and being tracked in KAFKA-14525. This > is part of a bigger initiative to split the core module into multiple modules > (e.g. storage, network, security, tools), which is being tracked in > KAFKA-14524. > The plan is to migrate tools and related classes in a fully compatible way > from kafka.tools and kafka.admin packages (core module) to > org.apache.kafka.tools package (tools module). > While kicking off this activity, we identified a number of potential > compatibility issues: > * Missing wrapper: some tools do not have a wrapper script. There are system > tests that directly refer to the tool's fully qualified class name (FQCN) and > expect the old package name when running on old Kafka releases. They are > often used for troubleshooting or automation through the kafka-run-class.sh > script which takes the FQCN as input parameter. > * SPI argument: some tools have arguments for setting a custom SPI > implementation to be used in place of the default implementation. Any custom > SPI implementation depends on the old package name. > * Broken tool: some tools do not work on supported releases. > * Core dependency: some tools require access to non-trivial core classes that > should be migrated first. > See KIP-906 for more information. > https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] nizhikov commented on pull request #13247: KAFKA-14595 Move value objects of ReassignPartitionsCommand to java
nizhikov commented on PR #13247: URL: https://github.com/apache/kafka/pull/13247#issuecomment-1431428999 Hello @ijuma > If you want to do it in small steps It's more about simplify review then my personal preferences :) For now, I introduced java classes from `ReassignPartitionsCommand` in core module. It seems we can review and merge current changes and then move to the actual command scala -> java transformation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1107184550 ## metadata/src/main/java/org/apache/kafka/image/ScramImage.java: ## @@ -50,6 +58,63 @@ public void write(ImageWriter writer, ImageWriterOptions options) { } } +public DescribeUserScramCredentialsResponseData describe(DescribeUserScramCredentialsRequestData request) { + +List users = request.users(); +Map uniqueUsers = new HashMap(); + +if ((users == null) || (users.size() == 0)) { +System.out.println("Describe : get all the users"); +// If there are no users listed then get all the users +for (Map scramCredentialDataSet : mechanisms.values()) { +for (String user : scramCredentialDataSet.keySet()) { +uniqueUsers.put(user, false); +} +} +} else { +// Filter out duplicates +for (UserName user : users) { +if (uniqueUsers.containsKey(user.name())) { +uniqueUsers.put(user.name(), true); +} else { +uniqueUsers.put(user.name(), false); +} +} +} + +DescribeUserScramCredentialsResponseData retval = new DescribeUserScramCredentialsResponseData(); + +for (Map.Entry user : uniqueUsers.entrySet()) { +DescribeUserScramCredentialsResult result = + new DescribeUserScramCredentialsResult().setUser(user.getKey()); + +if (user.getValue() == false) { +List credentialInfos = new ArrayList(); + +boolean datafound = false; +for (Map.Entry> mechanismsEntry : mechanisms.entrySet()) { Review Comment: No, in our implementation we will never create a `Map` for ScramMechanism.UNKNOWN because we never populate any credentials for an unknown mechanism. Thus I can iterate over the maps knowing that they are all valid. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1107193412 ## metadata/src/main/java/org/apache/kafka/image/ScramImage.java: ## @@ -50,6 +58,63 @@ public void write(ImageWriter writer, ImageWriterOptions options) { } } +public DescribeUserScramCredentialsResponseData describe(DescribeUserScramCredentialsRequestData request) { + +List users = request.users(); +Map uniqueUsers = new HashMap(); + +if ((users == null) || (users.size() == 0)) { +System.out.println("Describe : get all the users"); +// If there are no users listed then get all the users +for (Map scramCredentialDataSet : mechanisms.values()) { +for (String user : scramCredentialDataSet.keySet()) { +uniqueUsers.put(user, false); +} +} +} else { +// Filter out duplicates +for (UserName user : users) { +if (uniqueUsers.containsKey(user.name())) { +uniqueUsers.put(user.name(), true); +} else { +uniqueUsers.put(user.name(), false); +} +} +} + +DescribeUserScramCredentialsResponseData retval = new DescribeUserScramCredentialsResponseData(); + +for (Map.Entry user : uniqueUsers.entrySet()) { +DescribeUserScramCredentialsResult result = + new DescribeUserScramCredentialsResult().setUser(user.getKey()); + +if (user.getValue() == false) { +List credentialInfos = new ArrayList(); + +boolean datafound = false; +for (Map.Entry> mechanismsEntry : mechanisms.entrySet()) { +Map credentialDataSet = mechanismsEntry.getValue(); +if (credentialDataSet.containsKey(user.getKey())) { +credentialInfos.add(new CredentialInfo().setMechanism(mechanismsEntry.getKey().type()) + .setIterations(credentialDataSet.get(user.getKey()).iterations())); +datafound = true; +} +} +if (datafound) { +result.setCredentialInfos(credentialInfos); +} else { +result.setErrorCode(Errors.RESOURCE_NOT_FOUND.code()) Review Comment: The request contains a list of users. If the list is empty then the request is to describe all the users. I personally think this is a security issue but I'm just implementing what was there for Zk. If there are no users with SCRAM credentials and the request is to describe all the users, then an empty response is returned. It is not an error. This is tested in the unit test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1107200727 ## metadata/src/main/java/org/apache/kafka/image/ScramImage.java: ## @@ -50,6 +58,63 @@ public void write(ImageWriter writer, ImageWriterOptions options) { } } +public DescribeUserScramCredentialsResponseData describe(DescribeUserScramCredentialsRequestData request) { + +List users = request.users(); +Map uniqueUsers = new HashMap(); + +if ((users == null) || (users.size() == 0)) { +System.out.println("Describe : get all the users"); +// If there are no users listed then get all the users +for (Map scramCredentialDataSet : mechanisms.values()) { +for (String user : scramCredentialDataSet.keySet()) { +uniqueUsers.put(user, false); +} +} +} else { +// Filter out duplicates +for (UserName user : users) { +if (uniqueUsers.containsKey(user.name())) { +uniqueUsers.put(user.name(), true); +} else { +uniqueUsers.put(user.name(), false); +} +} +} + +DescribeUserScramCredentialsResponseData retval = new DescribeUserScramCredentialsResponseData(); + +for (Map.Entry user : uniqueUsers.entrySet()) { +DescribeUserScramCredentialsResult result = + new DescribeUserScramCredentialsResult().setUser(user.getKey()); + +if (user.getValue() == false) { +List credentialInfos = new ArrayList(); + +boolean datafound = false; +for (Map.Entry> mechanismsEntry : mechanisms.entrySet()) { +Map credentialDataSet = mechanismsEntry.getValue(); +if (credentialDataSet.containsKey(user.getKey())) { +credentialInfos.add(new CredentialInfo().setMechanism(mechanismsEntry.getKey().type()) + .setIterations(credentialDataSet.get(user.getKey()).iterations())); +datafound = true; +} +} +if (datafound) { +result.setCredentialInfos(credentialInfos); +} else { +result.setErrorCode(Errors.RESOURCE_NOT_FOUND.code()) Review Comment: I'm not sure why there is an illegalUsers error in the Zk case as it explicitly tests for users.get.isEmtpy for the describe all case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1107200727 ## metadata/src/main/java/org/apache/kafka/image/ScramImage.java: ## @@ -50,6 +58,63 @@ public void write(ImageWriter writer, ImageWriterOptions options) { } } +public DescribeUserScramCredentialsResponseData describe(DescribeUserScramCredentialsRequestData request) { + +List users = request.users(); +Map uniqueUsers = new HashMap(); + +if ((users == null) || (users.size() == 0)) { +System.out.println("Describe : get all the users"); +// If there are no users listed then get all the users +for (Map scramCredentialDataSet : mechanisms.values()) { +for (String user : scramCredentialDataSet.keySet()) { +uniqueUsers.put(user, false); +} +} +} else { +// Filter out duplicates +for (UserName user : users) { +if (uniqueUsers.containsKey(user.name())) { +uniqueUsers.put(user.name(), true); +} else { +uniqueUsers.put(user.name(), false); +} +} +} + +DescribeUserScramCredentialsResponseData retval = new DescribeUserScramCredentialsResponseData(); + +for (Map.Entry user : uniqueUsers.entrySet()) { +DescribeUserScramCredentialsResult result = + new DescribeUserScramCredentialsResult().setUser(user.getKey()); + +if (user.getValue() == false) { +List credentialInfos = new ArrayList(); + +boolean datafound = false; +for (Map.Entry> mechanismsEntry : mechanisms.entrySet()) { +Map credentialDataSet = mechanismsEntry.getValue(); +if (credentialDataSet.containsKey(user.getKey())) { +credentialInfos.add(new CredentialInfo().setMechanism(mechanismsEntry.getKey().type()) + .setIterations(credentialDataSet.get(user.getKey()).iterations())); +datafound = true; +} +} +if (datafound) { +result.setCredentialInfos(credentialInfos); +} else { +result.setErrorCode(Errors.RESOURCE_NOT_FOUND.code()) Review Comment: I'm not sure why there is an illegalUsers error in the Zk case as it explicitly tests for users.get.isEmtpy for the describe all case. https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/core/src/main/scala/kafka/server/ZkAdminManager.scala#L805 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1107211109 ## metadata/src/main/java/org/apache/kafka/image/ScramImage.java: ## @@ -50,6 +58,63 @@ public void write(ImageWriter writer, ImageWriterOptions options) { } } +public DescribeUserScramCredentialsResponseData describe(DescribeUserScramCredentialsRequestData request) { + +List users = request.users(); +Map uniqueUsers = new HashMap(); + +if ((users == null) || (users.size() == 0)) { +System.out.println("Describe : get all the users"); +// If there are no users listed then get all the users +for (Map scramCredentialDataSet : mechanisms.values()) { +for (String user : scramCredentialDataSet.keySet()) { +uniqueUsers.put(user, false); +} +} +} else { +// Filter out duplicates +for (UserName user : users) { +if (uniqueUsers.containsKey(user.name())) { +uniqueUsers.put(user.name(), true); +} else { +uniqueUsers.put(user.name(), false); +} +} +} + +DescribeUserScramCredentialsResponseData retval = new DescribeUserScramCredentialsResponseData(); + +for (Map.Entry user : uniqueUsers.entrySet()) { +DescribeUserScramCredentialsResult result = + new DescribeUserScramCredentialsResult().setUser(user.getKey()); + +if (user.getValue() == false) { +List credentialInfos = new ArrayList(); + +boolean datafound = false; +for (Map.Entry> mechanismsEntry : mechanisms.entrySet()) { +Map credentialDataSet = mechanismsEntry.getValue(); +if (credentialDataSet.containsKey(user.getKey())) { +credentialInfos.add(new CredentialInfo().setMechanism(mechanismsEntry.getKey().type()) + .setIterations(credentialDataSet.get(user.getKey()).iterations())); +datafound = true; +} +} +if (datafound) { +result.setCredentialInfos(credentialInfos); +} else { +result.setErrorCode(Errors.RESOURCE_NOT_FOUND.code()) + .setErrorMessage("attemptToDescribeUserThatDoesNotExist: " + user.getKey()); Review Comment: Yes! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1107227490 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -3611,12 +3617,4 @@ object KafkaApis { private def unsupported(text: String): Exception = { new UnsupportedVersionException(s"Unsupported when using a Raft-based metadata quorum: $text") } - - private def notYetSupported(request: RequestChannel.Request): Exception = { Review Comment: The compiler throws an error if we try to keep them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag
C0urante commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1107273995 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java: ## @@ -16,40 +16,88 @@ */ package org.apache.kafka.connect.mirror; -import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TopicAdmin; -import java.util.Map; -import java.util.HashMap; import java.util.Collections; -import java.time.Duration; +import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import java.util.concurrent.ConcurrentHashMap; /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */ class OffsetSyncStore implements AutoCloseable { -private final KafkaConsumer consumer; -private final Map offsetSyncs = new HashMap<>(); -private final TopicPartition offsetSyncTopicPartition; +private final KafkaBasedLog backingStore; +private final Map offsetSyncs = new ConcurrentHashMap<>(); +private final TopicAdmin admin; +private volatile boolean readToEnd = false; OffsetSyncStore(MirrorCheckpointConfig config) { -consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(), -new ByteArrayDeserializer(), new ByteArrayDeserializer()); -offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0); -consumer.assign(Collections.singleton(offsetSyncTopicPartition)); +Consumer consumer = null; +TopicAdmin admin = null; +KafkaBasedLog store; +try { +Consumer finalConsumer = consumer = MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig()); Review Comment: Ah yeah, much cleaner. Thanks! 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag
C0urante commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1107275404 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java: ## @@ -134,9 +138,9 @@ public String version() { @Override public List poll() throws InterruptedException { try { -long deadline = System.currentTimeMillis() + interval.toMillis(); -while (!stopping && System.currentTimeMillis() < deadline) { -offsetSyncStore.update(pollTimeout); +if (stopping.await(pollTimeout.toMillis(), TimeUnit.MILLISECONDS)) { Review Comment: Ah yeah, totally right, the condition was correct. Sorry about that! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dejan2609 commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8
dejan2609 commented on PR #13205: URL: https://github.com/apache/kafka/pull/13205#issuecomment-1431575644 It seems that Spotless Gradle plugin needs to be alligned with Gradle 8.0 (I filed a ticket here: https://github.com/diffplug/spotless/issues/1572) Thing is that they dropped support for direct Java 8 builds: https://github.com/diffplug/spotless/blob/main/plugin-gradle/CHANGES.md#6140---2023-01-26 (their suggestion for other teams is to use Java cross compilation: https://docs.gradle.org/8.0/userguide/building_java_projects.html#sec:java_cross_compilation). Kafka obviously still needs to build artifacts against Java 8, so maybe it would be a good idea to use Spotless team suggestion. All-in-all, herewith a plan for a Gradle 7 -->> 8 upgrade: - spotless team will release Gradle 8.0 compatible version (most probably they will not backport solution into spotless gradle 6.13.x line) - in parallel I can try to drop JDK 8 usage (note: Java 8 compatible artifacts will still be generated) - when we make sure that Java cross compilation works we can come back to this PR and bump Gradle (and Spotless plugin version to 6.15+). @ijuma If it is ok with you I can start working towards this solution. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #13219: MINOR: Simplify JUnit assertions; remove accidental unnecessary code
clolov commented on PR #13219: URL: https://github.com/apache/kafka/pull/13219#issuecomment-1431608450 @divijvaidya, as suggested I have left only test changes in this pull request. Furthermore, I believe I have fixed all `assertEquals` which had arguments the wrong way around 😊 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1107329539 ## core/src/main/scala/kafka/server/ControllerApis.scala: ## @@ -99,6 +99,7 @@ class ControllerApis(val requestChannel: RequestChannel, case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigs(request) case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => handleAlterPartitionReassignments(request) case ApiKeys.LIST_PARTITION_REASSIGNMENTS => handleListPartitionReassignments(request) +case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => handleAlterUserScramCredentials(request) Review Comment: We cannot test it until we update ControllerApis to handle METADATA requests, so I'm going to wait on adding it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] pprovenzano commented on a diff in pull request #13114: KAFKA-14084: SCRAM support in KRaft.
pprovenzano commented on code in PR #13114: URL: https://github.com/apache/kafka/pull/13114#discussion_r1107329945 ## metadata/src/main/java/org/apache/kafka/image/ScramImage.java: ## @@ -50,6 +58,63 @@ public void write(ImageWriter writer, ImageWriterOptions options) { } } +public DescribeUserScramCredentialsResponseData describe(DescribeUserScramCredentialsRequestData request) { + +List users = request.users(); +Map uniqueUsers = new HashMap(); + +if ((users == null) || (users.size() == 0)) { +System.out.println("Describe : get all the users"); +// If there are no users listed then get all the users +for (Map scramCredentialDataSet : mechanisms.values()) { +for (String user : scramCredentialDataSet.keySet()) { +uniqueUsers.put(user, false); +} +} +} else { +// Filter out duplicates +for (UserName user : users) { +if (uniqueUsers.containsKey(user.name())) { +uniqueUsers.put(user.name(), true); +} else { +uniqueUsers.put(user.name(), false); +} +} +} + +DescribeUserScramCredentialsResponseData retval = new DescribeUserScramCredentialsResponseData(); + +for (Map.Entry user : uniqueUsers.entrySet()) { +DescribeUserScramCredentialsResult result = + new DescribeUserScramCredentialsResult().setUser(user.getKey()); + +if (user.getValue() == false) { +List credentialInfos = new ArrayList(); + +boolean datafound = false; +for (Map.Entry> mechanismsEntry : mechanisms.entrySet()) { +Map credentialDataSet = mechanismsEntry.getValue(); +if (credentialDataSet.containsKey(user.getKey())) { +credentialInfos.add(new CredentialInfo().setMechanism(mechanismsEntry.getKey().type()) + .setIterations(credentialDataSet.get(user.getKey()).iterations())); +datafound = true; +} +} +if (datafound) { +result.setCredentialInfos(credentialInfos); +} else { +result.setErrorCode(Errors.RESOURCE_NOT_FOUND.code()) + .setErrorMessage("attemptToDescribeUserThatDoesNotExist: " + user.getKey()); Review Comment: Fixed! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag
C0urante commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1107345427 ## connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java: ## @@ -392,6 +400,17 @@ protected Consumer createConsumer() { return new KafkaConsumer<>(consumerConfigs); } +/** + * Test whether a topic partition should be read by this log. + * Overridden by subclasses when only a subset of the assigned partitions should be read into memory. + * By default, this will read all partitions. Review Comment: Some nits: ```suggestion * Signals whether a topic partition should be read by this log. Invoked on {@link #start() startup} once * for every partition found in the log's backing topic. * This method can be overridden by subclasses when only a subset of the assigned partitions * should be read into memory. By default, all partitions are read. ``` ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -701,43 +745,151 @@ protected void produceMessages(EmbeddedConnectCluster cluster, String topicName, int cnt = 0; for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++) for (int p = 0; p < numPartitions; p++) -cluster.kafka().produce(topicName, p, "key", "value-" + cnt++); +produce(cluster.kafka(), topicName, p, "key", "value-" + cnt++); } - + +/** + * Produce a test record to a Kafka cluster. + * This method allows subclasses to configure and use their own Kafka Producer instead of using the built-in default. + * @param cluster Kafka cluster that should receive the record + * @param topic Topic to send the record to, non-null + * @param partition Partition to send the record to, maybe null. + * @param key Kafka key for the record + * @param value Kafka value for the record + */ +protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer partition, String key, String value) { +cluster.produce(topic, partition, key, value); +} + +protected static Map waitForCheckpointOnAllPartitions( +MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName +) throws InterruptedException { +AtomicReference> ret = new AtomicReference<>(); +waitForCondition( +() -> { +Map offsets = client.remoteConsumerOffsets( +consumerGroupName, remoteClusterAlias, Duration.ofMillis(3000)); +for (int i = 0; i < NUM_PARTITIONS; i++) { +if (!offsets.containsKey(new TopicPartition(topicName, i))) { +log.info("Checkpoint is missing for {}: {}-{}", consumerGroupName, topicName, i); +return false; +} +} +ret.set(offsets); +return true; +}, +CHECKPOINT_DURATION_MS, +String.format( +"Offsets for consumer group %s not translated from %s for topic %s", +consumerGroupName, +remoteClusterAlias, +topicName +) +); +return ret.get(); +} + /* * given consumer group, topics and expected number of records, make sure the consumer group * offsets are eventually synced to the expected offset numbers */ -protected static void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect, -Consumer consumer, List topics, String consumerGroupId, int numRecords, boolean exactOffsetTranslation) -throws InterruptedException { +protected static void waitForConsumerGroupFullSync( +EmbeddedConnectCluster connect, List topics, String consumerGroupId, int numRecords, boolean exactOffsetTranslation +) throws InterruptedException { try (Admin adminClient = connect.kafka().createAdminClient()) { -List tps = new ArrayList<>(NUM_PARTITIONS * topics.size()); +Map tps = new HashMap<>(NUM_PARTITIONS * topics.size()); for (int partitionIndex = 0; partitionIndex < NUM_PARTITIONS; partitionIndex++) { for (String topic : topics) { -tps.add(new TopicPartition(topic, partitionIndex)); +tps.put(new TopicPartition(topic, partitionIndex), OffsetSpec.latest()); } } long expectedTotalOffsets = numRecords * topics.size(); waitForCondition(() -> { Map consumerGroupOffsets = adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata()
[GitHub] [kafka] C0urante commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag
C0urante commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1107352681 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -701,43 +745,151 @@ protected void produceMessages(EmbeddedConnectCluster cluster, String topicName, int cnt = 0; for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++) for (int p = 0; p < numPartitions; p++) -cluster.kafka().produce(topicName, p, "key", "value-" + cnt++); +produce(cluster.kafka(), topicName, p, "key", "value-" + cnt++); } - + +/** + * Produce a test record to a Kafka cluster. + * This method allows subclasses to configure and use their own Kafka Producer instead of using the built-in default. + * @param cluster Kafka cluster that should receive the record + * @param topic Topic to send the record to, non-null + * @param partition Partition to send the record to, maybe null. + * @param key Kafka key for the record + * @param value Kafka value for the record + */ +protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer partition, String key, String value) { +cluster.produce(topic, partition, key, value); +} + +protected static Map waitForCheckpointOnAllPartitions( +MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName +) throws InterruptedException { +AtomicReference> ret = new AtomicReference<>(); +waitForCondition( +() -> { +Map offsets = client.remoteConsumerOffsets( +consumerGroupName, remoteClusterAlias, Duration.ofMillis(3000)); +for (int i = 0; i < NUM_PARTITIONS; i++) { +if (!offsets.containsKey(new TopicPartition(topicName, i))) { +log.info("Checkpoint is missing for {}: {}-{}", consumerGroupName, topicName, i); +return false; +} +} +ret.set(offsets); +return true; +}, +CHECKPOINT_DURATION_MS, +String.format( +"Offsets for consumer group %s not translated from %s for topic %s", +consumerGroupName, +remoteClusterAlias, +topicName +) +); +return ret.get(); +} + /* * given consumer group, topics and expected number of records, make sure the consumer group * offsets are eventually synced to the expected offset numbers */ -protected static void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect, -Consumer consumer, List topics, String consumerGroupId, int numRecords, boolean exactOffsetTranslation) -throws InterruptedException { +protected static void waitForConsumerGroupFullSync( +EmbeddedConnectCluster connect, List topics, String consumerGroupId, int numRecords, boolean exactOffsetTranslation +) throws InterruptedException { try (Admin adminClient = connect.kafka().createAdminClient()) { -List tps = new ArrayList<>(NUM_PARTITIONS * topics.size()); +Map tps = new HashMap<>(NUM_PARTITIONS * topics.size()); for (int partitionIndex = 0; partitionIndex < NUM_PARTITIONS; partitionIndex++) { for (String topic : topics) { -tps.add(new TopicPartition(topic, partitionIndex)); +tps.put(new TopicPartition(topic, partitionIndex), OffsetSpec.latest()); } } long expectedTotalOffsets = numRecords * topics.size(); waitForCondition(() -> { Map consumerGroupOffsets = adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get(); -long consumerGroupOffsetTotal = consumerGroupOffsets.values().stream() +long totalConsumerGroupOffsets = consumerGroupOffsets.values().stream() .mapToLong(OffsetAndMetadata::offset).sum(); -Map offsets = consumer.endOffsets(tps, CONSUMER_POLL_TIMEOUT_MS); -long totalOffsets = offsets.values().stream().mapToLong(l -> l).sum(); - +Map endOffsets = +adminClient.listOffsets(tps).all().get(); +long totalEndOffsets = endOffsets.values().stream() + .mapToLong(ListOffsetsResult.ListOffsetsResultInfo::offset).sum(); + +for (TopicPartition tp : endOffsets.keySet()) { +if (consumerGroupOffsets.containsKey(tp)) { +assertTrue(consumerGroup
[GitHub] [kafka] mumrah opened a new pull request, #13257: MINOR: Add ZK migration docs to the packaged docs
mumrah opened a new pull request, #13257: URL: https://github.com/apache/kafka/pull/13257 This patch brings in the ZK migration docs that were added for the 3.4 release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8
ijuma commented on PR #13205: URL: https://github.com/apache/kafka/pull/13205#issuecomment-1431655901 An alternative would be to drop spotless until we drop support for Java 8 (Apache Kafka 4.0). What actually uses spotless today? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush
C0urante commented on code in PR #13208: URL: https://github.com/apache/kafka/pull/13208#discussion_r1106236517 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java: ## @@ -135,38 +135,40 @@ public void testFlushFailureReplacesOffsets() throws Exception { // First time the write fails expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, true, null); writer.offset(OFFSET_KEY, OFFSET_VALUE); -assertTrue(writer.beginFlush()); +assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); verify(callback).onCompletion(eq(EXCEPTION), isNull()); // Second time it succeeds expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, null); -assertTrue(writer.beginFlush()); +assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); verify(callback).onCompletion(isNull(), isNull()); // Third time it has no data to flush so we won't get past beginFlush() -assertFalse(writer.beginFlush()); +assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); } @Test -public void testAlreadyFlushing() { +public void testAlreadyFlushing() throws InterruptedException, TimeoutException { @SuppressWarnings("unchecked") final Callback callback = mock(Callback.class); // Trigger the send, but don't invoke the callback so we'll still be mid-flush CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1); expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown); writer.offset(OFFSET_KEY, OFFSET_VALUE); -assertTrue(writer.beginFlush()); +assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); Review Comment: Worth it to add a check to ensure that `beginFlush` times out if if has been invoked previously, before `doFlush` has also been called? ```suggestion assertThrows(TimeoutException.class, () -> writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush
C0urante commented on code in PR #13208: URL: https://github.com/apache/kafka/pull/13208#discussion_r1107391193 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java: ## @@ -365,6 +365,10 @@ public void execute() { } catch (InterruptedException e) { // Ignore and allow to exit. } catch (RuntimeException e) { +if (isCancelled()) { +log.debug("Skipping final offset commit as task has been cancelled"); +throw e; +} Review Comment: Honestly not sure why we didn't put this here to begin with. Nice 👍 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java: ## @@ -135,38 +135,40 @@ public void testFlushFailureReplacesOffsets() throws Exception { // First time the write fails expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, true, null); writer.offset(OFFSET_KEY, OFFSET_VALUE); -assertTrue(writer.beginFlush()); +assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); verify(callback).onCompletion(eq(EXCEPTION), isNull()); // Second time it succeeds expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, null); -assertTrue(writer.beginFlush()); +assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); verify(callback).onCompletion(isNull(), isNull()); // Third time it has no data to flush so we won't get past beginFlush() -assertFalse(writer.beginFlush()); +assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); } @Test -public void testAlreadyFlushing() { +public void testAlreadyFlushing() throws InterruptedException, TimeoutException { @SuppressWarnings("unchecked") final Callback callback = mock(Callback.class); // Trigger the send, but don't invoke the callback so we'll still be mid-flush CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1); expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown); writer.offset(OFFSET_KEY, OFFSET_VALUE); -assertTrue(writer.beginFlush()); +assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); Review Comment: Still think this may apply here ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java: ## @@ -100,23 +104,45 @@ private boolean flushing() { /** * Performs the first step of a flush operation, snapshotting the current state. This does not - * actually initiate the flush with the underlying storage. + * actually initiate the flush with the underlying storage. Ensures that any previous flush operations + * have finished before beginning a new flush. * * @return true if a flush was initiated, false if no data was available + * @throws ConnectException if the previous flush is not complete before this method is called */ -public synchronized boolean beginFlush() { -if (flushing()) { -log.error("Invalid call to OffsetStorageWriter flush() while already flushing, the " +public boolean beginFlush() { +try { +return beginFlush(0, TimeUnit.NANOSECONDS); +} catch (InterruptedException | TimeoutException e) { +log.error("Invalid call to OffsetStorageWriter beginFlush() while already flushing, the " + "framework should not allow this"); throw new ConnectException("OffsetStorageWriter is already flushing"); } +} -if (data.isEmpty()) -return false; - -toFlush = data; -data = new HashMap<>(); -return true; +/** + * Performs the first step of a flush operation, snapshotting the current state. This does not + * actually initiate the flush with the underlying storage. Ensures that any previous flush operations + * have finished before beginning a new flush. + * + * @param timeout A maximum duration to wait for previous flushes to finish before giving up on waiting + * @param timeUnit Units of the timeout argument + * @return true if a flush was initiated, false if no data was available + * @throws InterruptedException if this thread was interrupted while waiting for the previous flush to complete + * @throws TimeoutException if the {@code timeout} elapses before previous flushes are complete. + */ +public boolean beginFlush(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException { +if (flushInProgress.tryAcqui
[jira] [Created] (KAFKA-14721) Kafka listener uses wrong login class
Daniel Urban created KAFKA-14721: Summary: Kafka listener uses wrong login class Key: KAFKA-14721 URL: https://issues.apache.org/jira/browse/KAFKA-14721 Project: Kafka Issue Type: Bug Affects Versions: 3.1.2 Reporter: Daniel Urban When trying to configure a single SASL_SSL listener with GSSAPI, Scram and OAuth support, we encounter an error at startup: {code:java} 2023-02-15 13:26:04,250 ERROR kafka.server.KafkaServer: [main]: [KafkaServer id=104] Fatal error during KafkaServer startup. Prepare to shutdown org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:107) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] at kafka.network.Processor.(SocketServer.scala:861) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at kafka.network.SocketServer.newProcessor(SocketServer.scala:442) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:299) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) ~[scala-library-2.13.10.jar:?] at kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:297) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:262) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:259) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575) ~[scala-library-2.13.10.jar:?] at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573) ~[scala-library-2.13.10.jar:?] at scala.collection.AbstractIterable.foreach(Iterable.scala:933) ~[scala-library-2.13.10.jar:?] at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:259) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at kafka.network.SocketServer.startup(SocketServer.scala:131) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at kafka.server.KafkaServer.startup(KafkaServer.scala:310) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at kafka.Kafka$.main(Kafka.scala:109) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at com.cloudera.kafka.wrap.Kafka$.$anonfun$main$1(Kafka.scala:107) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at com.cloudera.kafka.wrap.Kafka$.$anonfun$main$1$adapted(Kafka.scala:107) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at com.cloudera.kafka.wrap.Kafka$.runMain(Kafka.scala:118) [kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at com.cloudera.kafka.wrap.Kafka$.main(Kafka.scala:110) [kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at com.cloudera.kafka.wrap.Kafka.main(Kafka.scala) [kafka_2.13-3.1.2.7.1.9.0-15.jar:?] Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:309) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:92) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] at org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:61) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] ... 21 more{code} Using the following configs in a Kafka broker: jaas configuration file: {code:java} KafkaServer { com.sun.security.auth.module.Krb5LoginModule required doNotPrompt=true useKeyTab=true storeKey=true serviceName="kafka" keyTab="/var/KAFKA_BROKER/kafka.keytab" principal="kafka/hgiovr@SITE"; org.apache.kafka.common.security.scram.ScramLoginModule required; };{code} and the following properties: {code:java} listener.name.sasl_ssl.sasl.enabled.mechanisms=GSSAPI,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER listener.name.sasl_ssl.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuth
[GitHub] [kafka] dejan2609 commented on pull request #13205: KAFKA-14680: gradle version upgrade 7 -->> 8
dejan2609 commented on PR #13205: URL: https://github.com/apache/kafka/pull/13205#issuecomment-1431760212 At the moment spotless is used via Jenkins CI server :arrow_right: Jenkinsfile 'spotlessScalaCheck' task execution: https://github.com/apache/kafka/blob/3.4.0/Jenkinsfile#L23 My suggestion is to: - remove 'spotlessScalaCheck' out of Jenkinsfile - put some comments into build.gradle that would prevent others from changing anything related to spotless until Kafka 4.0 comes into play (i.e. after Java 8 is dropped): - plugin definition: https://github.com/apache/kafka/blob/3.4.0/build.gradle#L33 - plugin configuration: https://github.com/apache/kafka/blob/3.4.0/build.gradle#L47 Note: this PR already removes task [dependency](https://github.com/apache/kafka/blob/3.4.0/build.gradle#L2084). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #13025: KAFKA-14299: Fix pause and resume with state updater
lucasbru commented on code in PR #13025: URL: https://github.com/apache/kafka/pull/13025#discussion_r1107498812 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java: ## @@ -190,7 +190,7 @@ public void clearTaskTimeout() { @Override public boolean commitNeeded() { -throw new UnsupportedOperationException("This task is read-only"); +return task.commitNeeded(); Review Comment: ``` 1) introduce a TaskManager.allProcessingTasks, 2) depending on the updater enabled flag, let maybeCommit call either this one or allTasks ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru commented on a diff in pull request #13025: KAFKA-14299: Fix pause and resume with state updater
lucasbru commented on code in PR #13025: URL: https://github.com/apache/kafka/pull/13025#discussion_r1107498812 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java: ## @@ -190,7 +190,7 @@ public void clearTaskTimeout() { @Override public boolean commitNeeded() { -throw new UnsupportedOperationException("This task is read-only"); +return task.commitNeeded(); Review Comment: > 1) introduce a TaskManager.allProcessingTasks, 2) depending on the updater enabled flag, let maybeCommit call either this one or allTasks let's do :this: I will update the PR tomorrow first thing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14722) Make BooleanSerde public
Matthias J. Sax created KAFKA-14722: --- Summary: Make BooleanSerde public Key: KAFKA-14722 URL: https://issues.apache.org/jira/browse/KAFKA-14722 Project: Kafka Issue Type: Improvement Components: streams Reporter: Matthias J. Sax We introduce a "BooleanSerde via [https://github.com/apache/kafka/pull/13249] as internal class. We could make it public. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mjsax commented on a diff in pull request #13249: KAFKA-14491: [8/N] Add serdes for ValueAndTimestamp with null value
mjsax commented on code in PR #13249: URL: https://github.com/apache/kafka/pull/13249#discussion_r1107522774 ## streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampSerde.java: ## @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static java.util.Objects.requireNonNull; + +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.kstream.internals.WrappingNullableSerde; +import org.apache.kafka.streams.state.ValueAndTimestamp; + +/** + * Similar to {@link ValueAndTimestampSerde} but this serde additionally supports (de)serializing + * {@link ValueAndTimestamp} instances for which the {@code value} is {@code null}. + * + * The serialized format is: + * + * + + + * + * where the boolean is needed in order to distinguish between null and empty values (i.e., between + * tombstones and {@code byte[0]} values). + */ +public class NullableValueAndTimestampSerde extends WrappingNullableSerde, Void, V> { +public NullableValueAndTimestampSerde(final Serde valueSerde) { +super( +new NullableValueAndTimestampSerializer<>(requireNonNull(valueSerde).serializer()), +new NullableValueAndTimestampDeserializer<>(requireNonNull(valueSerde).deserializer()) +); +} + +static final class BooleanSerde { +private static final byte TRUE = 0x01; +private static final byte FALSE = 0x00; + +static class BooleanSerializer implements Serializer { +@Override +public byte[] serialize(final String topic, final Boolean data) { +if (data == null) { +// actually want to return null here but spotbugs won't allow deserialization so Review Comment: Not sure if I can follow? We should return `null` and just make sure spotbug does not mess with us. What does it complain about? ## streams/src/main/java/org/apache/kafka/streams/state/internals/NullableValueAndTimestampDeserializer.java: ## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer; + +import java.util.Map; +import java.util.Objects; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.internals.NullableValueAndTimestampSerde.BooleanSerde.BooleanDeserializer; + +/** + * See {@link NullableValueAndTimestampSerde}. + */ +public class NullableValueAndTimestampDeserializer implements WrappingNullableDeserializer, Void, V> { +public final Deserializer valueDeserializer; +private final Deserializer timestampDeserializer; +private final Deserializer booleanDeserializer; + +NullableValueAndTimestampDeserializer(final Deserializer valueDeserializer) { +this.valueDeserializer = Objects.
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores
vcrfxia commented on code in PR #13243: URL: https://github.com/apache/kafka/pull/13243#discussion_r1107567122 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -283,7 +293,12 @@ public void init(final StateStoreContext context, final StateStore root) { // VisibleForTesting void restoreBatch(final Collection> records) { -// advance stream time to the max timestamp in the batch +// copy the observed stream time, for use in deciding whether to drop records during restore, +// when records have exceeded the store's grace period. +long streamTimeForRestore = observedStreamTime; Review Comment: > I guess the question is, what is the value of `observedStreamTime` when we start the restore? Are you saying it's `-1` and we basically "replay" `observedStreamTime` during restore? Yes, that's exactly right. `observedStreamTime` is tracked locally per store. It is initialized to `-1` and only updated on `put()` or during restore. (This is the same as the existing behavior for window stores today.) > Maybe best to update some variable names? Are you proposing that `doPut()` takes stream time as a parameter, so that during normal `put()` operation we pass `observedStreamTime` and during restore we pass `endOfBatchStreamTime`, which means we can rename `streamTimeForRestore` to be `observedStreamTime` instead? This SGTM, just want to check whether that's also what you have in mind, since we removed a number of parameters from `doPut()` in a previous PR revision in order to keep the parameter list small. > I guess follow up work (independent for this KIP) might be, to actually make use of KS runtime streamTime instead of tracking inside the store, and thus won't need `observedStreamTime` any longer, as we could look ahead to the "end-of-restore stream-time" (not just "end-of batch"). What's the scope of the "streamTime" which is tracked by the KS runtime? Is it per-task? Per-processor? Global? I'm wondering how this would work in situations with multiple partitions, or with multiple processors where some processors are expected to see new data earlier than other (downstream) processors. I guess we'd also need to implement the change from your other comment about not writing records which are expired (based on grace period) into the changelog topic first before we can make this change, otherwise we would not have a way to determine during restore whether records are expired or not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13192: KAFKA-14675: Extract metadata-related tasks from Fetcher into MetadataFetcher 1/4
guozhangwang commented on code in PR #13192: URL: https://github.com/apache/kafka/pull/13192#discussion_r1107569600 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/MetadataFetcher.java: ## @@ -0,0 +1,805 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.Metadata; +import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.StaleMetadataException; +import org.apache.kafka.clients.consumer.LogTruncationException; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.OffsetForEpochResult; +import org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPosition; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.IsolationLevel; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidTopicException; +import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion; +import org.apache.kafka.common.message.ListOffsetsRequestData.ListOffsetsPartition; +import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse; +import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.common.requests.ListOffsetsResponse; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +public class MetadataFetcher { + +private final Logger log; +private final ConsumerMetadata metadata; +private final SubscriptionState subscriptions; +private final ConsumerNetworkClient client; +private final Time time; +private final long retryBackoffMs; +private final long requestTimeoutMs; +private final IsolationLevel isolationLevel; +private final AtomicReference cachedListOffsetsException = new AtomicReference<>(); +private final AtomicReference cachedOffsetForLeaderException = new AtomicReference<>(); +private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient; +private final ApiVersions apiVersions; +private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1); + +public MetadataFetcher(LogContext logContext, + ConsumerNetworkClient client, + ConsumerMetadata metadata, + SubscriptionState subscriptions, + Time time, + long retryBackoffMs, + long requestTimeoutMs, + IsolationLevel isolationLevel, + ApiVersions apiVersions) { +this.log = logContext.logger(getClass()); +this.time = time; +this.cli
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1107587497 ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -101,15 +130,61 @@ public String toString() { public AddPartitionsToTxnRequest(final AddPartitionsToTxnRequestData data, short version) { super(ApiKeys.ADD_PARTITIONS_TO_TXN, version); this.data = data; +this.version = version; } - + +// Only used for versions < 4 public List partitions() { if (cachedPartitions != null) { return cachedPartitions; } cachedPartitions = Builder.getPartitions(data); return cachedPartitions; } + +private List partitionsForTransaction(String transaction) { +if (cachedPartitionsByTransaction == null) { +cachedPartitionsByTransaction = new HashMap<>(); +} + +return cachedPartitionsByTransaction.computeIfAbsent(transaction, txn -> { +List partitions = new ArrayList<>(); +for (AddPartitionsToTxnTopic topicCollection : data.transactions().find(txn).topics()) { +for (Integer partition : topicCollection.partitions()) { +partitions.add(new TopicPartition(topicCollection.name(), partition)); +} +} +return partitions; +}); +} + +public Map> partitionsByTransaction() { +if (cachedPartitionsByTransaction != null && cachedPartitionsByTransaction.size() == data.transactions().size()) { Review Comment: I'm not sure how that would happen -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1107589437 ## core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala: ## @@ -352,7 +353,12 @@ class TransactionCoordinator(txnConfig: TransactionConfig, // this is an optimization: if the partitions are already in the metadata reply OK immediately Left(Errors.NONE) } else { - Right(coordinatorEpoch, txnMetadata.prepareAddPartitions(partitions.toSet, time.milliseconds())) + // If verifyOnly, we should have returned in the step above. If we didn't the partitions are not present in the transaction. Review Comment: yes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1107589884 ## clients/src/test/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponseTest.java: ## @@ -84,16 +88,59 @@ public void testParse() { topicCollection.add(topicResult); -AddPartitionsToTxnResponseData data = new AddPartitionsToTxnResponseData() - .setResults(topicCollection) - .setThrottleTimeMs(throttleTimeMs); -AddPartitionsToTxnResponse response = new AddPartitionsToTxnResponse(data); - for (short version : ApiKeys.ADD_PARTITIONS_TO_TXN.allVersions()) { Review Comment: I thought maybe we didn't want to redo the top steps every time but sure i can change it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1107590295 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -2383,68 +2384,101 @@ class KafkaApis(val requestChannel: RequestChannel, if (config.interBrokerProtocolVersion.isLessThan(version)) throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${config.interBrokerProtocolVersion.version} is less than the required version: ${version.version}") } - - def handleAddPartitionToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { + def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { ensureInterBrokerVersion(IBP_0_11_0_IV0) -val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest] -val transactionalId = addPartitionsToTxnRequest.data.transactionalId -val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala -if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => -addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)) -else { - val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() - val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() - val authorizedPartitions = mutable.Set[TopicPartition]() - - val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC, -partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic) - for (topicPartition <- partitionsToAdd) { -if (!authorizedTopics.contains(topicPartition.topic)) - unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED -else if (!metadataCache.contains(topicPartition)) - nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION -else - authorizedPartitions.add(topicPartition) +val lock = new Object +val addPartitionsToTxnRequest = if (request.context.apiVersion() < 4) request.body[AddPartitionsToTxnRequest].normalizeRequest() else request.body[AddPartitionsToTxnRequest] +val version = addPartitionsToTxnRequest.version +val responses = new AddPartitionsToTxnResultCollection() +val partitionsByTransaction = addPartitionsToTxnRequest.partitionsByTransaction() + +// Newer versions of the request should only come from other brokers. +if (version >= 4) authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) + +// V4 requests introduced batches of transactions. We need all transactions to be handled before sending the +// response so there are a few differences in handling errors and sending responses. +def createResponse(requestThrottleMs: Int): AbstractResponse = { + if (version < 4) { +// There will only be one response in data. Add it to the response data object. +val data = new AddPartitionsToTxnResponseData() +responses.forEach(result => { + data.setResults(result.topicResults()) + data.setThrottleTimeMs(requestThrottleMs) +}) +new AddPartitionsToTxnResponse(data) + } else { +new AddPartitionsToTxnResponse(new AddPartitionsToTxnResponseData().setThrottleTimeMs(requestThrottleMs).setResultsByTransaction(responses)) } +} - if (unauthorizedTopicErrors.nonEmpty || nonExistingTopicErrors.nonEmpty) { -// Any failed partition check causes the entire request to fail. We send the appropriate error codes for the -// partitions which failed, and an 'OPERATION_NOT_ATTEMPTED' error code for the partitions which succeeded -// the authorization check to indicate that they were not added to the transaction. -val partitionErrors = unauthorizedTopicErrors ++ nonExistingTopicErrors ++ - authorizedPartitions.map(_ -> Errors.OPERATION_NOT_ATTEMPTED) -requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new AddPartitionsToTxnResponse(requestThrottleMs, partitionErrors.asJava)) +val txns = addPartitionsToTxnRequest.data.transactions +def maybeSendResponse(): Unit = { + lock synchronized { +if (responses.size() == txns.size()) { + requestHelper.sendResponseMaybeThrottle(request, createResponse) Review Comment: no -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1107590667 ## core/src/main/scala/kafka/server/KafkaApis.scala: ## @@ -2383,68 +2384,101 @@ class KafkaApis(val requestChannel: RequestChannel, if (config.interBrokerProtocolVersion.isLessThan(version)) throw new UnsupportedVersionException(s"inter.broker.protocol.version: ${config.interBrokerProtocolVersion.version} is less than the required version: ${version.version}") } - - def handleAddPartitionToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { + def handleAddPartitionsToTxnRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { ensureInterBrokerVersion(IBP_0_11_0_IV0) -val addPartitionsToTxnRequest = request.body[AddPartitionsToTxnRequest] -val transactionalId = addPartitionsToTxnRequest.data.transactionalId -val partitionsToAdd = addPartitionsToTxnRequest.partitions.asScala -if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, transactionalId)) - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => -addPartitionsToTxnRequest.getErrorResponse(requestThrottleMs, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)) -else { - val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() - val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() - val authorizedPartitions = mutable.Set[TopicPartition]() - - val authorizedTopics = authHelper.filterByAuthorized(request.context, WRITE, TOPIC, -partitionsToAdd.filterNot(tp => Topic.isInternal(tp.topic)))(_.topic) - for (topicPartition <- partitionsToAdd) { -if (!authorizedTopics.contains(topicPartition.topic)) - unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED -else if (!metadataCache.contains(topicPartition)) - nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION -else - authorizedPartitions.add(topicPartition) +val lock = new Object Review Comment: i can do that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1107592032 ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -101,15 +130,61 @@ public String toString() { public AddPartitionsToTxnRequest(final AddPartitionsToTxnRequestData data, short version) { super(ApiKeys.ADD_PARTITIONS_TO_TXN, version); this.data = data; +this.version = version; } - + +// Only used for versions < 4 public List partitions() { if (cachedPartitions != null) { return cachedPartitions; } cachedPartitions = Builder.getPartitions(data); return cachedPartitions; } + +private List partitionsForTransaction(String transaction) { Review Comment: I'm not sure I understand the question. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14723) Do not write expired store records to changelog
Victoria Xia created KAFKA-14723: Summary: Do not write expired store records to changelog Key: KAFKA-14723 URL: https://issues.apache.org/jira/browse/KAFKA-14723 Project: Kafka Issue Type: Improvement Components: streams Reporter: Victoria Xia Window stores and versioned stores both have concepts of "retention" and "expiration." Records which are expired are not written to the store, e.g., [this example|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java#L265-L266] for segments stores. However, these expired records are still written to the changelog topic, in the case of persistent stores. This does not cause any problems because the records are once again omitted from the store during restore, but it is inefficient. It'd be good to avoid writing expired records to the changelog topic in the first place. Another benefit is that doing so would allow us to simplify the restoration code for versioned stores (see [relevant discussion|https://github.com/apache/kafka/pull/13243#discussion_r1106568364]). The reason expired records are still written to the changelog topic is because the whether records are expired or not is only tracked at the innermost store layer, and not any of the outer store layers such as the changelogging layer. The innermost store layer keeps its own `observedStreamTime` which is advanced on calls to put() and during restoration, and uses this variable to determine when a record is expired. Because the return type from put() is void, the changelogging layer has no way to tell whether the inner store's put() actually put the record or dropped it as expired, and always writes to the changelog topic regardless. In order to avoid this, we could: * update the put() interface to return a boolean indicating whether the record was actually put or not, or * move the logic for determining when a record is expired into an outer store layer, or * reorder/restructure the wrapped store layers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14723) Do not write expired store records to changelog
[ https://issues.apache.org/jira/browse/KAFKA-14723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Victoria Xia updated KAFKA-14723: - Description: Window stores and versioned stores both have concepts of "retention" and "expiration." Records which are expired are not written to the store, e.g., [this example|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java#L265-L266] for segments stores. However, these expired records are still written to the changelog topic, in the case of persistent stores. This does not cause any problems because the records are once again omitted from the store during restore, but it is inefficient. It'd be good to avoid writing expired records to the changelog topic in the first place. Another benefit is that doing so would allow us to simplify the restoration code for versioned stores (see [relevant discussion|https://github.com/apache/kafka/pull/13243#discussion_r1106568364]). The reason expired records are still written to the changelog topic is because whether the records are expired or not is only tracked at the innermost store layer, and not any of the outer store layers such as the changelogging layer. The innermost store layer keeps its own `observedStreamTime` which is advanced on calls to put() and during restoration, and uses this variable to determine when a record is expired. Because the return type from put() is void, the changelogging layer has no way to tell whether the inner store's put() actually put the record or dropped it as expired, and always writes to the changelog topic regardless. In order to avoid this, we could: * update the put() interface to return a boolean indicating whether the record was actually put or not, or * move the logic for determining when a record is expired into an outer store layer, or * reorder/restructure the wrapped store layers. was: Window stores and versioned stores both have concepts of "retention" and "expiration." Records which are expired are not written to the store, e.g., [this example|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java#L265-L266] for segments stores. However, these expired records are still written to the changelog topic, in the case of persistent stores. This does not cause any problems because the records are once again omitted from the store during restore, but it is inefficient. It'd be good to avoid writing expired records to the changelog topic in the first place. Another benefit is that doing so would allow us to simplify the restoration code for versioned stores (see [relevant discussion|https://github.com/apache/kafka/pull/13243#discussion_r1106568364]). The reason expired records are still written to the changelog topic is because the whether records are expired or not is only tracked at the innermost store layer, and not any of the outer store layers such as the changelogging layer. The innermost store layer keeps its own `observedStreamTime` which is advanced on calls to put() and during restoration, and uses this variable to determine when a record is expired. Because the return type from put() is void, the changelogging layer has no way to tell whether the inner store's put() actually put the record or dropped it as expired, and always writes to the changelog topic regardless. In order to avoid this, we could: * update the put() interface to return a boolean indicating whether the record was actually put or not, or * move the logic for determining when a record is expired into an outer store layer, or * reorder/restructure the wrapped store layers. > Do not write expired store records to changelog > --- > > Key: KAFKA-14723 > URL: https://issues.apache.org/jira/browse/KAFKA-14723 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Victoria Xia >Priority: Major > > Window stores and versioned stores both have concepts of "retention" and > "expiration." Records which are expired are not written to the store, e.g., > [this > example|https://github.com/apache/kafka/blob/b9754747d6eaa029c4bb69b073d749ff8df15908/streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java#L265-L266] > for segments stores. However, these expired records are still written to the > changelog topic, in the case of persistent stores. This does not cause any > problems because the records are once again omitted from the store during > restore, but it is inefficient. It'd be good to avoid writing expired records > to the changelog topic in the
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores
vcrfxia commented on code in PR #13243: URL: https://github.com/apache/kafka/pull/13243#discussion_r1107595402 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -297,6 +312,12 @@ void restoreBatch(final Collection> records) { // records into memory. how high this memory amplification will be is very much dependent // on the specific workload and the value of the "segment interval" parameter. for (final ConsumerRecord record : records) { +if (record.timestamp() < streamTimeForRestore - gracePeriod) { +// record is older than grace period and was therefore never written to the store Review Comment: Sounds good. Here's the ticket: https://issues.apache.org/jira/browse/KAFKA-14723 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14274) Fetcher refactor—split Fetcher into Fetcher and MetadataFetcher
[ https://issues.apache.org/jira/browse/KAFKA-14274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14274: -- Summary: Fetcher refactor—split Fetcher into Fetcher and MetadataFetcher (was: Implement fetching logic) > Fetcher refactor—split Fetcher into Fetcher and MetadataFetcher > --- > > Key: KAFKA-14274 > URL: https://issues.apache.org/jira/browse/KAFKA-14274 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > > The fetch request and fetch processing should happen asynchronously. More > specifically, we have the background thread to send fetch requests > autonomously and relay the response back to the polling thread. The polling > thread collects these fetch requests and returns the ConsumerRecord. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14274) Split Fetcher into Fetcher and MetadataFetcher
[ https://issues.apache.org/jira/browse/KAFKA-14274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14274: -- Summary: Split Fetcher into Fetcher and MetadataFetcher (was: Fetcher refactor—split Fetcher into Fetcher and MetadataFetcher) > Split Fetcher into Fetcher and MetadataFetcher > -- > > Key: KAFKA-14274 > URL: https://issues.apache.org/jira/browse/KAFKA-14274 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > > The fetch request and fetch processing should happen asynchronously. More > specifically, we have the background thread to send fetch requests > autonomously and relay the response back to the polling thread. The polling > thread collects these fetch requests and returns the ConsumerRecord. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14365) Extract common logic from Fetcher into FetcherUtils
[ https://issues.apache.org/jira/browse/KAFKA-14365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14365: -- Summary: Extract common logic from Fetcher into FetcherUtils (was: Refactor Fetcher to allow different implementations) > Extract common logic from Fetcher into FetcherUtils > --- > > Key: KAFKA-14365 > URL: https://issues.apache.org/jira/browse/KAFKA-14365 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > > The `Fetcher` API is used internally by the `KafkaConsumer` to fetch records > from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored `Fetcher`. In order to keep the > existing `KafkaConsumer` as untouched as possible, this Jira proposes to > refactor the `Fetcher` so as to allow other implementations to use the unit > tests and `KafkaConsumer`. > Here are the proposed steps: > # Extract out the common APIs used by the `KafkaConsumer` and related unit > tests into a new Java interface named `Fetcher` > # Rename the existing `Fetcher` as `KafkaFetcher` (or similar) > # Refactor the `KafkaConsumer`, `FetcherTest`, and other call sites to > primarily use the new `Fetcher` interface > A future pull request will add the new `Fetcher` implementation and tie it in > to the existing `FetcherTest` tests. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores
vcrfxia commented on code in PR #13243: URL: https://github.com/apache/kafka/pull/13243#discussion_r1107613538 ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java: ## @@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() { verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15); } +@Test +public void shouldNotRestoreExpired() { +final List records = new ArrayList<>(); +records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10)); +records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD)); // grace period has not elapsed +records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - GRACE_PERIOD)); // grace period has elapsed, so this record should not be restored + +store.restoreBatch(getChangelogRecords(records)); + +verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10); +verifyGetValueFromStore("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD); +verifyGetNullFromStore("k2"); +} + +@Test +public void shouldRestoreEvenIfRecordWouldBeExpiredByEndOfBatch() { +final List records = new ArrayList<>(); +records.add(new DataRecord("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD)); // this record will be older than grace period by the end of the batch, but should still be restored +records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10)); + +store.restoreBatch(getChangelogRecords(records)); + +verifyGetValueFromStore("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD); +verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10); +} + +@Test +public void shouldAllowZeroHistoryRetention() { +// recreate store with zero history retention +store.close(); +store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 0L, SEGMENT_INTERVAL); +store.init((StateStoreContext) context, store); + +// put, get, and delete +putToStore("k", "v", BASE_TIMESTAMP); +verifyGetValueFromStore("k", "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v", BASE_TIMESTAMP); // query in "future" is allowed + +// update existing record at same timestamp +putToStore("k", "updated", BASE_TIMESTAMP); +verifyGetValueFromStore("k", "updated", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "updated", BASE_TIMESTAMP); + +// put new record version +putToStore("k", "v2", BASE_TIMESTAMP + 2); +verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 2); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 2); + +// query in past (history retention expired) returns null +verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1); + +// put in past (grace period expired) does not update the store Review Comment: Hm, just realized it's not possible to add this case in a meaningful way. Suppose observed stream time is `t` and we put-in-past for an existing key at time `t-1`. We cannot query for the value of the key at time `t-1` because that is outside history retention. And if we query for the latest value of the key, then we'll get the record at time `t` regardless of whether the put at time `t-1` was properly rejected or not. We'd have to query the inner store in order to perform this check, which feels like overkill. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14675) Extract metadata-related tasks from Fetcher into MetadataFetcher
[ https://issues.apache.org/jira/browse/KAFKA-14675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14675: -- Parent: (was: KAFKA-14365) Issue Type: Improvement (was: Sub-task) > Extract metadata-related tasks from Fetcher into MetadataFetcher > > > Key: KAFKA-14675 > URL: https://issues.apache.org/jira/browse/KAFKA-14675 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > Extract from {{Fetcher}} the APIs that are related to metadata operations > into a new class named {{{}MetadataFetcher{}}}. This will allow the > refactoring of {{Fetcher}} and {{MetadataFetcher}} for the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14365) Extract common logic from Fetcher into FetcherUtils
[ https://issues.apache.org/jira/browse/KAFKA-14365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14365: -- Priority: Major (was: Minor) > Extract common logic from Fetcher into FetcherUtils > --- > > Key: KAFKA-14365 > URL: https://issues.apache.org/jira/browse/KAFKA-14365 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > The {{Fetcher}} class is used internally by the `KafkaConsumer` to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. In order to > keep the existing users of {{Fetcher}} as untouched as possible, this Jira > proposes to refactor the {{Fetcher}} by extracting out some common logic into > {{FetcherUtils}} to allow forthcoming implementations to use that common > logic. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14365) Extract common logic from Fetcher into FetcherUtils
[ https://issues.apache.org/jira/browse/KAFKA-14365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14365: -- Description: The {{Fetcher}} class is used internally by the `KafkaConsumer` to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored {{{}Fetcher{}}}. In order to keep the existing users of {{Fetcher}} as untouched as possible, this Jira proposes to refactor the {{Fetcher}} by extracting out some common logic into {{FetcherUtils}} to allow forthcoming implementations to use that common logic. (was: The `Fetcher` API is used internally by the `KafkaConsumer` to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored `Fetcher`. In order to keep the existing `KafkaConsumer` as untouched as possible, this Jira proposes to refactor the `Fetcher` so as to allow other implementations to use the unit tests and `KafkaConsumer`. Here are the proposed steps: # Extract out the common APIs used by the `KafkaConsumer` and related unit tests into a new Java interface named `Fetcher` # Rename the existing `Fetcher` as `KafkaFetcher` (or similar) # Refactor the `KafkaConsumer`, `FetcherTest`, and other call sites to primarily use the new `Fetcher` interface A future pull request will add the new `Fetcher` implementation and tie it in to the existing `FetcherTest` tests.) > Extract common logic from Fetcher into FetcherUtils > --- > > Key: KAFKA-14365 > URL: https://issues.apache.org/jira/browse/KAFKA-14365 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Minor > > The {{Fetcher}} class is used internally by the `KafkaConsumer` to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. In order to > keep the existing users of {{Fetcher}} as untouched as possible, this Jira > proposes to refactor the {{Fetcher}} by extracting out some common logic into > {{FetcherUtils}} to allow forthcoming implementations to use that common > logic. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14365) Extract common logic from Fetcher into FetcherUtils
[ https://issues.apache.org/jira/browse/KAFKA-14365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14365: -- Description: The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored {{{}Fetcher{}}}. In order to keep the existing users of {{Fetcher}} as untouched as possible, this Jira proposes to refactor the {{Fetcher}} by extracting out some common logic into {{FetcherUtils}} to allow forthcoming implementations to use that common logic. (was: The {{Fetcher}} class is used internally by the `KafkaConsumer` to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored {{{}Fetcher{}}}. In order to keep the existing users of {{Fetcher}} as untouched as possible, this Jira proposes to refactor the {{Fetcher}} by extracting out some common logic into {{FetcherUtils}} to allow forthcoming implementations to use that common logic.) > Extract common logic from Fetcher into FetcherUtils > --- > > Key: KAFKA-14365 > URL: https://issues.apache.org/jira/browse/KAFKA-14365 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. In order to > keep the existing users of {{Fetcher}} as untouched as possible, this Jira > proposes to refactor the {{Fetcher}} by extracting out some common logic into > {{FetcherUtils}} to allow forthcoming implementations to use that common > logic. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jeffkbkim commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1107618273 ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -101,15 +130,61 @@ public String toString() { public AddPartitionsToTxnRequest(final AddPartitionsToTxnRequestData data, short version) { super(ApiKeys.ADD_PARTITIONS_TO_TXN, version); this.data = data; +this.version = version; } - + +// Only used for versions < 4 public List partitions() { if (cachedPartitions != null) { return cachedPartitions; } cachedPartitions = Builder.getPartitions(data); return cachedPartitions; } + +private List partitionsForTransaction(String transaction) { Review Comment: sorry, i meant the return value `List`, it doesn't seem to be used -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14365) Extract common logic from Fetcher into FetcherUtils
[ https://issues.apache.org/jira/browse/KAFKA-14365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14365: -- Description: The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored {{{}Fetcher{}}}. This Jira proposes to refactor the {{Fetcher}} by extracting out some common logic into {{FetcherUtils}} to allow forthcoming implementations of fetcher to leverage that common logic. was:The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored {{{}Fetcher{}}}. In order to keep the existing users of {{Fetcher}} as untouched as possible, this Jira proposes to refactor the {{Fetcher}} by extracting out some common logic into {{FetcherUtils}} to allow forthcoming implementations to use that common logic. > Extract common logic from Fetcher into FetcherUtils > --- > > Key: KAFKA-14365 > URL: https://issues.apache.org/jira/browse/KAFKA-14365 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. > This Jira proposes to refactor the {{Fetcher}} by extracting out some common > logic into {{FetcherUtils}} to allow forthcoming implementations of fetcher > to leverage that common logic. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14675) Extract metadata-related tasks from Fetcher into MetadataFetcher
[ https://issues.apache.org/jira/browse/KAFKA-14675?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14675: -- Description: The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored {{{}Fetcher{}}}. This task covers the work to extract from {{Fetcher}} the APIs that are related to metadata operations into a new class named {{{}MetadataFetcher{}}}. This will allow the refactoring of {{Fetcher}} and {{MetadataFetcher}} for the new consumer. was:Extract from {{Fetcher}} the APIs that are related to metadata operations into a new class named {{{}MetadataFetcher{}}}. This will allow the refactoring of {{Fetcher}} and {{MetadataFetcher}} for the new consumer. > Extract metadata-related tasks from Fetcher into MetadataFetcher > > > Key: KAFKA-14675 > URL: https://issues.apache.org/jira/browse/KAFKA-14675 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. > This task covers the work to extract from {{Fetcher}} the APIs that are > related to metadata operations into a new class named > {{{}MetadataFetcher{}}}. This will allow the refactoring of {{Fetcher}} and > {{MetadataFetcher}} for the new consumer. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14365) Extract common logic from Fetcher into FetcherUtils
[ https://issues.apache.org/jira/browse/KAFKA-14365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14365: -- Description: The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored {{{}Fetcher{}}}. This task includes refactoring {{Fetcher}} by extracting out some common logic into {{FetcherUtils}} to allow forthcoming implementations to leverage that common logic. was: The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored {{{}Fetcher{}}}. This Jira proposes to refactor the {{Fetcher}} by extracting out some common logic into {{FetcherUtils}} to allow forthcoming implementations of fetcher to leverage that common logic. > Extract common logic from Fetcher into FetcherUtils > --- > > Key: KAFKA-14365 > URL: https://issues.apache.org/jira/browse/KAFKA-14365 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. > This task includes refactoring {{Fetcher}} by extracting out some common > logic into {{FetcherUtils}} to allow forthcoming implementations to leverage > that common logic. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14274) Split Fetcher into Fetcher and MetadataFetcher
[ https://issues.apache.org/jira/browse/KAFKA-14274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14274: -- Description: The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored {{{}Fetcher{}}}. The fetch request and fetch processing should happen asynchronously. More specifically, we have the background thread to send fetch requests autonomously and relay the response back to the polling thread. The polling thread collects these fetch requests and returns the ConsumerRecord. was:The fetch request and fetch processing should happen asynchronously. More specifically, we have the background thread to send fetch requests autonomously and relay the response back to the polling thread. The polling thread collects these fetch requests and returns the ConsumerRecord. > Split Fetcher into Fetcher and MetadataFetcher > -- > > Key: KAFKA-14274 > URL: https://issues.apache.org/jira/browse/KAFKA-14274 > Project: Kafka > Issue Type: Sub-task > Components: consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > > The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. > > The fetch request and fetch processing should happen asynchronously. More > specifically, we have the background thread to send fetch requests > autonomously and relay the response back to the polling thread. The polling > thread collects these fetch requests and returns the ConsumerRecord. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14274) Split Fetcher into Fetcher and MetadataFetcher
[ https://issues.apache.org/jira/browse/KAFKA-14274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14274: -- Component/s: clients > Split Fetcher into Fetcher and MetadataFetcher > -- > > Key: KAFKA-14274 > URL: https://issues.apache.org/jira/browse/KAFKA-14274 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > > The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. > > The fetch request and fetch processing should happen asynchronously. More > specifically, we have the background thread to send fetch requests > autonomously and relay the response back to the polling thread. The polling > thread collects these fetch requests and returns the ConsumerRecord. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14274) Split Fetcher into Fetcher and MetadataFetcher
[ https://issues.apache.org/jira/browse/KAFKA-14274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14274: -- Description: The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored {{{}Fetcher{}}}. The fetch request and fetch processing should happen asynchronously. More specifically, we have the background thread to send fetch requests autonomously and relay the response back to the foreground thread. The foreground thread collects these fetch responses in the form of {{{}CompletedFetch{}}}, decompresses the data, deserializes the data, and converts each {{Record}} into a {{ConsumerRecord}} for returning to the user. was: The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored {{{}Fetcher{}}}. The fetch request and fetch processing should happen asynchronously. More specifically, we have the background thread to send fetch requests autonomously and relay the response back to the polling thread. The polling thread collects these fetch requests and returns the ConsumerRecord. > Split Fetcher into Fetcher and MetadataFetcher > -- > > Key: KAFKA-14274 > URL: https://issues.apache.org/jira/browse/KAFKA-14274 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > > The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. > The fetch request and fetch processing should happen asynchronously. More > specifically, we have the background thread to send fetch requests > autonomously and relay the response back to the foreground thread. The > foreground thread collects these fetch responses in the form of > {{{}CompletedFetch{}}}, decompresses the data, deserializes the data, and > converts each {{Record}} into a {{ConsumerRecord}} for returning to the user. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14274) Introduce FetchRequestManager
[ https://issues.apache.org/jira/browse/KAFKA-14274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14274: -- Summary: Introduce FetchRequestManager (was: Split Fetcher into Fetcher and MetadataFetcher) > Introduce FetchRequestManager > - > > Key: KAFKA-14274 > URL: https://issues.apache.org/jira/browse/KAFKA-14274 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > > The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. > The fetch request and fetch processing should happen asynchronously. More > specifically, we have the background thread to send fetch requests > autonomously and relay the response back to the foreground thread. The > foreground thread collects these fetch responses in the form of > {{{}CompletedFetch{}}}, decompresses the data, deserializes the data, and > converts each {{Record}} into a {{ConsumerRecord}} for returning to the user. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1107626067 ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -101,15 +130,61 @@ public String toString() { public AddPartitionsToTxnRequest(final AddPartitionsToTxnRequestData data, short version) { super(ApiKeys.ADD_PARTITIONS_TO_TXN, version); this.data = data; +this.version = version; } - + +// Only used for versions < 4 public List partitions() { if (cachedPartitions != null) { return cachedPartitions; } cachedPartitions = Builder.getPartitions(data); return cachedPartitions; } + +private List partitionsForTransaction(String transaction) { Review Comment: ?? are you saying the method isn't used? It's used on line 168 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag
gharris1727 commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1107628693 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -701,43 +745,151 @@ protected void produceMessages(EmbeddedConnectCluster cluster, String topicName, int cnt = 0; for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++) for (int p = 0; p < numPartitions; p++) -cluster.kafka().produce(topicName, p, "key", "value-" + cnt++); +produce(cluster.kafka(), topicName, p, "key", "value-" + cnt++); } - + +/** + * Produce a test record to a Kafka cluster. + * This method allows subclasses to configure and use their own Kafka Producer instead of using the built-in default. + * @param cluster Kafka cluster that should receive the record + * @param topic Topic to send the record to, non-null + * @param partition Partition to send the record to, maybe null. + * @param key Kafka key for the record + * @param value Kafka value for the record + */ +protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer partition, String key, String value) { +cluster.produce(topic, partition, key, value); +} + +protected static Map waitForCheckpointOnAllPartitions( +MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName +) throws InterruptedException { +AtomicReference> ret = new AtomicReference<>(); +waitForCondition( +() -> { +Map offsets = client.remoteConsumerOffsets( +consumerGroupName, remoteClusterAlias, Duration.ofMillis(3000)); +for (int i = 0; i < NUM_PARTITIONS; i++) { +if (!offsets.containsKey(new TopicPartition(topicName, i))) { +log.info("Checkpoint is missing for {}: {}-{}", consumerGroupName, topicName, i); +return false; +} +} +ret.set(offsets); +return true; +}, +CHECKPOINT_DURATION_MS, +String.format( +"Offsets for consumer group %s not translated from %s for topic %s", +consumerGroupName, +remoteClusterAlias, +topicName +) +); +return ret.get(); +} + /* * given consumer group, topics and expected number of records, make sure the consumer group * offsets are eventually synced to the expected offset numbers */ -protected static void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect, -Consumer consumer, List topics, String consumerGroupId, int numRecords, boolean exactOffsetTranslation) -throws InterruptedException { +protected static void waitForConsumerGroupFullSync( +EmbeddedConnectCluster connect, List topics, String consumerGroupId, int numRecords, boolean exactOffsetTranslation +) throws InterruptedException { try (Admin adminClient = connect.kafka().createAdminClient()) { -List tps = new ArrayList<>(NUM_PARTITIONS * topics.size()); +Map tps = new HashMap<>(NUM_PARTITIONS * topics.size()); for (int partitionIndex = 0; partitionIndex < NUM_PARTITIONS; partitionIndex++) { for (String topic : topics) { -tps.add(new TopicPartition(topic, partitionIndex)); +tps.put(new TopicPartition(topic, partitionIndex), OffsetSpec.latest()); } } long expectedTotalOffsets = numRecords * topics.size(); waitForCondition(() -> { Map consumerGroupOffsets = adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get(); -long consumerGroupOffsetTotal = consumerGroupOffsets.values().stream() +long totalConsumerGroupOffsets = consumerGroupOffsets.values().stream() .mapToLong(OffsetAndMetadata::offset).sum(); -Map offsets = consumer.endOffsets(tps, CONSUMER_POLL_TIMEOUT_MS); -long totalOffsets = offsets.values().stream().mapToLong(l -> l).sum(); - +Map endOffsets = +adminClient.listOffsets(tps).all().get(); +long totalEndOffsets = endOffsets.values().stream() + .mapToLong(ListOffsetsResult.ListOffsetsResultInfo::offset).sum(); + +for (TopicPartition tp : endOffsets.keySet()) { +if (consumerGroupOffsets.containsKey(tp)) { +assertTrue(consumerGr
[jira] [Updated] (KAFKA-14274) Introduce FetchRequestManager
[ https://issues.apache.org/jira/browse/KAFKA-14274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14274: -- Description: The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored {{{}Fetcher{}}}. This task is to introduce a new class named {{FetchRequestManager}} that will be responsible for: # Formatting fetch requests to the background thread # Configuring the callback on fetch responses for the background thread The response handler will collect the fetch responses from the broker and create {{{}CompletedFetch{}}}, instances, much as is done in {{{}Fetcher{}}}. The newly introduced {{FetchUtils}} will be used for both {{FetchRequestManager}} and {{Fetcher}} to keep the logic as reusable as possible. The foreground logic will decompress the data into a {{{}Record{}}}, which will then be deserialized into a {{ConsumerRecord}} for returning to the user. was: The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored {{{}Fetcher{}}}. The fetch request and fetch processing should happen asynchronously. More specifically, we have the background thread to send fetch requests autonomously and relay the response back to the foreground thread. The foreground thread collects these fetch responses in the form of {{{}CompletedFetch{}}}, decompresses the data, deserializes the data, and converts each {{Record}} into a {{ConsumerRecord}} for returning to the user. > Introduce FetchRequestManager > - > > Key: KAFKA-14274 > URL: https://issues.apache.org/jira/browse/KAFKA-14274 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > > The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. > This task is to introduce a new class named {{FetchRequestManager}} that will > be responsible for: > # Formatting fetch requests to the background thread > # Configuring the callback on fetch responses for the background thread > The response handler will collect the fetch responses from the broker and > create {{{}CompletedFetch{}}}, instances, much as is done in {{{}Fetcher{}}}. > The newly introduced {{FetchUtils}} will be used for both > {{FetchRequestManager}} and {{Fetcher}} to keep the logic as reusable as > possible. > The foreground logic will decompress the data into a {{{}Record{}}}, which > will then be deserialized into a {{ConsumerRecord}} for returning to the user. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag
gharris1727 commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1107634292 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java: ## @@ -30,7 +30,7 @@ public class OffsetSyncStoreTest { static class FakeOffsetSyncStore extends OffsetSyncStore { FakeOffsetSyncStore() { -super(null, null); +super(); Review Comment: 1. I've added a null-check to the real start() method which allows us to call the real start method on the fake offset store. 2. Added 3. I removed the assertion messages as they were just visual noise, the comments seemed easy enough to interpret. 4. Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14274) Introduce FetchRequestManager to integrate fetch into new consumer threading refactor
[ https://issues.apache.org/jira/browse/KAFKA-14274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14274: -- Summary: Introduce FetchRequestManager to integrate fetch into new consumer threading refactor (was: Introduce FetchRequestManager) > Introduce FetchRequestManager to integrate fetch into new consumer threading > refactor > - > > Key: KAFKA-14274 > URL: https://issues.apache.org/jira/browse/KAFKA-14274 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > > The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. > This task is to introduce a new class named {{FetchRequestManager}} that will > be responsible for: > # Formatting fetch requests to the background thread > # Configuring the callback on fetch responses for the background thread > The response handler will collect the fetch responses from the broker and > create {{{}CompletedFetch{}}}, instances, much as is done in {{{}Fetcher{}}}. > The newly introduced {{FetchUtils}} will be used for both > {{FetchRequestManager}} and {{Fetcher}} to keep the logic as reusable as > possible. > The foreground logic will decompress the data into a {{{}Record{}}}, which > will then be deserialized into a {{ConsumerRecord}} for returning to the user. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14274) Introduce FetchRequestManager to integrate fetch into new consumer threading refactor
[ https://issues.apache.org/jira/browse/KAFKA-14274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-14274: -- Parent: (was: KAFKA-14246) Issue Type: Improvement (was: Sub-task) > Introduce FetchRequestManager to integrate fetch into new consumer threading > refactor > - > > Key: KAFKA-14274 > URL: https://issues.apache.org/jira/browse/KAFKA-14274 > Project: Kafka > Issue Type: Improvement > Components: clients, consumer >Reporter: Philip Nee >Assignee: Kirk True >Priority: Major > > The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch > records from the brokers. There is ongoing work to create a new consumer > implementation with a significantly refactored threading model. The threading > refactor work requires a similarly refactored {{{}Fetcher{}}}. > This task is to introduce a new class named {{FetchRequestManager}} that will > be responsible for: > # Formatting fetch requests to the background thread > # Configuring the callback on fetch responses for the background thread > The response handler will collect the fetch responses from the broker and > create {{{}CompletedFetch{}}}, instances, much as is done in {{{}Fetcher{}}}. > The newly introduced {{FetchUtils}} will be used for both > {{FetchRequestManager}} and {{Fetcher}} to keep the logic as reusable as > possible. > The foreground logic will decompress the data into a {{{}Record{}}}, which > will then be deserialized into a {{ConsumerRecord}} for returning to the user. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14724) Port tests in FetcherTest to FetchRequestManagerTest
Kirk True created KAFKA-14724: - Summary: Port tests in FetcherTest to FetchRequestManagerTest Key: KAFKA-14724 URL: https://issues.apache.org/jira/browse/KAFKA-14724 Project: Kafka Issue Type: Improvement Reporter: Kirk True Assignee: Kirk True The {{Fetcher}} class is used internally by the {{KafkaConsumer}} to fetch records from the brokers. There is ongoing work to create a new consumer implementation with a significantly refactored threading model. The threading refactor work requires a similarly refactored {{{}Fetcher{}}}. This task involves copying the relevant tests from {{FetcherTest}} and modifying them to fit a new unit test named {{{}FetchRequestManagerTest{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13178: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag
gharris1727 commented on code in PR #13178: URL: https://github.com/apache/kafka/pull/13178#discussion_r1107647929 ## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ## @@ -578,7 +551,7 @@ public void testNoCheckpointsIfNoRecordsAreMirrored() throws InterruptedExceptio Map translatedOffsets = backupClient.remoteConsumerOffsets( consumerGroupName, PRIMARY_CLUSTER_ALIAS, Duration.ofSeconds(30L)); return translatedOffsets.containsKey(remoteTopicPartition(tp1, PRIMARY_CLUSTER_ALIAS)) && - translatedOffsets.containsKey(remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS)); + !translatedOffsets.containsKey(remoteTopicPartition(tp2, PRIMARY_CLUSTER_ALIAS)); Review Comment: I added a read-to-end-and-commit-offsets that makes this topic able to be checkpointed, and reverts this change to the wait condition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jeffkbkim commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1107649386 ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -101,15 +130,61 @@ public String toString() { public AddPartitionsToTxnRequest(final AddPartitionsToTxnRequestData data, short version) { super(ApiKeys.ADD_PARTITIONS_TO_TXN, version); this.data = data; +this.version = version; } - + +// Only used for versions < 4 public List partitions() { if (cachedPartitions != null) { return cachedPartitions; } cachedPartitions = Builder.getPartitions(data); return cachedPartitions; } + +private List partitionsForTransaction(String transaction) { Review Comment: i'm saying the method can be of type void, no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jeffkbkim commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1107649386 ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -101,15 +130,61 @@ public String toString() { public AddPartitionsToTxnRequest(final AddPartitionsToTxnRequestData data, short version) { super(ApiKeys.ADD_PARTITIONS_TO_TXN, version); this.data = data; +this.version = version; } - + +// Only used for versions < 4 public List partitions() { if (cachedPartitions != null) { return cachedPartitions; } cachedPartitions = Builder.getPartitions(data); return cachedPartitions; } + +private List partitionsForTransaction(String transaction) { Review Comment: the method can be of type void, no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
hachikuji commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1107606857 ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -35,21 +44,43 @@ public class AddPartitionsToTxnRequest extends AbstractRequest { private final AddPartitionsToTxnRequestData data; private List cachedPartitions = null; + +private Map> cachedPartitionsByTransaction = null; + +private final short version; public static class Builder extends AbstractRequest.Builder { public final AddPartitionsToTxnRequestData data; +public final boolean isClientRequest; -public Builder(final AddPartitionsToTxnRequestData data) { +// Only used for versions < 4 +public Builder(String transactionalId, + long producerId, + short producerEpoch, + List partitions) { super(ApiKeys.ADD_PARTITIONS_TO_TXN); -this.data = data; +this.isClientRequest = true; + +AddPartitionsToTxnTopicCollection topics = compileTopics(partitions); + +this.data = new AddPartitionsToTxnRequestData() +.setTransactionalId(transactionalId) +.setProducerId(producerId) +.setProducerEpoch(producerEpoch) +.setTopics(topics); } -public Builder(final String transactionalId, - final long producerId, - final short producerEpoch, - final List partitions) { +public Builder(AddPartitionsToTxnTransactionCollection transactions, + boolean verifyOnly) { super(ApiKeys.ADD_PARTITIONS_TO_TXN); +this.isClientRequest = false; +this.data = new AddPartitionsToTxnRequestData() +.setTransactions(transactions) +.setVerifyOnly(verifyOnly); +} + +private AddPartitionsToTxnTopicCollection compileTopics(final List partitions) { Review Comment: nit: how about `buildTxnTopicCollection`? ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -35,21 +44,43 @@ public class AddPartitionsToTxnRequest extends AbstractRequest { private final AddPartitionsToTxnRequestData data; private List cachedPartitions = null; + +private Map> cachedPartitionsByTransaction = null; + +private final short version; public static class Builder extends AbstractRequest.Builder { public final AddPartitionsToTxnRequestData data; +public final boolean isClientRequest; -public Builder(final AddPartitionsToTxnRequestData data) { +// Only used for versions < 4 +public Builder(String transactionalId, + long producerId, + short producerEpoch, + List partitions) { super(ApiKeys.ADD_PARTITIONS_TO_TXN); -this.data = data; +this.isClientRequest = true; + +AddPartitionsToTxnTopicCollection topics = compileTopics(partitions); + +this.data = new AddPartitionsToTxnRequestData() +.setTransactionalId(transactionalId) Review Comment: Does it make sense to set verifyOnly to false explicitly here? ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -66,24 +97,22 @@ public Builder(final String transactionalId, AddPartitionsToTxnTopicCollection topics = new AddPartitionsToTxnTopicCollection(); for (Map.Entry> partitionEntry : partitionMap.entrySet()) { topics.add(new AddPartitionsToTxnTopic() - .setName(partitionEntry.getKey()) - .setPartitions(partitionEntry.getValue())); +.setName(partitionEntry.getKey()) +.setPartitions(partitionEntry.getValue())); } - -this.data = new AddPartitionsToTxnRequestData() -.setTransactionalId(transactionalId) -.setProducerId(producerId) -.setProducerEpoch(producerEpoch) -.setTopics(topics); +return topics; } @Override public AddPartitionsToTxnRequest build(short version) { -return new AddPartitionsToTxnRequest(data, version); +short clampedVersion = (isClientRequest && version > 3) ? 3 : version; Review Comment: It's a little strange to ignore the version. I think another way to do this is to set the `latestAllowedVersion` to 3 in the client builder. That will ensure that the client does not try to use a higher version even if the broker su
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1107662160 ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -101,15 +130,61 @@ public String toString() { public AddPartitionsToTxnRequest(final AddPartitionsToTxnRequestData data, short version) { super(ApiKeys.ADD_PARTITIONS_TO_TXN, version); this.data = data; +this.version = version; } - + +// Only used for versions < 4 public List partitions() { if (cachedPartitions != null) { return cachedPartitions; } cachedPartitions = Builder.getPartitions(data); return cachedPartitions; } + +private List partitionsForTransaction(String transaction) { Review Comment: not as it is written. We return on line 150. I originally thought this could be useful if we just wanted the list of the partitions. but since it is private I can change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on a diff in pull request #13231: KAFKA-14402: Update AddPartitionsToTxn protocol to batch and handle verifyOnly requests
jolshan commented on code in PR #13231: URL: https://github.com/apache/kafka/pull/13231#discussion_r1107662689 ## clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java: ## @@ -66,24 +97,22 @@ public Builder(final String transactionalId, AddPartitionsToTxnTopicCollection topics = new AddPartitionsToTxnTopicCollection(); for (Map.Entry> partitionEntry : partitionMap.entrySet()) { topics.add(new AddPartitionsToTxnTopic() - .setName(partitionEntry.getKey()) - .setPartitions(partitionEntry.getValue())); +.setName(partitionEntry.getKey()) +.setPartitions(partitionEntry.getValue())); } - -this.data = new AddPartitionsToTxnRequestData() -.setTransactionalId(transactionalId) -.setProducerId(producerId) -.setProducerEpoch(producerEpoch) -.setTopics(topics); +return topics; } @Override public AddPartitionsToTxnRequest build(short version) { -return new AddPartitionsToTxnRequest(data, version); +short clampedVersion = (isClientRequest && version > 3) ? 3 : version; Review Comment: Hmm I didn't see such an option in the builder but maybe I'm missing something. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush
gharris1727 commented on code in PR #13208: URL: https://github.com/apache/kafka/pull/13208#discussion_r1107664475 ## connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java: ## @@ -135,38 +135,40 @@ public void testFlushFailureReplacesOffsets() throws Exception { // First time the write fails expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, true, null); writer.offset(OFFSET_KEY, OFFSET_VALUE); -assertTrue(writer.beginFlush()); +assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); verify(callback).onCompletion(eq(EXCEPTION), isNull()); // Second time it succeeds expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, null); -assertTrue(writer.beginFlush()); +assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS); verify(callback).onCompletion(isNull(), isNull()); // Third time it has no data to flush so we won't get past beginFlush() -assertFalse(writer.beginFlush()); +assertFalse(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); } @Test -public void testAlreadyFlushing() { +public void testAlreadyFlushing() throws InterruptedException, TimeoutException { @SuppressWarnings("unchecked") final Callback callback = mock(Callback.class); // Trigger the send, but don't invoke the callback so we'll still be mid-flush CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1); expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, false, allowStoreCompleteCountdown); writer.offset(OFFSET_KEY, OFFSET_VALUE); -assertTrue(writer.beginFlush()); +assertTrue(writer.beginFlush(1000L, TimeUnit.MILLISECONDS)); Review Comment: Oh i understand now, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13208: KAFKA-5756: Wait for concurrent offset flush to complete before starting next flush
gharris1727 commented on code in PR #13208: URL: https://github.com/apache/kafka/pull/13208#discussion_r1107669147 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java: ## @@ -100,23 +104,45 @@ private boolean flushing() { /** * Performs the first step of a flush operation, snapshotting the current state. This does not - * actually initiate the flush with the underlying storage. + * actually initiate the flush with the underlying storage. Ensures that any previous flush operations + * have finished before beginning a new flush. * * @return true if a flush was initiated, false if no data was available + * @throws ConnectException if the previous flush is not complete before this method is called */ -public synchronized boolean beginFlush() { -if (flushing()) { -log.error("Invalid call to OffsetStorageWriter flush() while already flushing, the " +public boolean beginFlush() { +try { +return beginFlush(0, TimeUnit.NANOSECONDS); +} catch (InterruptedException | TimeoutException e) { +log.error("Invalid call to OffsetStorageWriter beginFlush() while already flushing, the " + "framework should not allow this"); throw new ConnectException("OffsetStorageWriter is already flushing"); } +} -if (data.isEmpty()) -return false; - -toFlush = data; -data = new HashMap<>(); -return true; +/** + * Performs the first step of a flush operation, snapshotting the current state. This does not + * actually initiate the flush with the underlying storage. Ensures that any previous flush operations + * have finished before beginning a new flush. + * + * @param timeout A maximum duration to wait for previous flushes to finish before giving up on waiting + * @param timeUnit Units of the timeout argument + * @return true if a flush was initiated, false if no data was available + * @throws InterruptedException if this thread was interrupted while waiting for the previous flush to complete + * @throws TimeoutException if the {@code timeout} elapses before previous flushes are complete. + */ +public boolean beginFlush(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException { +if (flushInProgress.tryAcquire(Math.max(0, timeout), timeUnit)) { +synchronized (this) { +if (data.isEmpty()) +return false; Review Comment: Oh wow that's pretty serious, I added a unit test that targets this release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores
vcrfxia commented on code in PR #13243: URL: https://github.com/apache/kafka/pull/13243#discussion_r1107672610 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -283,7 +293,12 @@ public void init(final StateStoreContext context, final StateStore root) { // VisibleForTesting void restoreBatch(final Collection> records) { -// advance stream time to the max timestamp in the batch +// copy the observed stream time, for use in deciding whether to drop records during restore, +// when records have exceeded the store's grace period. +long streamTimeForRestore = observedStreamTime; Review Comment: > Are you proposing that doPut() takes stream time as a parameter, so that during normal put() operation we pass observedStreamTime and during restore we pass endOfBatchStreamTime, which means we can rename streamTimeForRestore to be observedStreamTime instead? Went ahead and made this update in the latest commit. Can revise if it's not what you had envisioned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores
vcrfxia commented on PR #13243: URL: https://github.com/apache/kafka/pull/13243#issuecomment-1431970157 > One more thought: should we add verification about the "droppedRecordSensor" into all unit tests that drop records? Included this test update in the latest commit. I believe I've addressed/responded to all outstanding comments with the latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #13191: KAFKA-14060: Replace EasyMock and PowerMock with Mockito in AbstractWorkerSourceTaskTest
C0urante commented on code in PR #13191: URL: https://github.com/apache/kafka/pull/13191#discussion_r1107510674 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ## @@ -235,115 +236,100 @@ public void testMetricsGroup() { public void testSendRecordsConvertsData() { createWorkerTask(); -List records = new ArrayList<>(); // Can just use the same record for key and value -records.add(new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)); - -Capture> sent = expectSendRecordAnyTimes(); +List records = Collections.singletonList( +new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD) +); +expectSendRecord(emptyHeaders()); expectTopicCreation(TOPIC); -PowerMock.replayAll(); - workerTask.toSend = records; workerTask.sendRecords(); + +ArgumentCaptor> sent = verifySendRecord(); + assertEquals(SERIALIZED_KEY, sent.getValue().key()); assertEquals(SERIALIZED_RECORD, sent.getValue().value()); -PowerMock.verifyAll(); +verifyTaskGetTopic(); } @Test public void testSendRecordsPropagatesTimestamp() { final Long timestamp = System.currentTimeMillis(); - createWorkerTask(); -List records = Collections.singletonList( -new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp) -); - -Capture> sent = expectSendRecordAnyTimes(); - +expectSendRecord(emptyHeaders()); expectTopicCreation(TOPIC); -PowerMock.replayAll(); - -workerTask.toSend = records; +workerTask.toSend = Collections.singletonList( +new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD, timestamp) +); workerTask.sendRecords(); + +ArgumentCaptor> sent = verifySendRecord(); assertEquals(timestamp, sent.getValue().timestamp()); -PowerMock.verifyAll(); +verifyTaskGetTopic(); } @Test public void testSendRecordsCorruptTimestamp() { final Long timestamp = -3L; createWorkerTask(); -List records = Collections.singletonList( +expectSendRecord(emptyHeaders()); +expectTopicCreation(TOPIC); Review Comment: Why is this added? We're testing a scenario where the task fails on an invalid record timestamp, it should never get to the point of attempting to create a topic. ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java: ## @@ -639,144 +644,112 @@ public void testTopicCreateSucceedsWhenCreateReturnsNewTopicFound() { SourceRecord record1 = new SourceRecord(PARTITION, OFFSET, TOPIC, 1, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); SourceRecord record2 = new SourceRecord(PARTITION, OFFSET, TOPIC, 2, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD); -expectPreliminaryCalls(); - EasyMock.expect(admin.describeTopics(TOPIC)).andReturn(Collections.emptyMap()); +expectSendRecord(emptyHeaders()); -Capture newTopicCapture = EasyMock.newCapture(); - EasyMock.expect(admin.createOrFindTopics(EasyMock.capture(newTopicCapture))).andReturn(createdTopic(TOPIC)); - -expectSendRecord(); -expectSendRecord(); - -PowerMock.replayAll(); +when(admin.describeTopics(TOPIC)).thenReturn(Collections.emptyMap()); + when(admin.createOrFindTopics(any(NewTopic.class))).thenReturn(createdTopic(TOPIC)); workerTask.toSend = Arrays.asList(record1, record2); workerTask.sendRecords(); + +ArgumentCaptor> sent = verifySendRecord(2); + +List> capturedValues = sent.getAllValues(); +assertEquals(2, capturedValues.size()); } -private Capture> expectSendRecord( -String topic, -boolean anyTimes, -Headers headers -) { +private void expectSendRecord(Headers headers) { if (headers != null) -expectConvertHeadersAndKeyValue(topic, anyTimes, headers); +expectConvertHeadersAndKeyValue(headers); -expectApplyTransformationChain(anyTimes); +expectApplyTransformationChain(); -Capture> sent = EasyMock.newCapture(); - -IExpectationSetters> expect = EasyMock.expect( -producer.send(EasyMock.capture(sent), EasyMock.capture(producerCallbacks))); +expectTaskGetTopic(); +} -IAnswer> expectResponse = () -> { -synchronized (producerCallbacks) { -for (Callback cb : producerCallbacks.getValues()) { -cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0, 0L, 0, 0), null); -
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13252: KAFKA-14491: [11/N] Add metered wrapper for versioned stores
vcrfxia commented on code in PR #13252: URL: https://github.com/apache/kafka/pull/13252#discussion_r1107700533 ## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java: ## @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; + +import java.util.Objects; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.query.PositionBound; +import org.apache.kafka.streams.query.Query; +import org.apache.kafka.streams.query.QueryConfig; +import org.apache.kafka.streams.query.QueryResult; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.TimestampedKeyValueStore; +import org.apache.kafka.streams.state.ValueAndTimestamp; +import org.apache.kafka.streams.state.VersionedBytesStore; +import org.apache.kafka.streams.state.VersionedKeyValueStore; +import org.apache.kafka.streams.state.VersionedRecord; + +/** + * A metered {@link VersionedKeyValueStore} wrapper that is used for recording operation + * metrics, and hence its inner {@link VersionedBytesStore} implementation does not need to provide + * its own metrics collecting functionality. The inner {@code VersionedBytesStore} of this class + * is a {@link KeyValueStore} of type, so we use {@link Serde}s + * to convert from > to . In particular, + * {@link NullableValueAndTimestampSerde} is used since putting a tombstone to a versioned key-value + * store requires putting a null value associated with a timestamp. + * + * @param The key type + * @param The (raw) value type + */ +public class MeteredVersionedKeyValueStore +extends WrappedStateStore +implements VersionedKeyValueStore { + +private final MeteredVersionedKeyValueStoreInternal internal; + +MeteredVersionedKeyValueStore(final VersionedBytesStore inner, + final String metricScope, + final Time time, + final Serde keySerde, + final Serde> valueSerde) { +super(inner); +internal = new MeteredVersionedKeyValueStoreInternal(inner, metricScope, time, keySerde, valueSerde); +} + +/** + * Private helper class which represents the functionality of a {@link VersionedKeyValueStore} + * as a {@link TimestampedKeyValueStore} so that the bulk of the metering logic may be + * inherited from {@link MeteredKeyValueStore}. As a result, the implementation of + * {@link MeteredVersionedKeyValueStore} is a simple wrapper to translate from this + * {@link TimestampedKeyValueStore} representation of a versioned key-value store into the + * {@link VersionedKeyValueStore} interface itself. + */ +private class MeteredVersionedKeyValueStoreInternal +extends MeteredKeyValueStore> +implements TimestampedKeyValueStore { + +private final VersionedBytesStore inner; + +MeteredVersionedKeyValueStoreInternal(final VersionedBytesStore inner, + final String metricScope, + final Time time, + final Serde keySerde, + final Serde> valueSerde) { +super(inner, metricScope, time, keySerde, valueSerde); +this.inner = inner; +} + +@Override +public void put(final K key, final ValueAndTimestamp value) { +super.put( +key, +// versioned stores req
[GitHub] [kafka] mjsax commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores
mjsax commented on code in PR #13243: URL: https://github.com/apache/kafka/pull/13243#discussion_r1107704159 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -283,7 +293,12 @@ public void init(final StateStoreContext context, final StateStore root) { // VisibleForTesting void restoreBatch(final Collection> records) { -// advance stream time to the max timestamp in the batch +// copy the observed stream time, for use in deciding whether to drop records during restore, +// when records have exceeded the store's grace period. +long streamTimeForRestore = observedStreamTime; Review Comment: Did not have a concrete proposal. Should be fine I guess. Currently, `streamTime` is tracked per task (based on input records over all partitions). And yes, there is all kind of tricky things that you call out. Even if we have a filter() downstream processors see only a subset of data and their "internal stream-time (if they have any)" could be different (ie lagging). Caching has a similar effect. There is a proposal to let KS track streamTime per processor, too. Bottom line: it's complicated and need proper design and a KIP by itself... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores
mjsax commented on code in PR #13243: URL: https://github.com/apache/kafka/pull/13243#discussion_r1107705172 ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java: ## @@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() { verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15); } +@Test +public void shouldNotRestoreExpired() { +final List records = new ArrayList<>(); +records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10)); +records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD)); // grace period has not elapsed +records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - GRACE_PERIOD)); // grace period has elapsed, so this record should not be restored + +store.restoreBatch(getChangelogRecords(records)); + +verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10); +verifyGetValueFromStore("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD); +verifyGetNullFromStore("k2"); +} + +@Test +public void shouldRestoreEvenIfRecordWouldBeExpiredByEndOfBatch() { +final List records = new ArrayList<>(); +records.add(new DataRecord("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD)); // this record will be older than grace period by the end of the batch, but should still be restored +records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10)); + +store.restoreBatch(getChangelogRecords(records)); + +verifyGetValueFromStore("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD); +verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10); +} + +@Test +public void shouldAllowZeroHistoryRetention() { +// recreate store with zero history retention +store.close(); +store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 0L, SEGMENT_INTERVAL); +store.init((StateStoreContext) context, store); + +// put, get, and delete +putToStore("k", "v", BASE_TIMESTAMP); +verifyGetValueFromStore("k", "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v", BASE_TIMESTAMP); // query in "future" is allowed + +// update existing record at same timestamp +putToStore("k", "updated", BASE_TIMESTAMP); +verifyGetValueFromStore("k", "updated", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "updated", BASE_TIMESTAMP); + +// put new record version +putToStore("k", "v2", BASE_TIMESTAMP + 2); +verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 2); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 2); + +// query in past (history retention expired) returns null +verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1); + +// put in past (grace period expired) does not update the store Review Comment: Was just an idea. Not a big deal to not have the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores
mjsax commented on code in PR #13243: URL: https://github.com/apache/kafka/pull/13243#discussion_r1107707006 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ## @@ -458,8 +460,22 @@ public void writeLatestValues(final WriteBatch batch) throws RocksDBException { } } +/** + * Helper method shared between put and restore. + * + * This method does not check whether the record being put is expired based on grace period + * or not; that is the caller's responsibility. This method does, however, check whether the + * record is expired based on history retention, by using the current + * {@code observedStreamTime}, and returns without inserting into the store if so. It can be + * possible that a record is not expired based on grace period but is expired based on + * history retention, even though history retention is always at least the grace period, + * during restore because restore advances {@code observedStreamTime} to the largest timestamp + * in the entire restore batch at the beginning of restore, in order to optimize for not + * putting records into the store which will have expired by the end of the restore. Review Comment: Thanks for adding this! Great addition! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #13243: KAFKA-14491: [7/N] Enforce strict grace period for versioned stores
mjsax commented on code in PR #13243: URL: https://github.com/apache/kafka/pull/13243#discussion_r1107711967 ## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java: ## @@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() { verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15); } +@Test +public void shouldNotRestoreExpired() { +final List records = new ArrayList<>(); +records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10)); +records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD)); // grace period has not elapsed +records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - GRACE_PERIOD)); // grace period has elapsed, so this record should not be restored + +store.restoreBatch(getChangelogRecords(records)); + +verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10); +verifyGetValueFromStore("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD); +verifyGetNullFromStore("k2"); +} + +@Test +public void shouldRestoreEvenIfRecordWouldBeExpiredByEndOfBatch() { +final List records = new ArrayList<>(); +records.add(new DataRecord("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD)); // this record will be older than grace period by the end of the batch, but should still be restored +records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10)); + +store.restoreBatch(getChangelogRecords(records)); + +verifyGetValueFromStore("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD); +verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10); +} + +@Test +public void shouldAllowZeroHistoryRetention() { +// recreate store with zero history retention +store.close(); +store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 0L, SEGMENT_INTERVAL); +store.init((StateStoreContext) context, store); + +// put, get, and delete +putToStore("k", "v", BASE_TIMESTAMP); +verifyGetValueFromStore("k", "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v", BASE_TIMESTAMP); // query in "future" is allowed + +// update existing record at same timestamp +putToStore("k", "updated", BASE_TIMESTAMP); +verifyGetValueFromStore("k", "updated", BASE_TIMESTAMP); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "updated", BASE_TIMESTAMP); + +// put new record version +putToStore("k", "v2", BASE_TIMESTAMP + 2); +verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 2); +verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 2); + +// query in past (history retention expired) returns null +verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1); + +// put in past (grace period expired) does not update the store Review Comment: Just see you added the test. Does not hurt to keep it. (We should not write test base on knowing how the implemenation works, but rather treat it as a "black box"). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org