[GitHub] [kafka] showuon commented on a change in pull request #11288: MINOR: Fix error response generation

2021-09-07 Thread GitBox


showuon commented on a change in pull request #11288:
URL: https://github.com/apache/kafka/pull/11288#discussion_r703239913



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/DescribeProducersRequest.java
##
@@ -80,6 +80,7 @@ public DescribeProducersResponse getErrorResponse(int 
throttleTimeMs, Throwable
 .setErrorCode(error.code())

Review comment:
   should we add `.setErrorMessage(error.message())` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11288: MINOR: Fix error response generation

2021-09-07 Thread GitBox


showuon commented on pull request #11288:
URL: https://github.com/apache/kafka/pull/11288#issuecomment-914059309


   Also, this PR title should be updated to link to JIRA tickets. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #11301: KAFKA-13276: Prefer KafkaFuture in admin Result constructors

2021-09-07 Thread GitBox


tombentley commented on pull request #11301:
URL: https://github.com/apache/kafka/pull/11301#issuecomment-914082991


   @dajac @showuon thanks, I've addressed your comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on a change in pull request #11301: KAFKA-13276: Prefer KafkaFuture in admin Result constructors

2021-09-07 Thread GitBox


tombentley commented on a change in pull request #11301:
URL: https://github.com/apache/kafka/pull/11301#discussion_r703274931



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiFuture.java
##
@@ -109,7 +110,7 @@ private void completeExceptionally(K key, Throwable t) {
 }
 
 private KafkaFutureImpl futureOrThrow(K key) {
-KafkaFutureImpl future = futures.get(key);
+KafkaFutureImpl future = (KafkaFutureImpl) futures.get(key);

Review comment:
   The typecast isn't actually necessary if we're willing to declare 
`public Map> all()` in `SimpleAdminApiFuture` and 
also in a handful of the `*Result` classes which use the 
`SimpleAdminApiFuture`. Those `*Result` classes have package access 
constructors so it would be compatible to do that. And that's actually the 
correct way to solve this. But I think there are other `Result` classes where 
that approach, if used more consistently in the other `*Result`, would be 
problematic. The precedent was set a long time ago to just use plain `Map` rather than the more correct `Map` in the Result classes. 
So I guess the typecast is the lesser evil, and I'm OK with a comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] patrickstuedi commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

2021-09-07 Thread GitBox


patrickstuedi commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r703309462



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
 }
 }
 
+private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
   @vamossagar12 great to see you're about to add direct buffer support, I 
was actually also looking into this. Just a few comments: as pointed out 
already, one important aspect to keep in mind with direct buffers is that 
allocation and disposal is more expensive than with regular heap buffers, so 
it's important to re-use them, it's common practice to keep a pool of direct 
buffers around and re-use them on a per-call basis (most I/O libraries to this, 
e.g., netty, Crail, etc.).  You can keep queues of free and used buffers and 
move them around during state store operations. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] patrickstuedi commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

2021-09-07 Thread GitBox


patrickstuedi commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r703312659



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
 }
 }
 
+private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
   Also, we should try to avoid serializing data into byte[] arrays and 
then copy the data into directBuffers. Instead we should serialize directly 
into "direct" ByteBuffers. For this we might need to have RocksDBStore 
implement a ByteBuffer interface, e.g., KeyValueStore, or 
anything similar...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

2021-09-07 Thread GitBox


vamossagar12 commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r70661



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
 }
 }
 
+private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
   > @vamossagar12 great to see you're about to add direct buffer support, 
I was actually also looking into this. Just a few comments: as pointed out 
already, one important aspect to keep in mind with direct buffers is that 
allocation and disposal is more expensive than with regular heap buffers, so 
it's important to re-use them, it's common practice to keep a pool of direct 
buffers around and re-use them on a per-call basis (most I/O libraries to this, 
e.g., netty, Crail, etc.). You can keep queues of free and used buffers and 
move them around during state store operations.
   
   Thanks @patrickstuedi , that's a great suggestion. I think it makes sense 
given the expense in allocation/disposal. Couple of questions there:
   
   1)  I think the factor to consider there would be how to manage the pool of 
used/free buffers? As already pointed by @cadonna  above, the only case where 
we can expect concurrent access to state stores would be for IQ and otherwise 
it's single threaded. For the single threaded case, it's going to be straight 
forward but for IQ, what happens if a request comes in and none of the direct 
byte buffers are `free` ? Do we re-allocate on the fly?
   2) This might be a small issue but calling it out. We need to allocate some 
capacity to the direct byte buffer. Currently, I am iterating over the list of 
keys and values and assigning the max length- which again mayn't be the most 
efficient way. If I were to create the pool, then I would need to know the 
capacity to allocate upfront. Can this be exposed as a config for the users in 
that case? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

2021-09-07 Thread GitBox


vamossagar12 commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r703336198



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
 }
 }
 
+private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
   > Also, we should try to avoid serializing data into byte[] arrays and 
then copy the data into directBuffers. Instead we should serialize directly 
into "direct" ByteBuffers. For this we might need to have RocksDBStore 
implement a ByteBuffer interface, e.g., KeyValueStore, or 
anything similar...
   
   Yeah.. I did play around with that idea, but the idea was to not change the 
public APIs. If that makes sense and I can do another round of benchmarking for 
that, then probably, we will need a KIP.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

2021-09-07 Thread GitBox


vamossagar12 commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r703336198



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
 }
 }
 
+private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
   > Also, we should try to avoid serializing data into byte[] arrays and 
then copy the data into directBuffers. Instead we should serialize directly 
into "direct" ByteBuffers. For this we might need to have RocksDBStore 
implement a ByteBuffer interface, e.g., KeyValueStore, or 
anything similar...
   
   Yeah.. I did play around with that idea, but the idea was to not change the 
public APIs. If that makes sense and I can do another round of benchmarking for 
that, then probably, we will need a KIP.
   
   Infact, we could even go one step ahead and add support for only 
DirectByteBuffers if that makes sense. We can throw errors if the ByteBuffer 
isn't direct for example- similar to the rocksdb implementation.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apenam opened a new pull request #11305: Make classpath smaller

2021-09-07 Thread GitBox


apenam opened a new pull request #11305:
URL: https://github.com/apache/kafka/pull/11305


   Fix `The input line is too long. The syntax of the command is incorrect`  
error when running on a windows environment.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] TomerWizman commented on a change in pull request #11250: Kafka 12766 - Disabling WAL-related Options in RocksDB

2021-09-07 Thread GitBox


