[GitHub] [kafka] showuon commented on a change in pull request #11288: MINOR: Fix error response generation
showuon commented on a change in pull request #11288: URL: https://github.com/apache/kafka/pull/11288#discussion_r703239913 ## File path: clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersRequest.java ## @@ -80,6 +80,7 @@ public DescribeProducersResponse getErrorResponse(int throttleTimeMs, Throwable .setErrorCode(error.code()) Review comment: should we add `.setErrorMessage(error.message())` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11288: MINOR: Fix error response generation
showuon commented on pull request #11288: URL: https://github.com/apache/kafka/pull/11288#issuecomment-914059309 Also, this PR title should be updated to link to JIRA tickets. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #11301: KAFKA-13276: Prefer KafkaFuture in admin Result constructors
tombentley commented on pull request #11301: URL: https://github.com/apache/kafka/pull/11301#issuecomment-914082991 @dajac @showuon thanks, I've addressed your comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a change in pull request #11301: KAFKA-13276: Prefer KafkaFuture in admin Result constructors
tombentley commented on a change in pull request #11301: URL: https://github.com/apache/kafka/pull/11301#discussion_r703274931 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiFuture.java ## @@ -109,7 +110,7 @@ private void completeExceptionally(K key, Throwable t) { } private KafkaFutureImpl futureOrThrow(K key) { -KafkaFutureImpl future = futures.get(key); +KafkaFutureImpl future = (KafkaFutureImpl) futures.get(key); Review comment: The typecast isn't actually necessary if we're willing to declare `public Map> all()` in `SimpleAdminApiFuture` and also in a handful of the `*Result` classes which use the `SimpleAdminApiFuture`. Those `*Result` classes have package access constructors so it would be compatible to do that. And that's actually the correct way to solve this. But I think there are other `Result` classes where that approach, if used more consistently in the other `*Result`, would be problematic. The precedent was set a long time ago to just use plain `Map` rather than the more correct `Map` in the Result classes. So I guess the typecast is the lesser evil, and I'm OK with a comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] patrickstuedi commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
patrickstuedi commented on a change in pull request #10798: URL: https://github.com/apache/kafka/pull/10798#discussion_r703309462 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ## @@ -505,6 +506,14 @@ private void closeOpenIterators() { } } +private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) { +ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length); Review comment: @vamossagar12 great to see you're about to add direct buffer support, I was actually also looking into this. Just a few comments: as pointed out already, one important aspect to keep in mind with direct buffers is that allocation and disposal is more expensive than with regular heap buffers, so it's important to re-use them, it's common practice to keep a pool of direct buffers around and re-use them on a per-call basis (most I/O libraries to this, e.g., netty, Crail, etc.). You can keep queues of free and used buffers and move them around during state store operations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] patrickstuedi commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
patrickstuedi commented on a change in pull request #10798: URL: https://github.com/apache/kafka/pull/10798#discussion_r703312659 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ## @@ -505,6 +506,14 @@ private void closeOpenIterators() { } } +private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) { +ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length); Review comment: Also, we should try to avoid serializing data into byte[] arrays and then copy the data into directBuffers. Instead we should serialize directly into "direct" ByteBuffers. For this we might need to have RocksDBStore implement a ByteBuffer interface, e.g., KeyValueStore, or anything similar... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
vamossagar12 commented on a change in pull request #10798: URL: https://github.com/apache/kafka/pull/10798#discussion_r70661 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ## @@ -505,6 +506,14 @@ private void closeOpenIterators() { } } +private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) { +ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length); Review comment: > @vamossagar12 great to see you're about to add direct buffer support, I was actually also looking into this. Just a few comments: as pointed out already, one important aspect to keep in mind with direct buffers is that allocation and disposal is more expensive than with regular heap buffers, so it's important to re-use them, it's common practice to keep a pool of direct buffers around and re-use them on a per-call basis (most I/O libraries to this, e.g., netty, Crail, etc.). You can keep queues of free and used buffers and move them around during state store operations. Thanks @patrickstuedi , that's a great suggestion. I think it makes sense given the expense in allocation/disposal. Couple of questions there: 1) I think the factor to consider there would be how to manage the pool of used/free buffers? As already pointed by @cadonna above, the only case where we can expect concurrent access to state stores would be for IQ and otherwise it's single threaded. For the single threaded case, it's going to be straight forward but for IQ, what happens if a request comes in and none of the direct byte buffers are `free` ? Do we re-allocate on the fly? 2) This might be a small issue but calling it out. We need to allocate some capacity to the direct byte buffer. Currently, I am iterating over the list of keys and values and assigning the max length- which again mayn't be the most efficient way. If I were to create the pool, then I would need to know the capacity to allocate upfront. Can this be exposed as a config for the users in that case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
vamossagar12 commented on a change in pull request #10798: URL: https://github.com/apache/kafka/pull/10798#discussion_r703336198 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ## @@ -505,6 +506,14 @@ private void closeOpenIterators() { } } +private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) { +ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length); Review comment: > Also, we should try to avoid serializing data into byte[] arrays and then copy the data into directBuffers. Instead we should serialize directly into "direct" ByteBuffers. For this we might need to have RocksDBStore implement a ByteBuffer interface, e.g., KeyValueStore, or anything similar... Yeah.. I did play around with that idea, but the idea was to not change the public APIs. If that makes sense and I can do another round of benchmarking for that, then probably, we will need a KIP. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
vamossagar12 commented on a change in pull request #10798: URL: https://github.com/apache/kafka/pull/10798#discussion_r703336198 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ## @@ -505,6 +506,14 @@ private void closeOpenIterators() { } } +private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) { +ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length); Review comment: > Also, we should try to avoid serializing data into byte[] arrays and then copy the data into directBuffers. Instead we should serialize directly into "direct" ByteBuffers. For this we might need to have RocksDBStore implement a ByteBuffer interface, e.g., KeyValueStore, or anything similar... Yeah.. I did play around with that idea, but the idea was to not change the public APIs. If that makes sense and I can do another round of benchmarking for that, then probably, we will need a KIP. Infact, we could even go one step ahead and add support for only DirectByteBuffers if that makes sense. We can throw errors if the ByteBuffer isn't direct for example- similar to the rocksdb implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] apenam opened a new pull request #11305: Make classpath smaller
apenam opened a new pull request #11305: URL: https://github.com/apache/kafka/pull/11305 Fix `The input line is too long. The syntax of the command is incorrect` error when running on a windows environment. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] TomerWizman commented on a change in pull request #11250: Kafka 12766 - Disabling WAL-related Options in RocksDB
TomerWizman commented on a change in pull request #11250: URL: https://github.com/apache/kafka/pull/11250#discussion_r703348241 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java ## @@ -249,7 +251,7 @@ public int maxFileOpeningThreads() { @Override public Options setMaxTotalWalSize(final long maxTotalWalSize) { -dbOptions.setMaxTotalWalSize(maxTotalWalSize); +LOGGER.warn("WAL is explicitly disabled by Streams in RocksDB. Setting option 'maxTotalWalSize' will be ignored"); Review comment: @cadonna Sorry for the late reply, was on vacation. I added the rest of the wal options to the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a change in pull request #11302: KAFKA-13243: KIP-773 Differentiate metric latency measured in ms and ns
tombentley commented on a change in pull request #11302: URL: https://github.com/apache/kafka/pull/11302#discussion_r703359283 ## File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java ## @@ -1272,12 +1274,27 @@ private Meter createMeter(Metrics metrics, String groupName, Map metricTags, +/** + * This method generates `time-total` metrics with a couple of errors, no `-ns` suffix and no dash between basename Review comment: ```suggestion * This method generates `time-total` metrics but has a couple of deficiencies: no `-ns` suffix and no dash between basename ``` ## File path: docs/ops.html ## @@ -1890,6 +1890,16 @@ Common monitoring me The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds. kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) + +io-wait-time-ns-total +The total time the I/O thread spent waiting in nanoseconds + kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) + + +io-waittime-total +*Deprecated* The total time the I/O thread spent waiting in nanoseconds Review comment: No harm in mentioning that the replacement is `io-wait-time-ns-total`. ## File path: docs/ops.html ## @@ -1900,6 +1910,16 @@ Common monitoring me The average length of time for I/O per select call in nanoseconds. kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) + +io-time-ns-total +The total time the I/O thread spent doing I/O in nanoseconds + kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+) + + +iotime-total +*Deprecated* The total time the I/O thread spent doing I/O in nanoseconds Review comment: Again, mention the replacement. ## File path: docs/ops.html ## @@ -2072,6 +2092,16 @@ < The fraction of time an appender waits for space allocation. kafka.producer:type=producer-metrics,client-id=([-.\w]+) + +bufferpool-wait-time-total +*Deprecated* The total time an appender waits for space allocation in nanoseconds. Review comment: Same comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #11254: KAFKA-2424: Introduce Scalafix linter
jlprat commented on pull request #11254: URL: https://github.com/apache/kafka/pull/11254#issuecomment-914189101 Rebased the changes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #11250: Kafka 12766 - Disabling WAL-related Options in RocksDB
cadonna commented on a change in pull request #11250: URL: https://github.com/apache/kafka/pull/11250#discussion_r703386486 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java ## @@ -126,7 +147,7 @@ private void verifyDBOptionsMethodCall(final Method method) throws Exception { } catch (final InvocationTargetException undeclaredMockMethodCall) { assertThat(undeclaredMockMethodCall.getCause(), instanceOf(AssertionError.class)); assertThat(undeclaredMockMethodCall.getCause().getMessage().trim(), -matchesPattern("Unexpected method call DBOptions\\." + method.getName() + "((.*\n*)*):")); +matchesPattern("Unexpected method call DBOptions\\." + method.getName() + "((.*\n*)*):")); Review comment: Could you please revert this change since it is unrelated to the PR? ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java ## @@ -93,7 +114,7 @@ public void shouldOverwriteAllOptionsMethods() throws Exception { for (final Method method : Options.class.getMethods()) { if (!ignoreMethods.contains(method.getName())) { RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class -.getDeclaredMethod(method.getName(), method.getParameterTypes()); +.getDeclaredMethod(method.getName(), method.getParameterTypes()); Review comment: Could you please revert this change since it is unrelated to the PR? ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java ## @@ -306,4 +327,33 @@ public String name() { return parameters; } + +@Test +public void shouldLogWarningWhenSettingWalOptions() throws Exception { + +try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class)) { + +final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter adapter += new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), new ColumnFamilyOptions()); + +for (final Method method : RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class.getDeclaredMethods()) { +if (walRelatedMethods.contains(method.getName())) { +method.invoke(adapter, getDBOptionsParameters(method.getParameterTypes())); +} +} + +final List walOptions = Arrays.asList("walDir", "walFilter", "walRecoveryMode", "walBytesPerSync", "walSizeLimitMB", "manualWalFlush", "maxTotalWalSize", "walTtlSeconds"); Review comment: nit: Would it be possible to extract this options from `walRelatedMethods`? ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java ## @@ -113,7 +134,7 @@ public void shouldForwardAllDbOptionsCalls() throws Exception { private void verifyDBOptionsMethodCall(final Method method) throws Exception { final DBOptions mockedDbOptions = mock(DBOptions.class); final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter optionsFacadeDbOptions -= new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(mockedDbOptions, new ColumnFamilyOptions()); += new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(mockedDbOptions, new ColumnFamilyOptions()); Review comment: Could you please revert this change since it is unrelated to the PR? ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java ## @@ -218,7 +239,7 @@ public void shouldForwardAllColumnFamilyCalls() throws Exception { private void verifyColumnFamilyOptionsMethodCall(final Method method) throws Exception { final ColumnFamilyOptions mockedColumnFamilyOptions = mock(ColumnFamilyOptions.class); final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter optionsFacadeColumnFamilyOptions -= new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), mockedColumnFamilyOptions); += new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), mockedColumnFamilyOptions); Review comment: Could you please revert this change since it is unrelated to the PR? ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java ## @@ -306,4 +327,33 @@ public String name()
[GitHub] [kafka] jlprat commented on pull request #11302: KAFKA-13243: KIP-773 Differentiate metric latency measured in ms and ns
jlprat commented on pull request #11302: URL: https://github.com/apache/kafka/pull/11302#issuecomment-914195040 Thanks for your feedback @tombentley . You feel free to review the changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] patrickstuedi commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
patrickstuedi commented on a change in pull request #10798: URL: https://github.com/apache/kafka/pull/10798#discussion_r703439679 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ## @@ -505,6 +506,14 @@ private void closeOpenIterators() { } } +private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) { +ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length); Review comment: Re (1) if all use cases are single threaded then yes we can allocate some buffer(s) as part of the store. Otherwise, if you need to support multiple concurrent ops then you could pre-populate a queue with a N buffers, and N becomes the maximum number of concurrent requests you can server. If you're queue is empty then a request would have to wait until at least one of the outstanding requests completes and add its buffer to the queue. Again that might all not be needed given the API is single-threaded. Re (2), is there a max-size, maybe given by the maximum Kafka message size that is configured (if such a limit exists and is not too big)? If we don't want to change the API (I guess it would be the RocksDBStore interface that would be changed which is not exposed I think, but still) then splitting this work into part I where we copy heap to direct buffers, and then a part II where we directly serialize into direct buffers is a way to go. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] TomerWizman commented on a change in pull request #11250: Kafka 12766 - Disabling WAL-related Options in RocksDB
TomerWizman commented on a change in pull request #11250: URL: https://github.com/apache/kafka/pull/11250#discussion_r703445569 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java ## @@ -93,7 +114,7 @@ public void shouldOverwriteAllOptionsMethods() throws Exception { for (final Method method : Options.class.getMethods()) { if (!ignoreMethods.contains(method.getName())) { RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class -.getDeclaredMethod(method.getName(), method.getParameterTypes()); +.getDeclaredMethod(method.getName(), method.getParameterTypes()); Review comment: @cadonna done, no idea how it came in... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] TomerWizman commented on a change in pull request #11250: Kafka 12766 - Disabling WAL-related Options in RocksDB
TomerWizman commented on a change in pull request #11250: URL: https://github.com/apache/kafka/pull/11250#discussion_r703445686 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java ## @@ -113,7 +134,7 @@ public void shouldForwardAllDbOptionsCalls() throws Exception { private void verifyDBOptionsMethodCall(final Method method) throws Exception { final DBOptions mockedDbOptions = mock(DBOptions.class); final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter optionsFacadeDbOptions -= new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(mockedDbOptions, new ColumnFamilyOptions()); += new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(mockedDbOptions, new ColumnFamilyOptions()); Review comment: @cadonna done, no idea how it came in... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] TomerWizman commented on a change in pull request #11250: Kafka 12766 - Disabling WAL-related Options in RocksDB
TomerWizman commented on a change in pull request #11250: URL: https://github.com/apache/kafka/pull/11250#discussion_r703445767 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java ## @@ -126,7 +147,7 @@ private void verifyDBOptionsMethodCall(final Method method) throws Exception { } catch (final InvocationTargetException undeclaredMockMethodCall) { assertThat(undeclaredMockMethodCall.getCause(), instanceOf(AssertionError.class)); assertThat(undeclaredMockMethodCall.getCause().getMessage().trim(), -matchesPattern("Unexpected method call DBOptions\\." + method.getName() + "((.*\n*)*):")); +matchesPattern("Unexpected method call DBOptions\\." + method.getName() + "((.*\n*)*):")); Review comment: @cadonna done, no idea how it came in... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] TomerWizman commented on a change in pull request #11250: Kafka 12766 - Disabling WAL-related Options in RocksDB
TomerWizman commented on a change in pull request #11250: URL: https://github.com/apache/kafka/pull/11250#discussion_r703454226 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java ## @@ -306,4 +327,33 @@ public String name() { return parameters; } + +@Test +public void shouldLogWarningWhenSettingWalOptions() throws Exception { + +try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class)) { + +final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter adapter += new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), new ColumnFamilyOptions()); + +for (final Method method : RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class.getDeclaredMethods()) { +if (walRelatedMethods.contains(method.getName())) { +method.invoke(adapter, getDBOptionsParameters(method.getParameterTypes())); +} +} + +final List walOptions = Arrays.asList("walDir", "walFilter", "walRecoveryMode", "walBytesPerSync", "walSizeLimitMB", "manualWalFlush", "maxTotalWalSize", "walTtlSeconds"); Review comment: @cadonna I Thought about it also. To do that I need to remove the "set" from the method name and to de-capitalize the first letter of the option. I think this is a bit overkill because: 1. I got checkstyle errors trying to use something like toLowerCase()... 2. Its going to be ugly to do it without additional StringUtils helper libs (like apache commons-lang StringUtils which I saw we dont have) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #11250: Kafka 12766 - Disabling WAL-related Options in RocksDB
cadonna commented on a change in pull request #11250: URL: https://github.com/apache/kafka/pull/11250#discussion_r703455477 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java ## @@ -306,4 +327,33 @@ public String name() { return parameters; } + +@Test +public void shouldLogWarningWhenSettingWalOptions() throws Exception { + +try (final LogCaptureAppender appender = LogCaptureAppender.createAndRegister(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class)) { + +final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter adapter += new RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), new ColumnFamilyOptions()); + +for (final Method method : RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class.getDeclaredMethods()) { +if (walRelatedMethods.contains(method.getName())) { +method.invoke(adapter, getDBOptionsParameters(method.getParameterTypes())); +} +} + +final List walOptions = Arrays.asList("walDir", "walFilter", "walRecoveryMode", "walBytesPerSync", "walSizeLimitMB", "manualWalFlush", "maxTotalWalSize", "walTtlSeconds"); Review comment: Fair enough! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
vamossagar12 commented on a change in pull request #10798: URL: https://github.com/apache/kafka/pull/10798#discussion_r703469853 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ## @@ -505,6 +506,14 @@ private void closeOpenIterators() { } } +private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) { +ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length); Review comment: I see. I think barring Interactive Queries, the other accesses are single threaded. Regarding the max-size, TBH i am not aware. Also, regarding changing the API, yeah rocksdb store isn't exposed so the new apis will have to bein KVStore with default implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13262) Mock Clients Now Have Final close() Methods
[ https://issues.apache.org/jira/browse/KAFKA-13262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma reassigned KAFKA-13262: --- Assignee: Ismael Juma > Mock Clients Now Have Final close() Methods > --- > > Key: KAFKA-13262 > URL: https://issues.apache.org/jira/browse/KAFKA-13262 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.0.0 >Reporter: Gary Russell >Assignee: Ismael Juma >Priority: Blocker > > Subclasses can no longer clean up resources when the consumer is closed. > Caused by > https://github.com/apache/kafka/commit/2f3600198722dd5a01a210bc78b7d43b33967c7f -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma opened a new pull request #11306: MINOR: Update build dependencies (Q3 2021)
ijuma opened a new pull request #11306: URL: https://github.com/apache/kafka/pull/11306 TBD: release notes, etc Note that checkstyle will be upgraded in a separate PR as it requires code changes in a large number of files. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request #11307: KAFKA-13262: Remove final from `MockConsumer.close()` and delegate implementation
ijuma opened a new pull request #11307: URL: https://github.com/apache/kafka/pull/11307 I added the final via 2f3600198722 to catch overriding mistakes since the implementation was moved from the deprecated and overloaded `close` with two parameters to the no-arg `close`. I didn't realize then that `MockConsumer` is a public API (seems like a bit of a mistake since we tweak the implementation and sometimes adds methods without a KIP). Given that this is a public API, I have also moved the implementation of `close` to the one arg overload. This makes it easier for a subclass to have specific overriding behavior depending on the timeout. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11307: KAFKA-13262: Remove final from `MockConsumer.close()` and delegate implementation
dajac commented on a change in pull request #11307: URL: https://github.com/apache/kafka/pull/11307#discussion_r703548948 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java ## @@ -446,8 +447,8 @@ public synchronized void resume(Collection partitions) { } @Override -public final synchronized void close() { -this.closed = true; +public synchronized void close() { Review comment: Should we move the `synchronized` to the other `close` method or add it there as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #11307: KAFKA-13262: Remove final from `MockConsumer.close()` and delegate implementation
ijuma commented on a change in pull request #11307: URL: https://github.com/apache/kafka/pull/11307#discussion_r703555946 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java ## @@ -446,8 +447,8 @@ public synchronized void resume(Collection partitions) { } @Override -public final synchronized void close() { -this.closed = true; +public synchronized void close() { Review comment: Makes sense, done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed
C0urante commented on pull request #10112: URL: https://github.com/apache/kafka/pull/10112#issuecomment-914348989 @rhauch Overall that looks good to me. It's an elegant solution to the tricky problem you noted about the opacity of task-provided source offsets w/r/t ordering. I'm a little worried about offset commits taking longer and longer with the more sophisticated approach you proposed (where we would unconditionally iterate over every record in the batch, instead of only until the first unacknowledged record). It's true that there would be natural back pressure from the producer as its `buffer.memory` gets eaten up, but with the default of 32MB, it still seems possible for a large number of unacknowledged records to build up. If this does happen, then offset commits may end up exceeding the `offset.flush.timeout.ms` for the worker, which may cause issues with the current model where a single shared, worker-global thread is used for offset commits of all tasks. If this is a valid concern and we'd like to take it into account for now, I can think of a couple ways to handle it off the top of my head: 1. Use the simpler approach that blocks offset commits across the board if a single record remains unacknowledged for a long period of time (which may realistically be a problem if a single partition out of many is unavailable for some reason). 2. Enable concurrent offset commits by multiple tasks. 3. Instead of a single dequeue per task, use a `ConcurrentMap, Queue>` that stores a single dequeue per unique source partition. This would allow us to iterate over the bare minimum number of records for every single offset commit and not spend time, for example, on accumulated records for unavailable Kafka partitions. We'd still have to iterate over those records eventually if the Kafka partition came back online, but that iteration would only ever occur once, instead of once for every offset commit. I think option 3 may be warranted, although it's still possible that offset commits take a long time if 32MB worth of records end up getting queued. Option 2 may be worth implementing or at least considering as a follow-up item to handle this case. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13243) Differentiate metric latency measured in millis and nanos
[ https://issues.apache.org/jira/browse/KAFKA-13243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josep Prat updated KAFKA-13243: --- Fix Version/s: 3.1.0 > Differentiate metric latency measured in millis and nanos > - > > Key: KAFKA-13243 > URL: https://issues.apache.org/jira/browse/KAFKA-13243 > Project: Kafka > Issue Type: Improvement > Components: metrics >Reporter: Guozhang Wang >Assignee: Josep Prat >Priority: Major > Labels: needs-kip > Fix For: 3.1.0 > > > Today most of the client latency metrics are measured in millis, and some in > nanos. For those measured in nanos we usually differentiate them by having a > `-ns` suffix in the metric names, e.g. `io-wait-time-ns-avg` and > `io-time-ns-avg`. But there are a few that we obviously forgot to follow this > pattern, e.g. `io-wait-time-total`: it is inconsistent where `avg` has `-ns` > suffix and `total` has not. I did a quick search and found just three of them: > * bufferpool-wait-time-total -> bufferpool-wait-time-ns-total > * io-wait-time-total -> io-wait-time-ns-total > * iotime-total -> io-time-ns-total (note that there are two inconsistencies > on naming, the average metric is `io-time-ns-avg` whereas total is > `iotime-total`, I suggest we use `io-time` instead of `iotime` for both). > We should change their name accordingly with the `-ns` suffix as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] viktorsomogyi commented on pull request #11276: KAFKA-13240: Disable HTTP TRACE Method in Connect
viktorsomogyi commented on pull request #11276: URL: https://github.com/apache/kafka/pull/11276#issuecomment-914396172 Rebased the build, it was conflicting with another change that replaced EasyMock/Powermock with Mockito so I had to change that part of my tests too. (Ran them locally successfully) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11213: KAFKA-13201: Convert KTable suppress to new PAPI
vvcephei commented on a change in pull request #11213: URL: https://github.com/apache/kafka/pull/11213#discussion_r703626660 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java ## @@ -150,16 +153,16 @@ public void init(final org.apache.kafka.streams.processor.ProcessorContext conte } @Override -public void process(final K key, final Change value) { +public void process(final Record> record) { observedStreamTime = Math.max(observedStreamTime, internalProcessorContext.timestamp()); -buffer(key, value); +buffer(record); enforceConstraints(); } -private void buffer(final K key, final Change value) { -final long bufferTime = bufferTimeDefinition.time(internalProcessorContext, key); +private void buffer(final Record> record) { +final long bufferTime = bufferTimeDefinition.time(internalProcessorContext, record.key()); -buffer.put(bufferTime, key, value, internalProcessorContext.recordContext()); +buffer.put(bufferTime, record.key(), record.value(), internalProcessorContext.recordContext()); Review comment: Thanks, @jeqo , I can see that my tone in that comment didn't encompass the scope of the change I was proposing. I'm fine if we want to delay that refactor for later work and focus on the Processor API itself in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #11213: KAFKA-13201: Convert KTable suppress to new PAPI
vvcephei commented on a change in pull request #11213: URL: https://github.com/apache/kafka/pull/11213#discussion_r703627436 ## File path: streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java ## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.processor.MockProcessorContext; Review comment: Sounds good. I think we can afford to cross that bridge later on as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13277) Serialization of long tagged string in request/response throws BufferOverflowException
Rajini Sivaram created KAFKA-13277: -- Summary: Serialization of long tagged string in request/response throws BufferOverflowException Key: KAFKA-13277 URL: https://issues.apache.org/jira/browse/KAFKA-13277 Project: Kafka Issue Type: Bug Components: clients Reporter: Rajini Sivaram Assignee: Rajini Sivaram Fix For: 3.0.1 Size computation for tagged strings in the message generator is incorrect and hence it works only for small strings (126 bytes or so) where the length happens to be correct. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma merged pull request #11303: MINOR: Upgrade compression libraries (Q3 2021)
ijuma merged pull request #11303: URL: https://github.com/apache/kafka/pull/11303 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13277) Serialization of long tagged string in request/response throws BufferOverflowException
[ https://issues.apache.org/jira/browse/KAFKA-13277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411309#comment-17411309 ] Ismael Juma commented on KAFKA-13277: - [~rsivaram] isn't this a blocker for 3.0? > Serialization of long tagged string in request/response throws > BufferOverflowException > -- > > Key: KAFKA-13277 > URL: https://issues.apache.org/jira/browse/KAFKA-13277 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 3.0.1 > > > Size computation for tagged strings in the message generator is incorrect and > hence it works only for small strings (126 bytes or so) where the length > happens to be correct. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #11213: KAFKA-13201: Convert KTable suppress to new PAPI
vvcephei commented on a change in pull request #11213: URL: https://github.com/apache/kafka/pull/11213#discussion_r703630483 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java ## @@ -198,7 +201,7 @@ private void emit(final TimeOrderedKeyValueBuffer.Eviction toEmit) { final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext(); internalProcessorContext.setRecordContext(toEmit.recordContext()); try { -internalProcessorContext.forward(toEmit.key(), toEmit.value()); +internalProcessorContext.forward(toEmit.record()); Review comment: I think this might actually be a bug. IIRC, the Record forward call overrides the context, so you might have to actually set all the fields in Record from the context when you construct it in `toEmit.record()`. Specifically, I think we might be dropping the headers here. I might also be wrong, so you might want to double-check me first. It looks like we were never testing whether we propagate headers through Suppress. If it turns out that we never were, then there's also no problem. The case to look out for is if we _were_, but now we no longer are. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13277) Serialization of long tagged string in request/response throws BufferOverflowException
[ https://issues.apache.org/jira/browse/KAFKA-13277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411311#comment-17411311 ] Rajini Sivaram commented on KAFKA-13277: [~ijuma] Yes, it will be good include in 3.0. > Serialization of long tagged string in request/response throws > BufferOverflowException > -- > > Key: KAFKA-13277 > URL: https://issues.apache.org/jira/browse/KAFKA-13277 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 3.0.1 > > > Size computation for tagged strings in the message generator is incorrect and > hence it works only for small strings (126 bytes or so) where the length > happens to be correct. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13277) Serialization of long tagged string in request/response throws BufferOverflowException
[ https://issues.apache.org/jira/browse/KAFKA-13277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-13277: --- Priority: Blocker (was: Major) > Serialization of long tagged string in request/response throws > BufferOverflowException > -- > > Key: KAFKA-13277 > URL: https://issues.apache.org/jira/browse/KAFKA-13277 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Blocker > Fix For: 3.0.0 > > > Size computation for tagged strings in the message generator is incorrect and > hence it works only for small strings (126 bytes or so) where the length > happens to be correct. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13277) Serialization of long tagged string in request/response throws BufferOverflowException
[ https://issues.apache.org/jira/browse/KAFKA-13277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-13277: --- Fix Version/s: (was: 3.0.1) 3.0.0 > Serialization of long tagged string in request/response throws > BufferOverflowException > -- > > Key: KAFKA-13277 > URL: https://issues.apache.org/jira/browse/KAFKA-13277 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > Fix For: 3.0.0 > > > Size computation for tagged strings in the message generator is incorrect and > hence it works only for small strings (126 bytes or so) where the length > happens to be correct. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13266) `InitialFetchState` should be created after partition is removed from the fetchers
[ https://issues.apache.org/jira/browse/KAFKA-13266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-13266: Priority: Blocker (was: Critical) > `InitialFetchState` should be created after partition is removed from the > fetchers > -- > > Key: KAFKA-13266 > URL: https://issues.apache.org/jira/browse/KAFKA-13266 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: David Jacot >Assignee: David Jacot >Priority: Blocker > Fix For: 3.0.0 > > > `ReplicationTest.test_replication_with_broker_failure` in KRaft mode > sometimes fails with the following error in the log: > {noformat} > [2021-08-31 11:31:25,092] ERROR [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Unexpected error occurred while processing data for partition > __consumer_offsets-1 at offset 31727 > (kafka.server.ReplicaFetcherThread)java.lang.IllegalStateException: Offset > mismatch for partition __consumer_offsets-1: fetched offset = 31727, log end > offset = 31728. at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:194) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$8(AbstractFetcherThread.scala:545) > at scala.Option.foreach(Option.scala:437) at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:533) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7$adapted(AbstractFetcherThread.scala:532) > at > kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) > at > scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359) > at > scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355) > at > scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:532) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:216) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:215) > at scala.Option.foreach(Option.scala:437) at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:215) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:197) at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:99)[2021-08-31 > 11:31:25,093] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] > Partition __consumer_offsets-1 marked as failed > (kafka.server.ReplicaFetcherThread) > {noformat} > The issue is due to a race condition in > `ReplicaManager#applyLocalFollowersDelta`. The `InitialFetchState` is created > and populated before the partition is removed from the fetcher threads. This > means that the fetch offset of the `InitialFetchState` could be outdated when > the fetcher threads are re-started because the fetcher threads could have > incremented the log end offset in between. > The partitions must be removed from the fetcher threads before the > `InitialFetchStates` are created. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13266) `InitialFetchState` should be created after partition is removed from the fetchers
[ https://issues.apache.org/jira/browse/KAFKA-13266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-13266: Fix Version/s: 3.0.0 > `InitialFetchState` should be created after partition is removed from the > fetchers > -- > > Key: KAFKA-13266 > URL: https://issues.apache.org/jira/browse/KAFKA-13266 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: David Jacot >Assignee: David Jacot >Priority: Critical > Fix For: 3.0.0 > > > `ReplicationTest.test_replication_with_broker_failure` in KRaft mode > sometimes fails with the following error in the log: > {noformat} > [2021-08-31 11:31:25,092] ERROR [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Unexpected error occurred while processing data for partition > __consumer_offsets-1 at offset 31727 > (kafka.server.ReplicaFetcherThread)java.lang.IllegalStateException: Offset > mismatch for partition __consumer_offsets-1: fetched offset = 31727, log end > offset = 31728. at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:194) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$8(AbstractFetcherThread.scala:545) > at scala.Option.foreach(Option.scala:437) at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:533) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7$adapted(AbstractFetcherThread.scala:532) > at > kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) > at > scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359) > at > scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355) > at > scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:532) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:216) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:215) > at scala.Option.foreach(Option.scala:437) at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:215) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:197) at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:99)[2021-08-31 > 11:31:25,093] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] > Partition __consumer_offsets-1 marked as failed > (kafka.server.ReplicaFetcherThread) > {noformat} > The issue is due to a race condition in > `ReplicaManager#applyLocalFollowersDelta`. The `InitialFetchState` is created > and populated before the partition is removed from the fetcher threads. This > means that the fetch offset of the `InitialFetchState` could be outdated when > the fetcher threads are re-started because the fetcher threads could have > incremented the log end offset in between. > The partitions must be removed from the fetcher threads before the > `InitialFetchStates` are created. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13276) Public DescribeConsumerGroupsResult constructor refers to KafkaFutureImpl
[ https://issues.apache.org/jira/browse/KAFKA-13276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-13276: Fix Version/s: 3.0.0 > Public DescribeConsumerGroupsResult constructor refers to KafkaFutureImpl > - > > Key: KAFKA-13276 > URL: https://issues.apache.org/jira/browse/KAFKA-13276 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Tom Bentley >Priority: Blocker > Fix For: 3.0.0 > > > The new public DescribeConsumerGroupsResult constructor refers to the > non-public API KafkaFutureImpl -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13260) FindCoordinator errorCounts does not handle v4
[ https://issues.apache.org/jira/browse/KAFKA-13260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411315#comment-17411315 ] Ismael Juma commented on KAFKA-13260: - 3.0.0 hasn't been released yet, so it's a bit weird to say it affects it. > FindCoordinator errorCounts does not handle v4 > -- > > Key: KAFKA-13260 > URL: https://issues.apache.org/jira/browse/KAFKA-13260 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.0.0 >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > > When using batch find coordinator (>=v4), errorCounts() does not correctly > compute the error count. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13266) `InitialFetchState` should be created after partition is removed from the fetchers
[ https://issues.apache.org/jira/browse/KAFKA-13266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-13266: Affects Version/s: (was: 3.0.0) > `InitialFetchState` should be created after partition is removed from the > fetchers > -- > > Key: KAFKA-13266 > URL: https://issues.apache.org/jira/browse/KAFKA-13266 > Project: Kafka > Issue Type: Bug >Reporter: David Jacot >Assignee: David Jacot >Priority: Blocker > Fix For: 3.0.0 > > > `ReplicationTest.test_replication_with_broker_failure` in KRaft mode > sometimes fails with the following error in the log: > {noformat} > [2021-08-31 11:31:25,092] ERROR [ReplicaFetcher replicaId=1, leaderId=2, > fetcherId=0] Unexpected error occurred while processing data for partition > __consumer_offsets-1 at offset 31727 > (kafka.server.ReplicaFetcherThread)java.lang.IllegalStateException: Offset > mismatch for partition __consumer_offsets-1: fetched offset = 31727, log end > offset = 31728. at > kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:194) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$8(AbstractFetcherThread.scala:545) > at scala.Option.foreach(Option.scala:437) at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:533) > at > kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7$adapted(AbstractFetcherThread.scala:532) > at > kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62) > at > scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359) > at > scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355) > at > scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309) > at > kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:532) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:216) > at > kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:215) > at scala.Option.foreach(Option.scala:437) at > kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:215) > at > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:197) at > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:99)[2021-08-31 > 11:31:25,093] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] > Partition __consumer_offsets-1 marked as failed > (kafka.server.ReplicaFetcherThread) > {noformat} > The issue is due to a race condition in > `ReplicaManager#applyLocalFollowersDelta`. The `InitialFetchState` is created > and populated before the partition is removed from the fetcher threads. This > means that the fetch offset of the `InitialFetchState` could be outdated when > the fetcher threads are re-started because the fetcher threads could have > incremented the log end offset in between. > The partitions must be removed from the fetcher threads before the > `InitialFetchStates` are created. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13260) FindCoordinator errorCounts does not handle v4
[ https://issues.apache.org/jira/browse/KAFKA-13260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-13260: Affects Version/s: (was: 3.0.0) > FindCoordinator errorCounts does not handle v4 > -- > > Key: KAFKA-13260 > URL: https://issues.apache.org/jira/browse/KAFKA-13260 > Project: Kafka > Issue Type: Improvement >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > Fix For: 3.0.0 > > > When using batch find coordinator (>=v4), errorCounts() does not correctly > compute the error count. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13260) FindCoordinator errorCounts does not handle v4
[ https://issues.apache.org/jira/browse/KAFKA-13260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-13260: Fix Version/s: 3.0.0 > FindCoordinator errorCounts does not handle v4 > -- > > Key: KAFKA-13260 > URL: https://issues.apache.org/jira/browse/KAFKA-13260 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.0.0 >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > Fix For: 3.0.0 > > > When using batch find coordinator (>=v4), errorCounts() does not correctly > compute the error count. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-13276) Public DescribeConsumerGroupsResult constructor refers to KafkaFutureImpl
[ https://issues.apache.org/jira/browse/KAFKA-13276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma reassigned KAFKA-13276: --- Assignee: Tom Bentley > Public DescribeConsumerGroupsResult constructor refers to KafkaFutureImpl > - > > Key: KAFKA-13276 > URL: https://issues.apache.org/jira/browse/KAFKA-13276 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.0.0 >Reporter: Tom Bentley >Assignee: Tom Bentley >Priority: Blocker > Fix For: 3.0.0 > > > The new public DescribeConsumerGroupsResult constructor refers to the > non-public API KafkaFutureImpl -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13260) FindCoordinator errorCounts does not handle v4
[ https://issues.apache.org/jira/browse/KAFKA-13260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-13260: Priority: Blocker (was: Major) > FindCoordinator errorCounts does not handle v4 > -- > > Key: KAFKA-13260 > URL: https://issues.apache.org/jira/browse/KAFKA-13260 > Project: Kafka > Issue Type: Improvement >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Blocker > Fix For: 3.0.0 > > > When using batch find coordinator (>=v4), errorCounts() does not correctly > compute the error count. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13259) DescribeProducers response does not include an error when it failed
[ https://issues.apache.org/jira/browse/KAFKA-13259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-13259: Fix Version/s: 3.0.0 > DescribeProducers response does not include an error when it failed > --- > > Key: KAFKA-13259 > URL: https://issues.apache.org/jira/browse/KAFKA-13259 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.0.0, 2.8.0 >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Blocker > Fix For: 3.0.0 > > > This is broken since 2.8 but this is directly visible by users in 3.0 because > of the new Admin.describeProducers() API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13259) DescribeProducers response does not include an error when it failed
[ https://issues.apache.org/jira/browse/KAFKA-13259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-13259: Affects Version/s: (was: 3.0.0) > DescribeProducers response does not include an error when it failed > --- > > Key: KAFKA-13259 > URL: https://issues.apache.org/jira/browse/KAFKA-13259 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.8.0 >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Blocker > Fix For: 3.0.0 > > > This is broken since 2.8 but this is directly visible by users in 3.0 because > of the new Admin.describeProducers() API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13258) AlterClientQuotas response does not include an error when it failed
[ https://issues.apache.org/jira/browse/KAFKA-13258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-13258: Fix Version/s: 3.0.0 > AlterClientQuotas response does not include an error when it failed > --- > > Key: KAFKA-13258 > URL: https://issues.apache.org/jira/browse/KAFKA-13258 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.8.0 >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > Fix For: 3.0.0 > > > This causes `admin.alterClientQuotas()` to not return an error even if the > call failed due to insufficient permissions for example. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13259) DescribeProducers response does not include an error when it failed
[ https://issues.apache.org/jira/browse/KAFKA-13259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-13259: Fix Version/s: 2.8.1 > DescribeProducers response does not include an error when it failed > --- > > Key: KAFKA-13259 > URL: https://issues.apache.org/jira/browse/KAFKA-13259 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.8.0 >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Blocker > Fix For: 3.0.0, 2.8.1 > > > This is broken since 2.8 but this is directly visible by users in 3.0 because > of the new Admin.describeProducers() API. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13258) AlterClientQuotas response does not include an error when it failed
[ https://issues.apache.org/jira/browse/KAFKA-13258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-13258: Affects Version/s: (was: 3.0.0) > AlterClientQuotas response does not include an error when it failed > --- > > Key: KAFKA-13258 > URL: https://issues.apache.org/jira/browse/KAFKA-13258 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.8.0 >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Major > > This causes `admin.alterClientQuotas()` to not return an error even if the > call failed due to insufficient permissions for example. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13258) AlterClientQuotas response does not include an error when it failed
[ https://issues.apache.org/jira/browse/KAFKA-13258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-13258: Fix Version/s: 2.8.1 > AlterClientQuotas response does not include an error when it failed > --- > > Key: KAFKA-13258 > URL: https://issues.apache.org/jira/browse/KAFKA-13258 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.8.0 >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Blocker > Fix For: 3.0.0, 2.8.1 > > > This causes `admin.alterClientQuotas()` to not return an error even if the > call failed due to insufficient permissions for example. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-13258) AlterClientQuotas response does not include an error when it failed
[ https://issues.apache.org/jira/browse/KAFKA-13258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-13258: Priority: Blocker (was: Major) > AlterClientQuotas response does not include an error when it failed > --- > > Key: KAFKA-13258 > URL: https://issues.apache.org/jira/browse/KAFKA-13258 > Project: Kafka > Issue Type: Improvement >Affects Versions: 2.8.0 >Reporter: Mickael Maison >Assignee: Mickael Maison >Priority: Blocker > Fix For: 3.0.0 > > > This causes `admin.alterClientQuotas()` to not return an error even if the > call failed due to insufficient permissions for example. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #11300: KAFKA-13258/13259/13260: Fix error response generation
ijuma commented on pull request #11300: URL: https://github.com/apache/kafka/pull/11300#issuecomment-914446506 @mimaison I synced with @kkonstantine and he said he's ok with this being merged to 3.0. He will update the release thread soon, but let's get this in when we're ready to avoid delays. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram opened a new pull request #11308: KAFKA-13277; Fix size calculation for tagged string fields in message generator
rajinisivaram opened a new pull request #11308: URL: https://github.com/apache/kafka/pull/11308 Size calculation for tagged fields is currently incorrect and works only for small strings. This results in BufferOverflowException when serializing requests with large strings in tagged fields. The PR fixes size calculation to match the bytes written. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13248) Class name mismatch for LoggerFactory.getLogger method in TimeOrderedWindowStoreBuilder.java
[ https://issues.apache.org/jira/browse/KAFKA-13248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411347#comment-17411347 ] Guozhang Wang commented on KAFKA-13248: --- [~sider-bughunter] Thanks for the find. I'm currently working on KAFKA-13216 in which I've used a kv-store instead of a window store, and hence I've renamed the whole class as well. Do you mind if I just incorporate your findings in the other PR? > Class name mismatch for LoggerFactory.getLogger method in > TimeOrderedWindowStoreBuilder.java > > > Key: KAFKA-13248 > URL: https://issues.apache.org/jira/browse/KAFKA-13248 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: S. B. Hunter >Priority: Minor > Attachments: KAFKA-13248.1.patch > > > I have noticed a mismatch with the class name passed to the > LoggerFactory.getLogger method. This would make it hard to track the source > of log messages. > public class {color:#00875a}TimeOrderedWindowStoreBuilder{color} > extends AbstractStoreBuilder> { > private final Logger log = > LoggerFactory.getLogger({color:#de350b}WindowStoreBuilder{color}.class); > private final WindowBytesStoreSupplier storeSupplier; > public {color:#00875a}TimeOrderedWindowStoreBuilder{color}(final > WindowBytesStoreSupplier storeSupplier, > final Serde keySerde, -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] spena commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store
spena commented on a change in pull request #11252: URL: https://github.com/apache/kafka/pull/11252#discussion_r703675892 ## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedTimestampedKeyAndJoinSideSerializerTest.java ## @@ -24,49 +24,49 @@ import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertThrows; -public class KeyAndJoinSideSerializerTest { +public class TimestampedTimestampedKeyAndJoinSideSerializerTest { Review comment: The class starts with repeated timestamped -> `TimestampedTimestamped...` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on a change in pull request #11213: KAFKA-13201: Convert KTable suppress to new PAPI
jeqo commented on a change in pull request #11213: URL: https://github.com/apache/kafka/pull/11213#discussion_r703713785 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java ## @@ -198,7 +201,7 @@ private void emit(final TimeOrderedKeyValueBuffer.Eviction toEmit) { final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext(); internalProcessorContext.setRecordContext(toEmit.recordContext()); try { -internalProcessorContext.forward(toEmit.key(), toEmit.value()); +internalProcessorContext.forward(toEmit.record()); Review comment: It does, yet. I just tested in one of the methods. Adding the change and updating a test to cover this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on a change in pull request #11213: KAFKA-13201: Convert KTable suppress to new PAPI
jeqo commented on a change in pull request #11213: URL: https://github.com/apache/kafka/pull/11213#discussion_r703713785 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java ## @@ -198,7 +201,7 @@ private void emit(final TimeOrderedKeyValueBuffer.Eviction toEmit) { final ProcessorRecordContext prevRecordContext = internalProcessorContext.recordContext(); internalProcessorContext.setRecordContext(toEmit.recordContext()); try { -internalProcessorContext.forward(toEmit.key(), toEmit.value()); +internalProcessorContext.forward(toEmit.record()); Review comment: It does, yes. I just tested in one of the methods. Adding the change and updating a test to cover this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed
rhauch commented on pull request #10112: URL: https://github.com/apache/kafka/pull/10112#issuecomment-914547745 @C0urante, thanks for the feedback on my suggestion. I like your option 3, because it does allow the iteration to stop on each source partition as soon as it encounters the first unacknowledged record in each queue. I also think that the behavior with the suggested approach and your option 3 is still a lot better than the current situation. One question, though: you mention that it might be a problem if iterating over the submitted records takes longer than `offset.flush.timeout.ms`. But IIUC the `offset.flush.timeout.ms` would actually not be used anymore, as there actually are no timeouts as the offset commit thread doesn't block anymore. So, worst case, if task A has a ton of submitted records that have to be iterated over (e.g., fast producer and fast source task), it might slow the committing of offsets for other tasks. (Again, this is not any worse than the current behavior.) But your option 2 would help with this at the risk of using more threads, and so we may also want to consider this to help ensure that no slowly-proceeding producer of a task blocks other offset commits. Of course, another option might be to incur the iteration on the worker source task thread. That would essentially move the use of the queue(s) to the worker source task thread, tho we still need to get the offsets to the offset commit thread and so would likely have to keep the synchronization blocks around the offset writer snapshot. On one hand, that's putting more work onto the worker source task thread and making the offset thread super straightforward (snapshot and write); on the other it's putting the onus on the worker source task thread. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11307: KAFKA-13262: Remove final from `MockConsumer.close()` and delegate implementation
ijuma commented on pull request #11307: URL: https://github.com/apache/kafka/pull/11307#issuecomment-914551974 Unrelated test failures: > Build / JDK 17 and Scala 2.13 / org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldPrefixAllInternalTopicNamesWithNamedTopology > Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldPrefixAllInternalTopicNamesWithNamedTopology > Build / JDK 8 and Scala 2.12 / org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldPrefixAllInternalTopicNamesWithNamedTopology > -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #11307: KAFKA-13262: Remove final from `MockConsumer.close()` and delegate implementation
ijuma merged pull request #11307: URL: https://github.com/apache/kafka/pull/11307 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11307: KAFKA-13262: Remove final from `MockConsumer.close()` and delegate implementation
ijuma commented on pull request #11307: URL: https://github.com/apache/kafka/pull/11307#issuecomment-914552427 Merged to master and cherry-picked to 3.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #11308: KAFKA-13277; Fix size calculation for tagged string fields in message generator
cmccabe merged pull request #11308: URL: https://github.com/apache/kafka/pull/11308 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding commented on pull request #11293: MINOR: defineInternal for KIP-405 configs
ccding commented on pull request #11293: URL: https://github.com/apache/kafka/pull/11293#issuecomment-914613398 It failed here https://github.com/apache/kafka/blob/9d107c174bfb23e5ce0daca9a59796f018f627c6/core[…]t/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala ``` Expected :HashSet(message.timestamp.difference.max.ms, follower.replication.throttled.replicas, file.delete.delay.ms, compression.type, unclean.leader.election.enable, flush.messages, delete.retention.ms, max.message.bytes, segment.index.bytes, segment.jitter.ms, min.cleanable.dirty.ratio, retention.bytes, message.downconversion.enable, max.compaction.lag.ms, min.compaction.lag.ms, flush.ms, cleanup.policy, message.timestamp.type, retention.ms, min.insync.replicas, message.format.version, leader.replication.throttled.replicas, preallocate, index.interval.bytes, segment.bytes, segment.ms) Actual :HashSet(message.timestamp.difference.max.ms, follower.replication.throttled.replicas, file.delete.delay.ms, compression.type, unclean.leader.election.enable, flush.messages, delete.retention.ms, local.retention.ms, max.message.bytes, segment.index.bytes, segment.jitter.ms, min.cleanable.dirty.ratio, retention.bytes, message.downconversion.enable, max.compaction.lag.ms, min.compaction.lag.ms, remote.storage.enable, flush.ms, cleanup.policy, message.timestamp.type, retention.ms, min.insync.replicas, message.format.version, leader.replication.throttled.replicas, local.retention.bytes, preallocate, index.interval.bytes, segment.bytes, segment.ms) ``` the diff between Expected and Actual is the three defineInternal configs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ccding edited a comment on pull request #11293: MINOR: defineInternal for KIP-405 configs
ccding edited a comment on pull request #11293: URL: https://github.com/apache/kafka/pull/11293#issuecomment-914613398 It failed here https://github.com/apache/kafka/blob/9d107c174bfb23e5ce0daca9a59796f018f627c6/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala#L432 ``` Expected :HashSet(message.timestamp.difference.max.ms, follower.replication.throttled.replicas, file.delete.delay.ms, compression.type, unclean.leader.election.enable, flush.messages, delete.retention.ms, max.message.bytes, segment.index.bytes, segment.jitter.ms, min.cleanable.dirty.ratio, retention.bytes, message.downconversion.enable, max.compaction.lag.ms, min.compaction.lag.ms, flush.ms, cleanup.policy, message.timestamp.type, retention.ms, min.insync.replicas, message.format.version, leader.replication.throttled.replicas, preallocate, index.interval.bytes, segment.bytes, segment.ms) Actual :HashSet(message.timestamp.difference.max.ms, follower.replication.throttled.replicas, file.delete.delay.ms, compression.type, unclean.leader.election.enable, flush.messages, delete.retention.ms, local.retention.ms, max.message.bytes, segment.index.bytes, segment.jitter.ms, min.cleanable.dirty.ratio, retention.bytes, message.downconversion.enable, max.compaction.lag.ms, min.compaction.lag.ms, remote.storage.enable, flush.ms, cleanup.policy, message.timestamp.type, retention.ms, min.insync.replicas, message.format.version, leader.replication.throttled.replicas, local.retention.bytes, preallocate, index.interval.bytes, segment.bytes, segment.ms) ``` the diff between Expected and Actual is the three defineInternal configs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.
junrao commented on a change in pull request #11033: URL: https://github.com/apache/kafka/pull/11033#discussion_r703798708 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java ## @@ -129,37 +132,44 @@ public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmen } // Publish the message to the topic. - doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(), segmentMetadataUpdate); +return doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(), segmentMetadataUpdate); } finally { lock.readLock().unlock(); } } @Override -public void putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) +public CompletableFuture putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) throws RemoteStorageException { Objects.requireNonNull(remotePartitionDeleteMetadata, "remotePartitionDeleteMetadata can not be null"); lock.readLock().lock(); try { ensureInitializedAndNotClosed(); - doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), remotePartitionDeleteMetadata); +return doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), remotePartitionDeleteMetadata); } finally { lock.readLock().unlock(); } } -private void doPublishMetadata(TopicIdPartition topicIdPartition, RemoteLogMetadata remoteLogMetadata) +private CompletableFuture doPublishMetadata(TopicIdPartition topicIdPartition, + RemoteLogMetadata remoteLogMetadata) throws RemoteStorageException { log.debug("Publishing metadata for partition: [{}] with context: [{}]", topicIdPartition, remoteLogMetadata); try { // Publish the message to the topic. -RecordMetadata recordMetadata = producerManager.publishMessage(remoteLogMetadata); -// Wait until the consumer catches up with this offset. This will ensure read-after-write consistency -// semantics. -consumerManager.waitTillConsumptionCatchesUp(recordMetadata); +CompletableFuture produceFuture = new CompletableFuture<>(); +producerManager.publishMessage(remoteLogMetadata, produceFuture); +return produceFuture.thenApplyAsync((Function) recordMetadata -> { +try { + consumerManager.waitTillConsumptionCatchesUp(recordMetadata); +} catch (TimeoutException e) { +throw new KafkaException(e); +} +return null; +}).toCompletableFuture(); Review comment: Is toCompletableFuture() needed since thenApplyAsync() returns CompletableFuture already? ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java ## @@ -129,37 +132,44 @@ public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmen } // Publish the message to the topic. - doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(), segmentMetadataUpdate); +return doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(), segmentMetadataUpdate); } finally { lock.readLock().unlock(); } } @Override -public void putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) +public CompletableFuture putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) throws RemoteStorageException { Objects.requireNonNull(remotePartitionDeleteMetadata, "remotePartitionDeleteMetadata can not be null"); lock.readLock().lock(); try { ensureInitializedAndNotClosed(); - doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), remotePartitionDeleteMetadata); +return doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), remotePartitionDeleteMetadata); } finally { lock.readLock().unlock(); } } -private void doPublishMetadata(TopicIdPartition topicIdPartition, RemoteLogMetadata remoteLogMetadata) +private CompletableFuture doPublishMetadata(TopicIdPartition topicIdPartition, + RemoteLogMetadata remoteLogMetadata) throws RemoteStorageException { log.debug("Publishing metadata for partition: [{}] with context: [{}]", topicIdPartition, remoteLogMetadat
[GitHub] [kafka] guozhangwang commented on pull request #11297: KAFKA-10038: Supports default client.id for ConsoleConsumer, ProducerPerformance, ConsumerPerformance
guozhangwang commented on pull request #11297: URL: https://github.com/apache/kafka/pull/11297#issuecomment-914619982 Thanks @jasonyanwenl , the PR lgtm, and thank you also for adding the unit tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #11297: KAFKA-10038: Supports default client.id for ConsoleConsumer, ProducerPerformance, ConsumerPerformance
guozhangwang merged pull request #11297: URL: https://github.com/apache/kafka/pull/11297 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #11304: MINOR: Supplement unit test for KAFKA-13175 : Optimization TopicExis…
guozhangwang merged pull request #11304: URL: https://github.com/apache/kafka/pull/11304 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2
[ https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411480#comment-17411480 ] Ashish Patil commented on KAFKA-9366: - Thanks, [~dongjin] > Upgrade log4j to log4j2 > --- > > Key: KAFKA-9366 > URL: https://issues.apache.org/jira/browse/KAFKA-9366 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0 >Reporter: leibo >Assignee: Dongjin Lee >Priority: Critical > Labels: needs-kip > Fix For: 3.1.0 > > > h2. CVE-2019-17571 Detail > Included in Log4j 1.2 is a SocketServer class that is vulnerable to > deserialization of untrusted data which can be exploited to remotely execute > arbitrary code when combined with a deserialization gadget when listening to > untrusted network traffic for log data. This affects Log4j versions up to 1.2 > up to 1.2.17. > > [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang commented on a change in pull request #11302: KAFKA-13243: KIP-773 Differentiate metric latency measured in ms and ns
guozhangwang commented on a change in pull request #11302: URL: https://github.com/apache/kafka/pull/11302#discussion_r703822498 ## File path: docs/upgrade.html ## @@ -22,9 +22,12 @@ Notable changes in 3.1.0 Apache Kafka supports Java 17. +The following metrics have been deprecated: bufferpool-wait-time-total, io-waittime-total, Review comment: nit: also add a ref link to the KIP? ## File path: docs/upgrade.html ## @@ -22,9 +22,12 @@ Notable changes in 3.1.0 Apache Kafka supports Java 17. +The following metrics have been deprecated: bufferpool-wait-time-total, io-waittime-total, +and iotime-total. Please use bufferpool-wait-time-ns-total, io-wait-time-ns-total, +and io-time-ns-total instead. -Notable changes in 3.0.0 +Notable changes in 3.0.0 Review comment: Nice find. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
guozhangwang commented on pull request #10798: URL: https://github.com/apache/kafka/pull/10798#issuecomment-914624644 Okay, let me re-trigger the tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #10566: KAFKA-12694 Avoid schema mismatch DataException when validating default values
C0urante commented on pull request #10566: URL: https://github.com/apache/kafka/pull/10566#issuecomment-914640232 Hey Gunnar! Sorry for the delay. > Initializing a Struct with a builder rather than a Schema seems to be an odd thing to do, no? Indeed it is rather odd, but it's still possible at the moment and I've definitely seen it in use (intentionally or otherwise...) in at least one fairly large and popular code base. If we can avoid breaking projects that use this pattern (strange as it is) I think that'd be best. I haven't spent too much time thinking about a clean solution but it's at least possible to satisfy both cases with the following: - The change proposed in this PR - The change I proposed above where the order of schemas in the comparison in `ConnectSchema::validateValue` is reversed - The `ConnectSchema::equals` method is altered to accept any `Schema` instance and only operate on the public methods exposed by the `Schema` interface instead of internal variables for comparison of the schemas' fields, key schema, and value schema This would be fairly involved for such a small bug so hopefully there's something less invasive out there, but at least it's possible to favor one use case without compromising the other. As an alternative, could the connector use the `SchemaBuilder` when instantiating the `Struct` object, instead of calling `build` and using a `ConnectSchema`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante edited a comment on pull request #10566: KAFKA-12694 Avoid schema mismatch DataException when validating default values
C0urante edited a comment on pull request #10566: URL: https://github.com/apache/kafka/pull/10566#issuecomment-914640232 Hey Gunnar! Sorry for the delay. > Initializing a Struct with a builder rather than a Schema seems to be an odd thing to do, no? Indeed it is rather odd, but it's still possible at the moment and I've definitely seen it in use (intentionally or otherwise...) in at least one fairly large and popular code base. If we can avoid breaking projects that use this pattern (strange as it is) I think that'd be best. I haven't spent too much time thinking about a clean solution but it's at least possible to satisfy both cases with the following: - The change proposed in this PR - The change I proposed above where the order of schemas in the comparison in `ConnectSchema::validateValue` is reversed - The `ConnectSchema::equals` method is altered to accept any `Schema` instance and only operate on the public methods exposed by the `Schema` interface instead of internal variables for comparison of the schemas' fields, key schema, and value schema I've verified this locally with the following two test cases: ```java @Test public void testDefaultValueStructSchema() { SchemaBuilder builder = SchemaBuilder.struct() .field("f1", Schema.BOOLEAN_SCHEMA); Struct defaultValue = new Struct(builder.build()); // the Struct receives a schema, not a builder defaultValue.put("f1", true); builder.defaultValue(defaultValue) .build(); } @Test public void testDefaultValueStructSchemaBuilder() { SchemaBuilder builder = SchemaBuilder.struct() .field("f1", Schema.BOOLEAN_SCHEMA); Struct defaultValue = new Struct(builder); defaultValue.put("f1", true); builder.defaultValue(defaultValue).build(); } ``` This would be fairly involved for such a small bug so hopefully there's something less invasive out there, but at least it's possible to favor one use case without compromising the other. As an alternative, could the connector use the `SchemaBuilder` when instantiating the `Struct` object, instead of calling `build` and using a `ConnectSchema`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dejan2609 commented on pull request #10967: KAFKA-12770: CheckStyle team needs this feature (in order to include Kafka into their regression suite)
dejan2609 commented on pull request #10967: URL: https://github.com/apache/kafka/pull/10967#issuecomment-914653605 Adding a comment (just to bring this PR to the surface). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #11213: KAFKA-13201: Convert KTable suppress to new PAPI
vvcephei commented on pull request #11213: URL: https://github.com/apache/kafka/pull/11213#issuecomment-914663785 Java 17 errored out: ``` [2021-09-07T21:03:23.475Z] * What went wrong: [2021-09-07T21:03:23.475Z] Execution failed for task ':core:integrationTest'. [2021-09-07T21:03:23.475Z] > Process 'Gradle Test Executor 129' finished with non-zero exit value 1 [2021-09-07T21:03:23.475Z] This problem might be caused by incorrect test process configuration. [2021-09-07T21:03:23.475Z] Please refer to the test execution section in the User Manual at https://docs.gradle.org/7.2/userguide/java_testing.html#sec:test_execution ``` But Java 8 and 11 passed, so I'll go ahead and merge (since Java 17 itself is new to this project). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #11213: KAFKA-13201: Convert KTable suppress to new PAPI
vvcephei merged pull request #11213: URL: https://github.com/apache/kafka/pull/11213 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13256) Possible NPE in ConfigDef when rendering (enriched) RST or HTML when documentation is not set/NULL
[ https://issues.apache.org/jira/browse/KAFKA-13256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411582#comment-17411582 ] Matthias J. Sax commented on KAFKA-13256: - [~rk3rn3r] Thanks for reporting this issue and for the PR. I am not familiar with the details of this tool, so my question is, for what cases can the key actually be {{null}}? > Possible NPE in ConfigDef when rendering (enriched) RST or HTML when > documentation is not set/NULL > -- > > Key: KAFKA-13256 > URL: https://issues.apache.org/jira/browse/KAFKA-13256 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.8.0 >Reporter: René Kerner >Priority: Major > Original Estimate: 0.5h > Remaining Estimate: 0.5h > > While working on Debezium I discovered the following issue: > When Kafka's ConfigDef renders the HTML or RST documentation representation > of the config definition, it requires `ConfigKey.documentation` member > variable to be a java.lang.String instance that's set to an actual value > different than NULL, else NPE happens: > {code:java} > b.append(key.documentation.replaceAll("\n", "")); > {code} > {code:java} > for (String docLine : key.documentation.split("\n")) { > {code} > > When `documentation` is not set/NULL I suggest to either set a valid String > like "No documentation available" or skip that config key. > > I could provide a PR to fix this soon. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10493) KTable out-of-order updates are not being ignored
[ https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411592#comment-17411592 ] Matthias J. Sax commented on KAFKA-10493: - Agreed. The original API only had `builder.table()` and the assumption was that people would follow the "single writer principle" to avoid out-of-order data in the input topic. When we added `toTable()` the case you describe was actually discussed, as it was a concern... In the end, we decided to add `toTable()` anyway, and leave it to the user's responsibility to be aware of the issue. In the end, from my POV only "versioned tables" will solve the issue correctly, and we are already working on some design/prototyping. > KTable out-of-order updates are not being ignored > - > > Key: KAFKA-10493 > URL: https://issues.apache.org/jira/browse/KAFKA-10493 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Pedro Gontijo >Assignee: Matthias J. Sax >Priority: Blocker > Fix For: 4.0.0 > > Attachments: KTableOutOfOrderBug.java, out-of-order-table.png > > > On a materialized KTable, out-of-order records for a given key (records which > timestamp are older than the current value in store) are not being ignored > but used to update the local store value and also being forwarded. > I believe the bug is here: > [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77] > It should return true, not false (see javadoc) > The bug impacts here: > [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148] > I have attached a simple stream app that shows the issue happening. > Thank you! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #11218: MINOR: optimize performAssignment to skip unnecessary check
ableegoldman commented on a change in pull request #11218: URL: https://github.com/apache/kafka/pull/11218#discussion_r703933946 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java ## @@ -48,6 +53,16 @@ public class ConsumerConfig extends AbstractConfig { private static final ConfigDef CONFIG; +// a list contains all the in-product assignor names. Should be updated when new assignor added. +// This is to help optimize the ConsumerCoordinator#performAssignment +public static final List IN_PRODUCT_ASSIGNOR_NAMES = +Collections.unmodifiableList(Arrays.asList( +RANGE_ASSIGNOR_NAME, +ROUNDROBIN_ASSIGNOR_NAME, +STICKY_ASSIGNOR_NAME, +COOPERATIVE_STICKY_ASSIGNOR_NAME Review comment: No, I think it's correct to leave the Streams assignor out of this. Though I think it technically may not matter at the moment, since the Streams assignor will only assign topics from its own subscription and ignores the subscribed topics of the other members, we may want the flexibility to do something like this in the future. ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java ## @@ -48,6 +53,16 @@ public class ConsumerConfig extends AbstractConfig { private static final ConfigDef CONFIG; +// a list contains all the in-product assignor names. Should be updated when new assignor added. +// This is to help optimize the ConsumerCoordinator#performAssignment +public static final List IN_PRODUCT_ASSIGNOR_NAMES = Review comment: nit: this name is a little funky, can we come up with something that describes what this list actually means? The only things I can think of are a bit clunky, but maybe `ASSIGN_FROM_SUBSCRIBED_ASSIGNORS` or `SUBSCRIBED_TOPICS_ASSIGNORS` or whatever sounds good to you 🙂 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #11294: KAFKA-13266; `InitialFetchState` should be created after partition is removed from the fetchers
hachikuji commented on a change in pull request #11294: URL: https://github.com/apache/kafka/pull/11294#discussion_r703942630 ## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ## @@ -3242,6 +3260,92 @@ class ReplicaManagerTest { } } + @Test + def testDeltaFollowerStopFetcherBeforeCreatingInitialFetchOffset(): Unit = { +val localId = 1 +val otherId = localId + 1 +val topicPartition = new TopicPartition("foo", 0) + +val mockReplicaFetcherManager = Mockito.mock(classOf[ReplicaFetcherManager]) +val replicaManager = setupReplicaManagerWithMockedPurgatories( + timer = new MockTimer(time), + brokerId = localId, + mockReplicaFetcherManager = Some(mockReplicaFetcherManager) +) + +try { + // The first call to removeFetcherForPartitions should be ignored. + Mockito.when(mockReplicaFetcherManager.removeFetcherForPartitions( +Set(topicPartition)) + ).thenReturn(Map.empty[TopicPartition, PartitionFetchState]) + + // Make the local replica the follower + var followerTopicsDelta = topicsCreateDelta(localId, false) + var followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) + replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta) + + // Check the state of that partition + val HostedPartition.Online(followerPartition) = replicaManager.getPartition(topicPartition) + assertFalse(followerPartition.isLeader) + assertEquals(0, followerPartition.getLeaderEpoch) + assertEquals(0, followerPartition.localLogOrException.logEndOffset) + + // Verify that addFetcherForPartitions was called with the correct + // init offset. + Mockito.verify(mockReplicaFetcherManager, Mockito.times(1)) +.addFetcherForPartitions( + Map(topicPartition -> InitialFetchState( +leader = BrokerEndPoint(otherId, "localhost", 9093), +currentLeaderEpoch = 0, +initOffset = 0 + )) +) + + // The second call to removeFetcherForPartitions simulate the case + // where the fetcher write to the log before being shutdown. + Mockito.when(mockReplicaFetcherManager.removeFetcherForPartitions( +Set(topicPartition)) + ).thenAnswer { _ => +replicaManager.getPartition(topicPartition) match { + case HostedPartition.Online(partition) => +partition.appendRecordsToFollowerOrFutureReplica( + records = MemoryRecords.withRecords(CompressionType.NONE, 0, +new SimpleRecord("first message".getBytes)), + isFuture = false +) + + case _ => +} + +Map.empty[TopicPartition, PartitionFetchState] + } + + // Apply changes that bumps the leader epoch. + followerTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), localId, false) + followerMetadataImage = imageFromTopics(followerTopicsDelta.apply()) + replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta) + + assertFalse(followerPartition.isLeader) + assertEquals(1, followerPartition.getLeaderEpoch) + assertEquals(1, followerPartition.localLogOrException.logEndOffset) + + // Verify that addFetcherForPartitions was called with the correct + // init offset. + Mockito.verify(mockReplicaFetcherManager, Mockito.times(1)) +.addFetcherForPartitions( + Map(topicPartition -> InitialFetchState( +leader = BrokerEndPoint(otherId, "localhost", 9093), +currentLeaderEpoch = 1, +initOffset = 1 + )) +) +} finally { + replicaManager.shutdown() +} + +TestUtils.assertNoNonDaemonThreads(this.getClass.getName) Review comment: nit: is it worthwhile moving this to an `@AfterEach`? ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -2220,7 +2210,22 @@ class ReplicaManager(val config: KafkaConfig, replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.keySet) Review comment: Perhaps it's worth a comment here that stopping the fetchers is required so that we can initialize the fetch position correctly. It's a subtle point which we might miss again in the future. ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -2229,15 +2234,14 @@ class ReplicaManager(val config: KafkaConfig, } def deleteStrayReplicas(topicPartitions: Iterable[TopicPartition]): Unit = { -stopPartitions(topicPartitions.map { tp => tp -> true }.toMap).foreach { - case (topicPartition, e) => -if (e.isInstanceOf[KafkaStorageException]) { - stateChangeLogger.error(s"Unable to delete stray replica $topicPartition because " + -"the local replica for the partition is in an offline log directory") -} else { - stateChangeLogger.error(s"Unable to delete
[GitHub] [kafka] ableegoldman commented on a change in pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order
ableegoldman commented on a change in pull request #11292: URL: https://github.com/apache/kafka/pull/11292#discussion_r703945833 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/WindowStoreFetchIntegrationTest.java ## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.integration; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.TimeWindowedKStream; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowBytesStoreSupplier; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.test.TestUtils; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; +import java.util.function.Supplier; + +import static java.time.Duration.ofMillis; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; + +@RunWith(Parameterized.class) +public class WindowStoreFetchIntegrationTest { +private enum StoreType { InMemory, RocksDB, Timed }; +private static final String STORE_NAME = "store"; +private static final int DATA_SIZE = 5; +private static final long WINDOW_SIZE = 500L; +private static final long RETENTION_MS = 1L; + +private StoreType storeType; +private boolean enableLogging; +private boolean enableCaching; +private boolean forward; + +private LinkedList, Long>> expectedRecords; +private LinkedList> records; +private Properties streamsConfig; + +private TimeWindowedKStream windowedStream; + +public WindowStoreFetchIntegrationTest(final StoreType storeType, final boolean enableLogging, final boolean enableCaching, final boolean forward) { Review comment: Seems more like a unit test than an integration (though a particularly thorough one😜 ) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13269) Kafka Streams Aggregation data loss between instance restarts and rebalances
[ https://issues.apache.org/jira/browse/KAFKA-13269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411606#comment-17411606 ] A. Sophie Blee-Goldman commented on KAFKA-13269: Hey [~rohitbobade], thanks for the report. Unfortunately I don't think the acceptable recovery lag is directly responsible, as that config is only used within the assignor to figure out the placement of tasks. Assigning a task as "Active" just means that the instance should try to process it, the task still has to go through restoration if it's anything less than 100% caught up with the end of the changelog. Wonder if this might be due to https://issues.apache.org/jira/browse/KAFKA-13249? > Kafka Streams Aggregation data loss between instance restarts and rebalances > > > Key: KAFKA-13269 > URL: https://issues.apache.org/jira/browse/KAFKA-13269 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.2 >Reporter: Rohit Bobade >Priority: Major > > Using Kafka Streams 2.6.2 and doing count based aggregation of messages. Also > setting Processing Guarantee - EXACTLY_ONCE_BETA and > NUM_STANDBY_REPLICAS_CONFIG = 1. Sending some messages and restarting > instances in middle while processing to test fault tolerance. The output > count is incorrect because of data loss while restoring state. > It looks like the streams task becomes active and starts processing even when > the state is not fully restored but is within the acceptable recovery lag > (default is 1) This results in data loss > {quote}A stateful active task is assigned to an instance only when its state > is within the configured acceptable.recovery.lag, if one exists > {quote} > [https://docs.confluent.io/platform/current/streams/developer-guide/running-app.html?_ga=2.33073014.912824567.1630441414-1598368976.1615841473#state-restoration-during-workload-rebalance] > [https://docs.confluent.io/platform/current/installation/configuration/streams-configs.html#streamsconfigs_acceptable.recovery.lag] > Setting acceptable.recovery.lag to 0 and re-running the chaos tests gives the > correct result. > Related KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-Computingthemost-caught-upinstances] > Just want to get some thoughts on this use case from the Kafka team or if > anyone has encountered similar issue -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file
ableegoldman commented on a change in pull request #11283: URL: https://github.com/apache/kafka/pull/11283#discussion_r703953000 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -565,7 +565,7 @@ public void closeCleanAndRecycleState() { protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) { // commitNeeded indicates we may have processed some records since last commit // and hence we need to refresh checkpointable offsets regardless whether we should checkpoint or not -if (commitNeeded) { +if (commitNeeded || enforceCheckpoint) { Review comment: Fair enough 🙂 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11242: [WIP] MINOR: POC for KIP-591: Add config to set default store impl class
guozhangwang commented on pull request #11242: URL: https://github.com/apache/kafka/pull/11242#issuecomment-914789772 Thanks for the POC @showuon ! I made a pass and here are some high-level thoughts: 1) On the PAPI layer, users today can only add stores via Topology#addStateStore in which a `StoreBuilder` is required, and from the public APIs they are most likely going to create one from the `Stores` factory. Note that `KeyValueStoreBuilder` is not a public API and hence cannot be really used by PAPI users. In other words, in PAPI today users would have to specify the store types explicitly via the `Topology#addStateStore` API anyways, and that's also why we only consider this KIP for DSL originally. If we want to benefit the PAPI users, we would probably need to add new functions in the `Stores` factory which would not specify if it is persistent or in-memory, but only the type (kv, window, etc). That would require a larger KIP and maybe we can exclude that for this one. 2) I personally would prefer as less public API changes/additions as possible for KIPs, since it's always easier to add later when we found it's not sufficient, than add first and regret later :) Plus with more public APIs it is always getting clumsier. For that, my original thought would be, from user's perspective, if `Materialized` construct does not have a supplier set, then Streams would use the supplier as configured by the config. I.e. we should work with all thee cases below: ``` // explicitly set the store stream.groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("some-name")); // only set the store type stream.groupByKey().count(Materialized.as(StoreType.IN_MEMORY)); // do not set the store type at all, rely on the config stream.groupByKey().count(Materialized.as("some-name")); ``` I understand that today the config is not available yet during the DSL's StreamsBuilder phase, but only during the build() phase. What I was thinking is that during the StreamsBuidler's topology-construction phase, e.g. inside `materialize()` function, instead of ``` if (supplier == null) { final String name = materialized.storeName(); supplier = Stores.persistentTimestampedKeyValueStore(name); } ``` That means, for the third case above, we would need some "placeholder" StoreSupplier as it is "to be decided", and then during `buildAndOptimizeTopology` we would fill-in the placeholders based on the configs. For the other two cases, we would derive the supplier immediately. With that we would not need the `StoreImplementation` interface. In general I'm leaning towards we, as the developers of KS, eat more complexity bullets so that we can leave the public interface as succinct as possible to the users. As for passing in the config at the `StreamsBuilder` constructor instead of the build phase, I think that would work too, and would be much simpler, but again it would change the public API pattern as well which makes the `build(properties)` function deprecated, and the other deprecated `build()` back to normal? Maybe cc @ableegoldman as well since she's working on another project which would benefit if we pass in the config at the constructor phase). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13268) Add more integration tests for Table Table FK joins with repartitioning
[ https://issues.apache.org/jira/browse/KAFKA-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411619#comment-17411619 ] A. Sophie Blee-Goldman commented on KAFKA-13268: What's the name of that testing framework where you just define the parameter space and then it runs all the permutations of those? We should start using that in Streams, especially for new operators and configurations since we've been bitten a few times by a specific topology we don't have test coverage for running into some weird edge case. Would be nice if we could just append to this framework any time we add a new operator, and it will run tests with all configurations of that operator against all (or many/a randomized set) combinations of other operators > Add more integration tests for Table Table FK joins with repartitioning > --- > > Key: KAFKA-13268 > URL: https://issues.apache.org/jira/browse/KAFKA-13268 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: Guozhang Wang >Priority: Major > > We should add to the FK join multipartition integration test with a > Repartitioned for: > 1) just the new partition count > 2) a custom partitioner > This is to test if there's a bug where the internal topics don't pick up a > partitioner provided that way. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on a change in pull request #11218: MINOR: optimize performAssignment to skip unnecessary check
showuon commented on a change in pull request #11218: URL: https://github.com/apache/kafka/pull/11218#discussion_r703970666 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java ## @@ -48,6 +53,16 @@ public class ConsumerConfig extends AbstractConfig { private static final ConfigDef CONFIG; +// a list contains all the in-product assignor names. Should be updated when new assignor added. +// This is to help optimize the ConsumerCoordinator#performAssignment +public static final List IN_PRODUCT_ASSIGNOR_NAMES = Review comment: `ASSIGN_FROM_SUBSCRIBED_ASSIGNORS` sounds good to me. Updated. Let's wait for the jenkins build. Thank you! :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13248) Class name mismatch for LoggerFactory.getLogger method in TimeOrderedWindowStoreBuilder.java
[ https://issues.apache.org/jira/browse/KAFKA-13248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411631#comment-17411631 ] S. B. Hunter commented on KAFKA-13248: -- No. Go right ahead. Thank you for your handling my findings swiftly. > Class name mismatch for LoggerFactory.getLogger method in > TimeOrderedWindowStoreBuilder.java > > > Key: KAFKA-13248 > URL: https://issues.apache.org/jira/browse/KAFKA-13248 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: S. B. Hunter >Priority: Minor > Attachments: KAFKA-13248.1.patch > > > I have noticed a mismatch with the class name passed to the > LoggerFactory.getLogger method. This would make it hard to track the source > of log messages. > public class {color:#00875a}TimeOrderedWindowStoreBuilder{color} > extends AbstractStoreBuilder> { > private final Logger log = > LoggerFactory.getLogger({color:#de350b}WindowStoreBuilder{color}.class); > private final WindowBytesStoreSupplier storeSupplier; > public {color:#00875a}TimeOrderedWindowStoreBuilder{color}(final > WindowBytesStoreSupplier storeSupplier, > final Serde keySerde, -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #11278: KAFKA-12648: Enforce size limits for each task's cache
ableegoldman commented on a change in pull request #11278: URL: https://github.com/apache/kafka/pull/11278#discussion_r703973474 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java ## @@ -84,13 +85,34 @@ public synchronized void resize(final long newCacheSizeBytes) { while (sizeBytes() > maxCacheSizeBytes) { final NamedCache cache = circularIterator.next(); cache.evict(); -numEvicts++; +numEvicts.incrementAndGet(); } } else { log.debug("Cache size was expanded to {}", newCacheSizeBytes); } } +public synchronized void resize(final Map newCacheSizes) { +maxCacheSizeBytes = newCacheSizes.values().stream().reduce(0L, Long::sum); +log.debug("Cache size was changed to {}", newCacheSizes); +for (final Map.Entry taskMaxSize: newCacheSizes.entrySet()) { +for (final Map.Entry cache: caches.entrySet()) { +if (cache.getKey().contains(taskMaxSize.getKey())) { +cache.getValue().setMaxBytes(taskMaxSize.getValue()); +} +} +} +if (caches.values().isEmpty()) { +return; +} +final CircularIterator circularIterator = new CircularIterator<>(caches.values()); +while (sizeBytes() > maxCacheSizeBytes) { +final NamedCache cache = circularIterator.next(); +cache.evict(); +numEvicts.incrementAndGet(); +} Review comment: nit: we do this same thing in the other `#resize` for thread count changes, can you factor it out into a helper method? Then I think we can narrow the scope and make only that helper synchronized (should double check that though) ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java ## @@ -84,13 +85,34 @@ public synchronized void resize(final long newCacheSizeBytes) { while (sizeBytes() > maxCacheSizeBytes) { final NamedCache cache = circularIterator.next(); cache.evict(); -numEvicts++; +numEvicts.incrementAndGet(); } } else { log.debug("Cache size was expanded to {}", newCacheSizeBytes); } } +public synchronized void resize(final Map newCacheSizes) { +maxCacheSizeBytes = newCacheSizes.values().stream().reduce(0L, Long::sum); +log.debug("Cache size was changed to {}", newCacheSizes); +for (final Map.Entry taskMaxSize: newCacheSizes.entrySet()) { +for (final Map.Entry cache: caches.entrySet()) { +if (cache.getKey().contains(taskMaxSize.getKey())) { +cache.getValue().setMaxBytes(taskMaxSize.getValue()); +} +} +} +if (caches.values().isEmpty()) { Review comment: Any reason this checks emptiness of `caches.values()` instead of `caches.keys()`? ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java ## @@ -43,7 +44,7 @@ // internal stats private long numPuts = 0; private long numGets = 0; -private long numEvicts = 0; +private AtomicLong numEvicts = new AtomicLong(0); Review comment: why make this atomic, we're still only ever evicting/accessing this from the actual StreamThread right? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -502,7 +504,8 @@ public StreamThread(final Time time, this.assignmentErrorCode = assignmentErrorCode; this.shutdownErrorHook = shutdownErrorHook; this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler; -this.cacheResizer = cacheResizer; +this.threadCache = threadCache; +cacheSizes = new ConcurrentHashMap<>(); Review comment: Does this need to be a concurrent map? Seems to only be accessed by the StreamThread itself ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java ## @@ -247,4 +247,6 @@ default boolean commitRequested() { * @return This returns the time the task started idling. If it is not idling it returns empty. */ Optional timeCurrentIdlingStarted(); + +long maxBuffer(); Review comment: Should probably specify what kind of buffer in the name (esp. with KIP-770 adding another relevant buffer type) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11218: MINOR: optimize performAssignment to skip unnecessary check
ableegoldman commented on a change in pull request #11218: URL: https://github.com/apache/kafka/pull/11218#discussion_r703978721 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java ## @@ -48,6 +53,16 @@ public class ConsumerConfig extends AbstractConfig { private static final ConfigDef CONFIG; +// a list contains all the in-product assignor names. Should be updated when new assignor added. +// This is to help optimize the ConsumerCoordinator#performAssignment +public static final List IN_PRODUCT_ASSIGNOR_NAMES = Review comment: Alrighty, just give me a ping when the build is done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12963) Improve error message for Class cast exception
[ https://issues.apache.org/jira/browse/KAFKA-12963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411644#comment-17411644 ] A. Sophie Blee-Goldman commented on KAFKA-12963: {quote}I can imagine that a class cast exception of types A and B for processor KSTREAM-TRANSFORM-16 would be a bit of a head-scratcher {quote} Very fair, that's why we recommend naming all operators ;) . But you should be able to figure out what it is from the topology description if necessary, along with the upstream topics. I know it's not the best experience but I think trying to put the topic into this message would end up with a lot of checking specific topological patterns in a way that would be hard not to accidentally mess up when changing things in the future. Still, if you'd like to have a crack at it yourself then definitely go for it! Otherwise maybe split that into a separate feature request ticket and see if someone else does > Improve error message for Class cast exception > -- > > Key: KAFKA-12963 > URL: https://issues.apache.org/jira/browse/KAFKA-12963 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.7.0 >Reporter: Rasmus Helbig Hansen >Assignee: Andrew Lapidas >Priority: Minor > Fix For: 3.1.0 > > > After a topology change and starting the application again, we got this type > of error message: > [g9z-StreamThread-1] ERROR > org.apache.kafka.streams.processor.internals.TaskManager - stream-thread > [g9z-StreamThread-1] Failed to process stream task 1_12 due to the following > error: > org.apache.kafka.streams.errors.StreamsException: ClassCastException > invoking Processor. Do the Processor's input types match the deserialized > types? Check the Serde setup and change the default Serdes in StreamConfig or > provide correct Serdes via method parameters. Make sure the Processor can > accept the deserialized input of type key: org.acme.SomeKey, and value: > org.acme.SomeValue. > Note that although incorrect Serdes are a common cause of error, the cast > exception might have another cause (in user code, for example). For example, > if a processor wires in a store, but casts the generics incorrectly, a class > cast exception could be raised during processing, but the cause would not be > wrong Serdes. > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172) > at > org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55) > at > org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512) > Caused by: java.lang.ClassCastException: class org.acme.Som
[GitHub] [kafka] ableegoldman merged pull request #11214: KAFKA-12994 Migrate JoinWindowsTest and SessionWindowsTest to new API
ableegoldman merged pull request #11214: URL: https://github.com/apache/kafka/pull/11214 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13267) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
[ https://issues.apache.org/jira/browse/KAFKA-13267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13267: Component/s: streams > InvalidPidMappingException: The producer attempted to use a producer id which > is not currently assigned to its transactional id > --- > > Key: KAFKA-13267 > URL: https://issues.apache.org/jira/browse/KAFKA-13267 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Gilles Philippart >Priority: Major > > We're using Confluent Cloud and Kafka Streams 2.8.0 and we've seen these > errors pop up in apps using EOS: > {code:java} > InvalidPidMappingException: The producer attempted to use a producer id which > is not currently assigned to its transactional id > {code} > Full stack trace: > {code:java} > Error encountered sending record to topic ola-update-1 for task 4_7 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. Exception handler choose to FAIL the processing, no more > records would be sent. > RecordCollectorImpl.java 226 recordSendError(...) > RecordCollectorImpl.java:226:in `recordSendError' > RecordCollectorImpl.java 196 lambda$send$0(...) > RecordCollectorImpl.java:196:in `lambda$send$0' > KafkaProducer.java 1365 onCompletion(...) > KafkaProducer.java:1365:in `onCompletion' > ProducerBatch.java 231 completeFutureAndFireCallbacks(...) > ProducerBatch.java:231:in `completeFutureAndFireCallbacks' > ProducerBatch.java 159 abort(...) > ProducerBatch.java:159:in `abort' > RecordAccumulator.java 763 abortBatches(...) > RecordAccumulator.java:763:in `abortBatches' > More (5 lines) > Nested Exceptionsorg.apache.kafka.streams.errors.StreamsException: Error > encountered sending record to topic ola-update-1 for task 4_7 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. Exception handler choose to FAIL the processing, no more > records would be sent. > RecordCollectorImpl.java 226 recordSendError(...) > RecordCollectorImpl.java:226:in `recordSendError' > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > {code} > I've seen that KAFKA-6821 described the same problem on an earlier version of > Kafka and was closed due to the subsequent works on EOS. > Another ticket raised recently shows that the exception is still occurring > (but the ticket wasn't raised for that specific error): KAFKA-12774 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-13267) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id
[ https://issues.apache.org/jira/browse/KAFKA-13267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411666#comment-17411666 ] Matthias J. Sax commented on KAFKA-13267: - [~guozhang] [~hachikuji] – Given the error message, this might actually be a broker or producer bug, rather than a Stream issue? > InvalidPidMappingException: The producer attempted to use a producer id which > is not currently assigned to its transactional id > --- > > Key: KAFKA-13267 > URL: https://issues.apache.org/jira/browse/KAFKA-13267 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Gilles Philippart >Priority: Major > > We're using Confluent Cloud and Kafka Streams 2.8.0 and we've seen these > errors pop up in apps using EOS: > {code:java} > InvalidPidMappingException: The producer attempted to use a producer id which > is not currently assigned to its transactional id > {code} > Full stack trace: > {code:java} > Error encountered sending record to topic ola-update-1 for task 4_7 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. Exception handler choose to FAIL the processing, no more > records would be sent. > RecordCollectorImpl.java 226 recordSendError(...) > RecordCollectorImpl.java:226:in `recordSendError' > RecordCollectorImpl.java 196 lambda$send$0(...) > RecordCollectorImpl.java:196:in `lambda$send$0' > KafkaProducer.java 1365 onCompletion(...) > KafkaProducer.java:1365:in `onCompletion' > ProducerBatch.java 231 completeFutureAndFireCallbacks(...) > ProducerBatch.java:231:in `completeFutureAndFireCallbacks' > ProducerBatch.java 159 abort(...) > ProducerBatch.java:159:in `abort' > RecordAccumulator.java 763 abortBatches(...) > RecordAccumulator.java:763:in `abortBatches' > More (5 lines) > Nested Exceptionsorg.apache.kafka.streams.errors.StreamsException: Error > encountered sending record to topic ola-update-1 for task 4_7 due to: > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. Exception handler choose to FAIL the processing, no more > records would be sent. > RecordCollectorImpl.java 226 recordSendError(...) > RecordCollectorImpl.java:226:in `recordSendError' > org.apache.kafka.common.errors.InvalidPidMappingException: The producer > attempted to use a producer id which is not currently assigned to its > transactional id. > {code} > I've seen that KAFKA-6821 described the same problem on an earlier version of > Kafka and was closed due to the subsequent works on EOS. > Another ticket raised recently shows that the exception is still occurring > (but the ticket wasn't raised for that specific error): KAFKA-12774 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10038) ConsumerPerformance.scala supports the setting of client.id
[ https://issues.apache.org/jira/browse/KAFKA-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411669#comment-17411669 ] Yanwen Lin commented on KAFKA-10038: Hey [~guozhang], per this wiki: [https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest], when a PR is merged, the related Jira ticket will also be closed automatically. The PR in this ticket is linked and is already merged (PR: [https://github.com/apache/kafka/pull/11297]), but this ticket status is still PATCH AVAILABLE? Is there anything I missed? Or should I close this ticket manually? Thanks! > ConsumerPerformance.scala supports the setting of client.id > --- > > Key: KAFKA-10038 > URL: https://issues.apache.org/jira/browse/KAFKA-10038 > Project: Kafka > Issue Type: Improvement > Components: consumer, core >Affects Versions: 2.1.1 > Environment: Trunk branch >Reporter: tigertan >Assignee: Yanwen Lin >Priority: Minor > Labels: newbie, performance > > ConsumerPerformance.scala supports the setting of "client.id", which is a > reasonable requirement, and the way "console consumer" and "console producer" > handle "client.id" can be unified. "client.id" defaults to > "perf-consumer-client". > We often use client.id in quotas, if the script of > kafka-producer-perf-test.sh supports the setting of "client.id" , we can do > quota testing through scripts without writing our own consumer programs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10038) ConsumerPerformance.scala supports the setting of client.id
[ https://issues.apache.org/jira/browse/KAFKA-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411674#comment-17411674 ] Guozhang Wang commented on KAFKA-10038: --- Yeah that should happen.. let's wait for a bit for github bot. Or you can close it yourself too. > ConsumerPerformance.scala supports the setting of client.id > --- > > Key: KAFKA-10038 > URL: https://issues.apache.org/jira/browse/KAFKA-10038 > Project: Kafka > Issue Type: Improvement > Components: consumer, core >Affects Versions: 2.1.1 > Environment: Trunk branch >Reporter: tigertan >Assignee: Yanwen Lin >Priority: Minor > Labels: newbie, performance > > ConsumerPerformance.scala supports the setting of "client.id", which is a > reasonable requirement, and the way "console consumer" and "console producer" > handle "client.id" can be unified. "client.id" defaults to > "perf-consumer-client". > We often use client.id in quotas, if the script of > kafka-producer-perf-test.sh supports the setting of "client.id" , we can do > quota testing through scripts without writing our own consumer programs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10038) ConsumerPerformance.scala supports the setting of client.id
[ https://issues.apache.org/jira/browse/KAFKA-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411675#comment-17411675 ] Yanwen Lin commented on KAFKA-10038: Sure, thanks for your super fast reply. Let's wait at least for a while. > ConsumerPerformance.scala supports the setting of client.id > --- > > Key: KAFKA-10038 > URL: https://issues.apache.org/jira/browse/KAFKA-10038 > Project: Kafka > Issue Type: Improvement > Components: consumer, core >Affects Versions: 2.1.1 > Environment: Trunk branch >Reporter: tigertan >Assignee: Yanwen Lin >Priority: Minor > Labels: newbie, performance > > ConsumerPerformance.scala supports the setting of "client.id", which is a > reasonable requirement, and the way "console consumer" and "console producer" > handle "client.id" can be unified. "client.id" defaults to > "perf-consumer-client". > We often use client.id in quotas, if the script of > kafka-producer-perf-test.sh supports the setting of "client.id" , we can do > quota testing through scripts without writing our own consumer programs. -- This message was sent by Atlassian Jira (v8.3.4#803005)