TomerWizman commented on a change in pull request #11250:
URL: https://github.com/apache/kafka/pull/11250#discussion_r703348241



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##
@@ -249,7 +251,7 @@ public int maxFileOpeningThreads() {
 
 @Override
 public Options setMaxTotalWalSize(final long maxTotalWalSize) {
-dbOptions.setMaxTotalWalSize(maxTotalWalSize);
+LOGGER.warn("WAL is explicitly disabled by Streams in RocksDB. Setting 
option 'maxTotalWalSize' will be ignored");

Review comment:
   @cadonna 
   Sorry for the late reply, was on vacation.
   I added the rest of the wal options to the test. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on a change in pull request #11302: KAFKA-13243: KIP-773 Differentiate metric latency measured in ms and ns

2021-09-07 Thread GitBox


tombentley commented on a change in pull request #11302:
URL: https://github.com/apache/kafka/pull/11302#discussion_r703359283



##
File path: clients/src/main/java/org/apache/kafka/common/network/Selector.java
##
@@ -1272,12 +1274,27 @@ private Meter createMeter(Metrics metrics, String 
groupName,  Map metricTags,
+/**
+ * This method generates `time-total` metrics with a couple of errors, 
no `-ns` suffix and no dash between basename

Review comment:
   ```suggestion
* This method generates `time-total` metrics but has a couple of 
deficiencies: no `-ns` suffix and no dash between basename
   ```

##
File path: docs/ops.html
##
@@ -1890,6 +1890,16 @@ Common monitoring me
 The average length of time the I/O thread spent waiting for a 
socket ready for reads or writes in nanoseconds.
 
kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
   
+  
+io-wait-time-ns-total
+The total time the I/O thread spent waiting in nanoseconds
+
kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
+  
+  
+io-waittime-total
+*Deprecated* The total time the I/O thread spent waiting in 
nanoseconds

Review comment:
   No harm in mentioning that the replacement is `io-wait-time-ns-total`.

##
File path: docs/ops.html
##
@@ -1900,6 +1910,16 @@ Common monitoring me
 The average length of time for I/O per select call in 
nanoseconds.
 
kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
   
+  
+io-time-ns-total
+The total time the I/O thread spent doing I/O in nanoseconds
+
kafka.[producer|consumer|connect]:type=[producer|consumer|connect]-metrics,client-id=([-.\w]+)
+  
+  
+iotime-total
+*Deprecated* The total time the I/O thread spent doing I/O 
in nanoseconds

Review comment:
   Again, mention the replacement.

##
File path: docs/ops.html
##
@@ -2072,6 +2092,16 @@ <
 The fraction of time an appender waits for space allocation.
 kafka.producer:type=producer-metrics,client-id=([-.\w]+)
   
+  
+bufferpool-wait-time-total
+*Deprecated* The total time an appender waits for space 
allocation in nanoseconds.

Review comment:
   Same comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #11254: KAFKA-2424: Introduce Scalafix linter

2021-09-07 Thread GitBox


jlprat commented on pull request #11254:
URL: https://github.com/apache/kafka/pull/11254#issuecomment-914189101


   Rebased the changes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #11250: Kafka 12766 - Disabling WAL-related Options in RocksDB

2021-09-07 Thread GitBox


cadonna commented on a change in pull request #11250:
URL: https://github.com/apache/kafka/pull/11250#discussion_r703386486



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
##
@@ -126,7 +147,7 @@ private void verifyDBOptionsMethodCall(final Method method) 
throws Exception {
 } catch (final InvocationTargetException undeclaredMockMethodCall) {
 assertThat(undeclaredMockMethodCall.getCause(), 
instanceOf(AssertionError.class));
 assertThat(undeclaredMockMethodCall.getCause().getMessage().trim(),
-matchesPattern("Unexpected method call DBOptions\\." + 
method.getName() + "((.*\n*)*):"));
+matchesPattern("Unexpected method call DBOptions\\." + 
method.getName() + "((.*\n*)*):"));

Review comment:
   Could you please revert this change since it is unrelated to the PR?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
##
@@ -93,7 +114,7 @@ public void shouldOverwriteAllOptionsMethods() throws 
Exception {
 for (final Method method : Options.class.getMethods()) {
 if (!ignoreMethods.contains(method.getName())) {
 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class
-.getDeclaredMethod(method.getName(), 
method.getParameterTypes());
+.getDeclaredMethod(method.getName(), 
method.getParameterTypes());

Review comment:
   Could you please revert this change since it is unrelated to the PR?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
##
@@ -306,4 +327,33 @@ public String name() {
 
 return parameters;
 }
+
+@Test
+public void shouldLogWarningWhenSettingWalOptions() throws Exception {
+
+try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class))
 {
+
+final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter 
adapter
+= new 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), new 
ColumnFamilyOptions());
+
+for (final Method method : 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class.getDeclaredMethods())
 {
+if (walRelatedMethods.contains(method.getName())) {
+method.invoke(adapter, 
getDBOptionsParameters(method.getParameterTypes()));
+}
+}
+
+final List walOptions = Arrays.asList("walDir", 
"walFilter", "walRecoveryMode", "walBytesPerSync", "walSizeLimitMB", 
"manualWalFlush", "maxTotalWalSize", "walTtlSeconds");

Review comment:
   nit: Would it be possible to extract this options from 
`walRelatedMethods`? 

##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
##
@@ -113,7 +134,7 @@ public void shouldForwardAllDbOptionsCalls() throws 
Exception {
 private void verifyDBOptionsMethodCall(final Method method) throws 
Exception {
 final DBOptions mockedDbOptions = mock(DBOptions.class);
 final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter 
optionsFacadeDbOptions
-= new 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(mockedDbOptions, new 
ColumnFamilyOptions());
+= new 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(mockedDbOptions, new 
ColumnFamilyOptions());

Review comment:
   Could you please revert this change since it is unrelated to the PR?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
##
@@ -218,7 +239,7 @@ public void shouldForwardAllColumnFamilyCalls() throws 
Exception {
 private void verifyColumnFamilyOptionsMethodCall(final Method method) 
throws Exception {
 final ColumnFamilyOptions mockedColumnFamilyOptions = 
mock(ColumnFamilyOptions.class);
 final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter 
optionsFacadeColumnFamilyOptions
-= new 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), 
mockedColumnFamilyOptions);
+= new 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), 
mockedColumnFamilyOptions);

Review comment:
   Could you please revert this change since it is unrelated to the PR?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
##
@@ -306,4 +327,33 @@ public String name()

[GitHub] [kafka] jlprat commented on pull request #11302: KAFKA-13243: KIP-773 Differentiate metric latency measured in ms and ns

2021-09-07 Thread GitBox


jlprat commented on pull request #11302:
URL: https://github.com/apache/kafka/pull/11302#issuecomment-914195040


   Thanks for your feedback @tombentley . You feel free to review the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] patrickstuedi commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

2021-09-07 Thread GitBox


patrickstuedi commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r703439679



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
 }
 }
 
+private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
   Re (1) if all use cases are single threaded then yes we can allocate 
some buffer(s) as part of the store. Otherwise, if you need to support multiple 
concurrent ops then you could pre-populate a queue with a N buffers, and N 
becomes the maximum number of concurrent requests you can server. If you're 
queue is empty then a request would have to wait until at least one of the 
outstanding requests completes and add its buffer to the queue. Again that 
might all not be needed given the API is single-threaded. 

   Re (2), is there a max-size, maybe given by the maximum Kafka message size 
that is configured (if such a limit exists and is not too big)? 
   
   If we don't want to change the API (I guess it would be the RocksDBStore 
interface that would be changed which is not exposed I think, but still) then 
splitting this work into part I where we copy heap to direct buffers, and then 
a part II where we directly serialize into direct buffers is a way to go.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] TomerWizman commented on a change in pull request #11250: Kafka 12766 - Disabling WAL-related Options in RocksDB

2021-09-07 Thread GitBox


TomerWizman commented on a change in pull request #11250:
URL: https://github.com/apache/kafka/pull/11250#discussion_r703445569



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
##
@@ -93,7 +114,7 @@ public void shouldOverwriteAllOptionsMethods() throws 
Exception {
 for (final Method method : Options.class.getMethods()) {
 if (!ignoreMethods.contains(method.getName())) {
 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class
-.getDeclaredMethod(method.getName(), 
method.getParameterTypes());
+.getDeclaredMethod(method.getName(), 
method.getParameterTypes());

Review comment:
   @cadonna 
   done, no idea how it came in...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] TomerWizman commented on a change in pull request #11250: Kafka 12766 - Disabling WAL-related Options in RocksDB

2021-09-07 Thread GitBox


TomerWizman commented on a change in pull request #11250:
URL: https://github.com/apache/kafka/pull/11250#discussion_r703445686



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
##
@@ -113,7 +134,7 @@ public void shouldForwardAllDbOptionsCalls() throws 
Exception {
 private void verifyDBOptionsMethodCall(final Method method) throws 
Exception {
 final DBOptions mockedDbOptions = mock(DBOptions.class);
 final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter 
optionsFacadeDbOptions
-= new 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(mockedDbOptions, new 
ColumnFamilyOptions());
+= new 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(mockedDbOptions, new 
ColumnFamilyOptions());

Review comment:
   @cadonna 
   done, no idea how it came in...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] TomerWizman commented on a change in pull request #11250: Kafka 12766 - Disabling WAL-related Options in RocksDB

2021-09-07 Thread GitBox


TomerWizman commented on a change in pull request #11250:
URL: https://github.com/apache/kafka/pull/11250#discussion_r703445767



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
##
@@ -126,7 +147,7 @@ private void verifyDBOptionsMethodCall(final Method method) 
throws Exception {
 } catch (final InvocationTargetException undeclaredMockMethodCall) {
 assertThat(undeclaredMockMethodCall.getCause(), 
instanceOf(AssertionError.class));
 assertThat(undeclaredMockMethodCall.getCause().getMessage().trim(),
-matchesPattern("Unexpected method call DBOptions\\." + 
method.getName() + "((.*\n*)*):"));
+matchesPattern("Unexpected method call DBOptions\\." + 
method.getName() + "((.*\n*)*):"));

Review comment:
   @cadonna 
   done, no idea how it came in...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] TomerWizman commented on a change in pull request #11250: Kafka 12766 - Disabling WAL-related Options in RocksDB

2021-09-07 Thread GitBox


TomerWizman commented on a change in pull request #11250:
URL: https://github.com/apache/kafka/pull/11250#discussion_r703454226



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
##
@@ -306,4 +327,33 @@ public String name() {
 
 return parameters;
 }
+
+@Test
+public void shouldLogWarningWhenSettingWalOptions() throws Exception {
+
+try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class))
 {
+
+final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter 
adapter
+= new 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), new 
ColumnFamilyOptions());
+
+for (final Method method : 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class.getDeclaredMethods())
 {
+if (walRelatedMethods.contains(method.getName())) {
+method.invoke(adapter, 
getDBOptionsParameters(method.getParameterTypes()));
+}
+}
+
+final List walOptions = Arrays.asList("walDir", 
"walFilter", "walRecoveryMode", "walBytesPerSync", "walSizeLimitMB", 
"manualWalFlush", "maxTotalWalSize", "walTtlSeconds");

Review comment:
   @cadonna 
   I Thought about it also.
   To do that I need to remove the "set" from the method name and to 
de-capitalize the first letter of the option. 
   I think this is a bit overkill because:
   1. I got checkstyle errors trying to use something like toLowerCase()... 
   2. Its going to be ugly to do it without additional StringUtils helper libs 
(like apache commons-lang StringUtils which I saw we dont have)





-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #11250: Kafka 12766 - Disabling WAL-related Options in RocksDB

2021-09-07 Thread GitBox


cadonna commented on a change in pull request #11250:
URL: https://github.com/apache/kafka/pull/11250#discussion_r703455477



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java
##
@@ -306,4 +327,33 @@ public String name() {
 
 return parameters;
 }
+
+@Test
+public void shouldLogWarningWhenSettingWalOptions() throws Exception {
+
+try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class))
 {
+
+final RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter 
adapter
+= new 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter(new DBOptions(), new 
ColumnFamilyOptions());
+
+for (final Method method : 
RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.class.getDeclaredMethods())
 {
+if (walRelatedMethods.contains(method.getName())) {
+method.invoke(adapter, 
getDBOptionsParameters(method.getParameterTypes()));
+}
+}
+
+final List walOptions = Arrays.asList("walDir", 
"walFilter", "walRecoveryMode", "walBytesPerSync", "walSizeLimitMB", 
"manualWalFlush", "maxTotalWalSize", "walTtlSeconds");

Review comment:
   Fair enough!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

2021-09-07 Thread GitBox


vamossagar12 commented on a change in pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#discussion_r703469853



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##
@@ -505,6 +506,14 @@ private void closeOpenIterators() {
 }
 }
 
+private ByteBuffer createDirectByteBufferAndPut(byte[] bytes) {
+ByteBuffer directBuffer = ByteBuffer.allocateDirect(bytes.length);

Review comment:
   I see. I think barring Interactive Queries, the other accesses are 
single threaded. 
   Regarding the max-size, TBH i am not aware. 
   Also, regarding changing the API, yeah rocksdb store isn't exposed so the 
new apis will have to bein KVStore with default implementation. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13262) Mock Clients Now Have Final close() Methods

2021-09-07 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13262?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-13262:
---

Assignee: Ismael Juma

> Mock Clients Now Have Final close() Methods
> ---
>
> Key: KAFKA-13262
> URL: https://issues.apache.org/jira/browse/KAFKA-13262
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: Gary Russell
>Assignee: Ismael Juma
>Priority: Blocker
>
> Subclasses can no longer clean up resources when the consumer is closed.
> Caused by 
> https://github.com/apache/kafka/commit/2f3600198722dd5a01a210bc78b7d43b33967c7f



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma opened a new pull request #11306: MINOR: Update build dependencies (Q3 2021)

2021-09-07 Thread GitBox


ijuma opened a new pull request #11306:
URL: https://github.com/apache/kafka/pull/11306


   TBD: release notes, etc
   
   Note that checkstyle will be upgraded in a separate PR as it requires
   code changes in a large number of files.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #11307: KAFKA-13262: Remove final from `MockConsumer.close()` and delegate implementation

2021-09-07 Thread GitBox


ijuma opened a new pull request #11307:
URL: https://github.com/apache/kafka/pull/11307


   I added the final via 2f3600198722 to catch overriding mistakes
   since the implementation was moved from the deprecated and
   overloaded `close` with two parameters to the no-arg
   `close`.
   
   I didn't realize then that `MockConsumer` is a public
   API (seems like a bit of a mistake since we tweak the
   implementation and sometimes adds methods without a KIP).
   
   Given that this is a public API, I have also moved the implementation
   of `close` to the one arg overload. This makes it easier for a
   subclass to have specific overriding behavior depending on the
   timeout.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11307: KAFKA-13262: Remove final from `MockConsumer.close()` and delegate implementation

2021-09-07 Thread GitBox


dajac commented on a change in pull request #11307:
URL: https://github.com/apache/kafka/pull/11307#discussion_r703548948



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
##
@@ -446,8 +447,8 @@ public synchronized void resume(Collection 
partitions) {
 }
 
 @Override
-public final synchronized void close() {
-this.closed = true;
+public synchronized void close() {

Review comment:
   Should we move the `synchronized` to the other `close` method or add it 
there as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11307: KAFKA-13262: Remove final from `MockConsumer.close()` and delegate implementation

2021-09-07 Thread GitBox


ijuma commented on a change in pull request #11307:
URL: https://github.com/apache/kafka/pull/11307#discussion_r703555946



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
##
@@ -446,8 +447,8 @@ public synchronized void resume(Collection 
partitions) {
 }
 
 @Override
-public final synchronized void close() {
-this.closed = true;
+public synchronized void close() {

Review comment:
   Makes sense, done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

2021-09-07 Thread GitBox


C0urante commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-914348989


   @rhauch Overall that looks good to me. It's an elegant solution to the 
tricky problem you noted about the opacity of task-provided source offsets 
w/r/t ordering.
   
   I'm a little worried about offset commits taking longer and longer with the 
more sophisticated approach you proposed (where we would unconditionally 
iterate over every record in the batch, instead of only until the first 
unacknowledged record). It's true that there would be natural back pressure 
from the producer as its `buffer.memory` gets eaten up, but with the default of 
32MB, it still seems possible for a large number of unacknowledged records to 
build up. If this does happen, then offset commits may end up exceeding the 
`offset.flush.timeout.ms` for the worker, which may cause issues with the 
current model where a single shared, worker-global thread is used for offset 
commits of all tasks.
   
   If this is a valid concern and we'd like to take it into account for now, I 
can think of a couple ways to handle it off the top of my head:
   1. Use the simpler approach that blocks offset commits across the board if a 
single record remains unacknowledged for a long period of time (which may 
realistically be a problem if a single partition out of many is unavailable for 
some reason).
   2. Enable concurrent offset commits by multiple tasks.
   3. Instead of a single dequeue per task, use a `ConcurrentMap, Queue>` that stores a single dequeue per unique source 
partition. This would allow us to iterate over the bare minimum number of 
records for every single offset commit and not spend time, for example, on 
accumulated records for unavailable Kafka partitions. We'd still have to 
iterate over those records eventually if the Kafka partition came back online, 
but that iteration would only ever occur once, instead of once for every offset 
commit.
   
   I think option 3 may be warranted, although it's still possible that offset 
commits take a long time if 32MB worth of records end up getting queued. Option 
2 may be worth implementing or at least considering as a follow-up item to 
handle this case.
   
   Thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13243) Differentiate metric latency measured in millis and nanos

2021-09-07 Thread Josep Prat (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josep Prat updated KAFKA-13243:
---
Fix Version/s: 3.1.0

> Differentiate metric latency measured in millis and nanos
> -
>
> Key: KAFKA-13243
> URL: https://issues.apache.org/jira/browse/KAFKA-13243
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Reporter: Guozhang Wang
>Assignee: Josep Prat
>Priority: Major
>  Labels: needs-kip
> Fix For: 3.1.0
>
>
> Today most of the client latency metrics are measured in millis, and some in 
> nanos. For those measured in nanos we usually differentiate them by having a 
> `-ns` suffix in the metric names, e.g. `io-wait-time-ns-avg` and 
> `io-time-ns-avg`. But there are a few that we obviously forgot to follow this 
> pattern, e.g. `io-wait-time-total`: it is inconsistent where `avg` has `-ns` 
> suffix and `total` has not. I did a quick search and found just three of them:
> * bufferpool-wait-time-total -> bufferpool-wait-time-ns-total
> * io-wait-time-total -> io-wait-time-ns-total
> * iotime-total -> io-time-ns-total (note that there are two inconsistencies 
> on naming, the average metric is `io-time-ns-avg` whereas total is 
> `iotime-total`, I suggest we use `io-time` instead of `iotime` for both).
> We should change their name accordingly with the `-ns` suffix as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] viktorsomogyi commented on pull request #11276: KAFKA-13240: Disable HTTP TRACE Method in Connect

2021-09-07 Thread GitBox


viktorsomogyi commented on pull request #11276:
URL: https://github.com/apache/kafka/pull/11276#issuecomment-914396172


   Rebased the build, it was conflicting with another change that replaced 
EasyMock/Powermock with Mockito so I had to change that part of my tests too. 
(Ran them locally successfully)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11213: KAFKA-13201: Convert KTable suppress to new PAPI

2021-09-07 Thread GitBox


vvcephei commented on a change in pull request #11213:
URL: https://github.com/apache/kafka/pull/11213#discussion_r703626660



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
##
@@ -150,16 +153,16 @@ public void init(final 
org.apache.kafka.streams.processor.ProcessorContext conte
 }
 
 @Override
-public void process(final K key, final Change value) {
+public void process(final Record> record) {
 observedStreamTime = Math.max(observedStreamTime, 
internalProcessorContext.timestamp());
-buffer(key, value);
+buffer(record);
 enforceConstraints();
 }
 
-private void buffer(final K key, final Change value) {
-final long bufferTime = 
bufferTimeDefinition.time(internalProcessorContext, key);
+private void buffer(final Record> record) {
+final long bufferTime = 
bufferTimeDefinition.time(internalProcessorContext, record.key());
 
-buffer.put(bufferTime, key, value, 
internalProcessorContext.recordContext());
+buffer.put(bufferTime, record.key(), record.value(), 
internalProcessorContext.recordContext());

Review comment:
   Thanks, @jeqo , I can see that my tone in that comment didn't encompass 
the scope of the change I was proposing.
   
   I'm fine if we want to delay that refactor for later work and focus on the 
Processor API itself in this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11213: KAFKA-13201: Convert KTable suppress to new PAPI

2021-09-07 Thread GitBox


vvcephei commented on a change in pull request #11213:
URL: https://github.com/apache/kafka/pull/11213#discussion_r703627436



##
File path: 
streams/src/test/java/org/apache/kafka/test/MockInternalNewProcessorContext.java
##
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.processor.MockProcessorContext;

Review comment:
   Sounds good. I think we can afford to cross that bridge later on as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13277) Serialization of long tagged string in request/response throws BufferOverflowException

2021-09-07 Thread Rajini Sivaram (Jira)
Rajini Sivaram created KAFKA-13277:
--

 Summary: Serialization of long tagged string in request/response 
throws BufferOverflowException
 Key: KAFKA-13277
 URL: https://issues.apache.org/jira/browse/KAFKA-13277
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 3.0.1


Size computation for tagged strings in the message generator is incorrect and 
hence it works only for small strings (126 bytes or so) where the length 
happens to be correct.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma merged pull request #11303: MINOR: Upgrade compression libraries (Q3 2021)

2021-09-07 Thread GitBox


ijuma merged pull request #11303:
URL: https://github.com/apache/kafka/pull/11303


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13277) Serialization of long tagged string in request/response throws BufferOverflowException

2021-09-07 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411309#comment-17411309
 ] 

Ismael Juma commented on KAFKA-13277:
-

[~rsivaram] isn't this a blocker for 3.0?

> Serialization of long tagged string in request/response throws 
> BufferOverflowException
> --
>
> Key: KAFKA-13277
> URL: https://issues.apache.org/jira/browse/KAFKA-13277
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 3.0.1
>
>
> Size computation for tagged strings in the message generator is incorrect and 
> hence it works only for small strings (126 bytes or so) where the length 
> happens to be correct.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #11213: KAFKA-13201: Convert KTable suppress to new PAPI

2021-09-07 Thread GitBox


vvcephei commented on a change in pull request #11213:
URL: https://github.com/apache/kafka/pull/11213#discussion_r703630483



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
##
@@ -198,7 +201,7 @@ private void emit(final 
TimeOrderedKeyValueBuffer.Eviction toEmit) {
 final ProcessorRecordContext prevRecordContext = 
internalProcessorContext.recordContext();
 
internalProcessorContext.setRecordContext(toEmit.recordContext());
 try {
-internalProcessorContext.forward(toEmit.key(), 
toEmit.value());
+internalProcessorContext.forward(toEmit.record());

Review comment:
   I think this might actually be a bug. IIRC, the Record forward call 
overrides the context, so you might have to actually set all the fields in 
Record from the context when you construct it in `toEmit.record()`. 
Specifically, I think we might be dropping the headers here.
   
   I might also be wrong, so you might want to double-check me first.
   
   It looks like we were never testing whether we propagate headers through 
Suppress. If it turns out that we never were, then there's also no problem. The 
case to look out for is if we _were_, but now we no longer are.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13277) Serialization of long tagged string in request/response throws BufferOverflowException

2021-09-07 Thread Rajini Sivaram (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411311#comment-17411311
 ] 

Rajini Sivaram commented on KAFKA-13277:


[~ijuma] Yes, it will be good include in 3.0.

> Serialization of long tagged string in request/response throws 
> BufferOverflowException
> --
>
> Key: KAFKA-13277
> URL: https://issues.apache.org/jira/browse/KAFKA-13277
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 3.0.1
>
>
> Size computation for tagged strings in the message generator is incorrect and 
> hence it works only for small strings (126 bytes or so) where the length 
> happens to be correct.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13277) Serialization of long tagged string in request/response throws BufferOverflowException

2021-09-07 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-13277:
---
Priority: Blocker  (was: Major)

> Serialization of long tagged string in request/response throws 
> BufferOverflowException
> --
>
> Key: KAFKA-13277
> URL: https://issues.apache.org/jira/browse/KAFKA-13277
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 3.0.0
>
>
> Size computation for tagged strings in the message generator is incorrect and 
> hence it works only for small strings (126 bytes or so) where the length 
> happens to be correct.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13277) Serialization of long tagged string in request/response throws BufferOverflowException

2021-09-07 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-13277:
---
Fix Version/s: (was: 3.0.1)
   3.0.0

> Serialization of long tagged string in request/response throws 
> BufferOverflowException
> --
>
> Key: KAFKA-13277
> URL: https://issues.apache.org/jira/browse/KAFKA-13277
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 3.0.0
>
>
> Size computation for tagged strings in the message generator is incorrect and 
> hence it works only for small strings (126 bytes or so) where the length 
> happens to be correct.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13266) `InitialFetchState` should be created after partition is removed from the fetchers

2021-09-07 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-13266:

Priority: Blocker  (was: Critical)

> `InitialFetchState` should be created after partition is removed from the 
> fetchers
> --
>
> Key: KAFKA-13266
> URL: https://issues.apache.org/jira/browse/KAFKA-13266
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Blocker
> Fix For: 3.0.0
>
>
>  `ReplicationTest.test_replication_with_broker_failure` in KRaft mode 
> sometimes fails with the following error in the log:
> {noformat}
> [2021-08-31 11:31:25,092] ERROR [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Unexpected error occurred while processing data for partition 
> __consumer_offsets-1 at offset 31727 
> (kafka.server.ReplicaFetcherThread)java.lang.IllegalStateException: Offset 
> mismatch for partition __consumer_offsets-1: fetched offset = 31727, log end 
> offset = 31728. at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:194)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$8(AbstractFetcherThread.scala:545)
>  at scala.Option.foreach(Option.scala:437) at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:533)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7$adapted(AbstractFetcherThread.scala:532)
>  at 
> kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
>  at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
>  at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
>  at 
> scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:532)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:216)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:215)
>  at scala.Option.foreach(Option.scala:437) at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:215)
>  at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:197) at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:99)[2021-08-31 
> 11:31:25,093] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] 
> Partition __consumer_offsets-1 marked as failed 
> (kafka.server.ReplicaFetcherThread)
> {noformat}
>  The issue is due to a race condition in 
> `ReplicaManager#applyLocalFollowersDelta`. The `InitialFetchState` is created 
> and populated before the partition is removed from the fetcher threads. This 
> means that the fetch offset of the `InitialFetchState` could be outdated when 
> the fetcher threads are re-started because the fetcher threads could have 
> incremented the log end offset in between.
> The partitions must be removed from the fetcher threads before the 
> `InitialFetchStates` are created.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13266) `InitialFetchState` should be created after partition is removed from the fetchers

2021-09-07 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-13266:

Fix Version/s: 3.0.0

> `InitialFetchState` should be created after partition is removed from the 
> fetchers
> --
>
> Key: KAFKA-13266
> URL: https://issues.apache.org/jira/browse/KAFKA-13266
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Critical
> Fix For: 3.0.0
>
>
>  `ReplicationTest.test_replication_with_broker_failure` in KRaft mode 
> sometimes fails with the following error in the log:
> {noformat}
> [2021-08-31 11:31:25,092] ERROR [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Unexpected error occurred while processing data for partition 
> __consumer_offsets-1 at offset 31727 
> (kafka.server.ReplicaFetcherThread)java.lang.IllegalStateException: Offset 
> mismatch for partition __consumer_offsets-1: fetched offset = 31727, log end 
> offset = 31728. at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:194)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$8(AbstractFetcherThread.scala:545)
>  at scala.Option.foreach(Option.scala:437) at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:533)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7$adapted(AbstractFetcherThread.scala:532)
>  at 
> kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
>  at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
>  at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
>  at 
> scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:532)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:216)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:215)
>  at scala.Option.foreach(Option.scala:437) at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:215)
>  at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:197) at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:99)[2021-08-31 
> 11:31:25,093] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] 
> Partition __consumer_offsets-1 marked as failed 
> (kafka.server.ReplicaFetcherThread)
> {noformat}
>  The issue is due to a race condition in 
> `ReplicaManager#applyLocalFollowersDelta`. The `InitialFetchState` is created 
> and populated before the partition is removed from the fetcher threads. This 
> means that the fetch offset of the `InitialFetchState` could be outdated when 
> the fetcher threads are re-started because the fetcher threads could have 
> incremented the log end offset in between.
> The partitions must be removed from the fetcher threads before the 
> `InitialFetchStates` are created.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13276) Public DescribeConsumerGroupsResult constructor refers to KafkaFutureImpl

2021-09-07 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-13276:

Fix Version/s: 3.0.0

> Public DescribeConsumerGroupsResult constructor refers to KafkaFutureImpl
> -
>
> Key: KAFKA-13276
> URL: https://issues.apache.org/jira/browse/KAFKA-13276
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Tom Bentley
>Priority: Blocker
> Fix For: 3.0.0
>
>
> The new public DescribeConsumerGroupsResult constructor refers to the 
> non-public API KafkaFutureImpl



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13260) FindCoordinator errorCounts does not handle v4

2021-09-07 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411315#comment-17411315
 ] 

Ismael Juma commented on KAFKA-13260:
-

3.0.0 hasn't been released yet, so it's a bit weird to say it affects it.

> FindCoordinator errorCounts does not handle v4
> --
>
> Key: KAFKA-13260
> URL: https://issues.apache.org/jira/browse/KAFKA-13260
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.0.0
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>
> When using batch find coordinator (>=v4), errorCounts() does not correctly 
> compute the error count.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13266) `InitialFetchState` should be created after partition is removed from the fetchers

2021-09-07 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-13266:

Affects Version/s: (was: 3.0.0)

> `InitialFetchState` should be created after partition is removed from the 
> fetchers
> --
>
> Key: KAFKA-13266
> URL: https://issues.apache.org/jira/browse/KAFKA-13266
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Blocker
> Fix For: 3.0.0
>
>
>  `ReplicationTest.test_replication_with_broker_failure` in KRaft mode 
> sometimes fails with the following error in the log:
> {noformat}
> [2021-08-31 11:31:25,092] ERROR [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Unexpected error occurred while processing data for partition 
> __consumer_offsets-1 at offset 31727 
> (kafka.server.ReplicaFetcherThread)java.lang.IllegalStateException: Offset 
> mismatch for partition __consumer_offsets-1: fetched offset = 31727, log end 
> offset = 31728. at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:194)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$8(AbstractFetcherThread.scala:545)
>  at scala.Option.foreach(Option.scala:437) at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:533)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7$adapted(AbstractFetcherThread.scala:532)
>  at 
> kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
>  at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:359)
>  at 
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:355)
>  at 
> scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:309)
>  at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:532)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:216)
>  at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:215)
>  at scala.Option.foreach(Option.scala:437) at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:215)
>  at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:197) at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:99)[2021-08-31 
> 11:31:25,093] WARN [ReplicaFetcher replicaId=1, leaderId=2, fetcherId=0] 
> Partition __consumer_offsets-1 marked as failed 
> (kafka.server.ReplicaFetcherThread)
> {noformat}
>  The issue is due to a race condition in 
> `ReplicaManager#applyLocalFollowersDelta`. The `InitialFetchState` is created 
> and populated before the partition is removed from the fetcher threads. This 
> means that the fetch offset of the `InitialFetchState` could be outdated when 
> the fetcher threads are re-started because the fetcher threads could have 
> incremented the log end offset in between.
> The partitions must be removed from the fetcher threads before the 
> `InitialFetchStates` are created.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13260) FindCoordinator errorCounts does not handle v4

2021-09-07 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-13260:

Affects Version/s: (was: 3.0.0)

> FindCoordinator errorCounts does not handle v4
> --
>
> Key: KAFKA-13260
> URL: https://issues.apache.org/jira/browse/KAFKA-13260
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
> Fix For: 3.0.0
>
>
> When using batch find coordinator (>=v4), errorCounts() does not correctly 
> compute the error count.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13260) FindCoordinator errorCounts does not handle v4

2021-09-07 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-13260:

Fix Version/s: 3.0.0

> FindCoordinator errorCounts does not handle v4
> --
>
> Key: KAFKA-13260
> URL: https://issues.apache.org/jira/browse/KAFKA-13260
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.0.0
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
> Fix For: 3.0.0
>
>
> When using batch find coordinator (>=v4), errorCounts() does not correctly 
> compute the error count.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13276) Public DescribeConsumerGroupsResult constructor refers to KafkaFutureImpl

2021-09-07 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma reassigned KAFKA-13276:
---

Assignee: Tom Bentley

> Public DescribeConsumerGroupsResult constructor refers to KafkaFutureImpl
> -
>
> Key: KAFKA-13276
> URL: https://issues.apache.org/jira/browse/KAFKA-13276
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.0
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Blocker
> Fix For: 3.0.0
>
>
> The new public DescribeConsumerGroupsResult constructor refers to the 
> non-public API KafkaFutureImpl



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13260) FindCoordinator errorCounts does not handle v4

2021-09-07 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-13260:

Priority: Blocker  (was: Major)

> FindCoordinator errorCounts does not handle v4
> --
>
> Key: KAFKA-13260
> URL: https://issues.apache.org/jira/browse/KAFKA-13260
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Blocker
> Fix For: 3.0.0
>
>
> When using batch find coordinator (>=v4), errorCounts() does not correctly 
> compute the error count.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13259) DescribeProducers response does not include an error when it failed

2021-09-07 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-13259:

Fix Version/s: 3.0.0

> DescribeProducers response does not include an error when it failed
> ---
>
> Key: KAFKA-13259
> URL: https://issues.apache.org/jira/browse/KAFKA-13259
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.0.0, 2.8.0
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Blocker
> Fix For: 3.0.0
>
>
> This is broken since 2.8 but this is directly visible by users in 3.0 because 
> of the new Admin.describeProducers() API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13259) DescribeProducers response does not include an error when it failed

2021-09-07 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-13259:

Affects Version/s: (was: 3.0.0)

> DescribeProducers response does not include an error when it failed
> ---
>
> Key: KAFKA-13259
> URL: https://issues.apache.org/jira/browse/KAFKA-13259
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.8.0
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Blocker
> Fix For: 3.0.0
>
>
> This is broken since 2.8 but this is directly visible by users in 3.0 because 
> of the new Admin.describeProducers() API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13258) AlterClientQuotas response does not include an error when it failed

2021-09-07 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-13258:

Fix Version/s: 3.0.0

> AlterClientQuotas response does not include an error when it failed
> ---
>
> Key: KAFKA-13258
> URL: https://issues.apache.org/jira/browse/KAFKA-13258
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.8.0
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
> Fix For: 3.0.0
>
>
> This causes `admin.alterClientQuotas()` to not return an error even if the 
> call failed due to insufficient permissions for example.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13259) DescribeProducers response does not include an error when it failed

2021-09-07 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-13259:

Fix Version/s: 2.8.1

> DescribeProducers response does not include an error when it failed
> ---
>
> Key: KAFKA-13259
> URL: https://issues.apache.org/jira/browse/KAFKA-13259
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.8.0
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Blocker
> Fix For: 3.0.0, 2.8.1
>
>
> This is broken since 2.8 but this is directly visible by users in 3.0 because 
> of the new Admin.describeProducers() API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13258) AlterClientQuotas response does not include an error when it failed

2021-09-07 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-13258:

Affects Version/s: (was: 3.0.0)

> AlterClientQuotas response does not include an error when it failed
> ---
>
> Key: KAFKA-13258
> URL: https://issues.apache.org/jira/browse/KAFKA-13258
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.8.0
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>
> This causes `admin.alterClientQuotas()` to not return an error even if the 
> call failed due to insufficient permissions for example.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13258) AlterClientQuotas response does not include an error when it failed

2021-09-07 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-13258:

Fix Version/s: 2.8.1

> AlterClientQuotas response does not include an error when it failed
> ---
>
> Key: KAFKA-13258
> URL: https://issues.apache.org/jira/browse/KAFKA-13258
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.8.0
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Blocker
> Fix For: 3.0.0, 2.8.1
>
>
> This causes `admin.alterClientQuotas()` to not return an error even if the 
> call failed due to insufficient permissions for example.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13258) AlterClientQuotas response does not include an error when it failed

2021-09-07 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-13258:

Priority: Blocker  (was: Major)

> AlterClientQuotas response does not include an error when it failed
> ---
>
> Key: KAFKA-13258
> URL: https://issues.apache.org/jira/browse/KAFKA-13258
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.8.0
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Blocker
> Fix For: 3.0.0
>
>
> This causes `admin.alterClientQuotas()` to not return an error even if the 
> call failed due to insufficient permissions for example.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #11300: KAFKA-13258/13259/13260: Fix error response generation

2021-09-07 Thread GitBox


ijuma commented on pull request #11300:
URL: https://github.com/apache/kafka/pull/11300#issuecomment-914446506


   @mimaison I synced with @kkonstantine and he said he's ok with this being 
merged to 3.0. He will update the release thread soon, but let's get this in 
when we're ready to avoid delays.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram opened a new pull request #11308: KAFKA-13277; Fix size calculation for tagged string fields in message generator

2021-09-07 Thread GitBox


rajinisivaram opened a new pull request #11308:
URL: https://github.com/apache/kafka/pull/11308


   Size calculation for tagged fields is currently incorrect and works only for 
small strings. This results in BufferOverflowException when serializing 
requests with large strings in tagged fields. The PR fixes size calculation to 
match the bytes written.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13248) Class name mismatch for LoggerFactory.getLogger method in TimeOrderedWindowStoreBuilder.java

2021-09-07 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411347#comment-17411347
 ] 

Guozhang Wang commented on KAFKA-13248:
---

[~sider-bughunter] Thanks for the find. I'm currently working on KAFKA-13216 in 
which I've used a kv-store instead of a window store, and hence I've renamed 
the whole class as well. Do you mind if I just incorporate your findings in the 
other PR?

> Class name mismatch for LoggerFactory.getLogger method in 
> TimeOrderedWindowStoreBuilder.java
> 
>
> Key: KAFKA-13248
> URL: https://issues.apache.org/jira/browse/KAFKA-13248
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: S. B. Hunter
>Priority: Minor
> Attachments: KAFKA-13248.1.patch
>
>
> I have noticed a mismatch with the class name passed to the 
> LoggerFactory.getLogger method. This would make it hard to track the source 
> of log messages.
> public class {color:#00875a}TimeOrderedWindowStoreBuilder{color} 
> extends AbstractStoreBuilder> {
>  private final Logger log = 
> LoggerFactory.getLogger({color:#de350b}WindowStoreBuilder{color}.class);
> private final WindowBytesStoreSupplier storeSupplier;
> public {color:#00875a}TimeOrderedWindowStoreBuilder{color}(final 
> WindowBytesStoreSupplier storeSupplier,
>  final Serde keySerde,



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] spena commented on a change in pull request #11252: KAFKA-13216: Use a KV with list serde for the shared store

2021-09-07 Thread GitBox


spena commented on a change in pull request #11252:
URL: https://github.com/apache/kafka/pull/11252#discussion_r703675892



##
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedTimestampedKeyAndJoinSideSerializerTest.java
##
@@ -24,49 +24,49 @@
 import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertThrows;
 
-public class KeyAndJoinSideSerializerTest {
+public class TimestampedTimestampedKeyAndJoinSideSerializerTest {

Review comment:
   The class starts with repeated timestamped -> `TimestampedTimestamped...`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeqo commented on a change in pull request #11213: KAFKA-13201: Convert KTable suppress to new PAPI

2021-09-07 Thread GitBox


jeqo commented on a change in pull request #11213:
URL: https://github.com/apache/kafka/pull/11213#discussion_r703713785



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
##
@@ -198,7 +201,7 @@ private void emit(final 
TimeOrderedKeyValueBuffer.Eviction toEmit) {
 final ProcessorRecordContext prevRecordContext = 
internalProcessorContext.recordContext();
 
internalProcessorContext.setRecordContext(toEmit.recordContext());
 try {
-internalProcessorContext.forward(toEmit.key(), 
toEmit.value());
+internalProcessorContext.forward(toEmit.record());

Review comment:
   It does, yet. I just tested in one of the methods. Adding the change and 
updating a test to cover this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeqo commented on a change in pull request #11213: KAFKA-13201: Convert KTable suppress to new PAPI

2021-09-07 Thread GitBox


jeqo commented on a change in pull request #11213:
URL: https://github.com/apache/kafka/pull/11213#discussion_r703713785



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorSupplier.java
##
@@ -198,7 +201,7 @@ private void emit(final 
TimeOrderedKeyValueBuffer.Eviction toEmit) {
 final ProcessorRecordContext prevRecordContext = 
internalProcessorContext.recordContext();
 
internalProcessorContext.setRecordContext(toEmit.recordContext());
 try {
-internalProcessorContext.forward(toEmit.key(), 
toEmit.value());
+internalProcessorContext.forward(toEmit.record());

Review comment:
   It does, yes. I just tested in one of the methods. Adding the change and 
updating a test to cover this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

2021-09-07 Thread GitBox


rhauch commented on pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#issuecomment-914547745


   @C0urante, thanks for the feedback on my suggestion. I like your option 3, 
because it does allow the iteration to stop on each source partition as soon as 
it encounters the first unacknowledged record in each queue. I also think that 
the behavior with the suggested approach and your option 3 is still a lot 
better than the current situation. 
   
   One question, though: you mention that it might be a problem if iterating 
over the submitted records takes longer than `offset.flush.timeout.ms`. But 
IIUC the `offset.flush.timeout.ms` would actually not be used anymore, as there 
actually are no timeouts as the offset commit thread doesn't block anymore. So, 
worst case, if task A has a ton of submitted records that have to be iterated 
over (e.g., fast producer and fast source task), it might slow the committing 
of offsets for other tasks. (Again, this is not any worse than the current 
behavior.) But your option 2 would help with this at the risk of using more 
threads, and so we may also want to consider this to help ensure that no 
slowly-proceeding producer of a task blocks other offset commits.
   
   Of course, another option might be to incur the iteration on the worker 
source task thread. That would essentially move the use of the queue(s) to the 
worker source task thread, tho we still need to get the offsets to the offset 
commit thread and so would likely have to keep the synchronization blocks 
around the offset writer snapshot. On one hand, that's putting more work onto 
the worker source task thread and making the offset thread super 
straightforward (snapshot and write); on the other it's putting the onus on the 
worker source task thread.
   
   Thoughts?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11307: KAFKA-13262: Remove final from `MockConsumer.close()` and delegate implementation

2021-09-07 Thread GitBox


ijuma commented on pull request #11307:
URL: https://github.com/apache/kafka/pull/11307#issuecomment-914551974


   Unrelated test failures:
   
   > Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldPrefixAllInternalTopicNamesWithNamedTopology
   > Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldPrefixAllInternalTopicNamesWithNamedTopology
   > Build / JDK 8 and Scala 2.12 / 
org.apache.kafka.streams.integration.NamedTopologyIntegrationTest.shouldPrefixAllInternalTopicNamesWithNamedTopology
   > 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #11307: KAFKA-13262: Remove final from `MockConsumer.close()` and delegate implementation

2021-09-07 Thread GitBox


ijuma merged pull request #11307:
URL: https://github.com/apache/kafka/pull/11307


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11307: KAFKA-13262: Remove final from `MockConsumer.close()` and delegate implementation

2021-09-07 Thread GitBox


ijuma commented on pull request #11307:
URL: https://github.com/apache/kafka/pull/11307#issuecomment-914552427


   Merged to master and cherry-picked to 3.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #11308: KAFKA-13277; Fix size calculation for tagged string fields in message generator

2021-09-07 Thread GitBox


cmccabe merged pull request #11308:
URL: https://github.com/apache/kafka/pull/11308


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding commented on pull request #11293: MINOR: defineInternal for KIP-405 configs

2021-09-07 Thread GitBox


ccding commented on pull request #11293:
URL: https://github.com/apache/kafka/pull/11293#issuecomment-914613398


   It failed here 
https://github.com/apache/kafka/blob/9d107c174bfb23e5ce0daca9a59796f018f627c6/core[…]t/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala
   ```
   Expected :HashSet(message.timestamp.difference.max.ms, 
follower.replication.throttled.replicas, file.delete.delay.ms, 
compression.type, unclean.leader.election.enable, flush.messages, 
delete.retention.ms, max.message.bytes, segment.index.bytes, segment.jitter.ms, 
min.cleanable.dirty.ratio, retention.bytes, message.downconversion.enable, 
max.compaction.lag.ms, min.compaction.lag.ms, flush.ms, cleanup.policy, 
message.timestamp.type, retention.ms, min.insync.replicas, 
message.format.version, leader.replication.throttled.replicas, preallocate, 
index.interval.bytes, segment.bytes, segment.ms)
   Actual   :HashSet(message.timestamp.difference.max.ms, 
follower.replication.throttled.replicas, file.delete.delay.ms, 
compression.type, unclean.leader.election.enable, flush.messages, 
delete.retention.ms, local.retention.ms, max.message.bytes, 
segment.index.bytes, segment.jitter.ms, min.cleanable.dirty.ratio, 
retention.bytes, message.downconversion.enable, max.compaction.lag.ms, 
min.compaction.lag.ms, remote.storage.enable, flush.ms, cleanup.policy, 
message.timestamp.type, retention.ms, min.insync.replicas, 
message.format.version, leader.replication.throttled.replicas, 
local.retention.bytes, preallocate, index.interval.bytes, segment.bytes, 
segment.ms)
   ```
   the diff between Expected and Actual is the three defineInternal configs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ccding edited a comment on pull request #11293: MINOR: defineInternal for KIP-405 configs

2021-09-07 Thread GitBox


ccding edited a comment on pull request #11293:
URL: https://github.com/apache/kafka/pull/11293#issuecomment-914613398


   It failed here 
https://github.com/apache/kafka/blob/9d107c174bfb23e5ce0daca9a59796f018f627c6/core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala#L432
   ```
   Expected :HashSet(message.timestamp.difference.max.ms, 
follower.replication.throttled.replicas, file.delete.delay.ms, 
compression.type, unclean.leader.election.enable, flush.messages, 
delete.retention.ms, max.message.bytes, segment.index.bytes, segment.jitter.ms, 
min.cleanable.dirty.ratio, retention.bytes, message.downconversion.enable, 
max.compaction.lag.ms, min.compaction.lag.ms, flush.ms, cleanup.policy, 
message.timestamp.type, retention.ms, min.insync.replicas, 
message.format.version, leader.replication.throttled.replicas, preallocate, 
index.interval.bytes, segment.bytes, segment.ms)
   Actual   :HashSet(message.timestamp.difference.max.ms, 
follower.replication.throttled.replicas, file.delete.delay.ms, 
compression.type, unclean.leader.election.enable, flush.messages, 
delete.retention.ms, local.retention.ms, max.message.bytes, 
segment.index.bytes, segment.jitter.ms, min.cleanable.dirty.ratio, 
retention.bytes, message.downconversion.enable, max.compaction.lag.ms, 
min.compaction.lag.ms, remote.storage.enable, flush.ms, cleanup.policy, 
message.timestamp.type, retention.ms, min.insync.replicas, 
message.format.version, leader.replication.throttled.replicas, 
local.retention.bytes, preallocate, index.interval.bytes, segment.bytes, 
segment.ms)
   ```
   the diff between Expected and Actual is the three defineInternal configs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11033: KAFKA-12988 Asynchronous API support for RemoteLogMetadataManager add/update methods.

2021-09-07 Thread GitBox


junrao commented on a change in pull request #11033:
URL: https://github.com/apache/kafka/pull/11033#discussion_r703798708



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
##
@@ -129,37 +132,44 @@ public void 
updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmen
 }
 
 // Publish the message to the topic.
-
doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(),
 segmentMetadataUpdate);
+return 
doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(),
 segmentMetadataUpdate);
 } finally {
 lock.readLock().unlock();
 }
 }
 
 @Override
-public void putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata 
remotePartitionDeleteMetadata)
+public CompletableFuture 
putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata 
remotePartitionDeleteMetadata)
 throws RemoteStorageException {
 Objects.requireNonNull(remotePartitionDeleteMetadata, 
"remotePartitionDeleteMetadata can not be null");
 
 lock.readLock().lock();
 try {
 ensureInitializedAndNotClosed();
 
-
doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), 
remotePartitionDeleteMetadata);
+return 
doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), 
remotePartitionDeleteMetadata);
 } finally {
 lock.readLock().unlock();
 }
 }
 
-private void doPublishMetadata(TopicIdPartition topicIdPartition, 
RemoteLogMetadata remoteLogMetadata)
+private CompletableFuture doPublishMetadata(TopicIdPartition 
topicIdPartition,
+  RemoteLogMetadata 
remoteLogMetadata)
 throws RemoteStorageException {
 log.debug("Publishing metadata for partition: [{}] with context: 
[{}]", topicIdPartition, remoteLogMetadata);
 
 try {
 // Publish the message to the topic.
-RecordMetadata recordMetadata = 
producerManager.publishMessage(remoteLogMetadata);
-// Wait until the consumer catches up with this offset. This will 
ensure read-after-write consistency
-// semantics.
-consumerManager.waitTillConsumptionCatchesUp(recordMetadata);
+CompletableFuture produceFuture = new 
CompletableFuture<>();
+producerManager.publishMessage(remoteLogMetadata, produceFuture);
+return produceFuture.thenApplyAsync((Function) recordMetadata -> {
+try {
+
consumerManager.waitTillConsumptionCatchesUp(recordMetadata);
+} catch (TimeoutException e) {
+throw new KafkaException(e);
+}
+return null;
+}).toCompletableFuture();

Review comment:
   Is toCompletableFuture() needed since thenApplyAsync() returns 
CompletableFuture already?

##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
##
@@ -129,37 +132,44 @@ public void 
updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmen
 }
 
 // Publish the message to the topic.
-
doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(),
 segmentMetadataUpdate);
+return 
doPublishMetadata(segmentMetadataUpdate.remoteLogSegmentId().topicIdPartition(),
 segmentMetadataUpdate);
 } finally {
 lock.readLock().unlock();
 }
 }
 
 @Override
-public void putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata 
remotePartitionDeleteMetadata)
+public CompletableFuture 
putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata 
remotePartitionDeleteMetadata)
 throws RemoteStorageException {
 Objects.requireNonNull(remotePartitionDeleteMetadata, 
"remotePartitionDeleteMetadata can not be null");
 
 lock.readLock().lock();
 try {
 ensureInitializedAndNotClosed();
 
-
doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), 
remotePartitionDeleteMetadata);
+return 
doPublishMetadata(remotePartitionDeleteMetadata.topicIdPartition(), 
remotePartitionDeleteMetadata);
 } finally {
 lock.readLock().unlock();
 }
 }
 
-private void doPublishMetadata(TopicIdPartition topicIdPartition, 
RemoteLogMetadata remoteLogMetadata)
+private CompletableFuture doPublishMetadata(TopicIdPartition 
topicIdPartition,
+  RemoteLogMetadata 
remoteLogMetadata)
 throws RemoteStorageException {
 log.debug("Publishing metadata for partition: [{}] with context: 
[{}]", topicIdPartition, remoteLogMetadat

[GitHub] [kafka] guozhangwang commented on pull request #11297: KAFKA-10038: Supports default client.id for ConsoleConsumer, ProducerPerformance, ConsumerPerformance

2021-09-07 Thread GitBox


guozhangwang commented on pull request #11297:
URL: https://github.com/apache/kafka/pull/11297#issuecomment-914619982


   Thanks @jasonyanwenl , the PR lgtm, and thank you also for adding the unit 
tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #11297: KAFKA-10038: Supports default client.id for ConsoleConsumer, ProducerPerformance, ConsumerPerformance

2021-09-07 Thread GitBox


guozhangwang merged pull request #11297:
URL: https://github.com/apache/kafka/pull/11297


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #11304: MINOR: Supplement unit test for KAFKA-13175 : Optimization TopicExis…

2021-09-07 Thread GitBox


guozhangwang merged pull request #11304:
URL: https://github.com/apache/kafka/pull/11304


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2

2021-09-07 Thread Ashish Patil (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411480#comment-17411480
 ] 

Ashish Patil commented on KAFKA-9366:
-

Thanks, [~dongjin]

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.1.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #11302: KAFKA-13243: KIP-773 Differentiate metric latency measured in ms and ns

2021-09-07 Thread GitBox


guozhangwang commented on a change in pull request #11302:
URL: https://github.com/apache/kafka/pull/11302#discussion_r703822498



##
File path: docs/upgrade.html
##
@@ -22,9 +22,12 @@
 Notable changes in 
3.1.0
 
 Apache Kafka supports Java 17.
+The following metrics have been deprecated: 
bufferpool-wait-time-total, io-waittime-total,

Review comment:
   nit: also add a ref link to the KIP?

##
File path: docs/upgrade.html
##
@@ -22,9 +22,12 @@
 Notable changes in 
3.1.0
 
 Apache Kafka supports Java 17.
+The following metrics have been deprecated: 
bufferpool-wait-time-total, io-waittime-total,
+and iotime-total. Please use 
bufferpool-wait-time-ns-total, io-wait-time-ns-total,
+and io-time-ns-total instead.
 
 
-Notable changes in 
3.0.0
+Notable changes in 
3.0.0

Review comment:
   Nice find.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

2021-09-07 Thread GitBox


guozhangwang commented on pull request #10798:
URL: https://github.com/apache/kafka/pull/10798#issuecomment-914624644


   Okay, let me re-trigger the tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10566: KAFKA-12694 Avoid schema mismatch DataException when validating default values

2021-09-07 Thread GitBox


C0urante commented on pull request #10566:
URL: https://github.com/apache/kafka/pull/10566#issuecomment-914640232


   Hey Gunnar! Sorry for the delay.
   
   > Initializing a Struct with a builder rather than a Schema seems to be an 
odd thing to do, no?
   
   Indeed it is rather odd, but it's still possible at the moment and I've 
definitely seen it in use (intentionally or otherwise...) in at least one 
fairly large and popular code base. If we can avoid breaking projects that use 
this pattern (strange as it is) I think that'd be best.
   
   I haven't spent too much time thinking about a clean solution but it's at 
least possible to satisfy both cases with the following:
   - The change proposed in this PR
   - The change I proposed above where the order of schemas in the comparison 
in `ConnectSchema::validateValue` is reversed
   - The `ConnectSchema::equals` method is altered to accept any `Schema` 
instance and only operate on the public methods exposed by the `Schema` 
interface instead of internal variables for comparison of the schemas' fields, 
key schema, and value schema
   
   This would be fairly involved for such a small bug so hopefully there's 
something less invasive out there, but at least it's possible to favor one use 
case without compromising the other.
   
   As an alternative, could the connector use the `SchemaBuilder` when 
instantiating the `Struct` object, instead of calling `build` and using a 
`ConnectSchema`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante edited a comment on pull request #10566: KAFKA-12694 Avoid schema mismatch DataException when validating default values

2021-09-07 Thread GitBox


C0urante edited a comment on pull request #10566:
URL: https://github.com/apache/kafka/pull/10566#issuecomment-914640232


   Hey Gunnar! Sorry for the delay.
   
   > Initializing a Struct with a builder rather than a Schema seems to be an 
odd thing to do, no?
   
   Indeed it is rather odd, but it's still possible at the moment and I've 
definitely seen it in use (intentionally or otherwise...) in at least one 
fairly large and popular code base. If we can avoid breaking projects that use 
this pattern (strange as it is) I think that'd be best.
   
   I haven't spent too much time thinking about a clean solution but it's at 
least possible to satisfy both cases with the following:
   - The change proposed in this PR
   - The change I proposed above where the order of schemas in the comparison 
in `ConnectSchema::validateValue` is reversed
   - The `ConnectSchema::equals` method is altered to accept any `Schema` 
instance and only operate on the public methods exposed by the `Schema` 
interface instead of internal variables for comparison of the schemas' fields, 
key schema, and value schema
   
   I've verified this locally with the following two test cases:
   
   ```java
   @Test
   public void testDefaultValueStructSchema() {
   SchemaBuilder builder = SchemaBuilder.struct()
   .field("f1", Schema.BOOLEAN_SCHEMA);
   
   Struct defaultValue = new Struct(builder.build()); // the Struct 
receives a schema, not a builder
   defaultValue.put("f1", true);
   
   builder.defaultValue(defaultValue)
   .build();
   }
   
   @Test
   public void testDefaultValueStructSchemaBuilder() {
   SchemaBuilder builder = SchemaBuilder.struct()
   .field("f1", Schema.BOOLEAN_SCHEMA);
   
   Struct defaultValue = new Struct(builder);
   defaultValue.put("f1", true);
   
   builder.defaultValue(defaultValue).build();
   }
   ```
   
   This would be fairly involved for such a small bug so hopefully there's 
something less invasive out there, but at least it's possible to favor one use 
case without compromising the other.
   
   As an alternative, could the connector use the `SchemaBuilder` when 
instantiating the `Struct` object, instead of calling `build` and using a 
`ConnectSchema`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dejan2609 commented on pull request #10967: KAFKA-12770: CheckStyle team needs this feature (in order to include Kafka into their regression suite)

2021-09-07 Thread GitBox


dejan2609 commented on pull request #10967:
URL: https://github.com/apache/kafka/pull/10967#issuecomment-914653605


   Adding a comment (just to bring this PR to the surface).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #11213: KAFKA-13201: Convert KTable suppress to new PAPI

2021-09-07 Thread GitBox


vvcephei commented on pull request #11213:
URL: https://github.com/apache/kafka/pull/11213#issuecomment-914663785


   Java 17 errored out:
   
   ```
   [2021-09-07T21:03:23.475Z] * What went wrong:
   
   [2021-09-07T21:03:23.475Z] Execution failed for task ':core:integrationTest'.
   
   [2021-09-07T21:03:23.475Z] > Process 'Gradle Test Executor 129' finished 
with non-zero exit value 1
   
   [2021-09-07T21:03:23.475Z]   This problem might be caused by incorrect test 
process configuration.
   
   [2021-09-07T21:03:23.475Z]   Please refer to the test execution section in 
the User Manual at 
https://docs.gradle.org/7.2/userguide/java_testing.html#sec:test_execution
   
   ```
   
   But Java 8 and 11 passed, so I'll go ahead and merge (since Java 17 itself 
is new to this project).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #11213: KAFKA-13201: Convert KTable suppress to new PAPI

2021-09-07 Thread GitBox


vvcephei merged pull request #11213:
URL: https://github.com/apache/kafka/pull/11213


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13256) Possible NPE in ConfigDef when rendering (enriched) RST or HTML when documentation is not set/NULL

2021-09-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411582#comment-17411582
 ] 

Matthias J. Sax commented on KAFKA-13256:
-

[~rk3rn3r] Thanks for reporting this issue and for the PR. I am not familiar 
with the details of this tool, so my question is, for what cases can the key 
actually be {{null}}? 

> Possible NPE in ConfigDef when rendering (enriched) RST or HTML when 
> documentation is not set/NULL
> --
>
> Key: KAFKA-13256
> URL: https://issues.apache.org/jira/browse/KAFKA-13256
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.8.0
>Reporter: René Kerner
>Priority: Major
>   Original Estimate: 0.5h
>  Remaining Estimate: 0.5h
>
> While working on Debezium I discovered the following issue:
> When Kafka's ConfigDef renders the HTML or RST documentation representation 
> of the config definition, it requires `ConfigKey.documentation` member 
> variable to be a java.lang.String instance that's set to an actual value 
> different than NULL, else NPE happens:
> {code:java}
>  b.append(key.documentation.replaceAll("\n", ""));
> {code}
> {code:java}
>  for (String docLine : key.documentation.split("\n")) {
> {code}
>  
> When `documentation` is not set/NULL I suggest to either set a valid String 
> like "No documentation available" or skip that config key.
>  
> I could provide a PR to fix this soon.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10493) KTable out-of-order updates are not being ignored

2021-09-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411592#comment-17411592
 ] 

Matthias J. Sax commented on KAFKA-10493:
-

Agreed. The original API only had `builder.table()` and the assumption was that 
people would follow the "single writer principle" to avoid out-of-order data in 
the input topic. When we added `toTable()` the case you describe was actually 
discussed, as it was a concern... In the end, we decided to add `toTable()` 
anyway, and leave it to the user's responsibility to be aware of the issue.

In the end, from my POV only "versioned tables" will solve the issue correctly, 
and we are already working on some design/prototyping.

> KTable out-of-order updates are not being ignored
> -
>
> Key: KAFKA-10493
> URL: https://issues.apache.org/jira/browse/KAFKA-10493
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Pedro Gontijo
>Assignee: Matthias J. Sax
>Priority: Blocker
> Fix For: 4.0.0
>
> Attachments: KTableOutOfOrderBug.java, out-of-order-table.png
>
>
> On a materialized KTable, out-of-order records for a given key (records which 
> timestamp are older than the current value in store) are not being ignored 
> but used to update the local store value and also being forwarded.
> I believe the bug is here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L77]
>  It should return true, not false (see javadoc)
> The bug impacts here: 
> [https://github.com/apache/kafka/blob/2.6.0/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableSource.java#L142-L148]
> I have attached a simple stream app that shows the issue happening.
> Thank you!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #11218: MINOR: optimize performAssignment to skip unnecessary check

2021-09-07 Thread GitBox


ableegoldman commented on a change in pull request #11218:
URL: https://github.com/apache/kafka/pull/11218#discussion_r703933946



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
##
@@ -48,6 +53,16 @@
 public class ConsumerConfig extends AbstractConfig {
 private static final ConfigDef CONFIG;
 
+// a list contains all the in-product assignor names. Should be updated 
when new assignor added.
+// This is to help optimize the ConsumerCoordinator#performAssignment
+public static final List IN_PRODUCT_ASSIGNOR_NAMES =
+Collections.unmodifiableList(Arrays.asList(
+RANGE_ASSIGNOR_NAME,
+ROUNDROBIN_ASSIGNOR_NAME,
+STICKY_ASSIGNOR_NAME,
+COOPERATIVE_STICKY_ASSIGNOR_NAME

Review comment:
   No, I think it's correct to leave the Streams assignor out of this. 
Though I think it technically may not matter at the moment, since the Streams 
assignor will only assign topics from its own subscription and ignores the 
subscribed topics of the other members, we may want the flexibility to do 
something like this in the future.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
##
@@ -48,6 +53,16 @@
 public class ConsumerConfig extends AbstractConfig {
 private static final ConfigDef CONFIG;
 
+// a list contains all the in-product assignor names. Should be updated 
when new assignor added.
+// This is to help optimize the ConsumerCoordinator#performAssignment
+public static final List IN_PRODUCT_ASSIGNOR_NAMES =

Review comment:
   nit: this name is a little funky, can we come up with something that 
describes what this list actually means? The only things I can think of are a 
bit clunky, but maybe `ASSIGN_FROM_SUBSCRIBED_ASSIGNORS` or 
`SUBSCRIBED_TOPICS_ASSIGNORS` or whatever sounds good to you 🙂 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #11294: KAFKA-13266; `InitialFetchState` should be created after partition is removed from the fetchers

2021-09-07 Thread GitBox


hachikuji commented on a change in pull request #11294:
URL: https://github.com/apache/kafka/pull/11294#discussion_r703942630



##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -3242,6 +3260,92 @@ class ReplicaManagerTest {
 }
   }
 
+  @Test
+  def testDeltaFollowerStopFetcherBeforeCreatingInitialFetchOffset(): Unit = {
+val localId = 1
+val otherId = localId + 1
+val topicPartition = new TopicPartition("foo", 0)
+
+val mockReplicaFetcherManager = 
Mockito.mock(classOf[ReplicaFetcherManager])
+val replicaManager = setupReplicaManagerWithMockedPurgatories(
+  timer = new MockTimer(time),
+  brokerId = localId,
+  mockReplicaFetcherManager = Some(mockReplicaFetcherManager)
+)
+
+try {
+  // The first call to removeFetcherForPartitions should be ignored.
+  Mockito.when(mockReplicaFetcherManager.removeFetcherForPartitions(
+Set(topicPartition))
+  ).thenReturn(Map.empty[TopicPartition, PartitionFetchState])
+
+  // Make the local replica the follower
+  var followerTopicsDelta = topicsCreateDelta(localId, false)
+  var followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+  replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+  // Check the state of that partition
+  val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+  assertFalse(followerPartition.isLeader)
+  assertEquals(0, followerPartition.getLeaderEpoch)
+  assertEquals(0, followerPartition.localLogOrException.logEndOffset)
+
+  // Verify that addFetcherForPartitions was called with the correct
+  // init offset.
+  Mockito.verify(mockReplicaFetcherManager, Mockito.times(1))
+.addFetcherForPartitions(
+  Map(topicPartition -> InitialFetchState(
+leader = BrokerEndPoint(otherId, "localhost", 9093),
+currentLeaderEpoch = 0,
+initOffset = 0
+  ))
+)
+
+  // The second call to removeFetcherForPartitions simulate the case
+  // where the fetcher write to the log before being shutdown.
+  Mockito.when(mockReplicaFetcherManager.removeFetcherForPartitions(
+Set(topicPartition))
+  ).thenAnswer { _ =>
+replicaManager.getPartition(topicPartition) match {
+  case HostedPartition.Online(partition) =>
+partition.appendRecordsToFollowerOrFutureReplica(
+  records = MemoryRecords.withRecords(CompressionType.NONE, 0,
+new SimpleRecord("first message".getBytes)),
+  isFuture = false
+)
+
+  case _ =>
+}
+
+Map.empty[TopicPartition, PartitionFetchState]
+  }
+
+  // Apply changes that bumps the leader epoch.
+  followerTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(), 
localId, false)
+  followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+  replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+  assertFalse(followerPartition.isLeader)
+  assertEquals(1, followerPartition.getLeaderEpoch)
+  assertEquals(1, followerPartition.localLogOrException.logEndOffset)
+
+  // Verify that addFetcherForPartitions was called with the correct
+  // init offset.
+  Mockito.verify(mockReplicaFetcherManager, Mockito.times(1))
+.addFetcherForPartitions(
+  Map(topicPartition -> InitialFetchState(
+leader = BrokerEndPoint(otherId, "localhost", 9093),
+currentLeaderEpoch = 1,
+initOffset = 1
+  ))
+)
+} finally {
+  replicaManager.shutdown()
+}
+
+TestUtils.assertNoNonDaemonThreads(this.getClass.getName)

Review comment:
   nit: is it worthwhile moving this to an `@AfterEach`?

##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -2220,7 +2210,22 @@ class ReplicaManager(val config: KafkaConfig,
 
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.keySet)

Review comment:
   Perhaps it's worth a comment here that stopping the fetchers is required 
so that we can initialize the fetch position correctly. It's a subtle point 
which we might miss again in the future.

##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -2229,15 +2234,14 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def deleteStrayReplicas(topicPartitions: Iterable[TopicPartition]): Unit = {
-stopPartitions(topicPartitions.map { tp => tp -> true }.toMap).foreach {
-  case (topicPartition, e) =>
-if (e.isInstanceOf[KafkaStorageException]) {
-  stateChangeLogger.error(s"Unable to delete stray replica 
$topicPartition because " +
-"the local replica for the partition is in an offline log 
directory")
-} else {
-  stateChangeLogger.error(s"Unable to delete

[GitHub] [kafka] ableegoldman commented on a change in pull request #11292: KAFKA-13264: fix inMemoryWindowStore backward fetch not in reversed order

2021-09-07 Thread GitBox


ableegoldman commented on a change in pull request #11292:
URL: https://github.com/apache/kafka/pull/11292#discussion_r703945833



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/WindowStoreFetchIntegrationTest.java
##
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.TimeWindowedKStream;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.test.TestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.function.Supplier;
+
+import static java.time.Duration.ofMillis;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+
+@RunWith(Parameterized.class)
+public class WindowStoreFetchIntegrationTest {
+private enum StoreType { InMemory, RocksDB, Timed };
+private static final String STORE_NAME = "store";
+private static final int DATA_SIZE = 5;
+private static final long WINDOW_SIZE = 500L;
+private static final long RETENTION_MS = 1L;
+
+private StoreType storeType;
+private boolean enableLogging;
+private boolean enableCaching;
+private boolean forward;
+
+private LinkedList, Long>> expectedRecords;
+private LinkedList> records;
+private Properties streamsConfig;
+
+private TimeWindowedKStream windowedStream;
+
+public WindowStoreFetchIntegrationTest(final StoreType storeType, final 
boolean enableLogging, final boolean enableCaching, final boolean forward) {

Review comment:
   Seems more like a unit test than an integration (though a particularly 
thorough one😜 )




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13269) Kafka Streams Aggregation data loss between instance restarts and rebalances

2021-09-07 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411606#comment-17411606
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13269:


Hey [~rohitbobade], thanks for the report. Unfortunately I don't think the 
acceptable recovery lag is directly responsible, as that config is only used 
within the assignor to figure out the placement of tasks. Assigning a task as 
"Active" just means that the instance should try to process it, the task still 
has to go through restoration if it's anything less than 100% caught up with 
the end of the changelog. 

Wonder if this might be due to 
https://issues.apache.org/jira/browse/KAFKA-13249?

> Kafka Streams Aggregation data loss between instance restarts and rebalances
> 
>
> Key: KAFKA-13269
> URL: https://issues.apache.org/jira/browse/KAFKA-13269
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.2
>Reporter: Rohit Bobade
>Priority: Major
>
> Using Kafka Streams 2.6.2 and doing count based aggregation of messages. Also 
> setting Processing Guarantee - EXACTLY_ONCE_BETA and 
> NUM_STANDBY_REPLICAS_CONFIG = 1. Sending some messages and restarting 
> instances in middle while processing to test fault tolerance. The output 
> count is incorrect because of data loss while restoring state.
> It looks like the streams task becomes active and starts processing even when 
> the state is not fully restored but is within the acceptable recovery lag 
> (default is 1) This results in data loss
> {quote}A stateful active task is assigned to an instance only when its state 
> is within the configured acceptable.recovery.lag, if one exists
> {quote}
> [https://docs.confluent.io/platform/current/streams/developer-guide/running-app.html?_ga=2.33073014.912824567.1630441414-1598368976.1615841473#state-restoration-during-workload-rebalance]
> [https://docs.confluent.io/platform/current/installation/configuration/streams-configs.html#streamsconfigs_acceptable.recovery.lag]
> Setting acceptable.recovery.lag to 0 and re-running the chaos tests gives the 
> correct result.
> Related KIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams#KIP441:SmoothScalingOutforKafkaStreams-Computingthemost-caught-upinstances]
> Just want to get some thoughts on this use case from the Kafka team or if 
> anyone has encountered similar issue



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #11283: KAFKA-13249: Always update changelog offsets before writing the checkpoint file

2021-09-07 Thread GitBox


ableegoldman commented on a change in pull request #11283:
URL: https://github.com/apache/kafka/pull/11283#discussion_r703953000



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -565,7 +565,7 @@ public void closeCleanAndRecycleState() {
 protected void maybeWriteCheckpoint(final boolean enforceCheckpoint) {
 // commitNeeded indicates we may have processed some records since 
last commit
 // and hence we need to refresh checkpointable offsets regardless 
whether we should checkpoint or not
-if (commitNeeded) {
+if (commitNeeded || enforceCheckpoint) {

Review comment:
   Fair enough 🙂 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11242: [WIP] MINOR: POC for KIP-591: Add config to set default store impl class

2021-09-07 Thread GitBox


guozhangwang commented on pull request #11242:
URL: https://github.com/apache/kafka/pull/11242#issuecomment-914789772


   Thanks for the POC @showuon ! I made a pass and here are some high-level 
thoughts:
   
   1) On the PAPI layer, users today can only add stores via 
Topology#addStateStore in which a `StoreBuilder` is required, and from the 
public APIs they are most likely going to create one from the `Stores` factory. 
Note that `KeyValueStoreBuilder` is not a public API and hence cannot be really 
used by PAPI users. In other words, in PAPI today users would have to specify 
the store types explicitly via the `Topology#addStateStore` API anyways, and 
that's also why we only consider this KIP for DSL originally. If we want to 
benefit the PAPI users, we would probably need to add new functions in the 
`Stores` factory which would not specify if it is persistent or in-memory, but 
only the type (kv, window, etc). That would require a larger KIP and maybe we 
can exclude that for this one.
   
   2) I personally would prefer as less public API changes/additions as 
possible for KIPs, since it's always easier to add later when we found it's not 
sufficient, than add first and regret later :) Plus with more public APIs it is 
always getting clumsier. For that, my original thought would be, from user's 
perspective, if `Materialized` construct does not have a supplier set, then 
Streams would use the supplier as configured by the config.
   
   I.e. we should work with all thee cases below:
   
   ```
   // explicitly set the store
   
stream.groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("some-name"));

   // only set the store type
   stream.groupByKey().count(Materialized.as(StoreType.IN_MEMORY));
   
   // do not set the store type at all, rely on the config
   stream.groupByKey().count(Materialized.as("some-name"));
   ```
   
   I understand that today the config is not available yet during the DSL's 
StreamsBuilder phase, but only during the build() phase. What I was thinking is 
that during the StreamsBuidler's topology-construction phase, e.g. inside 
`materialize()` function, instead of 
   
   ```
   if (supplier == null) {
   final String name = materialized.storeName();
   supplier = Stores.persistentTimestampedKeyValueStore(name);
   }
   ```
   
   That means, for the third case above, we would need some "placeholder" 
StoreSupplier as it is "to be decided", and then during 
`buildAndOptimizeTopology` we would fill-in the placeholders based on the 
configs. For the other two cases, we would derive the supplier immediately. 
With that we would not need the `StoreImplementation` interface. 
   
   In general I'm leaning towards we, as the developers of KS, eat more 
complexity bullets so that we can leave the public interface as succinct as 
possible to the users. As for passing in the config at the `StreamsBuilder` 
constructor instead of the build phase, I think that would work too, and would 
be much simpler, but again it would change the public API pattern as well which 
makes the `build(properties)` function deprecated, and the other deprecated 
`build()` back to normal? Maybe cc @ableegoldman as well since she's working on 
another project which would benefit if we pass in the config at the constructor 
phase).
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13268) Add more integration tests for Table Table FK joins with repartitioning

2021-09-07 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13268?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411619#comment-17411619
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13268:


What's the name of that testing framework where you just define the parameter 
space and then it runs all the permutations of those? We should start using 
that in Streams, especially for new operators and configurations since we've 
been bitten a few times by a specific topology we don't have test coverage for 
running into some weird edge case. 

Would be nice if we could just append to this framework any time we add a new 
operator, and it will run tests with all configurations of that operator 
against all (or many/a randomized set) combinations of other operators

> Add more integration tests for Table Table FK joins with repartitioning
> ---
>
> Key: KAFKA-13268
> URL: https://issues.apache.org/jira/browse/KAFKA-13268
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Priority: Major
>
> We should add to the FK join multipartition integration test with a 
> Repartitioned for:
> 1) just the new partition count
> 2) a custom partitioner
> This is to test if there's a bug where the internal topics don't pick up a 
> partitioner provided that way.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #11218: MINOR: optimize performAssignment to skip unnecessary check

2021-09-07 Thread GitBox


showuon commented on a change in pull request #11218:
URL: https://github.com/apache/kafka/pull/11218#discussion_r703970666



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
##
@@ -48,6 +53,16 @@
 public class ConsumerConfig extends AbstractConfig {
 private static final ConfigDef CONFIG;
 
+// a list contains all the in-product assignor names. Should be updated 
when new assignor added.
+// This is to help optimize the ConsumerCoordinator#performAssignment
+public static final List IN_PRODUCT_ASSIGNOR_NAMES =

Review comment:
   `ASSIGN_FROM_SUBSCRIBED_ASSIGNORS` sounds good to me. Updated. Let's 
wait for the jenkins build. Thank you! :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13248) Class name mismatch for LoggerFactory.getLogger method in TimeOrderedWindowStoreBuilder.java

2021-09-07 Thread S. B. Hunter (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13248?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411631#comment-17411631
 ] 

S. B. Hunter commented on KAFKA-13248:
--

No. Go right ahead. Thank you for your handling my findings swiftly.

> Class name mismatch for LoggerFactory.getLogger method in 
> TimeOrderedWindowStoreBuilder.java
> 
>
> Key: KAFKA-13248
> URL: https://issues.apache.org/jira/browse/KAFKA-13248
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: S. B. Hunter
>Priority: Minor
> Attachments: KAFKA-13248.1.patch
>
>
> I have noticed a mismatch with the class name passed to the 
> LoggerFactory.getLogger method. This would make it hard to track the source 
> of log messages.
> public class {color:#00875a}TimeOrderedWindowStoreBuilder{color} 
> extends AbstractStoreBuilder> {
>  private final Logger log = 
> LoggerFactory.getLogger({color:#de350b}WindowStoreBuilder{color}.class);
> private final WindowBytesStoreSupplier storeSupplier;
> public {color:#00875a}TimeOrderedWindowStoreBuilder{color}(final 
> WindowBytesStoreSupplier storeSupplier,
>  final Serde keySerde,



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #11278: KAFKA-12648: Enforce size limits for each task's cache

2021-09-07 Thread GitBox


ableegoldman commented on a change in pull request #11278:
URL: https://github.com/apache/kafka/pull/11278#discussion_r703973474



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##
@@ -84,13 +85,34 @@ public synchronized void resize(final long 
newCacheSizeBytes) {
 while (sizeBytes() > maxCacheSizeBytes) {
 final NamedCache cache = circularIterator.next();
 cache.evict();
-numEvicts++;
+numEvicts.incrementAndGet();
 }
 } else {
 log.debug("Cache size was expanded to {}", newCacheSizeBytes);
 }
 }
 
+public synchronized void resize(final Map newCacheSizes) {
+maxCacheSizeBytes = newCacheSizes.values().stream().reduce(0L, 
Long::sum);
+log.debug("Cache size was changed to {}", newCacheSizes);
+for (final Map.Entry taskMaxSize: 
newCacheSizes.entrySet()) {
+for (final Map.Entry cache: caches.entrySet()) 
{
+if (cache.getKey().contains(taskMaxSize.getKey())) {
+cache.getValue().setMaxBytes(taskMaxSize.getValue());
+}
+}
+}
+if (caches.values().isEmpty()) {
+return;
+}
+final CircularIterator circularIterator = new 
CircularIterator<>(caches.values());
+while (sizeBytes() > maxCacheSizeBytes) {
+final NamedCache cache = circularIterator.next();
+cache.evict();
+numEvicts.incrementAndGet();
+}

Review comment:
   nit: we do this same thing in the other `#resize` for thread count 
changes, can you factor it out into a helper method? Then I think we can narrow 
the scope and make only that helper synchronized (should double check that 
though)

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##
@@ -84,13 +85,34 @@ public synchronized void resize(final long 
newCacheSizeBytes) {
 while (sizeBytes() > maxCacheSizeBytes) {
 final NamedCache cache = circularIterator.next();
 cache.evict();
-numEvicts++;
+numEvicts.incrementAndGet();
 }
 } else {
 log.debug("Cache size was expanded to {}", newCacheSizeBytes);
 }
 }
 
+public synchronized void resize(final Map newCacheSizes) {
+maxCacheSizeBytes = newCacheSizes.values().stream().reduce(0L, 
Long::sum);
+log.debug("Cache size was changed to {}", newCacheSizes);
+for (final Map.Entry taskMaxSize: 
newCacheSizes.entrySet()) {
+for (final Map.Entry cache: caches.entrySet()) 
{
+if (cache.getKey().contains(taskMaxSize.getKey())) {
+cache.getValue().setMaxBytes(taskMaxSize.getValue());
+}
+}
+}
+if (caches.values().isEmpty()) {

Review comment:
   Any reason this checks emptiness of `caches.values()` instead of 
`caches.keys()`?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java
##
@@ -43,7 +44,7 @@
 // internal stats
 private long numPuts = 0;
 private long numGets = 0;
-private long numEvicts = 0;
+private AtomicLong numEvicts = new AtomicLong(0);

Review comment:
   why make this atomic, we're still only ever evicting/accessing this from 
the actual StreamThread right?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -502,7 +504,8 @@ public StreamThread(final Time time,
 this.assignmentErrorCode = assignmentErrorCode;
 this.shutdownErrorHook = shutdownErrorHook;
 this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
-this.cacheResizer = cacheResizer;
+this.threadCache = threadCache;
+cacheSizes = new ConcurrentHashMap<>();

Review comment:
   Does this need to be a concurrent map? Seems to only be accessed by the 
StreamThread itself

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##
@@ -247,4 +247,6 @@ default boolean commitRequested() {
  * @return This returns the time the task started idling. If it is not 
idling it returns empty.
  */
 Optional timeCurrentIdlingStarted();
+
+long maxBuffer();

Review comment:
   Should probably specify what kind of buffer in the name (esp. with 
KIP-770 adding another relevant buffer type)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11218: MINOR: optimize performAssignment to skip unnecessary check

2021-09-07 Thread GitBox


ableegoldman commented on a change in pull request #11218:
URL: https://github.com/apache/kafka/pull/11218#discussion_r703978721



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
##
@@ -48,6 +53,16 @@
 public class ConsumerConfig extends AbstractConfig {
 private static final ConfigDef CONFIG;
 
+// a list contains all the in-product assignor names. Should be updated 
when new assignor added.
+// This is to help optimize the ConsumerCoordinator#performAssignment
+public static final List IN_PRODUCT_ASSIGNOR_NAMES =

Review comment:
   Alrighty, just give me a ping when the build is done!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12963) Improve error message for Class cast exception

2021-09-07 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411644#comment-17411644
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12963:


{quote}I can imagine that a class cast exception of types A and B for processor 
KSTREAM-TRANSFORM-16 would be a bit of a head-scratcher
{quote}
Very fair, that's why we recommend naming all operators ;) . But you should be 
able to figure out what it is from the topology description if necessary, along 
with the upstream topics. I know it's not the best experience but I think 
trying to put the topic into this message would end up with a lot of checking 
specific topological patterns in a way that would be hard not to accidentally 
mess up when changing things in the future. Still, if you'd like to have a 
crack at it yourself then definitely go for it! Otherwise maybe split that into 
a separate feature request ticket and see if someone else does

> Improve error message for Class cast exception
> --
>
> Key: KAFKA-12963
> URL: https://issues.apache.org/jira/browse/KAFKA-12963
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Rasmus Helbig Hansen
>Assignee: Andrew Lapidas
>Priority: Minor
> Fix For: 3.1.0
>
>
> After a topology change and starting the application again, we got this type 
> of error message:
>  [g9z-StreamThread-1] ERROR 
> org.apache.kafka.streams.processor.internals.TaskManager  - stream-thread 
> [g9z-StreamThread-1] Failed to process stream task 1_12 due to the following 
> error:
>  org.apache.kafka.streams.errors.StreamsException: ClassCastException 
> invoking Processor. Do the Processor's input types match the deserialized 
> types? Check the Serde setup and change the default Serdes in StreamConfig or 
> provide correct Serdes via method parameters. Make sure the Processor can 
> accept the deserialized input of type key: org.acme.SomeKey, and value: 
> org.acme.SomeValue.
>  Note that although incorrect Serdes are a common cause of error, the cast 
> exception might have another cause (in user code, for example). For example, 
> if a processor wires in a store, but casts the generics incorrectly, a class 
> cast exception could be raised during processing, but the cause would not be 
> wrong Serdes.
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>  at 
> org.apache.kafka.streams.kstream.internals.KStreamJoinWindow$KStreamJoinWindowProcessor.process(KStreamJoinWindow.java:55)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:273)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>  at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:703)
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:703)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1105)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:647)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
>  Caused by: java.lang.ClassCastException: class org.acme.Som

[GitHub] [kafka] ableegoldman merged pull request #11214: KAFKA-12994 Migrate JoinWindowsTest and SessionWindowsTest to new API

2021-09-07 Thread GitBox


ableegoldman merged pull request #11214:
URL: https://github.com/apache/kafka/pull/11214


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13267) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id

2021-09-07 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13267:

Component/s: streams

> InvalidPidMappingException: The producer attempted to use a producer id which 
> is not currently assigned to its transactional id
> ---
>
> Key: KAFKA-13267
> URL: https://issues.apache.org/jira/browse/KAFKA-13267
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Gilles Philippart
>Priority: Major
>
> We're using Confluent Cloud and Kafka Streams 2.8.0 and we've seen these 
> errors pop up in apps using EOS:
> {code:java}
> InvalidPidMappingException: The producer attempted to use a producer id which 
> is not currently assigned to its transactional id
> {code}
> Full stack trace:
> {code:java}
> Error encountered sending record to topic ola-update-1 for task 4_7 due to: 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id. Exception handler choose to FAIL the processing, no more 
> records would be sent.
> RecordCollectorImpl.java  226  recordSendError(...)
> RecordCollectorImpl.java:226:in `recordSendError'
> RecordCollectorImpl.java  196  lambda$send$0(...)
> RecordCollectorImpl.java:196:in `lambda$send$0'
> KafkaProducer.java  1365  onCompletion(...)
> KafkaProducer.java:1365:in `onCompletion'
> ProducerBatch.java  231  completeFutureAndFireCallbacks(...)
> ProducerBatch.java:231:in `completeFutureAndFireCallbacks'
> ProducerBatch.java  159  abort(...)
> ProducerBatch.java:159:in `abort'
> RecordAccumulator.java  763  abortBatches(...)
> RecordAccumulator.java:763:in `abortBatches'
> More (5 lines)
> Nested Exceptionsorg.apache.kafka.streams.errors.StreamsException: Error 
> encountered sending record to topic ola-update-1 for task 4_7 due to: 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id. Exception handler choose to FAIL the processing, no more 
> records would be sent.
> RecordCollectorImpl.java  226  recordSendError(...)
> RecordCollectorImpl.java:226:in `recordSendError'
>  org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> {code}
> I've seen that KAFKA-6821 described the same problem on an earlier version of 
> Kafka and was closed due to the subsequent works on EOS.
> Another ticket raised recently shows that the exception is still occurring 
> (but the ticket wasn't raised for that specific error): KAFKA-12774



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13267) InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id

2021-09-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411666#comment-17411666
 ] 

Matthias J. Sax commented on KAFKA-13267:
-

[~guozhang] [~hachikuji] – Given the error message, this might actually be a 
broker or producer bug, rather than a Stream issue? 

> InvalidPidMappingException: The producer attempted to use a producer id which 
> is not currently assigned to its transactional id
> ---
>
> Key: KAFKA-13267
> URL: https://issues.apache.org/jira/browse/KAFKA-13267
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Gilles Philippart
>Priority: Major
>
> We're using Confluent Cloud and Kafka Streams 2.8.0 and we've seen these 
> errors pop up in apps using EOS:
> {code:java}
> InvalidPidMappingException: The producer attempted to use a producer id which 
> is not currently assigned to its transactional id
> {code}
> Full stack trace:
> {code:java}
> Error encountered sending record to topic ola-update-1 for task 4_7 due to: 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id. Exception handler choose to FAIL the processing, no more 
> records would be sent.
> RecordCollectorImpl.java  226  recordSendError(...)
> RecordCollectorImpl.java:226:in `recordSendError'
> RecordCollectorImpl.java  196  lambda$send$0(...)
> RecordCollectorImpl.java:196:in `lambda$send$0'
> KafkaProducer.java  1365  onCompletion(...)
> KafkaProducer.java:1365:in `onCompletion'
> ProducerBatch.java  231  completeFutureAndFireCallbacks(...)
> ProducerBatch.java:231:in `completeFutureAndFireCallbacks'
> ProducerBatch.java  159  abort(...)
> ProducerBatch.java:159:in `abort'
> RecordAccumulator.java  763  abortBatches(...)
> RecordAccumulator.java:763:in `abortBatches'
> More (5 lines)
> Nested Exceptionsorg.apache.kafka.streams.errors.StreamsException: Error 
> encountered sending record to topic ola-update-1 for task 4_7 due to: 
> org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id. Exception handler choose to FAIL the processing, no more 
> records would be sent.
> RecordCollectorImpl.java  226  recordSendError(...)
> RecordCollectorImpl.java:226:in `recordSendError'
>  org.apache.kafka.common.errors.InvalidPidMappingException: The producer 
> attempted to use a producer id which is not currently assigned to its 
> transactional id.
> {code}
> I've seen that KAFKA-6821 described the same problem on an earlier version of 
> Kafka and was closed due to the subsequent works on EOS.
> Another ticket raised recently shows that the exception is still occurring 
> (but the ticket wasn't raised for that specific error): KAFKA-12774



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10038) ConsumerPerformance.scala supports the setting of client.id

2021-09-07 Thread Yanwen Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411669#comment-17411669
 ] 

Yanwen Lin commented on KAFKA-10038:


Hey [~guozhang], per this wiki: 
[https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes#ContributingCodeChanges-PullRequest],
 when a PR is merged, the related Jira ticket will also be closed automatically.

The PR in this ticket is linked and is already merged (PR: 
[https://github.com/apache/kafka/pull/11297]), but this ticket status is still 
PATCH AVAILABLE? Is there anything I missed? Or should I close this ticket 
manually? Thanks!

> ConsumerPerformance.scala supports the setting of client.id
> ---
>
> Key: KAFKA-10038
> URL: https://issues.apache.org/jira/browse/KAFKA-10038
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, core
>Affects Versions: 2.1.1
> Environment: Trunk branch
>Reporter: tigertan
>Assignee: Yanwen Lin
>Priority: Minor
>  Labels: newbie, performance
>
> ConsumerPerformance.scala supports the setting of "client.id", which is a 
> reasonable requirement, and the way "console consumer" and "console producer" 
> handle "client.id" can be unified. "client.id" defaults to 
> "perf-consumer-client".
> We often use client.id in quotas, if the script of 
> kafka-producer-perf-test.sh supports the setting of "client.id" , we can do 
> quota testing through scripts without writing our own consumer programs. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10038) ConsumerPerformance.scala supports the setting of client.id

2021-09-07 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411674#comment-17411674
 ] 

Guozhang Wang commented on KAFKA-10038:
---

Yeah that should happen.. let's wait for a bit for github bot. Or you can close 
it yourself too.

> ConsumerPerformance.scala supports the setting of client.id
> ---
>
> Key: KAFKA-10038
> URL: https://issues.apache.org/jira/browse/KAFKA-10038
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, core
>Affects Versions: 2.1.1
> Environment: Trunk branch
>Reporter: tigertan
>Assignee: Yanwen Lin
>Priority: Minor
>  Labels: newbie, performance
>
> ConsumerPerformance.scala supports the setting of "client.id", which is a 
> reasonable requirement, and the way "console consumer" and "console producer" 
> handle "client.id" can be unified. "client.id" defaults to 
> "perf-consumer-client".
> We often use client.id in quotas, if the script of 
> kafka-producer-perf-test.sh supports the setting of "client.id" , we can do 
> quota testing through scripts without writing our own consumer programs. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10038) ConsumerPerformance.scala supports the setting of client.id

2021-09-07 Thread Yanwen Lin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17411675#comment-17411675
 ] 

Yanwen Lin commented on KAFKA-10038:


Sure, thanks for your super fast reply. Let's wait at least for a while.

> ConsumerPerformance.scala supports the setting of client.id
> ---
>
> Key: KAFKA-10038
> URL: https://issues.apache.org/jira/browse/KAFKA-10038
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, core
>Affects Versions: 2.1.1
> Environment: Trunk branch
>Reporter: tigertan
>Assignee: Yanwen Lin
>Priority: Minor
>  Labels: newbie, performance
>
> ConsumerPerformance.scala supports the setting of "client.id", which is a 
> reasonable requirement, and the way "console consumer" and "console producer" 
> handle "client.id" can be unified. "client.id" defaults to 
> "perf-consumer-client".
> We often use client.id in quotas, if the script of 
> kafka-producer-perf-test.sh supports the setting of "client.id" , we can do 
> quota testing through scripts without writing our own consumer programs. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >